From a6fad27372f9a931540c2e4345e428b643535d46 Mon Sep 17 00:00:00 2001 From: Kamal C Date: Fri, 16 Mar 2018 01:32:28 +0530 Subject: [PATCH] KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651) Author: Kamal Chandraprakash Reviewer: Matthias J. Sax , Bill Bejeck --- .../processor/internals/AssignedTasks.java | 35 +++----------- .../processor/internals/StreamThread.java | 4 +- .../processor/internals/TaskManager.java | 17 ++++--- .../internals/AssignedStreamsTasksTest.java | 45 ++--------------- .../processor/internals/StreamThreadTest.java | 42 +++++++++++++--- .../processor/internals/TaskManagerTest.java | 48 +++++++------------ 6 files changed, 74 insertions(+), 117 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index c806bfde47e..92045713146 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -73,27 +73,12 @@ abstract class AssignedTasks { created.put(task.id(), task); } - Set uninitializedPartitions() { - if (created.isEmpty()) { - return Collections.emptySet(); - } - final Set partitions = new HashSet<>(); - for (final Map.Entry entry : created.entrySet()) { - if (entry.getValue().hasStateStores()) { - partitions.addAll(entry.getValue().partitions()); - } - } - return partitions; - } - /** - * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - Set initializeNewTasks() { - final Set readyPartitions = new HashSet<>(); + void initializeNewTasks() { if (!created.isEmpty()) { log.debug("Initializing {}s {}", taskTypeName, created.keySet()); } @@ -104,7 +89,7 @@ abstract class AssignedTasks { log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey()); addToRestoring(entry.getValue()); } else { - transitionToRunning(entry.getValue(), readyPartitions); + transitionToRunning(entry.getValue()); } it.remove(); } catch (final LockException e) { @@ -112,21 +97,19 @@ abstract class AssignedTasks { log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage()); } } - return readyPartitions; } - Set updateRestored(final Collection restored) { + void updateRestored(final Collection restored) { if (restored.isEmpty()) { - return Collections.emptySet(); + return; } log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored); - final Set resume = new HashSet<>(); restoredPartitions.addAll(restored); for (final Iterator> it = restoring.entrySet().iterator(); it.hasNext(); ) { final Map.Entry entry = it.next(); final T task = entry.getValue(); if (restoredPartitions.containsAll(task.changelogPartitions())) { - transitionToRunning(task, resume); + transitionToRunning(task); it.remove(); log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state", taskTypeName, @@ -146,7 +129,6 @@ abstract class AssignedTasks { if (allTasksRunning()) { restoredPartitions.clear(); } - return resume; } boolean allTasksRunning() { @@ -243,7 +225,7 @@ abstract class AssignedTasks { suspended.remove(taskId); task.resume(); try { - transitionToRunning(task, new HashSet()); + transitionToRunning(task); } catch (final TaskMigratedException e) { // we need to catch migration exception internally since this function // is triggered in the rebalance callback @@ -278,15 +260,12 @@ abstract class AssignedTasks { /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - private void transitionToRunning(final T task, final Set readyPartitions) { + private void transitionToRunning(final T task) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); task.initializeTopology(); for (TopicPartition topicPartition : task.partitions()) { runningByPartition.put(topicPartition, task); - if (task.hasStateStores()) { - readyPartitions.add(topicPartition); - } } for (TopicPartition topicPartition : task.changelogPartitions()) { runningByPartition.put(topicPartition, task); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9bbc0da26a5..02a4bb92137 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -899,7 +899,7 @@ public class StreamThread extends Thread { final StreamTask task = taskManager.activeTask(partition); if (task.isClosed()) { - log.warn("Stream task {} is already closed, probably because it got unexpectly migrated to another thread already. " + + log.warn("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " + "Notifying the thread to trigger a new rebalance immediately.", task.id()); throw new TaskMigratedException(task); } @@ -1065,7 +1065,7 @@ public class StreamThread extends Thread { } if (task.isClosed()) { - log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " + + log.warn("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " + "Notifying the thread to trigger a new rebalance immediately.", task.id()); throw new TaskMigratedException(task); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 80df5179a37..6308ca7fd80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -106,9 +106,9 @@ class TaskManager { active.closeNonAssignedSuspendedTasks(assignedActiveTasks); addStreamTasks(assignment); addStandbyTasks(); - final Set partitions = active.uninitializedPartitions(); - log.trace("Pausing partitions: {}", partitions); - consumer.pause(partitions); + // Pause all the partitions until the underlying state store is ready for all the active tasks. + log.trace("Pausing partitions: {}", assignment); + consumer.pause(assignment); } private void addStreamTasks(final Collection assignment) { @@ -312,18 +312,17 @@ class TaskManager { * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only) */ boolean updateNewAndRestoringTasks() { - final Set resumed = active.initializeNewTasks(); + active.initializeNewTasks(); standby.initializeNewTasks(); final Collection restored = changelogReader.restore(active); - resumed.addAll(active.updateRestored(restored)); + active.updateRestored(restored); - if (!resumed.isEmpty()) { - log.trace("Resuming partitions {}", resumed); - consumer.resume(resumed); - } if (active.allTasksRunning()) { + Set assignment = consumer.assignment(); + log.trace("Resuming partitions {}", assignment); + consumer.resume(assignment); assignStandbyPartitions(); return true; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index 246d0477eb2..fcd23220bac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -57,36 +57,6 @@ public class AssignedStreamsTasksTest { EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes(); } - @Test - public void shouldGetPartitionsFromNewTasksThatHaveStateStores() { - EasyMock.expect(t1.hasStateStores()).andReturn(true); - EasyMock.expect(t2.hasStateStores()).andReturn(true); - EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); - EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); - EasyMock.replay(t1, t2); - - assignedTasks.addNewTask(t1); - assignedTasks.addNewTask(t2); - - final Set partitions = assignedTasks.uninitializedPartitions(); - assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2))); - EasyMock.verify(t1, t2); - } - - @Test - public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() { - EasyMock.expect(t1.hasStateStores()).andReturn(false); - EasyMock.expect(t2.hasStateStores()).andReturn(false); - EasyMock.replay(t1, t2); - - assignedTasks.addNewTask(t1); - assignedTasks.addNewTask(t2); - - final Set partitions = assignedTasks.uninitializedPartitions(); - assertTrue(partitions.isEmpty()); - EasyMock.verify(t1, t2); - } - @Test public void shouldInitializeNewTasks() { EasyMock.expect(t1.initializeStateStores()).andReturn(false); @@ -112,19 +82,17 @@ public class AssignedStreamsTasksTest { final Set t2partitions = Collections.singleton(tp2); EasyMock.expect(t2.partitions()).andReturn(t2partitions); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()); - EasyMock.expect(t2.hasStateStores()).andReturn(true); EasyMock.replay(t1, t2); assignedTasks.addNewTask(t1); assignedTasks.addNewTask(t2); - final Set readyPartitions = assignedTasks.initializeNewTasks(); + assignedTasks.initializeNewTasks(); Collection restoring = assignedTasks.restoringTasks(); assertThat(restoring.size(), equalTo(1)); assertSame(restoring.iterator().next(), t1); - assertThat(readyPartitions, equalTo(t2partitions)); } @Test @@ -134,15 +102,13 @@ public class AssignedStreamsTasksTest { EasyMock.expectLastCall().once(); EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()); - EasyMock.expect(t2.hasStateStores()).andReturn(false); EasyMock.replay(t2); assignedTasks.addNewTask(t2); - final Set toResume = assignedTasks.initializeNewTasks(); + assignedTasks.initializeNewTasks(); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2))); - assertThat(toResume, equalTo(Collections.emptySet())); } @Test @@ -158,9 +124,9 @@ public class AssignedStreamsTasksTest { addAndInitTask(); - assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty()); - Set partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2)); - assertThat(partitions, equalTo(task1Partitions)); + assignedTasks.updateRestored(Utils.mkSet(changeLog1)); + assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.emptySet())); + assignedTasks.updateRestored(Utils.mkSet(changeLog2)); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1))); } @@ -282,7 +248,6 @@ public class AssignedStreamsTasksTest { EasyMock.expectLastCall().once(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()); - EasyMock.expect(t1.hasStateStores()).andReturn(false); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 482b764f3ff..b22d98ee41c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -158,6 +158,10 @@ public class StreamThreadTest { // assign single partition assignedPartitions = Collections.singletonList(t1p1); thread.taskManager().setAssignmentMetadata(Collections.>emptyMap(), Collections.>emptyMap()); + + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); assertEquals(thread.state(), StreamThread.State.RUNNING); @@ -378,8 +382,13 @@ public class StreamThreadTest { activeTasks.put(task2, Collections.singleton(t1p2)); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); - thread.taskManager().createTasks(assignedPartitions); + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + Map beginOffsets = new HashMap<>(); + beginOffsets.put(t1p1, 0L); + beginOffsets.put(t1p2, 0L); + mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); assertEquals(1, clientSupplier.producers.size()); @@ -411,6 +420,12 @@ public class StreamThreadTest { thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + Map beginOffsets = new HashMap<>(); + beginOffsets.put(t1p1, 0L); + beginOffsets.put(t1p2, 0L); + mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); thread.runOnce(-1); @@ -439,7 +454,12 @@ public class StreamThreadTest { activeTasks.put(task2, Collections.singleton(t1p2)); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); - thread.taskManager().createTasks(assignedPartitions); + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + Map beginOffsets = new HashMap<>(); + beginOffsets.put(t1p1, 0L); + beginOffsets.put(t1p2, 0L); + mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); @@ -595,6 +615,9 @@ public class StreamThreadTest { thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); @@ -659,6 +682,10 @@ public class StreamThreadTest { activeTasks.put(task1, Collections.singleton(t1p1)); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); + + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); @@ -714,8 +741,10 @@ public class StreamThreadTest { activeTasks.put(task1, Collections.singleton(t1p1)); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); - thread.taskManager().createTasks(assignedPartitions); + final MockConsumer mockConsumer = (MockConsumer) thread.consumer; + mockConsumer.assign(assignedPartitions); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); @@ -883,9 +912,9 @@ public class StreamThreadTest { thread.taskManager().setAssignmentMetadata(activeTasks, Collections.>emptyMap()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); clientSupplier.consumer.assign(assignedPartitions); clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); @@ -1074,17 +1103,18 @@ public class StreamThreadTest { thread.setState(StreamThread.State.RUNNING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); - final Set assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition())); + final Set assignedPartitions = Collections.singleton(t1p1); thread.taskManager().setAssignmentMetadata( Collections.singletonMap( new TaskId(0, t1p1.partition()), assignedPartitions), Collections.>emptyMap()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); final MockConsumer mockConsumer = (MockConsumer) thread.consumer; mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.runOnce(-1); final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 648e9b080cd..e8ef7ea4925 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -117,7 +117,7 @@ public class TaskManagerTest { public final TemporaryFolder testFolder = new TemporaryFolder(); @Before - public void setUp() throws Exception { + public void setUp() { taskManager = new TaskManager(changeLogReader, UUID.randomUUID(), "", @@ -324,11 +324,9 @@ public class TaskManagerTest { verify(standby, standbyTaskCreator); } - @Test - public void shouldPauseActiveUninitializedPartitions() { + public void shouldPauseActivePartitions() { mockSingleActiveTask(); - EasyMock.expect(active.uninitializedPartitions()).andReturn(taskId0Partitions); consumer.pause(taskId0Partitions); EasyMock.expectLastCall(); replay(); @@ -415,21 +413,17 @@ public class TaskManagerTest { @Test public void shouldInitializeNewActiveTasks() { - EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet()); - EasyMock.expect(active.updateRestored(EasyMock.>anyObject())). - andReturn(Collections.emptySet()); + active.updateRestored(EasyMock.>anyObject()); EasyMock.expectLastCall(); replay(); + taskManager.updateNewAndRestoringTasks(); verify(active); } @Test public void shouldInitializeNewStandbyTasks() { - EasyMock.expect(standby.initializeNewTasks()).andReturn(new HashSet()); - EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet()); - EasyMock.expect(active.updateRestored(EasyMock.>anyObject())). - andReturn(Collections.emptySet()); + active.updateRestored(EasyMock.>anyObject()); EasyMock.expectLastCall(); replay(); @@ -439,22 +433,21 @@ public class TaskManagerTest { @Test public void shouldRestoreStateFromChangeLogReader() { - EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet()); EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); - EasyMock.expect(active.updateRestored(taskId0Partitions)). - andReturn(Collections.emptySet()); - + active.updateRestored(taskId0Partitions); + EasyMock.expectLastCall(); replay(); + taskManager.updateNewAndRestoringTasks(); verify(changeLogReader, active); } @Test public void shouldResumeRestoredPartitions() { - EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet()); EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); - EasyMock.expect(active.updateRestored(taskId0Partitions)). - andReturn(taskId0Partitions); + EasyMock.expect(active.allTasksRunning()).andReturn(true); + EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions); + EasyMock.expect(standby.running()).andReturn(Collections.emptySet()); consumer.resume(taskId0Partitions); EasyMock.expectLastCall(); @@ -475,10 +468,7 @@ public class TaskManagerTest { @Test public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { - EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet()); EasyMock.expect(active.allTasksRunning()).andReturn(false); - EasyMock.expect(active.updateRestored(EasyMock.>anyObject())). - andReturn(Collections.emptySet()); replay(); assertFalse(taskManager.updateNewAndRestoringTasks()); @@ -626,16 +616,13 @@ public class TaskManagerTest { } @Test - public void shouldResumeConsumptionOfInitializedPartitions() { - final Set resumed = Collections.singleton(new TopicPartition("topic", 0)); - EasyMock.expect(active.initializeNewTasks()).andReturn(resumed); - EasyMock.expect(active.updateRestored(EasyMock.>anyObject())). - andReturn(Collections.emptySet()); - consumer.resume(resumed); - EasyMock.expectLastCall(); - + public void shouldNotResumeConsumptionUntilAllStoresRestored() { + EasyMock.expect(active.allTasksRunning()).andReturn(false); + Consumer consumer = (Consumer) EasyMock.createStrictMock(Consumer.class); + taskManager.setConsumer(consumer); EasyMock.replay(active, consumer); + // shouldn't invoke `resume` method in consumer taskManager.updateNewAndRestoringTasks(); EasyMock.verify(consumer); } @@ -662,10 +649,7 @@ public class TaskManagerTest { private void mockAssignStandbyPartitions(final long offset) { final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class); - EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet()); EasyMock.expect(active.allTasksRunning()).andReturn(true); - EasyMock.expect(active.updateRestored(EasyMock.>anyObject())). - andReturn(Collections.emptySet()); EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task)); EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset)); restoreConsumer.assign(taskId0Partitions);