Browse Source

KAFKA-7483: Allow streams to pass headers through Serializer. (#5751)

Satish Duggana <sduggana@hortonworks.com>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/5691/merge
Kamal Chandraprakash 6 years ago committed by Guozhang Wang
parent
commit
9a74569b99
  1. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
  2. 60
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
  3. 4
      streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
  4. 4
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
  5. 5
      streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
  6. 4
      streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -153,8 +153,8 @@ public class RecordCollectorImpl implements RecordCollector { @@ -153,8 +153,8 @@ public class RecordCollectorImpl implements RecordCollector {
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
checkForException();
final byte[] keyBytes = keySerializer.serialize(topic, key);
final byte[] valBytes = valueSerializer.serialize(topic, value);
final byte[] keyBytes = keySerializer.serialize(topic, headers, key);
final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);

60
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

@ -65,17 +65,14 @@ public class RecordCollectorTest { @@ -65,17 +65,14 @@ public class RecordCollectorTest {
);
private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
Collections.<String>emptySet(), Collections.<String>emptySet());
Collections.emptySet(), Collections.emptySet());
private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
private final StringSerializer stringSerializer = new StringSerializer();
private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
private final StreamPartitioner<String, Object> streamPartitioner = (topic, key, value, numPartitions) -> {
return Integer.parseInt(key) % numPartitions;
};
@Test
@ -362,4 +359,55 @@ public class RecordCollectorTest { @@ -362,4 +359,55 @@ public class RecordCollectorTest {
});
collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
}
@Test
public void testRecordHeaderPassThroughSerializer() {
final CustomStringSerializer keySerializer = new CustomStringSerializer();
final CustomStringSerializer valueSerializer = new CustomStringSerializer();
keySerializer.configure(Collections.EMPTY_MAP, true);
final RecordCollectorImpl collector = new RecordCollectorImpl(
"test",
logContext,
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records")
);
final MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer);
collector.init(mockProducer);
collector.send("topic1", "3", "0", new RecordHeaders(), null, keySerializer, valueSerializer, streamPartitioner);
final List<ProducerRecord<byte[], byte[]>> recordHistory = mockProducer.history();
for (final ProducerRecord<byte[], byte[]> sentRecord : recordHistory) {
final Headers headers = sentRecord.headers();
assertEquals(2, headers.toArray().length);
assertEquals(new RecordHeader("key", "key".getBytes()), headers.lastHeader("key"));
assertEquals(new RecordHeader("value", "value".getBytes()), headers.lastHeader("value"));
}
}
private static class CustomStringSerializer extends StringSerializer {
private boolean isKey;
private CustomStringSerializer() {
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
this.isKey = isKey;
super.configure(configs, isKey);
}
@Override
public byte[] serialize(final String topic, final Headers headers, final String data) {
if (isKey) {
headers.add(new RecordHeader("key", "key".getBytes()));
} else {
headers.add(new RecordHeader("value", "value".getBytes()));
}
return serialize(topic, data);
}
}
}

4
streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java

@ -206,8 +206,8 @@ public class KeyValueStoreTestDriver<K, V> { @@ -206,8 +206,8 @@ public class KeyValueStoreTestDriver<K, V> {
final Serializer<V1> valueSerializer) {
// for byte arrays we need to wrap it for comparison
final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key));
final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value));
final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, headers, key));
final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, headers, value));
recordFlushed(keyTest, valueTest);
}

4
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java

@ -96,8 +96,8 @@ public class RocksDBWindowStoreTest { @@ -96,8 +96,8 @@ public class RocksDBWindowStoreTest {
final Serializer<K1> keySerializer,
final Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
keySerializer.serialize(topic, key),
valueSerializer.serialize(topic, value))
keySerializer.serialize(topic, headers, key),
valueSerializer.serialize(topic, headers, value))
);
}
};

5
streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java

@ -61,8 +61,9 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> { @@ -61,8 +61,9 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
recordBuffer.add(
new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.timestamp(),
record.timestampType(), 0L, 0, 0,
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value())));
keySerializer.serialize(record.topic(), record.headers(), record.key()),
valueSerializer.serialize(record.topic(), record.headers(), record.value()),
record.headers()));
endOffset = record.offset();
super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));

4
streams/test-utils/src/main/java/org/apache/kafka/streams/test/ConsumerRecordFactory.java

@ -180,8 +180,8 @@ public class ConsumerRecordFactory<K, V> { @@ -180,8 +180,8 @@ public class ConsumerRecordFactory<K, V> {
final long timestampMs) {
Objects.requireNonNull(topicName, "topicName cannot be null.");
Objects.requireNonNull(headers, "headers cannot be null.");
final byte[] serializedKey = keySerializer.serialize(topicName, key);
final byte[] serializedValue = valueSerializer.serialize(topicName, value);
final byte[] serializedKey = keySerializer.serialize(topicName, headers, key);
final byte[] serializedValue = valueSerializer.serialize(topicName, headers, value);
return new ConsumerRecord<>(
topicName,
-1,

Loading…
Cancel
Save