Browse Source

KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651)

Author:  Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

Reviewer: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
pull/4716/merge
Kamal C 7 years ago committed by Matthias J. Sax
parent
commit
a6fad27372
  1. 35
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
  2. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  3. 17
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  4. 45
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
  5. 42
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  6. 48
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

35
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -73,27 +73,12 @@ abstract class AssignedTasks<T extends Task> {
created.put(task.id(), task); created.put(task.id(), task);
} }
Set<TopicPartition> uninitializedPartitions() {
if (created.isEmpty()) {
return Collections.emptySet();
}
final Set<TopicPartition> partitions = new HashSet<>();
for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
if (entry.getValue().hasStateStores()) {
partitions.addAll(entry.getValue().partitions());
}
}
return partitions;
}
/** /**
* @return partitions that are ready to be resumed
* @throws IllegalStateException If store gets registered after initialized is already finished * @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition * @throws StreamsException if the store's change log does not contain the partition
* @throws TaskMigratedException if the task producer got fenced (EOS only) * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/ */
Set<TopicPartition> initializeNewTasks() { void initializeNewTasks() {
final Set<TopicPartition> readyPartitions = new HashSet<>();
if (!created.isEmpty()) { if (!created.isEmpty()) {
log.debug("Initializing {}s {}", taskTypeName, created.keySet()); log.debug("Initializing {}s {}", taskTypeName, created.keySet());
} }
@ -104,7 +89,7 @@ abstract class AssignedTasks<T extends Task> {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey()); log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
addToRestoring(entry.getValue()); addToRestoring(entry.getValue());
} else { } else {
transitionToRunning(entry.getValue(), readyPartitions); transitionToRunning(entry.getValue());
} }
it.remove(); it.remove();
} catch (final LockException e) { } catch (final LockException e) {
@ -112,21 +97,19 @@ abstract class AssignedTasks<T extends Task> {
log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage()); log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
} }
} }
return readyPartitions;
} }
Set<TopicPartition> updateRestored(final Collection<TopicPartition> restored) { void updateRestored(final Collection<TopicPartition> restored) {
if (restored.isEmpty()) { if (restored.isEmpty()) {
return Collections.emptySet(); return;
} }
log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored); log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
final Set<TopicPartition> resume = new HashSet<>();
restoredPartitions.addAll(restored); restoredPartitions.addAll(restored);
for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) { for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, T> entry = it.next(); final Map.Entry<TaskId, T> entry = it.next();
final T task = entry.getValue(); final T task = entry.getValue();
if (restoredPartitions.containsAll(task.changelogPartitions())) { if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task, resume); transitionToRunning(task);
it.remove(); it.remove();
log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state", log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state",
taskTypeName, taskTypeName,
@ -146,7 +129,6 @@ abstract class AssignedTasks<T extends Task> {
if (allTasksRunning()) { if (allTasksRunning()) {
restoredPartitions.clear(); restoredPartitions.clear();
} }
return resume;
} }
boolean allTasksRunning() { boolean allTasksRunning() {
@ -243,7 +225,7 @@ abstract class AssignedTasks<T extends Task> {
suspended.remove(taskId); suspended.remove(taskId);
task.resume(); task.resume();
try { try {
transitionToRunning(task, new HashSet<TopicPartition>()); transitionToRunning(task);
} catch (final TaskMigratedException e) { } catch (final TaskMigratedException e) {
// we need to catch migration exception internally since this function // we need to catch migration exception internally since this function
// is triggered in the rebalance callback // is triggered in the rebalance callback
@ -278,15 +260,12 @@ abstract class AssignedTasks<T extends Task> {
/** /**
* @throws TaskMigratedException if the task producer got fenced (EOS only) * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/ */
private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) { private void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id()); log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task); running.put(task.id(), task);
task.initializeTopology(); task.initializeTopology();
for (TopicPartition topicPartition : task.partitions()) { for (TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task); runningByPartition.put(topicPartition, task);
if (task.hasStateStores()) {
readyPartitions.add(topicPartition);
}
} }
for (TopicPartition topicPartition : task.changelogPartitions()) { for (TopicPartition topicPartition : task.changelogPartitions()) {
runningByPartition.put(topicPartition, task); runningByPartition.put(topicPartition, task);

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -899,7 +899,7 @@ public class StreamThread extends Thread {
final StreamTask task = taskManager.activeTask(partition); final StreamTask task = taskManager.activeTask(partition);
if (task.isClosed()) { if (task.isClosed()) {
log.warn("Stream task {} is already closed, probably because it got unexpectly migrated to another thread already. " + log.warn("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id()); "Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task); throw new TaskMigratedException(task);
} }
@ -1065,7 +1065,7 @@ public class StreamThread extends Thread {
} }
if (task.isClosed()) { if (task.isClosed()) {
log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " + log.warn("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id()); "Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task); throw new TaskMigratedException(task);
} }

17
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -106,9 +106,9 @@ class TaskManager {
active.closeNonAssignedSuspendedTasks(assignedActiveTasks); active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
addStreamTasks(assignment); addStreamTasks(assignment);
addStandbyTasks(); addStandbyTasks();
final Set<TopicPartition> partitions = active.uninitializedPartitions(); // Pause all the partitions until the underlying state store is ready for all the active tasks.
log.trace("Pausing partitions: {}", partitions); log.trace("Pausing partitions: {}", assignment);
consumer.pause(partitions); consumer.pause(assignment);
} }
private void addStreamTasks(final Collection<TopicPartition> assignment) { private void addStreamTasks(final Collection<TopicPartition> assignment) {
@ -312,18 +312,17 @@ class TaskManager {
* @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only) * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
*/ */
boolean updateNewAndRestoringTasks() { boolean updateNewAndRestoringTasks() {
final Set<TopicPartition> resumed = active.initializeNewTasks(); active.initializeNewTasks();
standby.initializeNewTasks(); standby.initializeNewTasks();
final Collection<TopicPartition> restored = changelogReader.restore(active); final Collection<TopicPartition> restored = changelogReader.restore(active);
resumed.addAll(active.updateRestored(restored)); active.updateRestored(restored);
if (!resumed.isEmpty()) {
log.trace("Resuming partitions {}", resumed);
consumer.resume(resumed);
}
if (active.allTasksRunning()) { if (active.allTasksRunning()) {
Set<TopicPartition> assignment = consumer.assignment();
log.trace("Resuming partitions {}", assignment);
consumer.resume(assignment);
assignStandbyPartitions(); assignStandbyPartitions();
return true; return true;
} }

45
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

@ -57,36 +57,6 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes(); EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
} }
@Test
public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
EasyMock.expect(t1.hasStateStores()).andReturn(true);
EasyMock.expect(t2.hasStateStores()).andReturn(true);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
EasyMock.replay(t1, t2);
assignedTasks.addNewTask(t1);
assignedTasks.addNewTask(t2);
final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
EasyMock.verify(t1, t2);
}
@Test
public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
EasyMock.expect(t1.hasStateStores()).andReturn(false);
EasyMock.expect(t2.hasStateStores()).andReturn(false);
EasyMock.replay(t1, t2);
assignedTasks.addNewTask(t1);
assignedTasks.addNewTask(t2);
final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
assertTrue(partitions.isEmpty());
EasyMock.verify(t1, t2);
}
@Test @Test
public void shouldInitializeNewTasks() { public void shouldInitializeNewTasks() {
EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.initializeStateStores()).andReturn(false);
@ -112,19 +82,17 @@ public class AssignedStreamsTasksTest {
final Set<TopicPartition> t2partitions = Collections.singleton(tp2); final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
EasyMock.expect(t2.partitions()).andReturn(t2partitions); EasyMock.expect(t2.partitions()).andReturn(t2partitions);
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t2.hasStateStores()).andReturn(true);
EasyMock.replay(t1, t2); EasyMock.replay(t1, t2);
assignedTasks.addNewTask(t1); assignedTasks.addNewTask(t1);
assignedTasks.addNewTask(t2); assignedTasks.addNewTask(t2);
final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks(); assignedTasks.initializeNewTasks();
Collection<StreamTask> restoring = assignedTasks.restoringTasks(); Collection<StreamTask> restoring = assignedTasks.restoringTasks();
assertThat(restoring.size(), equalTo(1)); assertThat(restoring.size(), equalTo(1));
assertSame(restoring.iterator().next(), t1); assertSame(restoring.iterator().next(), t1);
assertThat(readyPartitions, equalTo(t2partitions));
} }
@Test @Test
@ -134,15 +102,13 @@ public class AssignedStreamsTasksTest {
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t2.hasStateStores()).andReturn(false);
EasyMock.replay(t2); EasyMock.replay(t2);
assignedTasks.addNewTask(t2); assignedTasks.addNewTask(t2);
final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks(); assignedTasks.initializeNewTasks();
assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2))); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
} }
@Test @Test
@ -158,9 +124,9 @@ public class AssignedStreamsTasksTest {
addAndInitTask(); addAndInitTask();
assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty()); assignedTasks.updateRestored(Utils.mkSet(changeLog1));
Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2)); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.<TaskId>emptySet()));
assertThat(partitions, equalTo(task1Partitions)); assignedTasks.updateRestored(Utils.mkSet(changeLog2));
assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1))); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
} }
@ -282,7 +248,6 @@ public class AssignedStreamsTasksTest {
EasyMock.expectLastCall().once(); EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t1.hasStateStores()).andReturn(false);
} }
@Test @Test

