|
|
@ -133,8 +133,8 @@ public class KafkaProducerTest { |
|
|
|
Properties props = new Properties(); |
|
|
|
Properties props = new Properties(); |
|
|
|
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); |
|
|
|
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); |
|
|
|
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); |
|
|
|
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); |
|
|
|
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName()); |
|
|
|
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
|
|
|
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName()); |
|
|
|
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
|
|
|
|
|
|
|
|
|
|
|
ProducerConfig config = new ProducerConfig(props); |
|
|
|
ProducerConfig config = new ProducerConfig(props); |
|
|
|
assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); |
|
|
|
assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); |
|
|
@ -457,7 +457,7 @@ public class KafkaProducerTest { |
|
|
|
|
|
|
|
|
|
|
|
// Four request updates where the topic isn't present, at which point the timeout expires and a
|
|
|
|
// Four request updates where the topic isn't present, at which point the timeout expires and a
|
|
|
|
// TimeoutException is thrown
|
|
|
|
// TimeoutException is thrown
|
|
|
|
Future future = producer.send(record); |
|
|
|
Future<RecordMetadata> future = producer.send(record); |
|
|
|
verify(metadata, times(4)).requestUpdateForTopic(topic); |
|
|
|
verify(metadata, times(4)).requestUpdateForTopic(topic); |
|
|
|
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); |
|
|
|
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); |
|
|
|
verify(metadata, times(5)).fetch(); |
|
|
|
verify(metadata, times(5)).fetch(); |
|
|
@ -533,7 +533,7 @@ public class KafkaProducerTest { |
|
|
|
|
|
|
|
|
|
|
|
// Four request updates where the requested partition is out of range, at which point the timeout expires
|
|
|
|
// Four request updates where the requested partition is out of range, at which point the timeout expires
|
|
|
|
// and a TimeoutException is thrown
|
|
|
|
// and a TimeoutException is thrown
|
|
|
|
Future future = producer.send(record); |
|
|
|
Future<RecordMetadata> future = producer.send(record); |
|
|
|
verify(metadata, times(4)).requestUpdateForTopic(topic); |
|
|
|
verify(metadata, times(4)).requestUpdateForTopic(topic); |
|
|
|
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); |
|
|
|
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); |
|
|
|
verify(metadata, times(5)).fetch(); |
|
|
|
verify(metadata, times(5)).fetch(); |
|
|
@ -644,9 +644,7 @@ public class KafkaProducerTest { |
|
|
|
private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) { |
|
|
|
private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) { |
|
|
|
Map<String, Object> configs = new HashMap<>(); |
|
|
|
Map<String, Object> configs = new HashMap<>(); |
|
|
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); |
|
|
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); |
|
|
|
@SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
|
|
|
|
|
|
|
|
Serializer<String> keySerializer = mock(serializerClassToMock); |
|
|
|
Serializer<String> keySerializer = mock(serializerClassToMock); |
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
|
|
|
Serializer<String> valueSerializer = mock(serializerClassToMock); |
|
|
|
Serializer<String> valueSerializer = mock(serializerClassToMock); |
|
|
|
|
|
|
|
|
|
|
|
long nowMs = Time.SYSTEM.milliseconds(); |
|
|
|
long nowMs = Time.SYSTEM.milliseconds(); |
|
|
@ -689,7 +687,7 @@ public class KafkaProducerTest { |
|
|
|
public void closeShouldBeIdempotent() { |
|
|
|
public void closeShouldBeIdempotent() { |
|
|
|
Properties producerProps = new Properties(); |
|
|
|
Properties producerProps = new Properties(); |
|
|
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); |
|
|
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); |
|
|
|
Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); |
|
|
|
Producer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); |
|
|
|
producer.close(); |
|
|
|
producer.close(); |
|
|
|
producer.close(); |
|
|
|
producer.close(); |
|
|
|
} |
|
|
|
} |
|
|
@ -698,12 +696,12 @@ public class KafkaProducerTest { |
|
|
|
public void testMetricConfigRecordingLevel() { |
|
|
|
public void testMetricConfigRecordingLevel() { |
|
|
|
Properties props = new Properties(); |
|
|
|
Properties props = new Properties(); |
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); |
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); |
|
|
|
try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { |
|
|
|
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { |
|
|
|
assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel()); |
|
|
|
assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); |
|
|
|
props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); |
|
|
|
try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { |
|
|
|
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { |
|
|
|
assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); |
|
|
|
assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|