From d637ad0dafb727bb73c63f1b187d9f83abcd4ec1 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 6 Nov 2017 10:26:52 -0800 Subject: [PATCH] KAFKA-6115: TaskManager should be type aware - remove type specific methods from Task interface - add generics to preserve task type - add sub classes for different task types Author: Matthias J. Sax Reviewers: Bill Bejeck , Damian Guy , Guozhang Wang Closes #4129 from mjsax/kafka-6115-taskManager-should-be-type-aware --- .../processor/internals/AbstractTask.java | 8 +- .../internals/AssignedStandbyTasks.java | 27 +++ .../internals/AssignedStreamsTasks.java | 128 +++++++++++++ .../processor/internals/AssignedTasks.java | 174 +++++------------- .../processor/internals/RestoringTasks.java | 2 +- .../processor/internals/StandbyTask.java | 25 --- .../processor/internals/StreamTask.java | 11 -- .../processor/internals/StreamThread.java | 36 ++-- .../streams/processor/internals/Task.java | 59 +++--- .../processor/internals/TaskAction.java | 4 +- .../processor/internals/TaskManager.java | 32 ++-- .../processor/internals/AbstractTaskTest.java | 37 ---- ...est.java => AssignedStreamsTasksTest.java} | 12 +- .../internals/StoreChangelogReaderTest.java | 2 +- .../processor/internals/TaskManagerTest.java | 22 ++- .../StreamThreadStateStoreProviderTest.java | 3 +- 16 files changed, 278 insertions(+), 304 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java rename streams/src/test/java/org/apache/kafka/streams/processor/internals/{AssignedTasksTest.java => AssignedStreamsTasksTest.java} (97%) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 52465ed0045..b0ae23c48d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -102,22 +102,22 @@ public abstract class AbstractTask implements Task { } @Override - public final String applicationId() { + public String applicationId() { return applicationId; } @Override - public final Set partitions() { + public Set partitions() { return partitions; } @Override - public final ProcessorTopology topology() { + public ProcessorTopology topology() { return topology; } @Override - public final ProcessorContext context() { + public ProcessorContext context() { return processorContext; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java new file mode 100644 index 00000000000..a99e45147b9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.LogContext; + +class AssignedStandbyTasks extends AssignedTasks { + + AssignedStandbyTasks(final LogContext logContext) { + super(logContext, "standby task"); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java new file mode 100644 index 00000000000..5ef404f361a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; + +import java.util.Iterator; +import java.util.Map; + +class AssignedStreamsTasks extends AssignedTasks implements RestoringTasks { + private final Logger log; + private final TaskAction maybeCommitAction; + private int committed = 0; + + AssignedStreamsTasks(final LogContext logContext) { + super(logContext, "stream task"); + + this.log = logContext.logger(getClass()); + + maybeCommitAction = new TaskAction() { + @Override + public String name() { + return "maybeCommit"; + } + + @Override + public void apply(final StreamTask task) { + if (task.commitNeeded()) { + committed++; + task.commit(); + log.debug("Committed active task {} per user request in", task.id()); + } + } + }; + } + + @Override + public StreamTask restoringTaskFor(final TopicPartition partition) { + return restoringByPartition.get(partition); + } + + /** + * @throws TaskMigratedException if committing offsets failed (non-EOS) + * or if the task producer got fenced (EOS) + */ + int maybeCommit() { + committed = 0; + applyToRunningTasks(maybeCommitAction); + return committed; + } + + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ + int process() { + int processed = 0; + final Iterator> it = running.entrySet().iterator(); + while (it.hasNext()) { + final StreamTask task = it.next().getValue(); + try { + if (task.process()) { + processed++; + } + } catch (final TaskMigratedException e) { + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } + it.remove(); + throw e; + } catch (final RuntimeException e) { + log.error("Failed to process stream task {} due to the following error:", task.id(), e); + throw e; + } + } + return processed; + } + + /** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ + int punctuate() { + int punctuated = 0; + final Iterator> it = running.entrySet().iterator(); + while (it.hasNext()) { + final StreamTask task = it.next().getValue(); + try { + if (task.maybePunctuateStreamTime()) { + punctuated++; + } + if (task.maybePunctuateSystemTime()) { + punctuated++; + } + } catch (final TaskMigratedException e) { + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } + it.remove(); + throw e; + } catch (final KafkaException e) { + log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); + throw e; + } + } + return punctuated; + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index a3c9c2ff889..b90ec10dafd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; @@ -37,22 +36,19 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -class AssignedTasks implements RestoringTasks { +abstract class AssignedTasks { private final Logger log; private final String taskTypeName; - private final TaskAction maybeCommitAction; - private final TaskAction commitAction; - private Map created = new HashMap<>(); - private Map suspended = new HashMap<>(); - private Map restoring = new HashMap<>(); + private final TaskAction commitAction; + private Map created = new HashMap<>(); + private Map suspended = new HashMap<>(); + private Map restoring = new HashMap<>(); private Set restoredPartitions = new HashSet<>(); private Set previousActiveTasks = new HashSet<>(); // IQ may access this map. - private Map running = new ConcurrentHashMap<>(); - private Map runningByPartition = new HashMap<>(); - private Map restoringByPartition = new HashMap<>(); - private int committed = 0; - + Map running = new ConcurrentHashMap<>(); + private Map runningByPartition = new HashMap<>(); + Map restoringByPartition = new HashMap<>(); AssignedTasks(final LogContext logContext, final String taskTypeName) { @@ -60,36 +56,20 @@ class AssignedTasks implements RestoringTasks { this.log = logContext.logger(getClass()); - maybeCommitAction = new TaskAction() { - @Override - public String name() { - return "maybeCommit"; - } - - @Override - public void apply(final Task task) { - if (task.commitNeeded()) { - committed++; - task.commit(); - log.debug("Committed active task {} per user request in", task.id()); - } - } - }; - - commitAction = new TaskAction() { + commitAction = new TaskAction() { @Override public String name() { return "commit"; } @Override - public void apply(final Task task) { + public void apply(final T task) { task.commit(); } }; } - void addNewTask(final Task task) { + void addNewTask(final T task) { created.put(task.id(), task); } @@ -98,7 +78,7 @@ class AssignedTasks implements RestoringTasks { return Collections.emptySet(); } final Set partitions = new HashSet<>(); - for (final Map.Entry entry : created.entrySet()) { + for (final Map.Entry entry : created.entrySet()) { if (entry.getValue().hasStateStores()) { partitions.addAll(entry.getValue().partitions()); } @@ -116,8 +96,8 @@ class AssignedTasks implements RestoringTasks { if (!created.isEmpty()) { log.debug("Initializing {}s {}", taskTypeName, created.keySet()); } - for (final Iterator> it = created.entrySet().iterator(); it.hasNext(); ) { - final Map.Entry entry = it.next(); + for (final Iterator> it = created.entrySet().iterator(); it.hasNext(); ) { + final Map.Entry entry = it.next(); try { if (!entry.getValue().initialize()) { log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey()); @@ -141,9 +121,9 @@ class AssignedTasks implements RestoringTasks { log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored); final Set resume = new HashSet<>(); restoredPartitions.addAll(restored); - for (final Iterator> it = restoring.entrySet().iterator(); it.hasNext(); ) { - final Map.Entry entry = it.next(); - final Task task = entry.getValue(); + for (final Iterator> it = restoring.entrySet().iterator(); it.hasNext(); ) { + final Map.Entry entry = it.next(); + final T task = entry.getValue(); if (restoredPartitions.containsAll(task.changelogPartitions())) { transitionToRunning(task, resume); it.remove(); @@ -174,7 +154,7 @@ class AssignedTasks implements RestoringTasks { && restoring.isEmpty(); } - Collection running() { + Collection running() { return running.values(); } @@ -196,9 +176,9 @@ class AssignedTasks implements RestoringTasks { return firstException.get(); } - private RuntimeException closeNonRunningTasks(final Collection tasks) { + private RuntimeException closeNonRunningTasks(final Collection tasks) { RuntimeException exception = null; - for (final Task task : tasks) { + for (final T task : tasks) { try { task.close(false, false); } catch (final RuntimeException e) { @@ -211,10 +191,10 @@ class AssignedTasks implements RestoringTasks { return exception; } - private RuntimeException suspendTasks(final Collection tasks) { + private RuntimeException suspendTasks(final Collection tasks) { final AtomicReference firstException = new AtomicReference<>(null); - for (Iterator it = tasks.iterator(); it.hasNext(); ) { - final Task task = it.next(); + for (Iterator it = tasks.iterator(); it.hasNext(); ) { + final T task = it.next(); try { task.suspend(); suspended.put(task.id(), task); @@ -235,7 +215,7 @@ class AssignedTasks implements RestoringTasks { return firstException.get(); } - private RuntimeException closeZombieTask(final Task task) { + RuntimeException closeZombieTask(final T task) { log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id()); try { task.close(false, true); @@ -255,7 +235,7 @@ class AssignedTasks implements RestoringTasks { */ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set partitions) { if (suspended.containsKey(taskId)) { - final Task task = suspended.get(taskId); + final T task = suspended.get(taskId); log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); @@ -279,7 +259,7 @@ class AssignedTasks implements RestoringTasks { return false; } - private void addToRestoring(final Task task) { + private void addToRestoring(final T task) { restoring.put(task.id(), task); for (TopicPartition topicPartition : task.partitions()) { restoringByPartition.put(topicPartition, task); @@ -289,7 +269,7 @@ class AssignedTasks implements RestoringTasks { } } - private void transitionToRunning(final Task task, final Set readyPartitions) { + private void transitionToRunning(final T task, final Set readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); for (TopicPartition topicPartition : task.partitions()) { @@ -303,12 +283,7 @@ class AssignedTasks implements RestoringTasks { } } - @Override - public Task restoringTaskFor(final TopicPartition partition) { - return restoringByPartition.get(partition); - } - - Task runningTaskFor(final TopicPartition partition) { + T runningTaskFor(final TopicPartition partition) { return runningByPartition.get(partition); } @@ -316,7 +291,7 @@ class AssignedTasks implements RestoringTasks { return running.keySet(); } - Map runningTaskMap() { + Map runningTaskMap() { return Collections.unmodifiableMap(running); } @@ -330,18 +305,18 @@ class AssignedTasks implements RestoringTasks { } private void describe(final StringBuilder builder, - final Collection tasks, + final Collection tasks, final String indent, final String name) { builder.append(indent).append(name); - for (final Task t : tasks) { + for (final T t : tasks) { builder.append(indent).append(t.toString(indent + "\t\t")); } builder.append("\n"); } - private List allTasks() { - final List tasks = new ArrayList<>(); + private List allTasks() { + final List tasks = new ArrayList<>(); tasks.addAll(running.values()); tasks.addAll(suspended.values()); tasks.addAll(restoring.values()); @@ -349,7 +324,7 @@ class AssignedTasks implements RestoringTasks { return tasks; } - Collection restoringTasks() { + Collection restoringTasks() { return Collections.unmodifiableCollection(restoring.values()); } @@ -384,78 +359,11 @@ class AssignedTasks implements RestoringTasks { return running.size(); } - /** - * @throws TaskMigratedException if committing offsets failed (non-EOS) - * or if the task producer got fenced (EOS) - */ - int maybeCommit() { - committed = 0; - applyToRunningTasks(maybeCommitAction); - return committed; - } - - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ - int process() { - int processed = 0; - final Iterator> it = running.entrySet().iterator(); - while (it.hasNext()) { - final Task task = it.next().getValue(); - try { - if (task.process()) { - processed++; - } - } catch (final TaskMigratedException e) { - final RuntimeException fatalException = closeZombieTask(task); - if (fatalException != null) { - throw fatalException; - } - it.remove(); - throw e; - } catch (final RuntimeException e) { - log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e); - throw e; - } - } - return processed; - } - - /** - * @throws TaskMigratedException if the task producer got fenced (EOS only) - */ - int punctuate() { - int punctuated = 0; - final Iterator> it = running.entrySet().iterator(); - while (it.hasNext()) { - final Task task = it.next().getValue(); - try { - if (task.maybePunctuateStreamTime()) { - punctuated++; - } - if (task.maybePunctuateSystemTime()) { - punctuated++; - } - } catch (final TaskMigratedException e) { - final RuntimeException fatalException = closeZombieTask(task); - if (fatalException != null) { - throw fatalException; - } - it.remove(); - throw e; - } catch (final KafkaException e) { - log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e); - throw e; - } - } - return punctuated; - } - - private void applyToRunningTasks(final TaskAction action) { + void applyToRunningTasks(final TaskAction action) { RuntimeException firstException = null; - for (Iterator it = running().iterator(); it.hasNext(); ) { - final Task task = it.next(); + for (Iterator it = running().iterator(); it.hasNext(); ) { + final T task = it.next(); try { action.apply(task); } catch (final TaskMigratedException e) { @@ -485,9 +393,9 @@ class AssignedTasks implements RestoringTasks { } void closeNonAssignedSuspendedTasks(final Map> newAssignment) { - final Iterator standByTaskIterator = suspended.values().iterator(); + final Iterator standByTaskIterator = suspended.values().iterator(); while (standByTaskIterator.hasNext()) { - final Task suspendedTask = standByTaskIterator.next(); + final T suspendedTask = standByTaskIterator.next(); if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) { log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id()); try { @@ -503,7 +411,7 @@ class AssignedTasks implements RestoringTasks { void close(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); - for (final Task task : allTasks()) { + for (final T task : allTasks()) { try { task.close(clean, false); } catch (final TaskMigratedException e) { @@ -531,7 +439,7 @@ class AssignedTasks implements RestoringTasks { } } - private boolean closeUnclean(final Task task) { + private boolean closeUnclean(final T task) { log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id()); try { task.close(false, false); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java index 6ed28fdf63c..3671b493f19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java @@ -19,5 +19,5 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; public interface RestoringTasks { - Task restoringTaskFor(final TopicPartition partition); + StreamTask restoringTaskFor(final TopicPartition partition); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 98ec8101180..fbbb3570623 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -147,11 +147,6 @@ public class StandbyTask extends AbstractTask { close(clean, isZombie); } - @Override - public boolean commitNeeded() { - return false; - } - /** * Updates a state store using records from one change log partition * @@ -163,28 +158,8 @@ public class StandbyTask extends AbstractTask { return stateMgr.updateStandbyStates(partition, records); } - @Override - public int addRecords(final TopicPartition partition, final Iterable> records) { - throw new UnsupportedOperationException("add records not supported by StandbyTasks"); - } - public Map checkpointedOffsets() { return checkpointedOffsets; } - @Override - public boolean maybePunctuateStreamTime() { - throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask"); - } - - @Override - public boolean maybePunctuateSystemTime() { - throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask"); - } - - @Override - public boolean process() { - throw new UnsupportedOperationException("process not supported by StandbyTasks"); - } - } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 06f45ed24f4..4b78e2716b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -42,7 +42,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import static java.lang.String.format; @@ -493,11 +492,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } } - @Override - public Map checkpointedOffsets() { - throw new UnsupportedOperationException("checkpointedOffsets is not supported by StreamTasks"); - } - /** *
      * - {@link #suspend(boolean) suspend(clean)}
@@ -619,11 +613,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
     }
 
