Browse Source

KAFKA-7672: Restoring tasks need to be closed upon task suspension (#6113)

* In activeTasks.suspend, we should also close all restoring tasks as well. Closing restoring tasks would not require `task.close` as in `closeNonRunningTasks `, since the topology is not initialized yet, instead only state stores are initialized. So we only need to call `task.closeStateManager`.
* Also add @linyli001 's fix.
* Unit tests updated accordingly.

Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>
pull/6311/head
Guozhang Wang 6 years ago committed by GitHub
parent
commit
0d461e4ea0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
  2. 89
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
  3. 13
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
  4. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  5. 11
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  6. 6
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
  7. 12
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java

@ -243,7 +243,6 @@ public abstract class AbstractTask implements Task { @@ -243,7 +243,6 @@ public abstract class AbstractTask implements Task {
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
* @throws ProcessorStateException if there is an error while closing the state manager
*/
// visible for testing
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
ProcessorStateException exception = null;
log.trace("Closing state manager");

89
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java

@ -30,7 +30,6 @@ import java.util.Iterator; @@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks {
private final Map<TaskId, StreamTask> restoring = new HashMap<>();
@ -46,6 +45,52 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -46,6 +45,52 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
return restoringByPartition.get(partition);
}
@Override
List<StreamTask> allTasks() {
final List<StreamTask> tasks = super.allTasks();
tasks.addAll(restoring.values());
return tasks;
}
@Override
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = super.allAssignedTaskIds();
taskIds.addAll(restoring.keySet());
return taskIds;
}
@Override
boolean allTasksRunning() {
return super.allTasksRunning() && restoring.isEmpty();
}
RuntimeException closeAllRestoringTasks() {
RuntimeException exception = null;
log.trace("Closing all restoring stream tasks {}", restoring.keySet());
final Iterator<StreamTask> restoringTaskIterator = restoring.values().iterator();
while (restoringTaskIterator.hasNext()) {
final StreamTask task = restoringTaskIterator.next();
log.debug("Closing restoring task {}", task.id());
try {
task.closeStateManager(true);
} catch (final RuntimeException e) {
log.error("Failed to remove restoring task {} due to the following error:", task.id(), e);
if (exception == null) {
exception = e;
}
} finally {
restoringTaskIterator.remove();
}
}
restoring.clear();
restoredPartitions.clear();
restoringByPartition.clear();
return exception;
}
void updateRestored(final Collection<TopicPartition> restored) {
if (restored.isEmpty()) {
return;
@ -86,20 +131,6 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -86,20 +131,6 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
}
}
@Override
boolean allTasksRunning() {
return super.allTasksRunning() && restoring.isEmpty();
}
RuntimeException suspend() {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(super.suspend());
log.trace("Close restoring stream task {}", restoring.keySet());
firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
restoring.clear();
restoringByPartition.clear();
return firstException.get();
}
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
@ -218,27 +249,6 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -218,27 +249,6 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
return punctuated;
}
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
describe(builder, restoring.values(), indent, "Restoring:");
return builder.toString();
}
@Override
List<StreamTask> allTasks() {
final List<StreamTask> tasks = super.allTasks();
tasks.addAll(restoring.values());
return tasks;
}
@Override
Set<TaskId> allAssignedTaskIds() {
final Set<TaskId> taskIds = super.allAssignedTaskIds();
taskIds.addAll(restoring.keySet());
return taskIds;
}
void clear() {
super.clear();
restoring.clear();
@ -246,6 +256,13 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -246,6 +256,13 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
restoredPartitions.clear();
}
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
describe(builder, restoring.values(), indent, "Restoring:");
return builder.toString();
}
// for testing only
Collection<StreamTask> restoringTasks() {

13
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -84,8 +84,7 @@ abstract class AssignedTasks<T extends Task> { @@ -84,8 +84,7 @@ abstract class AssignedTasks<T extends Task> {
}
boolean allTasksRunning() {
return created.isEmpty()
&& suspended.isEmpty();
return created.isEmpty() && suspended.isEmpty();
}
Collection<T> running() {
@ -106,7 +105,7 @@ abstract class AssignedTasks<T extends Task> { @@ -106,7 +105,7 @@ abstract class AssignedTasks<T extends Task> {
return firstException.get();
}
RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
RuntimeException exception = null;
for (final T task : tasks) {
try {
@ -167,7 +166,7 @@ abstract class AssignedTasks<T extends Task> { @@ -167,7 +166,7 @@ abstract class AssignedTasks<T extends Task> {
boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final T task = suspended.get(taskId);
log.trace("found suspended {} {}", taskTypeName, taskId);
log.trace("Found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
task.resume();
@ -185,10 +184,10 @@ abstract class AssignedTasks<T extends Task> { @@ -185,10 +184,10 @@ abstract class AssignedTasks<T extends Task> {
}
throw e;
}
log.trace("resuming suspended {} {}", taskTypeName, task.id());
log.trace("Resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
log.warn("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
}
}
return false;
@ -198,7 +197,7 @@ abstract class AssignedTasks<T extends Task> { @@ -198,7 +197,7 @@ abstract class AssignedTasks<T extends Task> {
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
log.debug("Transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
for (final TopicPartition topicPartition : task.partitions()) {

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -277,6 +277,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -277,6 +277,7 @@ public class StoreChangelogReader implements ChangelogReader {
needsRestoring.clear();
endOffsets.clear();
needsInitializing.clear();
completedRestorers.clear();
}
private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,

11
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -99,11 +99,11 @@ public class TaskManager { @@ -99,11 +99,11 @@ public class TaskManager {
throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
}
changelogReader.reset();
// do this first as we may have suspended standby tasks that
// will become active or vice versa
standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks);
active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
addStreamTasks(assignment);
addStandbyTasks();
// Pause all the partitions until the underlying state store is ready for all the active tasks.
@ -240,7 +240,14 @@ public class TaskManager { @@ -240,7 +240,14 @@ public class TaskManager {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
firstException.compareAndSet(null, active.suspend());
// close all restoring tasks as well and then reset changelog reader;
// for those restoring and still assigned tasks, they will be re-created
// in addStreamTasks.
firstException.compareAndSet(null, active.closeAllRestoringTasks());
changelogReader.reset();
firstException.compareAndSet(null, standby.suspend());
// remove the changelog partitions from restore consumer
restoreConsumer.unsubscribe();
@ -368,7 +375,7 @@ public class TaskManager { @@ -368,7 +375,7 @@ public class TaskManager {
}
public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;
}

6
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

@ -146,11 +146,13 @@ public class AssignedStreamsTasksTest { @@ -146,11 +146,13 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
t1.close(false, false);
t1.closeStateManager(true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
assertThat(suspendTask(), nullValue());
addAndInitTask();
assertThat(assignedTasks.closeAllRestoringTasks(), nullValue());
EasyMock.verify(t1);
}

12
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -260,18 +260,6 @@ public class TaskManagerTest { @@ -260,18 +260,6 @@ public class TaskManagerTest {
verify(active);
}
@Test
public void shouldResetChangeLogReaderOnCreateTasks() {
mockSingleActiveTask();
changeLogReader.reset();
EasyMock.expectLastCall();
replay();
taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
taskManager.createTasks(taskId0Partitions);
verify(changeLogReader);
}
@Test
public void shouldAddNonResumedActiveTasks() {
mockSingleActiveTask();

Loading…
Cancel
Save