Browse Source

KAFKA-9169: fix standby checkpoint initialization (#7681)

Instead of caching the checkpoint map during StandbyTask
initialization, use the latest checkpoints (which would have
been updated during suspend).

Reviewers: Bill Bejeck <bill@confluent.io>
pull/7692/head
John Roesler 5 years ago committed by GitHub
parent
commit
cac85601a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  2. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
  3. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  4. 18
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
  5. 86
      tests/kafkatest/tests/streams/streams_standby_replica_test.py

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -348,9 +348,7 @@ public class ProcessorStateManager implements StateManager { @@ -348,9 +348,7 @@ public class ProcessorStateManager implements StateManager {
updateCheckpointFileCache(checkpointableOffsetsFromProcessing);
log.trace("Checkpointable offsets updated with active acked offsets: {}", checkpointFileCache);
log.trace("Writing checkpoint: {}", checkpointFileCache);
log.debug("Writing checkpoint: {}", checkpointFileCache);
try {
checkpointFile.write(checkpointFileCache);
} catch (final IOException e) {

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -46,7 +46,6 @@ public class StandbyTask extends AbstractTask { @@ -46,7 +46,6 @@ public class StandbyTask extends AbstractTask {
private boolean updateOffsetLimits;
private final Sensor closeTaskSensor;
private final Map<TopicPartition, Long> offsetLimits = new HashMap<>();
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
/**
* Create {@link StandbyTask} with its assigned partitions
@ -86,7 +85,6 @@ public class StandbyTask extends AbstractTask { @@ -86,7 +85,6 @@ public class StandbyTask extends AbstractTask {
public boolean initializeStateStores() {
log.trace("Initializing state stores");
registerStateStores();
checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
processorContext.initialize();
taskInitialized = true;
return true;
@ -195,7 +193,7 @@ public class StandbyTask extends AbstractTask { @@ -195,7 +193,7 @@ public class StandbyTask extends AbstractTask {
}
Map<TopicPartition, Long> checkpointedOffsets() {
return checkpointedOffsets;
return Collections.unmodifiableMap(stateMgr.checkpointed());
}
private long updateOffsetLimits(final TopicPartition partition) {

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -440,6 +440,7 @@ public class TaskManager { @@ -440,6 +440,7 @@ public class TaskManager {
checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
}
log.debug("Assigning and seeking restoreConsumer to {}", checkpointedOffsets);
restoreConsumerAssignedStandbys = true;
restoreConsumer.assign(checkpointedOffsets.keySet());
for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {

18
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java

@ -255,6 +255,8 @@ public class StandbyTaskTest { @@ -255,6 +255,8 @@ public class StandbyTaskTest {
streamsMetrics,
stateDirectory);
task.initializeStateStores();
assertThat(task.checkpointedOffsets(),
equalTo(mkMap(mkEntry(partition1, -1L), mkEntry(partition2, -1L))));
final Set<TopicPartition> partition = Collections.singleton(partition2);
restoreStateConsumer.assign(partition);
@ -293,7 +295,15 @@ public class StandbyTaskTest { @@ -293,7 +295,15 @@ public class StandbyTaskTest {
restoreStateConsumer.seekToBeginning(partition);
task.update(partition2, restoreStateConsumer.poll(ofMillis(100)).records(partition2));
assertThat(
task.checkpointedOffsets(),
equalTo(
mkMap(
mkEntry(partition1, -1L),
mkEntry(partition2, 31L /*the checkpoint should be 1+ the highest consumed offset*/)
)
)
);
final StandbyContextImpl context = (StandbyContextImpl) task.context();
final MockKeyValueStore store1 = (MockKeyValueStore) context.getStateMgr().getStore(storeName1);
final MockKeyValueStore store2 = (MockKeyValueStore) context.getStateMgr().getStore(storeName2);
@ -416,8 +426,8 @@ public class StandbyTaskTest { @@ -416,8 +426,8 @@ public class StandbyTaskTest {
final InternalStreamsBuilder builder = new InternalStreamsBuilder(internalTopologyBuilder);
builder.stream(Collections.singleton("topic"), new ConsumedInternal<>())
.groupByKey()
.count(Materialized.as(storeName));
.groupByKey()
.count(Materialized.as(storeName));
builder.buildAndOptimizeTopology();
@ -719,7 +729,7 @@ public class StandbyTaskTest { @@ -719,7 +729,7 @@ public class StandbyTaskTest {
globalTopicPartition,
singletonList(new ConsumerRecord<>(globalTopicPartition.topic(),
globalTopicPartition.partition(),
50L,
50L,
serializedValue,
serializedValue))
);

86
tests/kafkatest/tests/streams/streams_standby_replica_test.py

@ -35,13 +35,15 @@ class StreamsStandbyTask(BaseStreamsTest): @@ -35,13 +35,15 @@ class StreamsStandbyTask(BaseStreamsTest):
def __init__(self, test_context):
super(StreamsStandbyTask, self).__init__(test_context,
topics={
self.streams_source_topic: {'partitions': 6, 'replication-factor': 1},
self.streams_sink_topic_1: {'partitions': 1, 'replication-factor': 1},
self.streams_sink_topic_2: {'partitions': 1, 'replication-factor': 1}
self.streams_source_topic: {'partitions': 6,
'replication-factor': 1},
self.streams_sink_topic_1: {'partitions': 1,
'replication-factor': 1},
self.streams_sink_topic_2: {'partitions': 1,
'replication-factor': 1}
})
def test_standby_tasks_rebalance(self):
configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2))
@ -108,8 +110,10 @@ class StreamsStandbyTask(BaseStreamsTest): @@ -108,8 +110,10 @@ class StreamsStandbyTask(BaseStreamsTest):
self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1,
self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2,
self.streams_sink_topic_2, self.num_messages)
wait_until(lambda: producer.num_acked >= self.num_messages,
timeout_sec=60,
@ -117,4 +121,74 @@ class StreamsStandbyTask(BaseStreamsTest): @@ -117,4 +121,74 @@ class StreamsStandbyTask(BaseStreamsTest):
producer.stop()
processor_1.stop()
processor_2.stop()
processor_3.stop()
# Validate the checkpoint/restore logs for monotonicity
# This was added to ensure that standby restoration resumes from the checkpoint
# rather than the beginning of the changelog, as part of KAFKA-9169
# First, process the logs to look for invariant violations
processor_1.node.account.ssh(validateMonotonicCheckpointsCmd(processor_1.LOG_FILE, processor_1.STDOUT_FILE))
processor_2.node.account.ssh(validateMonotonicCheckpointsCmd(processor_2.LOG_FILE, processor_2.STDOUT_FILE))
processor_3.node.account.ssh(validateMonotonicCheckpointsCmd(processor_3.LOG_FILE, processor_3.STDOUT_FILE))
# Second, check to make sure no invariant violations were reported
processor_1.node.account.ssh("! grep ERROR " + processor_1.STDOUT_FILE, allow_fail=False)
processor_2.node.account.ssh("! grep ERROR " + processor_2.STDOUT_FILE, allow_fail=False)
processor_3.node.account.ssh("! grep ERROR " + processor_3.STDOUT_FILE, allow_fail=False)
def validateMonotonicCheckpointsCmd(log_file, stdout_file):
"""
Enforces an invariant that, if we look at the offsets written to
checkpoint files and offsets used to resume the restore consumer,
for a given topic/partition, we should always observe the offsets
to be non-decreasing.
Note that this specifically would not hold for EOS in an unclean
shutdown, but outside of that, we should be able to rely on it.
"""
# This script gets turned into a one-liner and executed over SSH
#
# The idea here is to parse the logs and enforce an invariant. This
# should be resilient against meaningless variations in what tasks
# exactly get assigned to which instances and other factors that could
# make this test flaky.
#
# A quick overview, which should make this easier to read:
# PK is prior key
# PV is prior value
#
# 1. Extract only the relevant lines from the log (grep)
# 2. The log lines contain a map of topic-partition -> offset,
# so the role of the sed expressions is to extract those map values
# onto one k/v pair per line.
# 3. The sort is a _stable_ sort on the key, which puts all the
# events for a key together, while preserving their relative order
# 4. Now, we use K (key), V (value), PK, and PV to make sure that
# the offset (V) for each topic-partition (K) is non-decreasing.
# 5. If this invariant is violated, log an ERROR (so we can check for it later)
return "PK=''; " \
"PV=-999; " \
"cat %s " \
"| grep 'Assigning and seeking\|Writing checkpoint' " \
"| sed -e 's/.*{\(.*\)}.*/\\1/' -e 's/, /\\n/g' -e 's/=/\\t/g' " \
"| sort --key=1,1 --stable " \
"| while read LINE; do" \
" if [[ ${LINE} ]]; then" \
" K=$(cut -f1 <<< ${LINE});" \
" V=$(cut -f2 <<< ${LINE});" \
" if [[ ${K} != ${PK} ]]; then" \
" PK=${K};" \
" PV=${V};" \
" echo \"INFO: First occurrence of ${K}; set PV=${V}.\";" \
" elif [[ ${V} -lt ${PV} ]]; then" \
" echo \"ERROR: ${K} offset regressed from ${PV} to ${V}.\"; " \
" else" \
" PV=${V};" \
" echo \"INFO: Updated ${K} to ${V}.\";" \
" fi;" \
" fi;" \
" done >> %s" % (log_file, stdout_file)

Loading…
Cancel
Save