-    @Override
-    public List> update(final TopicPartition partition, final List> remaining) {
-        throw new UnsupportedOperationException("update is not implemented");
-    }
-
     /**
      * Request committing the current task's state
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72b3a52dd1b..1982afb1d2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -308,7 +308,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
     }
 
-    static abstract class AbstractTaskCreator {
+    static abstract class AbstractTaskCreator {
         final String applicationId;
         final InternalTopologyBuilder builder;
         final StreamsConfig config;
@@ -342,12 +342,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         /**
          * @throws TaskMigratedException if the task producer got fenced (EOS only)
          */
-        Collection createTasks(final Consumer consumer, final Map> tasksToBeCreated) {
-            final List createdTasks = new ArrayList<>();
+        Collection createTasks(final Consumer consumer, final Map> tasksToBeCreated) {
+            final List createdTasks = new ArrayList<>();
             for (final Map.Entry> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
                 final TaskId taskId = newTaskAndPartitions.getKey();
                 final Set partitions = newTaskAndPartitions.getValue();
-                Task task = createTask(consumer, taskId, partitions);
+                T task = createTask(consumer, taskId, partitions);
                 if (task != null) {
                     log.trace("Created task {} with assigned partitions {}", taskId, partitions);
                     createdTasks.add(task);
@@ -357,12 +357,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             return createdTasks;
         }
 
-        abstract Task createTask(final Consumer consumer, final TaskId id, final Set partitions);
+        abstract T createTask(final Consumer consumer, final TaskId id, final Set partitions);
 
         public void close() {}
     }
 
-    static class TaskCreator extends AbstractTaskCreator {
+    static class TaskCreator extends AbstractTaskCreator {
         private final ThreadCache cache;
         private final KafkaClientSupplier clientSupplier;
         private final String threadClientId;
@@ -441,7 +441,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
     }
 
-    static class StandbyTaskCreator extends AbstractTaskCreator {
+    static class StandbyTaskCreator extends AbstractTaskCreator {
         StandbyTaskCreator(final InternalTopologyBuilder builder,
                            final StreamsConfig config,
                            final StreamsMetrics streamsMetrics,
@@ -706,12 +706,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                                         restoreConsumer,
                                                         activeTaskCreator,
                                                         standbyTaskCreator,
-                                                        new AssignedTasks(logContext,
-                                                                          "stream task"
-                                                        ),
-                                                        new AssignedTasks(logContext,
-                                                                          "standby task"
-                                                        ));
+                                                        new AssignedStreamsTasks(logContext),
+                                                        new AssignedStandbyTasks(logContext));
 
         return new StreamThread(builder,
                                 clientId,
@@ -916,7 +912,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             int numAddedRecords = 0;
 
             for (final TopicPartition partition : records.partitions()) {
-                final Task task = taskManager.activeTask(partition);
+                final StreamTask task = taskManager.activeTask(partition);
                 numAddedRecords += task.addRecords(partition, records.records(partition));
             }
             streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@@ -1040,7 +1036,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                         final TopicPartition partition = entry.getKey();
                         List> remaining = entry.getValue();
                         if (remaining != null) {
-                            final Task task = taskManager.standbyTask(partition);
+                            final StandbyTask task = taskManager.standbyTask(partition);
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, remaining);
@@ -1061,7 +1057,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
             if (!records.isEmpty()) {
                 for (final TopicPartition partition : records.partitions()) {
-                    final Task task = taskManager.standbyTask(partition);
+                    final StandbyTask task = taskManager.standbyTask(partition);
 
                     if (task == null) {
                         throw new StreamsException(logPrefix + "Missing standby task for partition " + partition);
@@ -1101,7 +1097,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         setState(State.PENDING_SHUTDOWN);
     }
 
-    public Map tasks() {
+    public Map tasks() {
         return taskManager.activeTasks();
     }
 
@@ -1248,16 +1244,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         return threadMetadata;
     }
 
-    private void updateThreadMetadata(final Map activeTasks, final Map standbyTasks) {
+    private void updateThreadMetadata(final Map activeTasks, final Map standbyTasks) {
         final Set activeTasksMetadata = new HashSet<>();
         if (activeTasks != null) {
-            for (Map.Entry task : activeTasks.entrySet()) {
+            for (Map.Entry task : activeTasks.entrySet()) {
                 activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
             }
         }
         final Set standbyTasksMetadata = new HashSet<>();
         if (standbyTasks != null) {
-            for (Map.Entry task : standbyTasks.entrySet()) {
+            for (Map.Entry task : standbyTasks.entrySet()) {
                 standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 80e5423761c..f066bffde11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -24,61 +23,49 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public interface Task {
-    void resume();
+    /**
+     * Initialize the task and return {}true if the task is ready to run, i.e, it has not state stores
+     * @return true if this task has no state stores that may need restoring.
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    boolean initialize();
 
     void commit();
 
     void suspend();
 
-    void close(boolean clean, boolean isZombie);
+    void resume();
 
-    TaskId id();
+    void closeSuspended(final boolean clean,
+                        final boolean isZombie,
+                        final RuntimeException e);
 
-    String applicationId();
+    void close(final boolean clean,
+               final boolean isZombie);
 
-    Set partitions();
+    StateStore getStore(final String name);
+
+    String applicationId();
 
     ProcessorTopology topology();
 
     ProcessorContext context();
 
-    StateStore getStore(String name);
-
-    void closeSuspended(boolean clean, boolean isZombie, RuntimeException e);
-
-    Map checkpointedOffsets();
-
-    boolean process();
-
-    boolean commitNeeded();
-
-    boolean maybePunctuateStreamTime();
-
-    boolean maybePunctuateSystemTime();
-
-    List> update(TopicPartition partition, List> remaining);
-
-    String toString(String indent);
-
-    int addRecords(TopicPartition partition, final Iterable> records);
-
-    boolean hasStateStores();
+    TaskId id();
 
-    /**
-     * initialize the task and return true if the task is ready to run, i.e, it has not state stores
-     * @return true if this task has no state stores that may need restoring.
-     * @throws IllegalStateException If store gets registered after initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    boolean initialize();
+    Set partitions();
 
     /**
      * @return any changelog partitions associated with this task
      */
     Collection changelogPartitions();
+
+    boolean hasStateStores();
+
+    String toString(final String indent);
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
index 9112594a180..da5f3250ea7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-interface TaskAction {
+interface TaskAction {
     String name();
-    void apply(final Task task);
+    void apply(final T task);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6f4cc514539..023861506e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -37,23 +37,23 @@ class TaskManager {
     // activeTasks needs to be concurrent as it can be accessed
     // by QueryableState
     private final Logger log;
-    private final AssignedTasks active;
-    private final AssignedTasks standby;
+    private final AssignedStreamsTasks active;
+    private final AssignedStandbyTasks standby;
     private final ChangelogReader changelogReader;
     private final String logPrefix;
     private final Consumer restoreConsumer;
-    private final StreamThread.AbstractTaskCreator taskCreator;
-    private final StreamThread.AbstractTaskCreator standbyTaskCreator;
+    private final StreamThread.AbstractTaskCreator taskCreator;
+    private final StreamThread.AbstractTaskCreator standbyTaskCreator;
     private ThreadMetadataProvider threadMetadataProvider;
     private Consumer consumer;
 
     TaskManager(final ChangelogReader changelogReader,
                 final String logPrefix,
                 final Consumer restoreConsumer,
-                final StreamThread.AbstractTaskCreator taskCreator,
-                final StreamThread.AbstractTaskCreator standbyTaskCreator,
-                final AssignedTasks active,
-                final AssignedTasks standby) {
+                final StreamThread.AbstractTaskCreator taskCreator,
+                final StreamThread.AbstractTaskCreator standbyTaskCreator,
+                final AssignedStreamsTasks active,
+                final AssignedStandbyTasks standby) {
         this.changelogReader = changelogReader;
         this.logPrefix = logPrefix;
         this.restoreConsumer = restoreConsumer;
@@ -133,7 +133,7 @@ class TaskManager {
         // -> other thread will call removeSuspendedTasks(); eventually
         log.trace("New active tasks to be created: {}", newTasks);
 
-        for (final Task task : taskCreator.createTasks(consumer, newTasks)) {
+        for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) {
             active.addNewTask(task);
         }
     }
@@ -166,7 +166,7 @@ class TaskManager {
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
         log.trace("New standby tasks to be created: {}", newStandbyTasks);
 
-        for (final Task task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
+        for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
             standby.addNewTask(task);
         }
     }
@@ -240,20 +240,20 @@ class TaskManager {
         return standby.previousTaskIds();
     }
 
-    Task activeTask(final TopicPartition partition) {
+    StreamTask activeTask(final TopicPartition partition) {
         return active.runningTaskFor(partition);
     }
 
 
-    Task standbyTask(final TopicPartition partition) {
+    StandbyTask standbyTask(final TopicPartition partition) {
         return standby.runningTaskFor(partition);
     }
 
-    Map activeTasks() {
+    Map activeTasks() {
         return active.runningTaskMap();
     }
 
-    Map standbyTasks() {
+    Map standbyTasks() {
         return standby.runningTaskMap();
     }
 
@@ -293,9 +293,9 @@ class TaskManager {
     }
 
     private void assignStandbyPartitions() {
-        final Collection running = standby.running();
+        final Collection running = standby.running();
         final Map checkpointedOffsets = new HashMap<>();
-        for (final Task standbyTask : running) {
+        for (final StandbyTask standbyTask : running) {
             checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 02aa0a0352a..efc6f79b739 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -40,7 +39,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.fail;
@@ -141,41 +139,6 @@ public class AbstractTaskTest {
             @Override
             public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {}
 
-            @Override
-            public Map checkpointedOffsets() {
-                return null;
-            }
-
-            @Override
-            public boolean process() {
-                return false;
-            }
-
-            @Override
-            public boolean commitNeeded() {
-                return false;
-            }
-
-            @Override
-            public boolean maybePunctuateStreamTime() {
-                return false;
-            }
-
-            @Override
-            public boolean maybePunctuateSystemTime() {
-                return false;
-            }
-
-            @Override
-            public List> update(final TopicPartition partition, final List> remaining) {
-                return null;
-            }
-
-            @Override
-            public int addRecords(final TopicPartition partition, final Iterable> records) {
-                return 0;
-            }
-
             @Override
             public boolean initialize() {
                 return false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
similarity index 97%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index a721936c693..3d33b0b7646 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -38,21 +38,21 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class AssignedTasksTest {
+public class AssignedStreamsTasksTest {
 
-    private final Task t1 = EasyMock.createMock(Task.class);
-    private final Task t2 = EasyMock.createMock(Task.class);
+    private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
+    private final StreamTask t2 = EasyMock.createMock(StreamTask.class);
     private final TopicPartition tp1 = new TopicPartition("t1", 0);
     private final TopicPartition tp2 = new TopicPartition("t2", 0);
     private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
     private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
     private final TaskId taskId1 = new TaskId(0, 0);
     private final TaskId taskId2 = new TaskId(1, 0);
-    private AssignedTasks assignedTasks;
+    private AssignedStreamsTasks assignedTasks;
 
     @Before
     public void before() {
-        assignedTasks = new AssignedTasks(new LogContext("log "), "task");
+        assignedTasks = new AssignedStreamsTasks(new LogContext("log "));
         EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
         EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
     }
@@ -117,7 +117,7 @@ public class AssignedTasksTest {
 
         final Set readyPartitions = assignedTasks.initializeNewTasks();
 
-        Collection restoring = assignedTasks.restoringTasks();
+        Collection restoring = assignedTasks.restoringTasks();
         assertThat(restoring.size(), equalTo(1));
         assertSame(restoring.iterator().next(), t1);
         assertThat(readyPartitions, equalTo(t2partitions));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 705bcf98a75..342354cd3dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -61,7 +61,7 @@ public class StoreChangelogReaderTest {
     @Mock(type = MockType.NICE)
     private RestoringTasks active;
     @Mock(type = MockType.NICE)
-    private Task task;
+    private StreamTask task;
 
     private final MockStateRestoreListener callback = new MockStateRestoreListener();
     private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b11b8c27b17..1640f9eae7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -61,17 +61,19 @@ public class TaskManagerTest {
     @Mock(type = MockType.NICE)
     private Consumer consumer;
     @Mock(type = MockType.NICE)
-    private StreamThread.AbstractTaskCreator activeTaskCreator;
+    private StreamThread.AbstractTaskCreator activeTaskCreator;
     @Mock(type = MockType.NICE)
-    private StreamThread.AbstractTaskCreator standbyTaskCreator;
+    private StreamThread.AbstractTaskCreator standbyTaskCreator;
     @Mock(type = MockType.NICE)
     private ThreadMetadataProvider threadMetadataProvider;
     @Mock(type = MockType.NICE)
-    private Task firstTask;
+    private StreamTask streamTask;
     @Mock(type = MockType.NICE)
-    private AssignedTasks active;
+    private StandbyTask standbyTask;
     @Mock(type = MockType.NICE)
-    private AssignedTasks standby;
+    private AssignedStreamsTasks active;
+    @Mock(type = MockType.NICE)
+    private AssignedStandbyTasks standby;
 
     private TaskManager taskManager;
 
@@ -139,7 +141,7 @@ public class TaskManagerTest {
     public void shouldAddNonResumedActiveTasks() {
         mockSingleActiveTask();
         EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
-        active.addNewTask(EasyMock.same(firstTask));
+        active.addNewTask(EasyMock.same(streamTask));
         replay();
 
         taskManager.createTasks(taskId0Partitions);
@@ -164,7 +166,7 @@ public class TaskManagerTest {
     public void shouldAddNonResumedStandbyTasks() {
         mockStandbyTaskExpectations();
         EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
-        standby.addNewTask(EasyMock.same(firstTask));
+        standby.addNewTask(EasyMock.same(standbyTask));
         replay();
 
         taskManager.createTasks(taskId0Partitions);
@@ -470,7 +472,7 @@ public class TaskManagerTest {
     }
 
     private void mockAssignStandbyPartitions(final long offset) {
-        final Task task = EasyMock.createNiceMock(Task.class);
+        final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
         EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet());
         EasyMock.expect(active.allTasksRunning()).andReturn(true);
         EasyMock.expect(active.updateRestored(EasyMock.>anyObject())).
@@ -487,7 +489,7 @@ public class TaskManagerTest {
         mockThreadMetadataProvider(taskId0Assignment, Collections.>emptyMap());
         expect(standbyTaskCreator.createTasks(EasyMock.>anyObject(),
                                                    EasyMock.eq(taskId0Assignment)))
-                .andReturn(Collections.singletonList(firstTask));
+                .andReturn(Collections.singletonList(standbyTask));
 
     }
 
@@ -497,7 +499,7 @@ public class TaskManagerTest {
 
         expect(activeTaskCreator.createTasks(EasyMock.anyObject(Consumer.class),
                                                   EasyMock.eq(taskId0Assignment)))
-                .andReturn(Collections.singletonList(firstTask));
+                .andReturn(Collections.singletonList(streamTask));
 
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 136805128af..9271ca6a1b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -68,7 +67,7 @@ public class StreamThreadStateStoreProviderTest {
     private File stateDir;
     private final String topicName = "topic";
     private StreamThread threadMock;
-    private Map tasks;
+    private Map tasks;
 
     @SuppressWarnings("deprecation")
     @Before