Browse Source

KAFKA-10319: Skip unknown offsets when computing sum of changelog offsets (#9066) (#9097)

In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets in checkpoint files. The sentinel was set to -2 which is the same value used for the sentinel LATEST_OFFSET that is used in subscriptions to signal that state stores have been used by an active task. Unfortunately, we missed to skip UNKNOWN_OFFSET when we compute the sum of the changelog offsets.

If a task had only one state store and it did not restore anything before the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the subscription as sum of the changelog offsets. During assignment, the leader interpreted the -2 as if the stream run the task as active although it might have run it as standby. This misinterpretation of the sentinel value resulted in unexpected task assignments.

Ports: KAFKA-10287 / #9066

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, John Roesler <vvcephei@apache.org>, Matthias J. Sax <mjsax@apache.org>
pull/9105/head
Bruno Cadonna 4 years ago committed by GitHub
parent
commit
96e0719e42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  2. 6
      streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
  3. 30
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
  4. 5
      streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -696,7 +696,7 @@ public class TaskManager { @@ -696,7 +696,7 @@ public class TaskManager {
// for this case, the offset of all partitions is set to `LATEST_OFFSET`
// and we "forward" the sentinel value directly
return Task.LATEST_OFFSET;
} else {
} else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
if (offset < 0) {
throw new IllegalStateException("Expected not to get a sentinel offset, but got: " + changelogEntry);
}

6
streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java

@ -59,8 +59,10 @@ public class OffsetCheckpoint { @@ -59,8 +59,10 @@ public class OffsetCheckpoint {
private static final int VERSION = 0;
// Use a negative sentinel when we don't know the offset instead of skipping it to distinguish it from dirty state
// and use -2 as the -1 sentinel may be taken by some producer errors
public static final long OFFSET_UNKNOWN = -2;
// and use -4 as the -1 sentinel may be taken by some producer errors and -2 in the
// subscription means that the state is used by an active task and hence caught-up and
// -3 is also used in the subscription.
public static final long OFFSET_UNKNOWN = -4L;
private final File file;
private final Object lock;

30
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -255,19 +255,7 @@ public class TaskManagerTest { @@ -255,19 +255,7 @@ public class TaskManagerTest {
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, Task.LATEST_OFFSET));
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask runningTask = handleAssignment(
taskId00Assignment,
emptyMap(),
emptyMap()
).get(taskId00);
runningTask.setChangelogOffsets(changelogOffsets);
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
@Test
@ -278,6 +266,22 @@ public class TaskManagerTest { @@ -278,6 +266,22 @@ public class TaskManagerTest {
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
mkEntry(new TopicPartition("changelog", 0), OffsetCheckpoint.OFFSET_UNKNOWN),
mkEntry(new TopicPartition("changelog", 1), 10L)
);
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 10L));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
}
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
replay(stateDirectory);

5
streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java

@ -104,14 +104,15 @@ public class OffsetCheckpointTest { @@ -104,14 +104,15 @@ public class OffsetCheckpointTest {
public void shouldReadAndWriteSentinelOffset() throws IOException {
final File f = TestUtils.tempFile();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
final long sentinelOffset = -4L;
try {
final Map<TopicPartition, Long> offsetsToWrite = new HashMap<>();
offsetsToWrite.put(new TopicPartition(topic, 1), -2L);
offsetsToWrite.put(new TopicPartition(topic, 1), sentinelOffset);
checkpoint.write(offsetsToWrite);
final Map<TopicPartition, Long> readOffsets = checkpoint.read();
assertThat(readOffsets.get(new TopicPartition(topic, 1)), equalTo(-2L));
assertThat(readOffsets.get(new TopicPartition(topic, 1)), equalTo(sentinelOffset));
} finally {
checkpoint.delete();
}

Loading…
Cancel
Save