From 70afd5f9dd2eddc784a24fa2518992ef3371f0a4 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 19 Sep 2016 10:30:58 -0700 Subject: [PATCH] KAFKA-4175: Can't have StandbyTasks in KafkaStreams where NUM_STREAM_THREADS_CONFIG > 1 standby tasks should be assigned per consumer not per process Author: Damian Guy Reviewers: Eno Thereska, Guozhang Wang Closes #1862 from dguy/kafka-4175 --- .../streams/processor/internals/StreamPartitionAssignor.java | 3 ++- .../processor/internals/StreamPartitionAssignorTest.java | 5 ++++- .../org/apache/kafka/streams/smoketest/SmokeTestClient.java | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index b6cebf4f66f..3be9c114ba7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -418,10 +418,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } final int numConsumers = consumers.size(); - Map> standby = new HashMap<>(); + int i = 0; for (String consumer : consumers) { + Map> standby = new HashMap<>(); ArrayList assignedPartitions = new ArrayList<>(); final int numTaskIds = taskIds.size(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 4f4d2ebbc80..e46a016447d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -52,6 +52,7 @@ import java.util.Set; import java.util.UUID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; public class StreamPartitionAssignorTest { @@ -385,6 +386,8 @@ public class StreamPartitionAssignorTest { allActiveTasks.addAll(info11.activeTasks); allStandbyTasks.addAll(info11.standbyTasks.keySet()); + assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet()); + // check active tasks assigned to the first client assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); @@ -650,7 +653,7 @@ public class StreamPartitionAssignorTest { // pass } } - + @Test public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception { final Properties properties = configProps(); diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 53029008ff9..ba71e055550 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -85,7 +85,7 @@ public class SmokeTestClient extends SmokeTestUtil { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName()); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);