Browse Source

KAFKA-3108: custom StreamParitioner for Windowed key

guozhangwang

When ```WindowedSerializer``` is specified in ```to(...)``` or ```through(...)``` for a key, we use ```WindowedStreamPartitioner```.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #779 from ymatsuda/partitioner
pull/779/merge
Yasuhiro Matsuda 9 years ago committed by Guozhang Wang
parent
commit
37be6d98da
  1. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  2. 6
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
  3. 52
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
  4. 84
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Window; @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
@ -210,11 +211,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -210,11 +211,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
to(topic, null, null);
}
@SuppressWarnings("unchecked")
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
String name = topology.newName(SINK_NAME);
StreamPartitioner<K, V> streamPartitioner = null;
if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
}
topology.addSink(name, topic, keySerializer, valSerializer, this.name);
topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
}
@Override

6
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java

@ -49,9 +49,13 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> { @@ -49,9 +49,13 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
return buf.array();
}
@Override
public void close() {
inner.close();
}
public byte[] serializeBaseKey(String topic, Windowed<T> data) {
return inner.serialize(topic, data.value());
}
}

52
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
private final WindowedSerializer<K> serializer;
public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
this.serializer = serializer;
}
/**
* WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
* and the current number of partitions. The partition number id determined by the original key of the windowed key
* using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
*
* @param windowedKey the key of the message
* @param value the value of the message
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
// hash the keyBytes to choose a partition
return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
private static int toPositive(int number) {
return number & 0x7fffffff;
}
}

84
streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java

@ -0,0 +1,84 @@ @@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
public class WindowedStreamPartitionerTest {
private String topicName = "topic";
private IntegerSerializer keySerializer = new IntegerSerializer();
private StringSerializer valSerializer = new StringSerializer();
private List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
);
private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
@Test
public void testCopartitioning() {
Random rand = new Random();
DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
for (int k = 0; k < 10; k++) {
Integer key = rand.nextInt();
byte[] keyBytes = keySerializer.serialize(topicName, key);
String value = key.toString();
byte[] valueBytes = valSerializer.serialize(topicName, value);
Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
for (int w = 0; w < 10; w++) {
HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
Windowed<Integer> windowedKey = new Windowed<>(key, window);
Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
assertEquals(expected, actual);
}
}
}
}
Loading…
Cancel
Save