From 359be3a682951fd469d690df8d9e7a5a89a9d03b Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 9 Nov 2015 11:17:18 -0800 Subject: [PATCH] KAFKA-2674: clarify onPartitionsRevoked behavior Author: Jason Gustafson Reviewers: Guozhang Wang Closes #467 from hachikuji/KAFKA-2674 --- .../consumer/ConsumerRebalanceListener.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index 671b6f252ce..8af405cedd3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -57,22 +57,37 @@ import org.apache.kafka.common.TopicPartition; * this.consumer = consumer; * } * - * public void onPartitionsAssigned(Collection partitions) { - * // read the offsets from an external store using some custom code not described here - * for(TopicPartition partition: partitions) - * consumer.seek(partition, readOffsetFromExternalStore(partition)); - * } * public void onPartitionsRevoked(Collection partitions) { * // save the offsets in an external store using some custom code not described here * for(TopicPartition partition: partitions) * saveOffsetInExternalStore(consumer.position(partition)); * } + * + * public void onPartitionsAssigned(Collection partitions) { + * // read the offsets from an external store using some custom code not described here + * for(TopicPartition partition: partitions) + * consumer.seek(partition, readOffsetFromExternalStore(partition)); + * } * } * } * */ public interface ConsumerRebalanceListener { + /** + * A callback method the user can implement to provide handling of offset commits to a customized store on the start + * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer + * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a + * custom offset store to prevent duplicate data. + *

+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + *

+ * NOTE: This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}. + * + * @param partitions The list of partitions that were assigned to the consumer on the last rebalance + */ + public void onPartitionsRevoked(Collection partitions); + /** * A callback method the user can implement to provide handling of customized offsets on completion of a successful * partition re-assignment. This method will be called after an offset re-assignment completes and before the @@ -86,16 +101,4 @@ public interface ConsumerRebalanceListener { * assigned to the consumer) */ public void onPartitionsAssigned(Collection partitions); - - /** - * A callback method the user can implement to provide handling of offset commits to a customized store on the start - * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer - * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a - * custom offset store to prevent duplicate data - *

- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} - * - * @param partitions The list of partitions that were assigned to the consumer on the last rebalance - */ - public void onPartitionsRevoked(Collection partitions); }