Browse Source

KAFKA-10247: Correctly reset state when task is corrupted (#8994)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/8449/merge
John Roesler 4 years ago committed by GitHub
parent
commit
cec5f377b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 69
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
  3. 38
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  4. 105
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  5. 44
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

69
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -95,7 +95,7 @@ public class StreamThread extends Thread { @@ -95,7 +95,7 @@ public class StreamThread extends Thread {
* | | Assigned (3)| <----+
* | +-----+-------+ |
* | | |
* | | |
* | |--------------+
* | v |
* | +-----+-------+ |
* | | Running (4) | ---->+
@ -137,7 +137,7 @@ public class StreamThread extends Thread { @@ -137,7 +137,7 @@ public class StreamThread extends Thread {
STARTING(2, 3, 5), // 1
PARTITIONS_REVOKED(2, 3, 5), // 2
PARTITIONS_ASSIGNED(2, 3, 4, 5), // 3
RUNNING(2, 3, 5), // 4
RUNNING(2, 3, 4, 5), // 4
PENDING_SHUTDOWN(6), // 5
DEAD; // 6
@ -359,9 +359,10 @@ public class StreamThread extends Thread { @@ -359,9 +359,10 @@ public class StreamThread extends Thread {
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
// If there are any overrides, we never fall through to the consumer, but only handle offset management ourselves.
if (!builder.latestResetTopicsPattern().pattern().isEmpty() || !builder.earliestResetTopicsPattern().pattern().isEmpty()) {
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}
@ -386,6 +387,8 @@ public class StreamThread extends Thread { @@ -386,6 +387,8 @@ public class StreamThread extends Thread {
nextScheduledRebalanceMs
);
taskManager.setPartitionResetter(partitions -> streamThread.resetOffsets(partitions, null));
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
}
@ -648,7 +651,9 @@ public class StreamThread extends Thread { @@ -648,7 +651,9 @@ public class StreamThread extends Thread {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
if (state == State.PARTITIONS_ASSIGNED) {
if (state == State.PARTITIONS_ASSIGNED
|| state == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.enforceRestoreActive();
@ -760,17 +765,17 @@ public class StreamThread extends Thread { @@ -760,17 +765,17 @@ public class StreamThread extends Thread {
try {
records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
resetInvalidOffsets(e);
resetOffsets(e.partitions(), e);
}
return records;
}
private void resetInvalidOffsets(final InvalidOffsetException e) {
final Set<TopicPartition> partitions = e.partitions();
private void resetOffsets(final Set<TopicPartition> partitions, final Exception cause) {
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
final Set<TopicPartition> notReset = new HashSet<>();
for (final TopicPartition partition : partitions) {
if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
@ -778,26 +783,44 @@ public class StreamThread extends Thread { @@ -778,26 +783,44 @@ public class StreamThread extends Thread {
} else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
} else {
if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))";
throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), e);
}
if (originalReset.equals("earliest")) {
if ("earliest".equals(originalReset)) {
addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
} else { // can only be "latest"
} else if ("latest".equals(originalReset)) {
addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
} else {
notReset.add(partition);
}
}
}
if (!seekToBeginning.isEmpty()) {
mainConsumer.seekToBeginning(seekToBeginning);
}
if (!seekToEnd.isEmpty()) {
mainConsumer.seekToEnd(seekToEnd);
if (notReset.isEmpty()) {
if (!seekToBeginning.isEmpty()) {
mainConsumer.seekToBeginning(seekToBeginning);
}
if (!seekToEnd.isEmpty()) {
mainConsumer.seekToEnd(seekToEnd);
}
} else {
final String notResetString =
notReset.stream()
.map(TopicPartition::topic)
.distinct()
.collect(Collectors.joining(","));
final String format = String.format(
"No valid committed offset found for input [%s] and no valid reset policy configured." +
" You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " +
"policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or " +
"StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))",
notResetString
);
if (cause == null) {
throw new StreamsException(format);
} else {
throw new StreamsException(format, cause);
}
}
}

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java

@ -101,6 +101,10 @@ public interface Task { @@ -101,6 +101,10 @@ public interface Task {
State state();
default boolean needsInitializationOrRestoration() {
return state() == State.CREATED || state() == State.RESTORING;
}
boolean isActive();
boolean isClosed();

38
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference; @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.intersection;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_BETA;
@ -88,6 +89,7 @@ public class TaskManager { @@ -88,6 +89,7 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private java.util.function.Consumer<Set<TopicPartition>> resetter;
TaskManager(final ChangelogReader changelogReader,
final UUID processId,
@ -193,6 +195,34 @@ public class TaskManager { @@ -193,6 +195,34 @@ public class TaskManager {
log.error("Error suspending corrupted task {} ", task.id(), swallow);
}
task.closeDirty();
if (task.isActive()) {
// Pause so we won't poll any more records for this task until it has been re-initialized
// Note, closeDirty already clears the partitiongroup for the task.
final Set<TopicPartition> currentAssignment = mainConsumer().assignment();
final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
final Set<TopicPartition> assignedToPauseAndReset =
intersection(HashSet::new, currentAssignment, taskInputPartitions);
if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
log.warn(
"Expected the current consumer assignment {} to contain the input partitions {}. " +
"Will proceed to recover.",
currentAssignment,
taskInputPartitions
);
}
mainConsumer().pause(assignedToPauseAndReset);
final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(assignedToPauseAndReset);
for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committed.entrySet()) {
final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
if (offsetAndMetadata != null) {
mainConsumer().seek(committedEntry.getKey(), offsetAndMetadata);
assignedToPauseAndReset.remove(committedEntry.getKey());
}
}
// throws if anything has no configured reset policy
resetter.accept(assignedToPauseAndReset);
}
task.revive();
}
}
@ -1140,4 +1170,12 @@ public class TaskManager { @@ -1140,4 +1170,12 @@ public class TaskManager {
throw e; },
e -> log.debug("Ignoring error in unclean {}", name));
}
boolean needsInitializationOrRestoration() {
return tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
}
public void setPartitionResetter(final java.util.function.Consumer<Set<TopicPartition>> resetter) {
this.resetter = resetter;
}
}

105
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -67,6 +67,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -67,6 +67,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
@ -94,12 +95,14 @@ import java.util.Map; @@ -94,12 +95,14 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -1177,6 +1180,108 @@ public class StreamThreadTest { @@ -1177,6 +1180,108 @@ public class StreamThreadTest {
assertEquals(1, thread.activeTasks().size());
}
@Test
public void shouldReinitializeRevivedTasksInAnyState() {
final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(false)), false);
final String storeName = "store";
final String storeChangelog = "stream-thread-test-store-changelog";
final TopicPartition storeChangelogTopicPartition = new TopicPartition(storeChangelog, 1);
internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
final AtomicBoolean shouldThrow = new AtomicBoolean(false);
final AtomicBoolean processed = new AtomicBoolean(false);
internalTopologyBuilder.addProcessor("proc", new ProcessorSupplier<Object, Object>() {
@Override
public Processor<Object, Object> get() {
return new Processor<Object, Object>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public void process(final Object key, final Object value) {
if (shouldThrow.get()) {
throw new TaskCorruptedException(singletonMap(task1, new HashSet<TopicPartition>(singleton(storeChangelogTopicPartition))));
} else {
processed.set(true);
}
}
@Override
public void close() {
}
};
}
}, "name");
internalTopologyBuilder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.String()
),
"proc"
);
thread.setState(StreamThread.State.STARTING);
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
// assign single partition
assignedPartitions.add(t1p1);
activeTasks.put(task1, Collections.singleton(t1p1));
thread.taskManager().handleAssignment(activeTasks, emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer();
mockConsumer.assign(assignedPartitions);
mockConsumer.updateBeginningOffsets(mkMap(
mkEntry(t1p1, 0L)
));
final MockConsumer<byte[], byte[]> restoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer();
restoreConsumer.updateBeginningOffsets(mkMap(
mkEntry(storeChangelogTopicPartition, 0L)
));
final MockAdminClient admin = (MockAdminClient) thread.adminClient();
admin.updateEndOffsets(singletonMap(storeChangelogTopicPartition, 0L));
thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
// the first iteration completes the restoration
thread.runOnce();
assertThat(thread.activeTasks().size(), equalTo(1));
// the second transits to running and unpause the input
thread.runOnce();
// the third actually polls, processes the record, and throws the corruption exception
addRecord(mockConsumer, 0L);
shouldThrow.set(true);
final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, thread::runOnce);
// Now, we can handle the corruption
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTaskWithChangelogs());
// again, complete the restoration
thread.runOnce();
// transit to running and unpause
thread.runOnce();
// process the record
addRecord(mockConsumer, 0L);
shouldThrow.set(false);
assertThat(processed.get(), is(false));
thread.runOnce();
assertThat(processed.get(), is(true));
}
@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
// only have source but no sink so that we would not get fenced in producer.send

44
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -92,6 +92,7 @@ import static org.easymock.EasyMock.resetToStrict; @@ -92,6 +92,7 @@ import static org.easymock.EasyMock.resetToStrict;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@ -566,8 +567,15 @@ public class TaskManagerTest { @@ -566,8 +567,15 @@ public class TaskManagerTest {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@ -578,6 +586,7 @@ public class TaskManagerTest { @@ -578,6 +586,7 @@ public class TaskManagerTest {
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager);
verify(consumer);
}
@Test
@ -598,7 +607,14 @@ public class TaskManagerTest { @@ -598,7 +607,14 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)).anyTimes();
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
@ -611,6 +627,7 @@ public class TaskManagerTest { @@ -611,6 +627,7 @@ public class TaskManagerTest {
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(stateManager);
verify(consumer);
}
@Test
@ -633,9 +650,15 @@ public class TaskManagerTest { @@ -633,9 +650,15 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
consumer.commitSync(eq(emptyMap()));
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(), is(true));
@ -645,6 +668,7 @@ public class TaskManagerTest { @@ -645,6 +668,7 @@ public class TaskManagerTest {
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
assertTrue(nonCorruptedTask.commitPrepared);
verify(consumer);
}
@Test
@ -667,8 +691,16 @@ public class TaskManagerTest { @@ -667,8 +691,16 @@ public class TaskManagerTest {
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
expect(consumer.assignment()).andReturn(taskId00Partitions);
consumer.pause(taskId00Partitions);
expectLastCall();
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, offsetAndMetadata));
consumer.seek(t1p0, offsetAndMetadata);
expectLastCall();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.setPartitionResetter(tp -> assertThat(tp, is(empty())));
taskManager.handleAssignment(assignment, emptyMap());
assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
@ -676,6 +708,7 @@ public class TaskManagerTest { @@ -676,6 +708,7 @@ public class TaskManagerTest {
verify(activeTaskCreator);
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(consumer);
}
@Test
@ -714,6 +747,7 @@ public class TaskManagerTest { @@ -714,6 +747,7 @@ public class TaskManagerTest {
assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
verify(consumer);
}
@Test

Loading…
Cancel
Save