42
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -158,6 +158,10 @@ public class StreamThreadTest {
// assign single partition // assign single partition
assignedPartitions = Collections.singletonList(t1p1); assignedPartitions = Collections.singletonList(t1p1);
thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
rebalanceListener.onPartitionsAssigned(assignedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1); thread.runOnce(-1);
assertEquals(thread.state(), StreamThread.State.RUNNING); assertEquals(thread.state(), StreamThread.State.RUNNING);
@ -378,8 +382,13 @@ public class StreamThreadTest {
activeTasks.put(task2, Collections.singleton(t1p2)); activeTasks.put(task2, Collections.singleton(t1p2));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
thread.taskManager().createTasks(assignedPartitions);
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
Map<TopicPartition, Long> beginOffsets = new HashMap<>();
beginOffsets.put(t1p1, 0L);
beginOffsets.put(t1p2, 0L);
mockConsumer.updateBeginningOffsets(beginOffsets);
thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
assertEquals(1, clientSupplier.producers.size()); assertEquals(1, clientSupplier.producers.size());
@ -411,6 +420,12 @@ public class StreamThreadTest {
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
Map<TopicPartition, Long> beginOffsets = new HashMap<>();
beginOffsets.put(t1p1, 0L);
beginOffsets.put(t1p2, 0L);
mockConsumer.updateBeginningOffsets(beginOffsets);
thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
thread.runOnce(-1); thread.runOnce(-1);
@ -439,7 +454,12 @@ public class StreamThreadTest {
activeTasks.put(task2, Collections.singleton(t1p2)); activeTasks.put(task2, Collections.singleton(t1p2));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
thread.taskManager().createTasks(assignedPartitions); final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
Map<TopicPartition, Long> beginOffsets = new HashMap<>();
beginOffsets.put(t1p1, 0L);
beginOffsets.put(t1p2, 0L);
mockConsumer.updateBeginningOffsets(beginOffsets);
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
@ -595,6 +615,9 @@ public class StreamThreadTest {
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1); thread.runOnce(-1);
@ -659,6 +682,10 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1)); activeTasks.put(task1, Collections.singleton(t1p1));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1); thread.runOnce(-1);
@ -714,8 +741,10 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1)); activeTasks.put(task1, Collections.singleton(t1p1));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
thread.taskManager().createTasks(assignedPartitions);
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(assignedPartitions);
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1); thread.runOnce(-1);
@ -883,9 +912,9 @@ public class StreamThreadTest {
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
clientSupplier.consumer.assign(assignedPartitions); clientSupplier.consumer.assign(assignedPartitions);
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1); thread.runOnce(-1);
@ -1074,17 +1103,18 @@ public class StreamThreadTest {
thread.setState(StreamThread.State.RUNNING); thread.setState(StreamThread.State.RUNNING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED); thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition())); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
thread.taskManager().setAssignmentMetadata( thread.taskManager().setAssignmentMetadata(
Collections.singletonMap( Collections.singletonMap(
new TaskId(0, t1p1.partition()), new TaskId(0, t1p1.partition()),
assignedPartitions), assignedPartitions),
Collections.<TaskId, Set<TopicPartition>>emptyMap()); Collections.<TaskId, Set<TopicPartition>>emptyMap());
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer; final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.assign(Collections.singleton(t1p1));
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1);
final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue()); assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());

