Browse Source

KAFKA-6115: TaskManager should be type aware

- remove type specific methods from Task interface
 - add generics to preserve task type
 - add sub classes for different task types

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4129 from mjsax/kafka-6115-taskManager-should-be-type-aware
pull/4148/merge
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
d637ad0daf
  1. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
  2. 27
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
  3. 128
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
  4. 174
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
  5. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java
  6. 25
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
  7. 11
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  8. 36
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  9. 59
      streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
  10. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java
  11. 32
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  12. 37
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
  13. 12
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
  14. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
  15. 22
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
  16. 3
      streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java

@ -102,22 +102,22 @@ public abstract class AbstractTask implements Task { @@ -102,22 +102,22 @@ public abstract class AbstractTask implements Task {
}
@Override
public final String applicationId() {
public String applicationId() {
return applicationId;
}
@Override
public final Set<TopicPartition> partitions() {
public Set<TopicPartition> partitions() {
return partitions;
}
@Override
public final ProcessorTopology topology() {
public ProcessorTopology topology() {
return topology;
}
@Override
public final ProcessorContext context() {
public ProcessorContext context() {
return processorContext;
}

27
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.utils.LogContext;
class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
AssignedStandbyTasks(final LogContext logContext) {
super(logContext, "standby task");
}
}

128
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java

@ -0,0 +1,128 @@ @@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import java.util.Iterator;
import java.util.Map;
class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
private final Logger log;
private final TaskAction<StreamTask> maybeCommitAction;
private int committed = 0;
AssignedStreamsTasks(final LogContext logContext) {
super(logContext, "stream task");
this.log = logContext.logger(getClass());
maybeCommitAction = new TaskAction<StreamTask>() {
@Override
public String name() {
return "maybeCommit";
}
@Override
public void apply(final StreamTask task) {
if (task.commitNeeded()) {
committed++;
task.commit();
log.debug("Committed active task {} per user request in", task.id());
}
}
};
}
@Override
public StreamTask restoringTaskFor(final TopicPartition partition) {
return restoringByPartition.get(partition);
}
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
*/
int maybeCommit() {
committed = 0;
applyToRunningTasks(maybeCommitAction);
return committed;
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
int process() {
int processed = 0;
final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
while (it.hasNext()) {
final StreamTask task = it.next().getValue();
try {
if (task.process()) {
processed++;
}
} catch (final TaskMigratedException e) {
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final RuntimeException e) {
log.error("Failed to process stream task {} due to the following error:", task.id(), e);
throw e;
}
}
return processed;
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
int punctuate() {
int punctuated = 0;
final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
while (it.hasNext()) {
final StreamTask task = it.next().getValue();
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
} catch (final TaskMigratedException e) {
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e);
throw e;
}
}
return punctuated;
}
}

174
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
@ -37,22 +36,19 @@ import java.util.Set; @@ -37,22 +36,19 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
class AssignedTasks implements RestoringTasks {
abstract class AssignedTasks<T extends Task> {
private final Logger log;
private final String taskTypeName;
private final TaskAction maybeCommitAction;
private final TaskAction commitAction;
private Map<TaskId, Task> created = new HashMap<>();
private Map<TaskId, Task> suspended = new HashMap<>();
private Map<TaskId, Task> restoring = new HashMap<>();
private final TaskAction<T> commitAction;
private Map<TaskId, T> created = new HashMap<>();
private Map<TaskId, T> suspended = new HashMap<>();
private Map<TaskId, T> restoring = new HashMap<>();
private Set<TopicPartition> restoredPartitions = new HashSet<>();
private Set<TaskId> previousActiveTasks = new HashSet<>();
// IQ may access this map.
private Map<TaskId, Task> running = new ConcurrentHashMap<>();
private Map<TopicPartition, Task> runningByPartition = new HashMap<>();
private Map<TopicPartition, Task> restoringByPartition = new HashMap<>();
private int committed = 0;
Map<TaskId, T> running = new ConcurrentHashMap<>();
private Map<TopicPartition, T> runningByPartition = new HashMap<>();
Map<TopicPartition, T> restoringByPartition = new HashMap<>();
AssignedTasks(final LogContext logContext,
final String taskTypeName) {
@ -60,36 +56,20 @@ class AssignedTasks implements RestoringTasks { @@ -60,36 +56,20 @@ class AssignedTasks implements RestoringTasks {
this.log = logContext.logger(getClass());
maybeCommitAction = new TaskAction() {
@Override
public String name() {
return "maybeCommit";
}
@Override
public void apply(final Task task) {
if (task.commitNeeded()) {
committed++;
task.commit();
log.debug("Committed active task {} per user request in", task.id());
}
}
};
commitAction = new TaskAction() {
commitAction = new TaskAction<T>() {
@Override
public String name() {
return "commit";
}
@Override
public void apply(final Task task) {
public void apply(final T task) {
task.commit();
}
};
}
void addNewTask(final Task task) {
void addNewTask(final T task) {
created.put(task.id(), task);
}
@ -98,7 +78,7 @@ class AssignedTasks implements RestoringTasks { @@ -98,7 +78,7 @@ class AssignedTasks implements RestoringTasks {
return Collections.emptySet();
}
final Set<TopicPartition> partitions = new HashSet<>();
for (final Map.Entry<TaskId, Task> entry : created.entrySet()) {
for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
if (entry.getValue().hasStateStores()) {
partitions.addAll(entry.getValue().partitions());
}
@ -116,8 +96,8 @@ class AssignedTasks implements RestoringTasks { @@ -116,8 +96,8 @@ class AssignedTasks implements RestoringTasks {
if (!created.isEmpty()) {
log.debug("Initializing {}s {}", taskTypeName, created.keySet());
}
for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, Task> entry = it.next();
for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, T> entry = it.next();
try {
if (!entry.getValue().initialize()) {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
@ -141,9 +121,9 @@ class AssignedTasks implements RestoringTasks { @@ -141,9 +121,9 @@ class AssignedTasks implements RestoringTasks {
log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
final Set<TopicPartition> resume = new HashSet<>();
restoredPartitions.addAll(restored);
for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, Task> entry = it.next();
final Task task = entry.getValue();
for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, T> entry = it.next();
final T task = entry.getValue();
if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task, resume);
it.remove();
@ -174,7 +154,7 @@ class AssignedTasks implements RestoringTasks { @@ -174,7 +154,7 @@ class AssignedTasks implements RestoringTasks {
&& restoring.isEmpty();
}
Collection<Task> running() {
Collection<T> running() {
return running.values();
}
@ -196,9 +176,9 @@ class AssignedTasks implements RestoringTasks { @@ -196,9 +176,9 @@ class AssignedTasks implements RestoringTasks {
return firstException.get();
}
private RuntimeException closeNonRunningTasks(final Collection<Task> tasks) {
private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
RuntimeException exception = null;
for (final Task task : tasks) {
for (final T task : tasks) {
try {
task.close(false, false);
} catch (final RuntimeException e) {
@ -211,10 +191,10 @@ class AssignedTasks implements RestoringTasks { @@ -211,10 +191,10 @@ class AssignedTasks implements RestoringTasks {
return exception;
}
private RuntimeException suspendTasks(final Collection<Task> tasks) {
private RuntimeException suspendTasks(final Collection<T> tasks) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
final Task task = it.next();
for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
final T task = it.next();
try {
task.suspend();
suspended.put(task.id(), task);
@ -235,7 +215,7 @@ class AssignedTasks implements RestoringTasks { @@ -235,7 +215,7 @@ class AssignedTasks implements RestoringTasks {
return firstException.get();
}
private RuntimeException closeZombieTask(final Task task) {
RuntimeException closeZombieTask(final T task) {
log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
try {
task.close(false, true);
@ -255,7 +235,7 @@ class AssignedTasks implements RestoringTasks { @@ -255,7 +235,7 @@ class AssignedTasks implements RestoringTasks {
*/
boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final Task task = suspended.get(taskId);
final T task = suspended.get(taskId);
log.trace("found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
@ -279,7 +259,7 @@ class AssignedTasks implements RestoringTasks { @@ -279,7 +259,7 @@ class AssignedTasks implements RestoringTasks {
return false;
}
private void addToRestoring(final Task task) {
private void addToRestoring(final T task) {
restoring.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
restoringByPartition.put(topicPartition, task);
@ -289,7 +269,7 @@ class AssignedTasks implements RestoringTasks { @@ -289,7 +269,7 @@ class AssignedTasks implements RestoringTasks {
}
}
private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions) {
private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
@ -303,12 +283,7 @@ class AssignedTasks implements RestoringTasks { @@ -303,12 +283,7 @@ class AssignedTasks implements RestoringTasks {
}
}
@Override
public Task restoringTaskFor(final TopicPartition partition) {
return restoringByPartition.get(partition);
}
Task runningTaskFor(final TopicPartition partition) {
T runningTaskFor(final TopicPartition partition) {
return runningByPartition.get(partition);
}
@ -316,7 +291,7 @@ class AssignedTasks implements RestoringTasks { @@ -316,7 +291,7 @@ class AssignedTasks implements RestoringTasks {
return running.keySet();
}
Map<TaskId, Task> runningTaskMap() {
Map<TaskId, T> runningTaskMap() {
return Collections.unmodifiableMap(running);
}
@ -330,18 +305,18 @@ class AssignedTasks implements RestoringTasks { @@ -330,18 +305,18 @@ class AssignedTasks implements RestoringTasks {
}
private void describe(final StringBuilder builder,
final Collection<Task> tasks,
final Collection<T> tasks,
final String indent,
final String name) {
builder.append(indent).append(name);
for (final Task t : tasks) {
for (final T t : tasks) {
builder.append(indent).append(t.toString(indent + "\t\t"));
}
builder.append("\n");
}
private List<Task> allTasks() {
final List<Task> tasks = new ArrayList<>();
private List<T> allTasks() {
final List<T> tasks = new ArrayList<>();
tasks.addAll(running.values());
tasks.addAll(suspended.values());
tasks.addAll(restoring.values());
@ -349,7 +324,7 @@ class AssignedTasks implements RestoringTasks { @@ -349,7 +324,7 @@ class AssignedTasks implements RestoringTasks {
return tasks;
}
Collection<Task> restoringTasks() {
Collection<T> restoringTasks() {
return Collections.unmodifiableCollection(restoring.values());
}
@ -384,78 +359,11 @@ class AssignedTasks implements RestoringTasks { @@ -384,78 +359,11 @@ class AssignedTasks implements RestoringTasks {
return running.size();
}
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
*/
int maybeCommit() {
committed = 0;
applyToRunningTasks(maybeCommitAction);
return committed;
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
int process() {
int processed = 0;
final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
while (it.hasNext()) {
final Task task = it.next().getValue();
try {
if (task.process()) {
processed++;
}
} catch (final TaskMigratedException e) {
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final RuntimeException e) {
log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e);
throw e;
}
}
return processed;
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
int punctuate() {
int punctuated = 0;
final Iterator<Map.Entry<TaskId, Task>> it = running.entrySet().iterator();
while (it.hasNext()) {
final Task task = it.next().getValue();
try {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
} catch (final TaskMigratedException e) {
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e);
throw e;
}
}
return punctuated;
}
private void applyToRunningTasks(final TaskAction action) {
void applyToRunningTasks(final TaskAction<T> action) {
RuntimeException firstException = null;
for (Iterator<Task> it = running().iterator(); it.hasNext(); ) {
final Task task = it.next();
for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
final T task = it.next();
try {
action.apply(task);
} catch (final TaskMigratedException e) {
@ -485,9 +393,9 @@ class AssignedTasks implements RestoringTasks { @@ -485,9 +393,9 @@ class AssignedTasks implements RestoringTasks {
}
void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment) {
final Iterator<Task> standByTaskIterator = suspended.values().iterator();
final Iterator<T> standByTaskIterator = suspended.values().iterator();
while (standByTaskIterator.hasNext()) {
final Task suspendedTask = standByTaskIterator.next();
final T suspendedTask = standByTaskIterator.next();
if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) {
log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id());
try {
@ -503,7 +411,7 @@ class AssignedTasks implements RestoringTasks { @@ -503,7 +411,7 @@ class AssignedTasks implements RestoringTasks {
void close(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final Task task : allTasks()) {
for (final T task : allTasks()) {
try {
task.close(clean, false);
} catch (final TaskMigratedException e) {
@ -531,7 +439,7 @@ class AssignedTasks implements RestoringTasks { @@ -531,7 +439,7 @@ class AssignedTasks implements RestoringTasks {
}
}
private boolean closeUnclean(final Task task) {
private boolean closeUnclean(final T task) {
log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
try {
task.close(false, false);

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/RestoringTasks.java

@ -19,5 +19,5 @@ package org.apache.kafka.streams.processor.internals; @@ -19,5 +19,5 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
public interface RestoringTasks {
Task restoringTaskFor(final TopicPartition partition);
StreamTask restoringTaskFor(final TopicPartition partition);
}

25
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -147,11 +147,6 @@ public class StandbyTask extends AbstractTask { @@ -147,11 +147,6 @@ public class StandbyTask extends AbstractTask {
close(clean, isZombie);
}
@Override
public boolean commitNeeded() {
return false;
}
/**
* Updates a state store using records from one change log partition
*
@ -163,28 +158,8 @@ public class StandbyTask extends AbstractTask { @@ -163,28 +158,8 @@ public class StandbyTask extends AbstractTask {
return stateMgr.updateStandbyStates(partition, records);
}
@Override
public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
throw new UnsupportedOperationException("add records not supported by StandbyTasks");
}
public Map<TopicPartition, Long> checkpointedOffsets() {
return checkpointedOffsets;
}
@Override
public boolean maybePunctuateStreamTime() {
throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
}
@Override
public boolean maybePunctuateSystemTime() {
throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
}
@Override
public boolean process() {
throw new UnsupportedOperationException("process not supported by StandbyTasks");
}
}

11
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -42,7 +42,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache; @@ -42,7 +42,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.lang.String.format;
@ -493,11 +492,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -493,11 +492,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
@Override
public Map<TopicPartition, Long> checkpointedOffsets() {
throw new UnsupportedOperationException("checkpointedOffsets is not supported by StreamTasks");
}
/**
* <pre>
* - {@link #suspend(boolean) suspend(clean)}
@ -619,11 +613,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -619,11 +613,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
}
@Override
public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
throw new UnsupportedOperationException("update is not implemented");
}
/**
* Request committing the current task's state
*/

36
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -308,7 +308,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -308,7 +308,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
}
static abstract class AbstractTaskCreator {
static abstract class AbstractTaskCreator<T extends Task> {
final String applicationId;
final InternalTopologyBuilder builder;
final StreamsConfig config;
@ -342,12 +342,12 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -342,12 +342,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
final List<Task> createdTasks = new ArrayList<>();
Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
final List<T> createdTasks = new ArrayList<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
Task task = createTask(consumer, taskId, partitions);
T task = createTask(consumer, taskId, partitions);
if (task != null) {
log.trace("Created task {} with assigned partitions {}", taskId, partitions);
createdTasks.add(task);
@ -357,12 +357,12 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -357,12 +357,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
return createdTasks;
}
abstract Task createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
abstract T createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
public void close() {}
}
static class TaskCreator extends AbstractTaskCreator {
static class TaskCreator extends AbstractTaskCreator<StreamTask> {
private final ThreadCache cache;
private final KafkaClientSupplier clientSupplier;
private final String threadClientId;
@ -441,7 +441,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -441,7 +441,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
}
static class StandbyTaskCreator extends AbstractTaskCreator {
static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
StandbyTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
final StreamsMetrics streamsMetrics,
@ -706,12 +706,8 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -706,12 +706,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
restoreConsumer,
activeTaskCreator,
standbyTaskCreator,
new AssignedTasks(logContext,
"stream task"
),
new AssignedTasks(logContext,
"standby task"
));
new AssignedStreamsTasks(logContext),
new AssignedStandbyTasks(logContext));
return new StreamThread(builder,
clientId,
@ -916,7 +912,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -916,7 +912,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
int numAddedRecords = 0;
for (final TopicPartition partition : records.partitions()) {
final Task task = taskManager.activeTask(partition);
final StreamTask task = taskManager.activeTask(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@ -1040,7 +1036,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -1040,7 +1036,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
final TopicPartition partition = entry.getKey();
List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
if (remaining != null) {
final Task task = taskManager.standbyTask(partition);
final StandbyTask task = taskManager.standbyTask(partition);
remaining = task.update(partition, remaining);
if (remaining != null) {
remainingStandbyRecords.put(partition, remaining);
@ -1061,7 +1057,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -1061,7 +1057,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
if (!records.isEmpty()) {
for (final TopicPartition partition : records.partitions()) {
final Task task = taskManager.standbyTask(partition);
final StandbyTask task = taskManager.standbyTask(partition);
if (task == null) {
throw new StreamsException(logPrefix + "Missing standby task for partition " + partition);
@ -1101,7 +1097,7 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -1101,7 +1097,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
setState(State.PENDING_SHUTDOWN);
}
public Map<TaskId, Task> tasks() {
public Map<TaskId, StreamTask> tasks() {
return taskManager.activeTasks();
}
@ -1248,16 +1244,16 @@ public class StreamThread extends Thread implements ThreadDataProvider { @@ -1248,16 +1244,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
return threadMetadata;
}
private void updateThreadMetadata(final Map<TaskId, Task> activeTasks, final Map<TaskId, Task> standbyTasks) {
private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, final Map<TaskId, StandbyTask> standbyTasks) {
final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
if (activeTasks != null) {
for (Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
for (Map.Entry<TaskId, StreamTask> task : activeTasks.entrySet()) {
activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
}
}
final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
if (standbyTasks != null) {
for (Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
for (Map.Entry<TaskId, StandbyTask> task : standbyTasks.entrySet()) {
standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
}
}

59
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
@ -24,61 +23,49 @@ import org.apache.kafka.streams.processor.StateStore; @@ -24,61 +23,49 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface Task {
void resume();
/**
* Initialize the task and return {}true if the task is ready to run, i.e, it has not state stores
* @return true if this task has no state stores that may need restoring.
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
boolean initialize();
void commit();
void suspend();
void close(boolean clean, boolean isZombie);
void resume();
TaskId id();
void closeSuspended(final boolean clean,
final boolean isZombie,
final RuntimeException e);
String applicationId();
void close(final boolean clean,
final boolean isZombie);
Set<TopicPartition> partitions();
StateStore getStore(final String name);
String applicationId();
ProcessorTopology topology();
ProcessorContext context();
StateStore getStore(String name);
void closeSuspended(boolean clean, boolean isZombie, RuntimeException e);
Map<TopicPartition, Long> checkpointedOffsets();
boolean process();
boolean commitNeeded();
boolean maybePunctuateStreamTime();
boolean maybePunctuateSystemTime();
List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> remaining);
String toString(String indent);
int addRecords(TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records);
boolean hasStateStores();
TaskId id();
/**
* initialize the task and return true if the task is ready to run, i.e, it has not state stores
* @return true if this task has no state stores that may need restoring.
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
boolean initialize();
Set<TopicPartition> partitions();
/**
* @return any changelog partitions associated with this task
*/
Collection<TopicPartition> changelogPartitions();
boolean hasStateStores();
String toString(final String indent);
}

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskAction.java

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
interface TaskAction {
interface TaskAction<T extends Task> {
String name();
void apply(final Task task);
void apply(final T task);
}

32
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -37,23 +37,23 @@ class TaskManager { @@ -37,23 +37,23 @@ class TaskManager {
// activeTasks needs to be concurrent as it can be accessed
// by QueryableState
private final Logger log;
private final AssignedTasks active;
private final AssignedTasks standby;
private final AssignedStreamsTasks active;
private final AssignedStandbyTasks standby;
private final ChangelogReader changelogReader;
private final String logPrefix;
private final Consumer<byte[], byte[]> restoreConsumer;
private final StreamThread.AbstractTaskCreator taskCreator;
private final StreamThread.AbstractTaskCreator standbyTaskCreator;
private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
private ThreadMetadataProvider threadMetadataProvider;
private Consumer<byte[], byte[]> consumer;
TaskManager(final ChangelogReader changelogReader,
final String logPrefix,
final Consumer<byte[], byte[]> restoreConsumer,
final StreamThread.AbstractTaskCreator taskCreator,
final StreamThread.AbstractTaskCreator standbyTaskCreator,
final AssignedTasks active,
final AssignedTasks standby) {
final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
final AssignedStreamsTasks active,
final AssignedStandbyTasks standby) {
this.changelogReader = changelogReader;
this.logPrefix = logPrefix;
this.restoreConsumer = restoreConsumer;
@ -133,7 +133,7 @@ class TaskManager { @@ -133,7 +133,7 @@ class TaskManager {
// -> other thread will call removeSuspendedTasks(); eventually
log.trace("New active tasks to be created: {}", newTasks);
for (final Task task : taskCreator.createTasks(consumer, newTasks)) {
for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) {
active.addNewTask(task);
}
}
@ -166,7 +166,7 @@ class TaskManager { @@ -166,7 +166,7 @@ class TaskManager {
// -> other thread will call removeSuspendedStandbyTasks(); eventually
log.trace("New standby tasks to be created: {}", newStandbyTasks);
for (final Task task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
standby.addNewTask(task);
}
}
@ -240,20 +240,20 @@ class TaskManager { @@ -240,20 +240,20 @@ class TaskManager {
return standby.previousTaskIds();
}
Task activeTask(final TopicPartition partition) {
StreamTask activeTask(final TopicPartition partition) {
return active.runningTaskFor(partition);
}
Task standbyTask(final TopicPartition partition) {
StandbyTask standbyTask(final TopicPartition partition) {
return standby.runningTaskFor(partition);
}
Map<TaskId, Task> activeTasks() {
Map<TaskId, StreamTask> activeTasks() {
return active.runningTaskMap();
}
Map<TaskId, Task> standbyTasks() {
Map<TaskId, StandbyTask> standbyTasks() {
return standby.runningTaskMap();
}
@ -293,9 +293,9 @@ class TaskManager { @@ -293,9 +293,9 @@ class TaskManager {
}
private void assignStandbyPartitions() {
final Collection<Task> running = standby.running();
final Collection<StandbyTask> running = standby.running();
final Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
for (final Task standbyTask : running) {
for (final StandbyTask standbyTask : running) {
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
}

37
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@ -40,7 +39,6 @@ import org.junit.Test; @@ -40,7 +39,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.fail;
@ -141,41 +139,6 @@ public class AbstractTaskTest { @@ -141,41 +139,6 @@ public class AbstractTaskTest {
@Override
public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {}
@Override
public Map<TopicPartition, Long> checkpointedOffsets() {
return null;
}
@Override
public boolean process() {
return false;
}
@Override
public boolean commitNeeded() {
return false;
}
@Override
public boolean maybePunctuateStreamTime() {
return false;
}
@Override
public boolean maybePunctuateSystemTime() {
return false;
}
@Override
public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> remaining) {
return null;
}
@Override
public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
return 0;
}
@Override
public boolean initialize() {
return false;

12
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java → streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

@ -38,21 +38,21 @@ import static org.junit.Assert.assertSame; @@ -38,21 +38,21 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AssignedTasksTest {
public class AssignedStreamsTasksTest {
private final Task t1 = EasyMock.createMock(Task.class);
private final Task t2 = EasyMock.createMock(Task.class);
private final StreamTask t1 = EasyMock.createMock(StreamTask.class);
private final StreamTask t2 = EasyMock.createMock(StreamTask.class);
private final TopicPartition tp1 = new TopicPartition("t1", 0);
private final TopicPartition tp2 = new TopicPartition("t2", 0);
private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
private final TaskId taskId1 = new TaskId(0, 0);
private final TaskId taskId2 = new TaskId(1, 0);
private AssignedTasks assignedTasks;
private AssignedStreamsTasks assignedTasks;
@Before
public void before() {
assignedTasks = new AssignedTasks(new LogContext("log "), "task");
assignedTasks = new AssignedStreamsTasks(new LogContext("log "));
EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
}
@ -117,7 +117,7 @@ public class AssignedTasksTest { @@ -117,7 +117,7 @@ public class AssignedTasksTest {
final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
Collection<Task> restoring = assignedTasks.restoringTasks();
Collection<StreamTask> restoring = assignedTasks.restoringTasks();
assertThat(restoring.size(), equalTo(1));
assertSame(restoring.iterator().next(), t1);
assertThat(readyPartitions, equalTo(t2partitions));

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -61,7 +61,7 @@ public class StoreChangelogReaderTest { @@ -61,7 +61,7 @@ public class StoreChangelogReaderTest {
@Mock(type = MockType.NICE)
private RestoringTasks active;
@Mock(type = MockType.NICE)
private Task task;
private StreamTask task;
private final MockStateRestoreListener callback = new MockStateRestoreListener();
private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);

22
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -61,17 +61,19 @@ public class TaskManagerTest { @@ -61,17 +61,19 @@ public class TaskManagerTest {
@Mock(type = MockType.NICE)
private Consumer<byte[], byte[]> consumer;
@Mock(type = MockType.NICE)
private StreamThread.AbstractTaskCreator activeTaskCreator;
private StreamThread.AbstractTaskCreator<StreamTask> activeTaskCreator;
@Mock(type = MockType.NICE)
private StreamThread.AbstractTaskCreator standbyTaskCreator;
private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
@Mock(type = MockType.NICE)
private ThreadMetadataProvider threadMetadataProvider;
@Mock(type = MockType.NICE)
private Task firstTask;
private StreamTask streamTask;
@Mock(type = MockType.NICE)
private AssignedTasks active;
private StandbyTask standbyTask;
@Mock(type = MockType.NICE)
private AssignedTasks standby;
private AssignedStreamsTasks active;
@Mock(type = MockType.NICE)
private AssignedStandbyTasks standby;
private TaskManager taskManager;
@ -139,7 +141,7 @@ public class TaskManagerTest { @@ -139,7 +141,7 @@ public class TaskManagerTest {
public void shouldAddNonResumedActiveTasks() {
mockSingleActiveTask();
EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
active.addNewTask(EasyMock.same(firstTask));
active.addNewTask(EasyMock.same(streamTask));
replay();
taskManager.createTasks(taskId0Partitions);
@ -164,7 +166,7 @@ public class TaskManagerTest { @@ -164,7 +166,7 @@ public class TaskManagerTest {
public void shouldAddNonResumedStandbyTasks() {
mockStandbyTaskExpectations();
EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false);
standby.addNewTask(EasyMock.same(firstTask));
standby.addNewTask(EasyMock.same(standbyTask));
replay();
taskManager.createTasks(taskId0Partitions);
@ -470,7 +472,7 @@ public class TaskManagerTest { @@ -470,7 +472,7 @@ public class TaskManagerTest {
}
private void mockAssignStandbyPartitions(final long offset) {
final Task task = EasyMock.createNiceMock(Task.class);
final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.allTasksRunning()).andReturn(true);
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
@ -487,7 +489,7 @@ public class TaskManagerTest { @@ -487,7 +489,7 @@ public class TaskManagerTest {
mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(),
EasyMock.eq(taskId0Assignment)))
.andReturn(Collections.singletonList(firstTask));
.andReturn(Collections.singletonList(standbyTask));
}
@ -497,7 +499,7 @@ public class TaskManagerTest { @@ -497,7 +499,7 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(EasyMock.anyObject(Consumer.class),
EasyMock.eq(taskId0Assignment)))
.andReturn(Collections.singletonList(firstTask));
.andReturn(Collections.singletonList(streamTask));
}

3
streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java

@ -33,7 +33,6 @@ import org.apache.kafka.streams.processor.internals.StateDirectory; @@ -33,7 +33,6 @@ import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@ -68,7 +67,7 @@ public class StreamThreadStateStoreProviderTest { @@ -68,7 +67,7 @@ public class StreamThreadStateStoreProviderTest {
private File stateDir;
private final String topicName = "topic";
private StreamThread threadMock;
private Map<TaskId, Task> tasks;
private Map<TaskId, StreamTask> tasks;
@SuppressWarnings("deprecation")
@Before

Loading…
Cancel
Save