diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 79e33aa72c9..b1d407ee4dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -201,12 +201,12 @@ import java.util.regex.Pattern; *
*
* Properties props = new Properties(); - * props.put("bootstrap.servers", "localhost:9092"); - * props.put("group.id", "test"); - * props.put("enable.auto.commit", "true"); - * props.put("auto.commit.interval.ms", "1000"); - * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("bootstrap.servers", "localhost:9092"); + * props.setProperty("group.id", "test"); + * props.setProperty("enable.auto.commit", "true"); + * props.setProperty("auto.commit.interval.ms", "1000"); + * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); * consumer.subscribe(Arrays.asList("foo", "bar")); * while (true) { @@ -239,17 +239,17 @@ import java.util.regex.Pattern; **
* Properties props = new Properties(); - * props.put("bootstrap.servers", "localhost:9092"); - * props.put("group.id", "test"); - * props.put("enable.auto.commit", "false"); - * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("bootstrap.servers", "localhost:9092"); + * props.setProperty("group.id", "test"); + * props.setProperty("enable.auto.commit", "false"); + * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); * consumer.subscribe(Arrays.asList("foo", "bar")); * final int minBatchSize = 200; * List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); * while (true) { - * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100); + * ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); * for (ConsumerRecord<String, String> record : records) { * buffer.add(record); * } @@ -470,6 +470,10 @@ import java.util.regex.Pattern; * private final AtomicBoolean closed = new AtomicBoolean(false); * private final KafkaConsumer consumer; * + * public KafkaConsumerRunner(KafkaConsumer consumer) { + * this.consumer = consumer; + * } + * * public void run() { * try { * consumer.subscribe(Arrays.asList("topic"));