Browse Source

MINOR: Handle task migrated inside corruption path (#8667)

Reviewers: John Roesler <vvcephei@apache.org>
pull/8674/head
Boyang Chen 5 years ago committed by GitHub
parent
commit
b558287c0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 64
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

41
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -555,28 +555,35 @@ public class StreamThread extends Thread { @@ -555,28 +555,35 @@ public class StreamThread extends Thread {
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);
taskManager.commit(
taskManager.tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
.collect(Collectors.toSet())
);
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
try {
taskManager.commit(
taskManager.tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
.collect(Collectors.toSet())
);
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException taskMigrated) {
handleTaskMigrated(taskMigrated);
}
} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.", e);
taskManager.handleLostAll();
mainConsumer.unsubscribe();
subscribeConsumer();
handleTaskMigrated(e);
}
}
}
private void handleTaskMigrated(final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.", e);
taskManager.handleLostAll();
mainConsumer.unsubscribe();
subscribeConsumer();
}
private void subscribeConsumer() {
if (builder.usesPatternSubscription()) {
mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);

64
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -107,6 +107,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; @@ -107,6 +107,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
@ -1932,6 +1933,69 @@ public class StreamThreadTest { @@ -1932,6 +1933,69 @@ public class StreamThreadTest {
)).anyTimes();
expect(taskManager.commit(singleton(task2))).andReturn(0);
taskManager.handleCorruption(singletonMap(taskId1, emptySet()));
expectLastCall();
EasyMock.replay(task1, task2, taskManager);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
null,
consumer,
consumer,
null,
null,
taskManager,
streamsMetrics,
internalTopologyBuilder,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE)
) {
@Override
void runOnce() {
setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasksWithChangelogs);
}
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
thread.setState(StreamThread.State.STARTING);
thread.runLoop();
verify(taskManager);
}
@Test
public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);
final TaskId taskId1 = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(0, 2);
final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap(
mkEntry(taskId1, emptySet())
);
expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task1.id()).andReturn(taskId1).anyTimes();
expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.tasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
)).anyTimes();
expect(taskManager.commit(singleton(task2))).andThrow(new TaskMigratedException("Task migrated",
new RuntimeException("non-corrupted task migrated")));
taskManager.handleLostAll();
expectLastCall();
EasyMock.replay(task1, task2, taskManager);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);

Loading…
Cancel
Save