diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index abb69134e72..bd652e0a32e 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -151,15 +151,20 @@ public class Consumer extends Thread implements ConsumerRebalanceListener { @Override public void onPartitionsRevoked(Collection partitions) { Utils.printOut("Revoked partitions: %s", partitions); + // this can be used to commit pending offsets when using manual commit and EOS is disabled } @Override public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); + // this can be used to read the offsets from an external store or some other initialization } @Override public void onPartitionsLost(Collection partitions) { Utils.printOut("Lost partitions: %s", partitions); + // this is called when partitions are reassigned before we had a chance to revoke them gracefully + // we can't commit pending offsets because these partitions are probably owned by other consumers already + // nevertheless, we may need to do some other cleanup } }