Browse Source

KAFKA-4175: Can't have StandbyTasks in KafkaStreams where NUM_STREAM_THREADS_CONFIG > 1

standby tasks should be assigned per consumer not per process

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1862 from dguy/kafka-4175
pull/1862/merge
Damian Guy 8 years ago committed by Guozhang Wang
parent
commit
70afd5f9dd
  1. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
  2. 5
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
  3. 2
      streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

@ -418,10 +418,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @@ -418,10 +418,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
final int numConsumers = consumers.size();
Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
int i = 0;
for (String consumer : consumers) {
Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
final int numTaskIds = taskIds.size();

5
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java

@ -52,6 +52,7 @@ import java.util.Set; @@ -52,6 +52,7 @@ import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
public class StreamPartitionAssignorTest {
@ -385,6 +386,8 @@ public class StreamPartitionAssignorTest { @@ -385,6 +386,8 @@ public class StreamPartitionAssignorTest {
allActiveTasks.addAll(info11.activeTasks);
allStandbyTasks.addAll(info11.standbyTasks.keySet());
assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
// check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
@ -650,7 +653,7 @@ public class StreamPartitionAssignorTest { @@ -650,7 +653,7 @@ public class StreamPartitionAssignorTest {
// pass
}
}
@Test
public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
final Properties properties = configProps();

2
streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java

@ -85,7 +85,7 @@ public class SmokeTestClient extends SmokeTestUtil { @@ -85,7 +85,7 @@ public class SmokeTestClient extends SmokeTestUtil {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);

Loading…
Cancel
Save