@ -201,12 +201,12 @@ import java.util.regex.Pattern;
@@ -201,12 +201,12 @@ import java.util.regex.Pattern;
* < p >
* < pre >
* Properties props = new Properties ( ) ;
* props . put ( & quot ; bootstrap . servers & quot ; , & quot ; localhost : 9092 & quot ; ) ;
* props . put ( & quot ; group . id & quot ; , & quot ; test & quot ; ) ;
* props . put ( & quot ; enable . auto . commit & quot ; , & quot ; true & quot ; ) ;
* props . put ( & quot ; auto . commit . interval . ms & quot ; , & quot ; 1000 & quot ; ) ;
* props . put ( & quot ; key . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* props . put ( & quot ; value . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* props . setProperty ( & quot ; bootstrap . servers & quot ; , & quot ; localhost : 9092 & quot ; ) ;
* props . setProperty ( & quot ; group . id & quot ; , & quot ; test & quot ; ) ;
* props . setProperty ( & quot ; enable . auto . commit & quot ; , & quot ; true & quot ; ) ;
* props . setProperty ( & quot ; auto . commit . interval . ms & quot ; , & quot ; 1000 & quot ; ) ;
* props . setProperty ( & quot ; key . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* props . setProperty ( & quot ; value . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* KafkaConsumer & lt ; String , String & gt ; consumer = new KafkaConsumer & lt ; & gt ; ( props ) ;
* consumer . subscribe ( Arrays . asList ( & quot ; foo & quot ; , & quot ; bar & quot ; ) ) ;
* while ( true ) {
@ -239,17 +239,17 @@ import java.util.regex.Pattern;
@@ -239,17 +239,17 @@ import java.util.regex.Pattern;
* < p >
* < pre >
* Properties props = new Properties ( ) ;
* props . put ( & quot ; bootstrap . servers & quot ; , & quot ; localhost : 9092 & quot ; ) ;
* props . put ( & quot ; group . id & quot ; , & quot ; test & quot ; ) ;
* props . put ( & quot ; enable . auto . commit & quot ; , & quot ; false & quot ; ) ;
* props . put ( & quot ; key . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* props . put ( & quot ; value . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* props . setProperty ( & quot ; bootstrap . servers & quot ; , & quot ; localhost : 9092 & quot ; ) ;
* props . setProperty ( & quot ; group . id & quot ; , & quot ; test & quot ; ) ;
* props . setProperty ( & quot ; enable . auto . commit & quot ; , & quot ; false & quot ; ) ;
* props . setProperty ( & quot ; key . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* props . setProperty ( & quot ; value . deserializer & quot ; , & quot ; org . apache . kafka . common . serialization . StringDeserializer & quot ; ) ;
* KafkaConsumer & lt ; String , String & gt ; consumer = new KafkaConsumer & lt ; & gt ; ( props ) ;
* consumer . subscribe ( Arrays . asList ( & quot ; foo & quot ; , & quot ; bar & quot ; ) ) ;
* final int minBatchSize = 200 ;
* List & lt ; ConsumerRecord & lt ; String , String & gt ; & gt ; buffer = new ArrayList & lt ; & gt ; ( ) ;
* while ( true ) {
* ConsumerRecords & lt ; String , String & gt ; records = consumer . poll ( Duration . ofMillis ( 100 ) ;
* ConsumerRecords & lt ; String , String & gt ; records = consumer . poll ( Duration . ofMillis ( 100 ) ) ;
* for ( ConsumerRecord & lt ; String , String & gt ; record : records ) {
* buffer . add ( record ) ;
* }
@ -470,6 +470,10 @@ import java.util.regex.Pattern;
@@ -470,6 +470,10 @@ import java.util.regex.Pattern;
* private final AtomicBoolean closed = new AtomicBoolean ( false ) ;
* private final KafkaConsumer consumer ;
*
* public KafkaConsumerRunner ( KafkaConsumer consumer ) {
* this . consumer = consumer ;
* }
*
* public void run ( ) {
* try {
* consumer . subscribe ( Arrays . asList ( "topic" ) ) ;