diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 7e192973425..5df14ee2815 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -153,8 +153,8 @@ public class RecordCollectorImpl implements RecordCollector { final Serializer keySerializer, final Serializer valueSerializer) { checkForException(); - final byte[] keyBytes = keySerializer.serialize(topic, key); - final byte[] valBytes = valueSerializer.serialize(topic, value); + final byte[] keyBytes = keySerializer.serialize(topic, headers, key); + final byte[] valBytes = valueSerializer.serialize(topic, headers, value); final ProducerRecord serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 4f89a1e756f..c4e58be7c1f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -65,17 +65,14 @@ public class RecordCollectorTest { ); private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, - Collections.emptySet(), Collections.emptySet()); + Collections.emptySet(), Collections.emptySet()); private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); private final StringSerializer stringSerializer = new StringSerializer(); - private final StreamPartitioner streamPartitioner = new StreamPartitioner() { - @Override - public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { - return Integer.parseInt(key) % numPartitions; - } + private final StreamPartitioner streamPartitioner = (topic, key, value, numPartitions) -> { + return Integer.parseInt(key) % numPartitions; }; @Test @@ -362,4 +359,55 @@ public class RecordCollectorTest { }); collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); } + + @Test + public void testRecordHeaderPassThroughSerializer() { + final CustomStringSerializer keySerializer = new CustomStringSerializer(); + final CustomStringSerializer valueSerializer = new CustomStringSerializer(); + keySerializer.configure(Collections.EMPTY_MAP, true); + + final RecordCollectorImpl collector = new RecordCollectorImpl( + "test", + logContext, + new DefaultProductionExceptionHandler(), + new Metrics().sensor("skipped-records") + ); + final MockProducer mockProducer = new MockProducer<>(cluster, true, new DefaultPartitioner(), + byteArraySerializer, byteArraySerializer); + collector.init(mockProducer); + + collector.send("topic1", "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, streamPartitioner); + + final List> recordHistory = mockProducer.history(); + for (final ProducerRecord sentRecord : recordHistory) { + final Headers headers = sentRecord.headers(); + assertEquals(2, headers.toArray().length); + assertEquals(new RecordHeader("key", "key".getBytes()), headers.lastHeader("key")); + assertEquals(new RecordHeader("value", "value".getBytes()), headers.lastHeader("value")); + } + } + + private static class CustomStringSerializer extends StringSerializer { + + private boolean isKey; + + private CustomStringSerializer() { + } + + @Override + public void configure(final Map configs, final boolean isKey) { + this.isKey = isKey; + super.configure(configs, isKey); + } + + @Override + public byte[] serialize(final String topic, final Headers headers, final String data) { + if (isKey) { + headers.add(new RecordHeader("key", "key".getBytes())); + } else { + headers.add(new RecordHeader("value", "value".getBytes())); + } + return serialize(topic, data); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 699963395e9..096f792d78d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -206,8 +206,8 @@ public class KeyValueStoreTestDriver { final Serializer valueSerializer) { // for byte arrays we need to wrap it for comparison - final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key)); - final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value)); + final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, headers, key)); + final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, headers, value)); recordFlushed(keyTest, valueTest); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 08f019feffa..cd0f49a7372 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -96,8 +96,8 @@ public class RocksDBWindowStoreTest { final Serializer keySerializer, final Serializer valueSerializer) { changeLog.add(new KeyValue<>( - keySerializer.serialize(topic, key), - valueSerializer.serialize(topic, value)) + keySerializer.serialize(topic, headers, key), + valueSerializer.serialize(topic, headers, value)) ); } }; diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index bc918ea1b2b..244b35f1465 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -61,8 +61,9 @@ public class MockRestoreConsumer extends MockConsumer { recordBuffer.add( new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(), 0L, 0, 0, - keySerializer.serialize(record.topic(), record.key()), - valueSerializer.serialize(record.topic(), record.value()))); + keySerializer.serialize(record.topic(), record.headers(), record.key()), + valueSerializer.serialize(record.topic(), record.headers(), record.value()), + record.headers())); endOffset = record.offset(); super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset)); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java index 108dafdfdba..87ec7c1fcc3 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java @@ -180,8 +180,8 @@ public class ConsumerRecordFactory { final long timestampMs) { Objects.requireNonNull(topicName, "topicName cannot be null."); Objects.requireNonNull(headers, "headers cannot be null."); - final byte[] serializedKey = keySerializer.serialize(topicName, key); - final byte[] serializedValue = valueSerializer.serialize(topicName, value); + final byte[] serializedKey = keySerializer.serialize(topicName, headers, key); + final byte[] serializedValue = valueSerializer.serialize(topicName, headers, value); return new ConsumerRecord<>( topicName, -1,