Browse Source

KAFKA-10185: Restoration info logging (#8896)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/7160/head
John Roesler 4 years ago committed by GitHub
parent
commit
68db063aa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
  2. 81
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  3. 19
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

6
streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
@ -36,7 +37,12 @@ public class TaskCorruptedException extends StreamsException { @@ -36,7 +37,12 @@ public class TaskCorruptedException extends StreamsException {
public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs) {
super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized");
this.taskWithChangelogs = taskWithChangelogs;
}
public TaskCorruptedException(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs,
final InvalidOffsetException e) {
super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized", e);
this.taskWithChangelogs = taskWithChangelogs;
}

81
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -63,6 +63,8 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchComm @@ -63,6 +63,8 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchComm
* be completed, while standby tasks updating changelog would always be in restoring state after being initialized.
*/
public class StoreChangelogReader implements ChangelogReader {
private static final long RESTORE_LOG_INTERVAL_MS = 10_000L;
private long lastRestoreLogTime = 0L;
enum ChangelogState {
// registered but need to be initialized (i.e. set its starting, end, limit offsets)
@ -291,6 +293,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -291,6 +293,7 @@ public class StoreChangelogReader implements ChangelogReader {
public void enforceRestoreActive() {
if (state != ChangelogReaderState.ACTIVE_RESTORING) {
log.debug("Transiting to restore active tasks: {}", changelogs);
lastRestoreLogTime = 0L;
// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
@ -427,19 +430,20 @@ public class StoreChangelogReader implements ChangelogReader { @@ -427,19 +430,20 @@ public class StoreChangelogReader implements ChangelogReader {
// for restoring active and updating standby we may prefer different poll time
// in order to make sure we call the main consumer#poll in time.
// TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
} catch (final InvalidOffsetException e) {
log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
log.warn("Encountered " + e.getClass().getName() +
" fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
"the consumer's position has fallen out of the topic partition offset range because the topic was " +
"truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
" it later.", e.getClass().getName(), e.partitions());
" it later.", e);
final Map<TaskId, Collection<TopicPartition>> taskWithCorruptedChangelogs = new HashMap<>();
for (final TopicPartition partition : e.partitions()) {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
}
throw new TaskCorruptedException(taskWithCorruptedChangelogs);
throw new TaskCorruptedException(taskWithCorruptedChangelogs, e);
} catch (final KafkaException e) {
throw new StreamsException("Restore consumer get unexpected error polling records.", e);
}
@ -448,7 +452,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -448,7 +452,7 @@ public class StoreChangelogReader implements ChangelogReader {
bufferChangelogRecords(restoringChangelogByPartition(partition), polledRecords.records(partition));
}
for (final TopicPartition partition: restoringChangelogs) {
for (final TopicPartition partition : restoringChangelogs) {
// even if some partition do not have any accumulated data, we still trigger
// restoring since some changelog may not need to restore any at all, and the
// restore to end check needs to be executed still.
@ -458,9 +462,48 @@ public class StoreChangelogReader implements ChangelogReader { @@ -458,9 +462,48 @@ public class StoreChangelogReader implements ChangelogReader {
}
maybeUpdateLimitOffsetsForStandbyChangelogs();
maybeLogRestorationProgress();
}
}
private void maybeLogRestorationProgress() {
if (state == ChangelogReaderState.ACTIVE_RESTORING) {
if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
final Set<TopicPartition> topicPartitions = activeRestoringChangelogs();
if (!topicPartitions.isEmpty()) {
final StringBuilder builder = new StringBuilder().append("Restoration in progress for ")
.append(topicPartitions.size())
.append(" partitions.");
for (final TopicPartition partition : topicPartitions) {
final ChangelogMetadata changelogMetadata = restoringChangelogByPartition(partition);
builder.append(" {")
.append(partition)
.append(": ")
.append("position=")
.append(getPositionString(partition, changelogMetadata))
.append(", end=")
.append(changelogMetadata.restoreEndOffset)
.append(", totalRestored=")
.append(changelogMetadata.totalRestored)
.append("}");
}
log.info(builder.toString());
lastRestoreLogTime = time.milliseconds();
}
}
} else {
lastRestoreLogTime = 0L;
}
}
private static String getPositionString(final TopicPartition partition,
final ChangelogMetadata changelogMetadata) {
final ProcessorStateManager stateManager = changelogMetadata.stateManager;
final Long offsets = stateManager.changelogOffsets().get(partition);
return offsets == null ? "unknown" : String.valueOf(offsets);
}
private void maybeUpdateLimitOffsetsForStandbyChangelogs() {
// we only consider updating the limit offset for standbys if we are not restoring active tasks
if (state == ChangelogReaderState.STANDBY_UPDATING &&
@ -496,8 +539,9 @@ public class StoreChangelogReader implements ChangelogReader { @@ -496,8 +539,9 @@ public class StoreChangelogReader implements ChangelogReader {
} else {
changelogMetadata.bufferedRecords.add(record);
final long offset = record.offset();
if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset)
if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) {
changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size();
}
}
}
}
@ -572,8 +616,9 @@ public class StoreChangelogReader implements ChangelogReader { @@ -572,8 +616,9 @@ public class StoreChangelogReader implements ChangelogReader {
}
private Map<TopicPartition, Long> endOffsetForChangelogs(final Set<TopicPartition> partitions) {
if (partitions.isEmpty())
if (partitions.isEmpty()) {
return Collections.emptyMap();
}
try {
final ListOffsetsResult result = adminClient.listOffsets(
@ -617,8 +662,9 @@ public class StoreChangelogReader implements ChangelogReader { @@ -617,8 +662,9 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void initializeChangelogs(final Set<ChangelogMetadata> newPartitionsToRestore) {
if (newPartitionsToRestore.isEmpty())
if (newPartitionsToRestore.isEmpty()) {
return;
}
// for active changelogs, we need to find their end offset before transit to restoring
// if the changelog is on source topic, then its end offset should be the minimum of
@ -631,11 +677,13 @@ public class StoreChangelogReader implements ChangelogReader { @@ -631,11 +677,13 @@ public class StoreChangelogReader implements ChangelogReader {
final TopicPartition partition = metadata.storeMetadata.changelogPartition();
// TODO K9113: when TaskType.GLOBAL is added we need to modify this
if (metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
if (metadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
newPartitionsToFindEndOffset.add(partition);
}
if (metadata.stateManager.changelogAsSource(partition))
if (metadata.stateManager.changelogAsSource(partition)) {
newPartitionsToFindCommittedOffset.add(partition);
}
}
// NOTE we assume that all requested partitions will be included in the returned map for both end/committed
@ -643,33 +691,36 @@ public class StoreChangelogReader implements ChangelogReader { @@ -643,33 +691,36 @@ public class StoreChangelogReader implements ChangelogReader {
final Map<TopicPartition, Long> endOffsets = endOffsetForChangelogs(newPartitionsToFindEndOffset);
final Map<TopicPartition, Long> committedOffsets = committedOffsetForChangelogs(newPartitionsToFindCommittedOffset);
for (final TopicPartition partition: newPartitionsToFindEndOffset) {
for (final TopicPartition partition : newPartitionsToFindEndOffset) {
final ChangelogMetadata changelogMetadata = changelogs.get(partition);
final Long endOffset = endOffsets.get(partition);
final Long committedOffset = newPartitionsToFindCommittedOffset.contains(partition) ?
committedOffsets.get(partition) : Long.valueOf(Long.MAX_VALUE);
if (endOffset != null && committedOffset != null) {
if (changelogMetadata.restoreEndOffset != null)
if (changelogMetadata.restoreEndOffset != null) {
throw new IllegalStateException("End offset for " + partition +
" should only be initialized once. Existing value: " + changelogMetadata.restoreEndOffset +
", new value: (" + endOffset + ", " + committedOffset + ")");
}
changelogMetadata.restoreEndOffset = Math.min(endOffset, committedOffset);
log.debug("End offset for changelog {} initialized as {}.", partition, changelogMetadata.restoreEndOffset);
} else {
if (!newPartitionsToRestore.remove(changelogMetadata))
if (!newPartitionsToRestore.remove(changelogMetadata)) {
throw new IllegalStateException("New changelogs to restore " + newPartitionsToRestore +
" does not contain the one looking for end offset: " + partition + ", this should not happen.");
}
log.info("End offset for changelog {} cannot be found; will retry in the next time.", partition);
}
}
// try initialize limit offsets for standby tasks for the first time
if (!committedOffsets.isEmpty())
if (!committedOffsets.isEmpty()) {
updateLimitOffsetsForStandbyChangelogs(committedOffsets);
}
// add new partitions to the restore consumer and transit them to restoring state
addChangelogsToRestoreConsumer(newPartitionsToRestore.stream().map(metadata -> metadata.storeMetadata.changelogPartition())
@ -743,7 +794,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -743,7 +794,7 @@ public class StoreChangelogReader implements ChangelogReader {
// separate those who do not have the current offset loaded from checkpoint
final Set<TopicPartition> newPartitionsWithoutStartOffset = new HashSet<>();
for (final ChangelogMetadata changelogMetadata: newPartitionsToRestore) {
for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
final TopicPartition partition = storeMetadata.changelogPartition();
final Long currentOffset = storeMetadata.offset();

19
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -59,11 +59,14 @@ import java.util.concurrent.atomic.AtomicLong; @@ -59,11 +59,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE;
import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY;
import static org.apache.kafka.streams.processor.internals.StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING;
import static org.apache.kafka.streams.processor.internals.StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING;
import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE;
import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
@ -223,6 +226,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -223,6 +226,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldPollWithRightTimeout() {
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.replay(stateManager, storeMetadata, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
@ -249,6 +253,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -249,6 +253,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRestoreFromPositionAndCheckForCompletion() {
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.replay(stateManager, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@ -315,6 +320,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -315,6 +320,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRestoreFromBeginningAndCheckCompletion() {
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.replay(stateManager, storeMetadata, store);
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
@ -411,6 +417,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -411,6 +417,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRequestPositionAndHandleTimeoutException() {
EasyMock.expect(storeMetadata.offset()).andReturn(10L).anyTimes();
EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
EasyMock.replay(activeStateManager, storeMetadata, store);
final AtomicBoolean clearException = new AtomicBoolean(false);
@ -472,6 +479,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -472,6 +479,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
@Test
public void shouldRequestEndOffsetsAndHandleTimeoutException() {
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.replay(activeStateManager, storeMetadata, store);
final AtomicBoolean functionCalled = new AtomicBoolean(false);
@ -541,6 +549,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -541,6 +549,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
EasyMock.expect(stateManager.changelogAsSource(tp)).andReturn(true).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(5L).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.replay(stateManager, storeMetadata, store);
final AtomicBoolean functionCalled = new AtomicBoolean(false);
@ -849,6 +858,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -849,6 +858,11 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expect(storeMetadataTwo.offset()).andReturn(0L).anyTimes();
EasyMock.expect(activeStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
EasyMock.expect(activeStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(mkMap(
mkEntry(tp, 5L),
mkEntry(tp1, 5L),
mkEntry(tp2, 5L)
)).anyTimes();
EasyMock.replay(activeStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
setupConsumer(10, tp);
@ -889,6 +903,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport { @@ -889,6 +903,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
EasyMock.expect(storeMetadataTwo.offset()).andReturn(5L).anyTimes();
EasyMock.expect(standbyStateManager.storeMetadata(tp1)).andReturn(storeMetadataOne).anyTimes();
EasyMock.expect(standbyStateManager.storeMetadata(tp2)).andReturn(storeMetadataTwo).anyTimes();
EasyMock.expect(activeStateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));
EasyMock.replay(activeStateManager, standbyStateManager, storeMetadata, store, storeMetadataOne, storeMetadataTwo);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));

Loading…
Cancel
Save