|
|
|
@ -93,6 +93,8 @@ public class StreamThread extends Thread {
@@ -93,6 +93,8 @@ public class StreamThread extends Thread {
|
|
|
|
|
private final Map<TopicPartition, StreamTask> activeTasksByPartition; |
|
|
|
|
private final Map<TopicPartition, StandbyTask> standbyTasksByPartition; |
|
|
|
|
private final Set<TaskId> prevTasks; |
|
|
|
|
private final Map<TaskId, StreamTask> suspendedTasks; |
|
|
|
|
private final Map<TaskId, StandbyTask> suspendedStandbyTasks; |
|
|
|
|
private final Time time; |
|
|
|
|
private final long pollTimeMs; |
|
|
|
|
private final long cleanTimeMs; |
|
|
|
@ -119,7 +121,6 @@ public class StreamThread extends Thread {
@@ -119,7 +121,6 @@ public class StreamThread extends Thread {
|
|
|
|
|
try { |
|
|
|
|
log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.", |
|
|
|
|
StreamThread.this.getName(), assignment); |
|
|
|
|
|
|
|
|
|
addStreamTasks(assignment); |
|
|
|
|
addStandbyTasks(); |
|
|
|
|
lastCleanMs = time.milliseconds(); // start the cleaning cycle
|
|
|
|
@ -136,16 +137,14 @@ public class StreamThread extends Thread {
@@ -136,16 +137,14 @@ public class StreamThread extends Thread {
|
|
|
|
|
try { |
|
|
|
|
log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", |
|
|
|
|
StreamThread.this.getName(), assignment); |
|
|
|
|
|
|
|
|
|
initialized.set(false); |
|
|
|
|
lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
|
|
|
|
|
shutdownTasksAndState(true); |
|
|
|
|
// suspend active tasks
|
|
|
|
|
suspendTasksAndState(true); |
|
|
|
|
} catch (Throwable t) { |
|
|
|
|
rebalanceException = t; |
|
|
|
|
throw t; |
|
|
|
|
} finally { |
|
|
|
|
// TODO: right now upon partition revocation, we always remove all the tasks;
|
|
|
|
|
// this behavior can be optimized to only remove affected tasks in the future
|
|
|
|
|
streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata()); |
|
|
|
|
removeStreamTasks(); |
|
|
|
|
removeStandbyTasks(); |
|
|
|
@ -206,6 +205,8 @@ public class StreamThread extends Thread {
@@ -206,6 +205,8 @@ public class StreamThread extends Thread {
|
|
|
|
|
this.activeTasksByPartition = new HashMap<>(); |
|
|
|
|
this.standbyTasksByPartition = new HashMap<>(); |
|
|
|
|
this.prevTasks = new HashSet<>(); |
|
|
|
|
this.suspendedTasks = new HashMap<>(); |
|
|
|
|
this.suspendedStandbyTasks = new HashMap<>(); |
|
|
|
|
|
|
|
|
|
// standby ktables
|
|
|
|
|
this.standbyRecords = new HashMap<>(); |
|
|
|
@ -291,7 +292,23 @@ public class StreamThread extends Thread {
@@ -291,7 +292,23 @@ public class StreamThread extends Thread {
|
|
|
|
|
log.info("{} Stream thread shutdown complete", logPrefix); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void unAssignChangeLogPartitions(final boolean rethrowExceptions) { |
|
|
|
|
try { |
|
|
|
|
// un-assign the change log partitions
|
|
|
|
|
restoreConsumer.assign(Collections.<TopicPartition>emptyList()); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("{} Failed to un-assign change log partitions: ", logPrefix, e); |
|
|
|
|
if (rethrowExceptions) { |
|
|
|
|
throw e; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void shutdownTasksAndState(final boolean rethrowExceptions) { |
|
|
|
|
log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix, |
|
|
|
|
activeTasks.keySet(), standbyTasks.keySet()); |
|
|
|
|
|
|
|
|
|
// Commit first as there may be cached records that have not been flushed yet.
|
|
|
|
|
commitOffsets(rethrowExceptions); |
|
|
|
|
// Close all processors in topology order
|
|
|
|
@ -302,15 +319,33 @@ public class StreamThread extends Thread {
@@ -302,15 +319,33 @@ public class StreamThread extends Thread {
|
|
|
|
|
producer.flush(); |
|
|
|
|
// Close all task state managers
|
|
|
|
|
closeAllStateManagers(rethrowExceptions); |
|
|
|
|
try { |
|
|
|
|
// un-assign the change log partitions
|
|
|
|
|
restoreConsumer.assign(Collections.<TopicPartition>emptyList()); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("{} Failed to un-assign change log partitions: ", logPrefix, e); |
|
|
|
|
if (rethrowExceptions) { |
|
|
|
|
throw e; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// remove the changelog partitions from restore consumer
|
|
|
|
|
unAssignChangeLogPartitions(rethrowExceptions); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Similar to shutdownTasksAndState, however does not close the task managers, |
|
|
|
|
* in the hope that soon the tasks will be assigned again |
|
|
|
|
* @param rethrowExceptions |
|
|
|
|
*/ |
|
|
|
|
private void suspendTasksAndState(final boolean rethrowExceptions) { |
|
|
|
|
log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix, |
|
|
|
|
activeTasks.keySet(), standbyTasks.keySet()); |
|
|
|
|
|
|
|
|
|
// Commit first as there may be cached records that have not been flushed yet.
|
|
|
|
|
commitOffsets(rethrowExceptions); |
|
|
|
|
// Close all topology nodes
|
|
|
|
|
closeAllTasksTopologies(); |
|
|
|
|
// flush state
|
|
|
|
|
flushAllState(rethrowExceptions); |
|
|
|
|
// flush out any extra data sent during close
|
|
|
|
|
producer.flush(); |
|
|
|
|
// remove the changelog partitions from restore consumer
|
|
|
|
|
unAssignChangeLogPartitions(rethrowExceptions); |
|
|
|
|
|
|
|
|
|
updateSuspendedTasks(); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
interface AbstractTaskAction { |
|
|
|
@ -632,6 +667,27 @@ public class StreamThread extends Thread {
@@ -632,6 +667,27 @@ public class StreamThread extends Thread {
|
|
|
|
|
return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors, stateDirectory, cache); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) { |
|
|
|
|
if (suspendedTasks.containsKey(taskId)) { |
|
|
|
|
final StreamTask task = suspendedTasks.get(taskId); |
|
|
|
|
if (task.partitions.equals(partitions)) { |
|
|
|
|
return task; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private StandbyTask findMatchingSuspendedStandbyTask(final TaskId taskId, final Set<TopicPartition> partitions) { |
|
|
|
|
if (suspendedStandbyTasks.containsKey(taskId)) { |
|
|
|
|
final StandbyTask task = suspendedStandbyTasks.get(taskId); |
|
|
|
|
if (task.partitions.equals(partitions)) { |
|
|
|
|
return task; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void addStreamTasks(Collection<TopicPartition> assignment) { |
|
|
|
|
if (partitionAssignor == null) |
|
|
|
|
throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen."); |
|
|
|
@ -643,7 +699,15 @@ public class StreamThread extends Thread {
@@ -643,7 +699,15 @@ public class StreamThread extends Thread {
|
|
|
|
|
|
|
|
|
|
if (assignment.containsAll(partitions)) { |
|
|
|
|
try { |
|
|
|
|
StreamTask task = createStreamTask(taskId, partitions); |
|
|
|
|
StreamTask task = findMatchingSuspendedTask(taskId, partitions); |
|
|
|
|
if (task != null) { |
|
|
|
|
log.debug("{} recycling old task {}", logPrefix, taskId); |
|
|
|
|
suspendedTasks.remove(taskId); |
|
|
|
|
task.initTopology(); |
|
|
|
|
} else { |
|
|
|
|
log.debug("{} creating new task {}", logPrefix, taskId); |
|
|
|
|
task = createStreamTask(taskId, partitions); |
|
|
|
|
} |
|
|
|
|
activeTasks.put(taskId, task); |
|
|
|
|
|
|
|
|
|
for (TopicPartition partition : partitions) |
|
|
|
@ -656,6 +720,9 @@ public class StreamThread extends Thread {
@@ -656,6 +720,9 @@ public class StreamThread extends Thread {
|
|
|
|
|
log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// finally destroy any remaining suspended tasks
|
|
|
|
|
removeSuspendedTasks(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { |
|
|
|
@ -682,7 +749,16 @@ public class StreamThread extends Thread {
@@ -682,7 +749,16 @@ public class StreamThread extends Thread {
|
|
|
|
|
for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet()) { |
|
|
|
|
TaskId taskId = entry.getKey(); |
|
|
|
|
Set<TopicPartition> partitions = entry.getValue(); |
|
|
|
|
StandbyTask task = createStandbyTask(taskId, partitions); |
|
|
|
|
StandbyTask task = findMatchingSuspendedStandbyTask(taskId, partitions); |
|
|
|
|
|
|
|
|
|
if (task != null) { |
|
|
|
|
log.debug("{} recycling old standby task {}", logPrefix, taskId); |
|
|
|
|
suspendedStandbyTasks.remove(taskId); |
|
|
|
|
task.initTopology(); |
|
|
|
|
} else { |
|
|
|
|
log.debug("{} creating new standby task {}", logPrefix, taskId); |
|
|
|
|
task = createStandbyTask(taskId, partitions); |
|
|
|
|
} |
|
|
|
|
if (task != null) { |
|
|
|
|
standbyTasks.put(taskId, task); |
|
|
|
|
for (TopicPartition partition : partitions) { |
|
|
|
@ -696,6 +772,8 @@ public class StreamThread extends Thread {
@@ -696,6 +772,8 @@ public class StreamThread extends Thread {
|
|
|
|
|
checkpointedOffsets.putAll(task.checkpointedOffsets()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// finally destroy any remaining suspended tasks
|
|
|
|
|
removeSuspendedStandbyTasks(); |
|
|
|
|
|
|
|
|
|
restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet())); |
|
|
|
|
|
|
|
|
@ -710,6 +788,13 @@ public class StreamThread extends Thread {
@@ -710,6 +788,13 @@ public class StreamThread extends Thread {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void updateSuspendedTasks() { |
|
|
|
|
log.info("{} Updating suspended tasks to contain active tasks [{}]", logPrefix, activeTasks.keySet()); |
|
|
|
|
suspendedTasks.clear(); |
|
|
|
|
suspendedTasks.putAll(activeTasks); |
|
|
|
|
suspendedStandbyTasks.putAll(standbyTasks); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void removeStreamTasks() { |
|
|
|
|
log.info("{} Removing all active tasks [{}]", logPrefix, activeTasks.keySet()); |
|
|
|
|
|
|
|
|
@ -733,6 +818,40 @@ public class StreamThread extends Thread {
@@ -733,6 +818,40 @@ public class StreamThread extends Thread {
|
|
|
|
|
standbyRecords.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void removeSuspendedTasks() { |
|
|
|
|
log.info("{} Removing all suspended tasks [{}]", logPrefix, suspendedTasks.keySet()); |
|
|
|
|
try { |
|
|
|
|
// Close task and state manager
|
|
|
|
|
for (final AbstractTask task : suspendedTasks.values()) { |
|
|
|
|
task.close(); |
|
|
|
|
task.flushState(); |
|
|
|
|
task.closeStateManager(); |
|
|
|
|
// flush out any extra data sent during close
|
|
|
|
|
producer.flush(); |
|
|
|
|
} |
|
|
|
|
suspendedTasks.clear(); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("{} Failed to remove suspended tasks: ", logPrefix, e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void removeSuspendedStandbyTasks() { |
|
|
|
|
log.info("{} Removing all suspended standby tasks [{}]", logPrefix, suspendedStandbyTasks.keySet()); |
|
|
|
|
try { |
|
|
|
|
// Close task and state manager
|
|
|
|
|
for (final AbstractTask task : suspendedStandbyTasks.values()) { |
|
|
|
|
task.close(); |
|
|
|
|
task.flushState(); |
|
|
|
|
task.closeStateManager(); |
|
|
|
|
// flush out any extra data sent during close
|
|
|
|
|
producer.flush(); |
|
|
|
|
} |
|
|
|
|
suspendedStandbyTasks.clear(); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("{} Failed to remove suspended tasks: ", logPrefix, e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void closeAllTasks() { |
|
|
|
|
performOnAllTasks(new AbstractTaskAction() { |
|
|
|
|
@Override |
|
|
|
@ -744,6 +863,17 @@ public class StreamThread extends Thread {
@@ -744,6 +863,17 @@ public class StreamThread extends Thread {
|
|
|
|
|
}, "close", false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void closeAllTasksTopologies() { |
|
|
|
|
performOnAllTasks(new AbstractTaskAction() { |
|
|
|
|
@Override |
|
|
|
|
public void apply(final AbstractTask task) { |
|
|
|
|
log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id()); |
|
|
|
|
task.closeTopology(); |
|
|
|
|
sensors.taskDestructionSensor.record(); |
|
|
|
|
} |
|
|
|
|
}, "close", false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Produces a string representation contain useful information about a StreamThread. |
|
|
|
|
* This is useful in debugging scenarios. |
|
|
|
|