@ -20,6 +20,9 @@ import org.apache.kafka.clients.producer.Callback;
@@ -20,6 +20,9 @@ import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer ;
import org.apache.kafka.clients.producer.ProducerRecord ;
import org.apache.kafka.clients.producer.RecordMetadata ;
import org.apache.kafka.clients.producer.ProducerConfig ;
import org.apache.kafka.common.serialization.IntegerSerializer ;
import org.apache.kafka.common.serialization.StringSerializer ;
import java.util.Properties ;
import java.util.concurrent.ExecutionException ;
@ -31,10 +34,10 @@ public class Producer extends Thread {
@@ -31,10 +34,10 @@ public class Producer extends Thread {
public Producer ( String topic , Boolean isAsync ) {
Properties props = new Properties ( ) ;
props . put ( "bootstrap.servers" , KafkaProperties . KAFKA_SERVER_URL + ":" + KafkaProperties . KAFKA_SERVER_PORT ) ;
props . put ( "client.id" , "DemoProducer" ) ;
props . put ( "key.serializer" , "org.apache.kafka.common.serialization.IntegerSerializer" ) ;
props . put ( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) ;
props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , KafkaProperties . KAFKA_SERVER_URL + ":" + KafkaProperties . KAFKA_SERVER_PORT ) ;
props . put ( ProducerConfig . CLIENT_ID_CONFIG , "DemoProducer" ) ;
props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , IntegerSerializer . class . getName ( ) ) ;
props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class . getName ( ) ) ;
producer = new KafkaProducer < > ( props ) ;
this . topic = topic ;
this . isAsync = isAsync ;