Browse Source

KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Greg Harris <gharris1727@gmail.com>
pull/13072/head
csolidum 2 years ago committed by GitHub
parent
commit
ad94dc2134
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
  2. 10
      connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java

14
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java

@ -189,14 +189,16 @@ public class MirrorCheckpointTask extends SourceTask { @@ -189,14 +189,16 @@ public class MirrorCheckpointTask extends SourceTask {
Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition,
OffsetAndMetadata offsetAndMetadata) {
long upstreamOffset = offsetAndMetadata.offset();
OptionalLong downstreamOffset = offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
if (downstreamOffset.isPresent()) {
return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
if (offsetAndMetadata != null) {
long upstreamOffset = offsetAndMetadata.offset();
OptionalLong downstreamOffset =
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
if (downstreamOffset.isPresent()) {
return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
} else {
return Optional.empty();
}
}
return Optional.empty();
}
SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {

10
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java

@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest { @@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest {
assertFalse(checkpoint1.isPresent());
assertTrue(checkpoint2.isPresent());
}
@Test
public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
assertFalse(checkpoint.isPresent());
}
}

Loading…
Cancel
Save