From 37be6d98da842512367ab0b31d8f0244afafda92 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 14 Jan 2016 17:20:08 -0800 Subject: [PATCH] KAFKA-3108: custom StreamParitioner for Windowed key guozhangwang When ```WindowedSerializer``` is specified in ```to(...)``` or ```through(...)``` for a key, we use ```WindowedStreamPartitioner```. Author: Yasuhiro Matsuda Reviewers: Guozhang Wang Closes #779 from ymatsuda/partitioner --- .../kstream/internals/KStreamImpl.java | 10 ++- .../kstream/internals/WindowedSerializer.java | 6 +- .../internals/WindowedStreamPartitioner.java | 52 ++++++++++++ .../WindowedStreamPartitionerTest.java | 84 +++++++++++++++++++ 4 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index f53c0d0c849..2459f0d5660 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.Serdes; @@ -210,11 +211,18 @@ public class KStreamImpl extends AbstractStream implements KStream keySerializer, Serializer valSerializer) { String name = topology.newName(SINK_NAME); + StreamPartitioner streamPartitioner = null; + + if (keySerializer != null && keySerializer instanceof WindowedSerializer) { + WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; + streamPartitioner = (StreamPartitioner) new WindowedStreamPartitioner(windowedSerializer); + } - topology.addSink(name, topic, keySerializer, valSerializer, this.name); + topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java index 4407a5b9bb2..0afcad131d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java @@ -49,9 +49,13 @@ public class WindowedSerializer implements Serializer> { return buf.array(); } - @Override public void close() { inner.close(); } + + public byte[] serializeBaseKey(String topic, Windowed data) { + return inner.serialize(topic, data.value()); + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java new file mode 100644 index 00000000000..10e69cc776f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StreamPartitioner; + +public class WindowedStreamPartitioner implements StreamPartitioner, V> { + + private final WindowedSerializer serializer; + + public WindowedStreamPartitioner(WindowedSerializer serializer) { + this.serializer = serializer; + } + + /** + * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value + * and the current number of partitions. The partition number id determined by the original key of the windowed key + * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. + * + * @param windowedKey the key of the message + * @param value the value of the message + * @param numPartitions the total number of partitions + * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used + */ + public Integer partition(Windowed windowedKey, V value, int numPartitions) { + byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); + + // hash the keyBytes to choose a partition + return toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } + + private static int toPositive(int number) { + return number & 0x7fffffff; + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java new file mode 100644 index 00000000000..1b8cbb8a449 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class WindowedStreamPartitionerTest { + + private String topicName = "topic"; + + private IntegerSerializer keySerializer = new IntegerSerializer(); + private StringSerializer valSerializer = new StringSerializer(); + + private List infos = Arrays.asList( + new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) + ); + + private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); + + @Test + public void testCopartitioning() { + + Random rand = new Random(); + + DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); + + WindowedSerializer windowedSerializer = new WindowedSerializer<>(keySerializer); + WindowedStreamPartitioner streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); + + for (int k = 0; k < 10; k++) { + Integer key = rand.nextInt(); + byte[] keyBytes = keySerializer.serialize(topicName, key); + + String value = key.toString(); + byte[] valueBytes = valSerializer.serialize(topicName, value); + + Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); + + for (int w = 0; w < 10; w++) { + HoppingWindow window = new HoppingWindow(10 * w, 20 * w); + + Windowed windowedKey = new Windowed<>(key, window); + Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); + + assertEquals(expected, actual); + } + } + } + +}