@ -17,17 +17,26 @@
@@ -17,17 +17,26 @@
package kafka.examples ;
import org.apache.kafka.clients.consumer.ConsumerConfig ;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener ;
import org.apache.kafka.clients.consumer.ConsumerRecord ;
import org.apache.kafka.clients.consumer.ConsumerRecords ;
import org.apache.kafka.clients.consumer.KafkaConsumer ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.errors.WakeupException ;
import java.time.Duration ;
import java.util.Collection ;
import java.util.Collections ;
import java.util.Optional ;
import java.util.Properties ;
import java.util.concurrent.CountDownLatch ;
public class Consumer extends Thread {
/ * *
* A simple consumer thread that demonstrate subscribe and poll use case . The thread subscribes to a topic ,
* then runs a loop to poll new messages , and print the message out . The thread closes until the target { @code
* numMessageToConsume } is hit or catching an exception .
* /
public class Consumer extends Thread implements ConsumerRebalanceListener {
private final KafkaConsumer < Integer , String > consumer ;
private final String topic ;
private final String groupId ;
@ -48,8 +57,6 @@ public class Consumer extends Thread {
@@ -48,8 +57,6 @@ public class Consumer extends Thread {
props . put ( ConsumerConfig . GROUP_ID_CONFIG , groupId ) ;
instanceId . ifPresent ( id - > props . put ( ConsumerConfig . GROUP_INSTANCE_ID_CONFIG , id ) ) ;
props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , "true" ) ;
props . put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000" ) ;
props . put ( ConsumerConfig . SESSION_TIMEOUT_MS_CONFIG , "30000" ) ;
props . put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.IntegerDeserializer" ) ;
props . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringDeserializer" ) ;
if ( readCommitted ) {
@ -71,10 +78,14 @@ public class Consumer extends Thread {
@@ -71,10 +78,14 @@ public class Consumer extends Thread {
@Override
public void run ( ) {
try {
System . out . println ( "Subscribe to:" + this . topic ) ;
consumer . subscribe ( Collections . singletonList ( this . topic ) , this ) ;
do {
doWork ( ) ;
} while ( messageRemaining > 0 ) ;
System . out . println ( groupId + " finished reading " + numMessageToConsume + " messages" ) ;
} catch ( WakeupException e ) {
// swallow the wakeup
} catch ( Exception e ) {
System . out . println ( "Unexpected termination, exception thrown:" + e ) ;
} finally {
@ -82,7 +93,6 @@ public class Consumer extends Thread {
@@ -82,7 +93,6 @@ public class Consumer extends Thread {
}
}
public void doWork ( ) {
consumer . subscribe ( Collections . singletonList ( this . topic ) ) ;
ConsumerRecords < Integer , String > records = consumer . poll ( Duration . ofSeconds ( 1 ) ) ;
for ( ConsumerRecord < Integer , String > record : records ) {
System . out . println ( groupId + " received message : from partition " + record . partition ( ) + ", (" + record . key ( ) + ", " + record . value ( ) + ") at offset " + record . offset ( ) ) ;
@ -91,6 +101,17 @@ public class Consumer extends Thread {
@@ -91,6 +101,17 @@ public class Consumer extends Thread {
}
public void shutdown ( ) {
this . consumer . close ( ) ;
latch . countDown ( ) ;
}
@Override
public void onPartitionsRevoked ( Collection < TopicPartition > partitions ) {
System . out . println ( "Revoking partitions:" + partitions ) ;
}
@Override
public void onPartitionsAssigned ( Collection < TopicPartition > partitions ) {
System . out . println ( "Assigning partitions:" + partitions ) ;
}
}