Browse Source

KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset (#5430)

1. When we reinitialize the state store due to no CHECKPOINT with EOS turned on, we should update the checkpoint to consumer.seekToBeginnning() / consumer.position() to avoid falling into endless iterations.

2. Fixed a few other logic bugs around needsInitializing and needsRestoring.

Reviewers: Jason Gustafson <jason@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/5404/merge
Guozhang Wang 6 years ago committed by GitHub
parent
commit
c8c3a7dc48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
  2. 117
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  3. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  4. 17
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java

@ -26,13 +26,13 @@ public class StateRestorer { @@ -26,13 +26,13 @@ public class StateRestorer {
static final int NO_CHECKPOINT = -1;
private final Long checkpoint;
private final long offsetLimit;
private final boolean persistent;
private final String storeName;
private final TopicPartition partition;
private final CompositeRestoreListener compositeRestoreListener;
private long checkpointOffset;
private long restoredOffset;
private long startingOffset;
private long endingOffset;
@ -45,7 +45,7 @@ public class StateRestorer { @@ -45,7 +45,7 @@ public class StateRestorer {
final String storeName) {
this.partition = partition;
this.compositeRestoreListener = compositeRestoreListener;
this.checkpoint = checkpoint;
this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
this.offsetLimit = offsetLimit;
this.persistent = persistent;
this.storeName = storeName;
@ -60,7 +60,11 @@ public class StateRestorer { @@ -60,7 +60,11 @@ public class StateRestorer {
}
long checkpoint() {
return checkpoint == null ? NO_CHECKPOINT : checkpoint;
return checkpointOffset;
}
void setCheckpointOffset(final long checkpointOffset) {
this.checkpointOffset = checkpointOffset;
}
void restoreStarted() {

117
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader { @@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
private final Set<TopicPartition> needsRestoring = new HashSet<>();
private final Set<TopicPartition> needsInitializing = new HashSet<>();
private final Set<TopicPartition> completedRestorers = new HashSet<>();
private final Duration pollTime;
public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
@ -64,9 +65,14 @@ public class StoreChangelogReader implements ChangelogReader { @@ -64,9 +65,14 @@ public class StoreChangelogReader implements ChangelogReader {
@Override
public void register(final StateRestorer restorer) {
restorer.setUserRestoreListener(userStateRestoreListener);
stateRestorers.put(restorer.partition(), restorer);
needsInitializing.put(restorer.partition(), restorer);
if (!stateRestorers.containsKey(restorer.partition())) {
restorer.setUserRestoreListener(userStateRestoreListener);
stateRestorers.put(restorer.partition(), restorer);
log.trace("Added restorer for changelog {}", restorer.partition());
}
needsInitializing.add(restorer.partition());
}
public Collection<TopicPartition> restore(final RestoringTasks active) {
@ -81,16 +87,15 @@ public class StoreChangelogReader implements ChangelogReader { @@ -81,16 +87,15 @@ public class StoreChangelogReader implements ChangelogReader {
try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(pollTime);
final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
while (iterator.hasNext()) {
final TopicPartition partition = iterator.next();
for (final TopicPartition partition : needsRestoring) {
final StateRestorer restorer = stateRestorers.get(partition);
final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition));
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
restorer.restoreDone();
endOffsets.remove(partition);
iterator.remove();
completedRestorers.add(partition);
}
}
} catch (final InvalidOffsetException recoverableException) {
@ -98,12 +103,18 @@ public class StoreChangelogReader implements ChangelogReader { @@ -98,12 +103,18 @@ public class StoreChangelogReader implements ChangelogReader {
final Set<TopicPartition> partitions = recoverableException.partitions();
for (final TopicPartition partition : partitions) {
final StreamTask task = active.restoringTaskFor(partition);
log.info("Reinitializing StreamTask {}", task);
log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
needsInitializing.remove(partition);
needsRestoring.remove(partition);
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);
}
needsRestoring.removeAll(completedRestorers);
if (needsRestoring.isEmpty()) {
restoreConsumer.unsubscribe();
}
@ -120,25 +131,24 @@ public class StoreChangelogReader implements ChangelogReader { @@ -120,25 +131,24 @@ public class StoreChangelogReader implements ChangelogReader {
// the needsInitializing map is not empty, meaning we do not know the metadata for some of them yet
refreshChangelogInfo();
final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet()) {
final TopicPartition topicPartition = entry.getKey();
final Set<TopicPartition> initializable = new HashSet<>();
for (final TopicPartition topicPartition : needsInitializing) {
if (hasPartition(topicPartition)) {
initializable.put(entry.getKey(), entry.getValue());
initializable.add(topicPartition);
}
}
// try to fetch end offsets for the initializable restorers and remove any partitions
// where we already have all of the data
try {
endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
endOffsets.putAll(restoreConsumer.endOffsets(initializable));
} catch (final TimeoutException e) {
// if timeout exception gets thrown we just give up this time and retry in the next run loop
log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
return;
}
final Iterator<TopicPartition> iter = initializable.keySet().iterator();
final Iterator<TopicPartition> iter = initializable.iterator();
while (iter.hasNext()) {
final TopicPartition topicPartition = iter.next();
final Long endOffset = endOffsets.get(topicPartition);
@ -146,13 +156,15 @@ public class StoreChangelogReader implements ChangelogReader { @@ -146,13 +156,15 @@ public class StoreChangelogReader implements ChangelogReader {
// offset should not be null; but since the consumer API does not guarantee it
// we add this check just in case
if (endOffset != null) {
final StateRestorer restorer = needsInitializing.get(topicPartition);
final StateRestorer restorer = stateRestorers.get(topicPartition);
if (restorer.checkpoint() >= endOffset) {
restorer.setRestoredOffset(restorer.checkpoint());
iter.remove();
completedRestorers.add(topicPartition);
} else if (restorer.offsetLimit() == 0 || endOffset == 0) {
restorer.setRestoredOffset(0);
iter.remove();
completedRestorers.add(topicPartition);
} else {
restorer.setEndingOffset(endOffset);
}
@ -169,51 +181,59 @@ public class StoreChangelogReader implements ChangelogReader { @@ -169,51 +181,59 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
private void startRestoration(final Set<TopicPartition> initialized,
final RestoringTasks active) {
log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
log.debug("Start restoring state stores from changelog topics {}", initialized);
final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
assignment.addAll(initialized.keySet());
assignment.addAll(initialized);
restoreConsumer.assign(assignment);
final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
for (final StateRestorer restorer : initialized.values()) {
for (final TopicPartition partition : initialized) {
final StateRestorer restorer = stateRestorers.get(partition);
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
restoreConsumer.seek(restorer.partition(), restorer.checkpoint());
logRestoreOffsets(restorer.partition(),
restorer.checkpoint(),
endOffsets.get(restorer.partition()));
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
restoreConsumer.seek(partition, restorer.checkpoint());
logRestoreOffsets(partition,
restorer.checkpoint(),
endOffsets.get(partition));
restorer.setStartingOffset(restoreConsumer.position(partition));
restorer.restoreStarted();
} else {
final StreamTask task = active.restoringTaskFor(restorer.partition());
// If checkpoint does not exist it means the task was not shutdown gracefully before;
// and in this case if EOS is turned on we should wipe out the state and re-initialize the task
if (task.isEosEnabled()) {
log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
"Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition());
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
} else {
log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition());
}
restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
restoreConsumer.seekToBeginning(Collections.singletonList(partition));
needsPositionUpdate.add(restorer);
}
}
for (final StateRestorer restorer : needsPositionUpdate) {
final long position = restoreConsumer.position(restorer.partition());
logRestoreOffsets(restorer.partition(),
position,
endOffsets.get(restorer.partition()));
restorer.setStartingOffset(position);
restorer.restoreStarted();
final TopicPartition partition = restorer.partition();
// If checkpoint does not exist it means the task was not shutdown gracefully before;
// and in this case if EOS is turned on we should wipe out the state and re-initialize the task
final StreamTask task = active.restoringTaskFor(partition);
if (task.isEosEnabled()) {
log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " +
"Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), partition);
needsInitializing.remove(partition);
initialized.remove(partition);
restorer.setCheckpointOffset(restoreConsumer.position(partition));
task.reinitializeStateStoresForPartitions(Collections.singleton(partition));
} else {
log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), partition);
final long position = restoreConsumer.position(restorer.partition());
logRestoreOffsets(restorer.partition(),
position,
endOffsets.get(restorer.partition()));
restorer.setStartingOffset(position);
restorer.restoreStarted();
}
}
needsRestoring.putAll(initialized);
needsRestoring.addAll(initialized);
}
private void logRestoreOffsets(final TopicPartition partition,
@ -226,10 +246,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -226,10 +246,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private Collection<TopicPartition> completed() {
final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
completed.removeAll(needsRestoring.keySet());
log.trace("The set of restoration completed partitions so far: {}", completed);
return completed;
return completedRestorers;
}
private void refreshChangelogInfo() {

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -1129,7 +1129,7 @@ public class StreamThread extends Thread { @@ -1129,7 +1129,7 @@ public class StreamThread extends Thread {
throw new TaskMigratedException(task);
}
log.info("Reinitializing StandbyTask {}", task);
log.info("Reinitializing StandbyTask {} from changelogs {}", task, recoverableException.partitions());
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
}
restoreConsumer.seekToBeginning(partitions);

17
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -100,7 +100,11 @@ public class StoreChangelogReaderTest { @@ -100,7 +100,11 @@ public class StoreChangelogReaderTest {
public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
mockRestorer.setUserRestoreListener(stateRestoreListener);
expect(mockRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0));
expect(mockRestorer.partition())
.andReturn(new TopicPartition("sometopic", 0))
.andReturn(new TopicPartition("sometopic", 0))
.andReturn(new TopicPartition("sometopic", 0))
.andReturn(new TopicPartition("sometopic", 0));
EasyMock.replay(mockRestorer);
changelogReader.register(mockRestorer);
@ -144,6 +148,9 @@ public class StoreChangelogReaderTest { @@ -144,6 +148,9 @@ public class StoreChangelogReaderTest {
// first restore call "fails" but we should not die with an exception
assertEquals(0, changelogReader.restore(active).size());
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
"storeName"));
// retry restore should succeed
assertEquals(1, changelogReader.restore(active).size());
assertThat(callback.restored.size(), equalTo(messages));
@ -226,9 +233,9 @@ public class StoreChangelogReaderTest { @@ -226,9 +233,9 @@ public class StoreChangelogReaderTest {
setupConsumer(3, two);
changelogReader
.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
.register(new StateRestorer(topicPartition, restoreListener, 0L, Long.MAX_VALUE, true, "storeName1"));
changelogReader.register(new StateRestorer(one, restoreListener1, 0L, Long.MAX_VALUE, true, "storeName2"));
changelogReader.register(new StateRestorer(two, restoreListener2, 0L, Long.MAX_VALUE, true, "storeName3"));
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
@ -257,7 +264,7 @@ public class StoreChangelogReaderTest { @@ -257,7 +264,7 @@ public class StoreChangelogReaderTest {
public void shouldOnlyReportTheLastRestoredOffset() {
setupConsumer(10, topicPartition);
changelogReader
.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
.register(new StateRestorer(topicPartition, restoreListener, 0L, 5, true, "storeName1"));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);

Loading…
Cancel
Save