|
|
|
@ -131,8 +131,8 @@ import static org.apache.kafka.common.utils.Utils.min;
@@ -131,8 +131,8 @@ import static org.apache.kafka.common.utils.Utils.min;
|
|
|
|
|
* props.put("enable.auto.commit", "true"); |
|
|
|
|
* props.put("auto.commit.interval.ms", "1000"); |
|
|
|
|
* props.put("session.timeout.ms", "30000"); |
|
|
|
|
* props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer"); |
|
|
|
|
* props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer"); |
|
|
|
|
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
|
|
|
|
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
|
|
|
|
* KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); |
|
|
|
|
* consumer.subscribe("foo", "bar"); |
|
|
|
|
* while (true) { |
|
|
|
@ -159,8 +159,8 @@ import static org.apache.kafka.common.utils.Utils.min;
@@ -159,8 +159,8 @@ import static org.apache.kafka.common.utils.Utils.min;
|
|
|
|
|
* to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be |
|
|
|
|
* considered dead and it's partitions will be assigned to another process. |
|
|
|
|
* <p> |
|
|
|
|
* The serializers settings specify how to turn the objects the user provides into bytes. By specifying the string |
|
|
|
|
* serializers we are saying that our record's key and value will just be simple strings. |
|
|
|
|
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we |
|
|
|
|
* are saying that our record's key and value will just be simple strings. |
|
|
|
|
* |
|
|
|
|
* <h4>Controlling When Messages Are Considered Consumed</h4> |
|
|
|
|
* |
|
|
|
@ -183,8 +183,8 @@ import static org.apache.kafka.common.utils.Utils.min;
@@ -183,8 +183,8 @@ import static org.apache.kafka.common.utils.Utils.min;
|
|
|
|
|
* props.put("enable.auto.commit", "false"); |
|
|
|
|
* props.put("auto.commit.interval.ms", "1000"); |
|
|
|
|
* props.put("session.timeout.ms", "30000"); |
|
|
|
|
* props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer"); |
|
|
|
|
* props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer"); |
|
|
|
|
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
|
|
|
|
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); |
|
|
|
|
* KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); |
|
|
|
|
* consumer.subscribe("foo", "bar"); |
|
|
|
|
* int commitInterval = 200; |
|
|
|
|