diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 2412d365a15..38660e1b9d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -170,6 +170,7 @@ public class SubscriptionState { this.assignment.keySet().retainAll(this.userAssignment); this.needsPartitionAssignment = false; + this.needsFetchCommittedOffsets = true; } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b5a5fcab6aa..9affa79504d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; @@ -453,6 +454,55 @@ public class KafkaConsumerTest { assertTrue(heartbeatReceived.get()); } + @Test + public void testCommitsFetchedDuringAssign() { + String topic = "topic"; + final TopicPartition partition1 = new TopicPartition(topic, 0); + final TopicPartition partition2 = new TopicPartition(topic, 1); + + long offset1 = 10000; + long offset2 = 20000; + + int sessionTimeoutMs = 3000; + int heartbeatIntervalMs = 2000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + MockClient client = new MockClient(time); + Cluster cluster = TestUtils.singletonCluster(topic, 1); + Node node = cluster.nodes().get(0); + client.setNode(node); + Metadata metadata = new Metadata(0, Long.MAX_VALUE); + metadata.update(cluster, time.milliseconds()); + PartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, + sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + consumer.assign(Arrays.asList(partition1)); + + // lookup coordinator + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); + Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + + // fetch offset for one topic + client.prepareResponseFrom( + offsetResponse(Collections.singletonMap(partition1, offset1), Errors.NONE.code()), + coordinator); + + assertEquals(offset1, consumer.committed(partition1).offset()); + + consumer.assign(Arrays.asList(partition1, partition2)); + + // fetch offset for two topics + Map offsets = new HashMap<>(); + offsets.put(partition1, offset1); + offsets.put(partition2, offset2); + client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator); + + assertEquals(offset1, consumer.committed(partition1).offset()); + assertEquals(offset2, consumer.committed(partition2).offset()); + } + @Test public void testAutoCommitSentBeforePositionUpdate() { String topic = "topic"; @@ -611,6 +661,14 @@ public class KafkaConsumerTest { return new SyncGroupResponse(error, buf).toStruct(); } + private Struct offsetResponse(Map offsets, short error) { + Map partitionData = new HashMap<>(); + for (Map.Entry entry : offsets.entrySet()) { + partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error)); + } + return new OffsetFetchResponse(partitionData).toStruct(); + } + private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) { MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); for (int i = 0; i < count; i++) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 439ded7f79e..3b4b10e7b0c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -46,6 +46,8 @@ public class SubscriptionStateTest { state.assignFromUser(Arrays.asList(tp0)); assertEquals(Collections.singleton(tp0), state.assignedPartitions()); assertFalse(state.partitionAssignmentNeeded()); + assertFalse(state.hasAllFetchPositions()); + assertTrue(state.refreshCommitsNeeded()); state.committed(tp0, new OffsetAndMetadata(1)); state.seek(tp0, 1); assertTrue(state.isFetchable(tp0));