Browse Source

KAFKA-3960; Committed offset not set after first assign

Author: Alexey Romanchuk <al.romanchuk@2gis.ru>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1629 from 13h3r/kafka-3960
pull/1661/head
Alexey Romanchuk 8 years ago committed by Ismael Juma
parent
commit
932bb84c83
  1. 1
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  2. 58
      clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  3. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

1
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -170,6 +170,7 @@ public class SubscriptionState { @@ -170,6 +170,7 @@ public class SubscriptionState {
this.assignment.keySet().retainAll(this.userAssignment);
this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true;
}
/**

58
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.HeartbeatResponse; @@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
@ -453,6 +454,55 @@ public class KafkaConsumerTest { @@ -453,6 +454,55 @@ public class KafkaConsumerTest {
assertTrue(heartbeatReceived.get());
}
@Test
public void testCommitsFetchedDuringAssign() {
String topic = "topic";
final TopicPartition partition1 = new TopicPartition(topic, 0);
final TopicPartition partition2 = new TopicPartition(topic, 1);
long offset1 = 10000;
long offset2 = 20000;
int sessionTimeoutMs = 3000;
int heartbeatIntervalMs = 2000;
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
MockClient client = new MockClient(time);
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
client.setNode(node);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
metadata.update(cluster, time.milliseconds());
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
consumer.assign(Arrays.asList(partition1));
// lookup coordinator
client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// fetch offset for one topic
client.prepareResponseFrom(
offsetResponse(Collections.singletonMap(partition1, offset1), Errors.NONE.code()),
coordinator);
assertEquals(offset1, consumer.committed(partition1).offset());
consumer.assign(Arrays.asList(partition1, partition2));
// fetch offset for two topics
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(partition1, offset1);
offsets.put(partition2, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator);
assertEquals(offset1, consumer.committed(partition1).offset());
assertEquals(offset2, consumer.committed(partition2).offset());
}
@Test
public void testAutoCommitSentBeforePositionUpdate() {
String topic = "topic";
@ -611,6 +661,14 @@ public class KafkaConsumerTest { @@ -611,6 +661,14 @@ public class KafkaConsumerTest {
return new SyncGroupResponse(error, buf).toStruct();
}
private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionData = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error));
}
return new OffsetFetchResponse(partitionData).toStruct();
}
private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
for (int i = 0; i < count; i++)

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

@ -46,6 +46,8 @@ public class SubscriptionStateTest { @@ -46,6 +46,8 @@ public class SubscriptionStateTest {
state.assignFromUser(Arrays.asList(tp0));
assertEquals(Collections.singleton(tp0), state.assignedPartitions());
assertFalse(state.partitionAssignmentNeeded());
assertFalse(state.hasAllFetchPositions());
assertTrue(state.refreshCommitsNeeded());
state.committed(tp0, new OffsetAndMetadata(1));
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));

Loading…
Cancel
Save