@ -16,7 +16,6 @@
* /
* /
package kafka.examples ;
package kafka.examples ;
import kafka.utils.ShutdownableThread ;
import org.apache.kafka.clients.consumer.ConsumerConfig ;
import org.apache.kafka.clients.consumer.ConsumerConfig ;
import org.apache.kafka.clients.consumer.ConsumerRecord ;
import org.apache.kafka.clients.consumer.ConsumerRecord ;
import org.apache.kafka.clients.consumer.ConsumerRecords ;
import org.apache.kafka.clients.consumer.ConsumerRecords ;
@ -28,7 +27,7 @@ import java.util.Optional;
import java.util.Properties ;
import java.util.Properties ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.CountDownLatch ;
public class Consumer extends Shutdownable Thread {
public class Consumer extends Thread {
private final KafkaConsumer < Integer , String > consumer ;
private final KafkaConsumer < Integer , String > consumer ;
private final String topic ;
private final String topic ;
private final String groupId ;
private final String groupId ;
@ -42,7 +41,7 @@ public class Consumer extends ShutdownableThread {
final boolean readCommitted ,
final boolean readCommitted ,
final int numMessageToConsume ,
final int numMessageToConsume ,
final CountDownLatch latch ) {
final CountDownLatch latch ) {
super ( "KafkaConsumerExample" , false ) ;
super ( "KafkaConsumerExample" ) ;
this . groupId = groupId ;
this . groupId = groupId ;
Properties props = new Properties ( ) ;
Properties props = new Properties ( ) ;
props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , KafkaProperties . KAFKA_SERVER_URL + ":" + KafkaProperties . KAFKA_SERVER_PORT ) ;
props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , KafkaProperties . KAFKA_SERVER_URL + ":" + KafkaProperties . KAFKA_SERVER_PORT ) ;
@ -70,6 +69,18 @@ public class Consumer extends ShutdownableThread {
}
}
@Override
@Override
public void run ( ) {
try {
do {
doWork ( ) ;
} while ( messageRemaining > 0 ) ;
System . out . println ( groupId + " finished reading " + numMessageToConsume + " messages" ) ;
} catch ( Exception e ) {
System . out . println ( "Unexpected termination, exception thrown:" + e ) ;
} finally {
shutdown ( ) ;
}
}
public void doWork ( ) {
public void doWork ( ) {
consumer . subscribe ( Collections . singletonList ( this . topic ) ) ;
consumer . subscribe ( Collections . singletonList ( this . topic ) ) ;
ConsumerRecords < Integer , String > records = consumer . poll ( Duration . ofSeconds ( 1 ) ) ;
ConsumerRecords < Integer , String > records = consumer . poll ( Duration . ofSeconds ( 1 ) ) ;
@ -77,19 +88,9 @@ public class Consumer extends ShutdownableThread {
System . out . println ( groupId + " received message : from partition " + record . partition ( ) + ", (" + record . key ( ) + ", " + record . value ( ) + ") at offset " + record . offset ( ) ) ;
System . out . println ( groupId + " received message : from partition " + record . partition ( ) + ", (" + record . key ( ) + ", " + record . value ( ) + ") at offset " + record . offset ( ) ) ;
}
}
messageRemaining - = records . count ( ) ;
messageRemaining - = records . count ( ) ;
if ( messageRemaining < = 0 ) {
System . out . println ( groupId + " finished reading " + numMessageToConsume + " messages" ) ;
latch . countDown ( ) ;
}
}
}
@Override
public void shutdown ( ) {
public String name ( ) {
latch . countDown ( ) ;
return null ;
}
@Override
public boolean isInterruptible ( ) {
return false ;
}
}
}
}