Browse Source

KAFKA-2409; have KafkaConsumer.committed return null when there is no commit

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Onur Karaman, Guozhang Wang

Closes #243 from hachikuji/KAFKA-2409
pull/243/merge
Jason Gustafson 9 years ago committed by Guozhang Wang
parent
commit
7e453df317
  1. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  2. 5
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala
  3. 6
      core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala

11
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -775,7 +775,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -775,7 +775,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
* offset reset policy has been configured.
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
@Override
@ -814,7 +814,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -814,7 +814,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
* @return The fetched records (may be empty)
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
@ -1039,9 +1039,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1039,9 +1039,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* consumer hasn't yet initialized its cache of committed offsets.
*
* @param partition The partition to check
* @return The last committed offset
* @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
* partition.
* @return The last committed offset and metadata or null if there was no prior commit
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
@ -1059,9 +1057,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1059,9 +1057,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
committed = offsets.get(partition);
}
if (committed == null)
throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
return committed;
} finally {
release();

5
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -188,9 +188,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -188,9 +188,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
val pos2 = this.consumers(0).position(tp2)
this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
assertEquals(3, this.consumers(0).committed(tp).offset)
intercept[NoOffsetForPartitionException] {
this.consumers(0).committed(tp2)
}
assertNull(this.consumers(0).committed(tp2))
// positions should not change
assertEquals(pos1, this.consumers(0).position(tp))
assertEquals(pos2, this.consumers(0).position(tp2))

6
core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala

@ -169,10 +169,8 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging { @@ -169,10 +169,8 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
def testPositionAndCommit() {
sendRecords(5)
// committed() on a partition with no committed offset throws an exception
intercept[NoOffsetForPartitionException] {
this.consumers(0).committed(new TopicPartition(topic, 15))
}
// committed() on a partition with no committed offset returns null
assertNull(this.consumers(0).committed(new TopicPartition(topic, 15)))
// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalArgumentException] {

Loading…
Cancel
Save