48
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -117,7 +117,7 @@ public class TaskManagerTest {
public final TemporaryFolder testFolder = new TemporaryFolder(); public final TemporaryFolder testFolder = new TemporaryFolder();
@Before @Before
public void setUp() throws Exception { public void setUp() {
taskManager = new TaskManager(changeLogReader, taskManager = new TaskManager(changeLogReader,
UUID.randomUUID(), UUID.randomUUID(),
"", "",
@ -324,11 +324,9 @@ public class TaskManagerTest {
verify(standby, standbyTaskCreator); verify(standby, standbyTaskCreator);
} }
@Test @Test
public void shouldPauseActiveUninitializedPartitions() { public void shouldPauseActivePartitions() {
mockSingleActiveTask(); mockSingleActiveTask();
EasyMock.expect(active.uninitializedPartitions()).andReturn(taskId0Partitions);
consumer.pause(taskId0Partitions); consumer.pause(taskId0Partitions);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
replay(); replay();
@ -415,21 +413,17 @@ public class TaskManagerTest {
@Test @Test
public void shouldInitializeNewActiveTasks() { public void shouldInitializeNewActiveTasks() {
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
andReturn(Collections.<TopicPartition>emptySet());
EasyMock.expectLastCall(); EasyMock.expectLastCall();
replay(); replay();
taskManager.updateNewAndRestoringTasks(); taskManager.updateNewAndRestoringTasks();
verify(active); verify(active);
} }
@Test @Test
public void shouldInitializeNewStandbyTasks() { public void shouldInitializeNewStandbyTasks() {
EasyMock.expect(standby.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
andReturn(Collections.<TopicPartition>emptySet());
EasyMock.expectLastCall(); EasyMock.expectLastCall();
replay(); replay();
@ -439,22 +433,21 @@ public class TaskManagerTest {
@Test @Test
public void shouldRestoreStateFromChangeLogReader() { public void shouldRestoreStateFromChangeLogReader() {
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
EasyMock.expect(active.updateRestored(taskId0Partitions)). active.updateRestored(taskId0Partitions);
andReturn(Collections.<TopicPartition>emptySet()); EasyMock.expectLastCall();
replay(); replay();
taskManager.updateNewAndRestoringTasks(); taskManager.updateNewAndRestoringTasks();
verify(changeLogReader, active); verify(changeLogReader, active);
} }
@Test @Test
public void shouldResumeRestoredPartitions() { public void shouldResumeRestoredPartitions() {
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
EasyMock.expect(active.updateRestored(taskId0Partitions)). EasyMock.expect(active.allTasksRunning()).andReturn(true);
andReturn(taskId0Partitions); EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
consumer.resume(taskId0Partitions); consumer.resume(taskId0Partitions);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
@ -475,10 +468,7 @@ public class TaskManagerTest {
@Test @Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.allTasksRunning()).andReturn(false); EasyMock.expect(active.allTasksRunning()).andReturn(false);
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
andReturn(Collections.<TopicPartition>emptySet());
replay(); replay();
assertFalse(taskManager.updateNewAndRestoringTasks()); assertFalse(taskManager.updateNewAndRestoringTasks());
@ -626,16 +616,13 @@ public class TaskManagerTest {
} }
@Test @Test
public void shouldResumeConsumptionOfInitializedPartitions() { public void shouldNotResumeConsumptionUntilAllStoresRestored() {
final Set<TopicPartition> resumed = Collections.singleton(new TopicPartition("topic", 0)); EasyMock.expect(active.allTasksRunning()).andReturn(false);
EasyMock.expect(active.initializeNewTasks()).andReturn(resumed); Consumer<byte[], byte[]> consumer = (Consumer<byte[], byte[]>) EasyMock.createStrictMock(Consumer.class);
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). taskManager.setConsumer(consumer);
andReturn(Collections.<TopicPartition>emptySet());
consumer.resume(resumed);
EasyMock.expectLastCall();
EasyMock.replay(active, consumer); EasyMock.replay(active, consumer);
// shouldn't invoke `resume` method in consumer
taskManager.updateNewAndRestoringTasks(); taskManager.updateNewAndRestoringTasks();
EasyMock.verify(consumer); EasyMock.verify(consumer);
} }
@ -662,10 +649,7 @@ public class TaskManagerTest {
private void mockAssignStandbyPartitions(final long offset) { private void mockAssignStandbyPartitions(final long offset) {
final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class); final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.allTasksRunning()).andReturn(true); EasyMock.expect(active.allTasksRunning()).andReturn(true);
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
andReturn(Collections.<TopicPartition>emptySet());
EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task)); EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset)); EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
restoreConsumer.assign(taskId0Partitions); restoreConsumer.assign(taskId0Partitions);

Loading…
Cancel
Save