Browse Source

KAFKA-15607: Fix NPE in MirrorCheckpointTask::syncGroupOffset (#14587)

Reviewers: Chris Egerton <chrise@aiven.io>
pull/6541/merge
hudeqi 1 year ago committed by GitHub
parent
commit
4083cd627e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
  2. 27
      connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java

11
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java

@ -330,12 +330,21 @@ public class MirrorCheckpointTask extends SourceTask {
// if translated offset from upstream is smaller than the current consumer offset // if translated offset from upstream is smaller than the current consumer offset
// in the target, skip updating the offset for that partition // in the target, skip updating the offset for that partition
long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset(); OffsetAndMetadata targetOffsetAndMetadata = targetConsumerOffset.get(topicPartition);
if (targetOffsetAndMetadata != null) {
long latestDownstreamOffset = targetOffsetAndMetadata.offset();
if (latestDownstreamOffset >= convertedOffset.offset()) { if (latestDownstreamOffset >= convertedOffset.offset()) {
log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for " log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for "
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition); + "TopicPartition {}", latestDownstreamOffset, convertedOffset.offset(), topicPartition);
continue; continue;
} }
} else {
// It is possible that when resetting offsets are performed in the java kafka client, the reset to -1 will be intercepted.
// However, there are some other types of clients such as sarama, which can magically reset the group offset to -1, which will cause
// `targetOffsetAndMetadata` here is null. For this case, just sync the offset to target.
log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.",
consumerGroupId, topicPartition);
}
offsetToSync.put(topicPartition, convertedOffset); offsetToSync.put(topicPartition, convertedOffset);
} }

27
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java

@ -169,6 +169,33 @@ public class MirrorCheckpointTaskTest {
"Consumer 2 " + topic2 + " failed"); "Consumer 2 " + topic2 + " failed");
} }
@Test
public void testSyncOffsetForTargetGroupWithNullOffsetAndMetadata() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
String consumer = "consumer";
String topic = "topic";
Map<TopicPartition, OffsetAndMetadata> ct = new HashMap<>();
TopicPartition tp = new TopicPartition(topic, 0);
// Simulate other clients such as Sarama, which may reset group offsets to -1. This can cause
// the obtained `OffsetAndMetadata` of the target cluster to be null.
ct.put(tp, null);
idleConsumerGroupsOffset.put(consumer, ct);
Checkpoint cp = new Checkpoint(consumer, new TopicPartition(topic, 0), 200, 101, "metadata");
Map<TopicPartition, Checkpoint> checkpointMap = new HashMap<>();
checkpointMap.put(cp.topicPartition(), cp);
checkpointsPerConsumerGroup.put(consumer, checkpointMap);
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source", "target",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);
Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();
assertEquals(101, output.get(consumer).get(tp).offset(), "Consumer " + topic + " failed");
}
@Test @Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() { public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore(); OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();

Loading…
Cancel
Save