Browse Source

HOTFIX: safely clear all active state in onPartitionsLost (#7691)

After a number of last minute bugs were found stemming from the incremental closing of lost tasks in StreamsRebalanceListener#onPartitionsLost, a safer approach to this edge case seems warranted. We initially wanted to be as "future-proof" as possible, and avoid baking further protocol assumptions into the code that may be broken as the protocol evolves. This meant that rather than simply closing all active tasks and clearing all associated state in #onPartitionsLost(lostPartitions) we would loop through the lostPartitions/lost tasks and remove them one by one from the various data structures/assignments, then verify that everything was empty in the end. This verification in particular has caused us significant trouble, as it turns out to be nontrivial to determine what should in fact be empty, and if so whether it is also being correctly updated.

Therefore, before worrying about it being "future-proof" it seems we should make sure it is "present-day-proof" and implement this callback in the safest possible way, by blindly clearing and closing all active task state. We log all the relevant state (at debug level) before clearing it, so we can at least tell from the logs whether/which emptiness checks were being violated.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Andrew Choi <andchoi@linkedin.com>
pull/4237/merge
A. Sophie Blee-Goldman 5 years ago committed by Bill Bejeck
parent
commit
41a9e2c7c3
  1. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
  2. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
  3. 128
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
  4. 64
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
  5. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  6. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
  7. 58
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  8. 62
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

3
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java

@ -188,7 +188,8 @@ public interface ConsumerRebalanceListener { @@ -188,7 +188,8 @@ public interface ConsumerRebalanceListener {
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the consumer and now have been reassigned
* to other consumers (may not include all currently assigned partitions, i.e. there may still
* to other consumers. With the current protocol this will always include all of the consumer's
* previously assigned partitions, but this may change in future protocols (ie there would still
* be some partitions left)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -79,7 +80,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> { @@ -79,7 +80,7 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
} catch (final RuntimeException e) {
log.error("Closing the standby task {} failed due to the following error:", task.id(), e);
} finally {
removeTaskFromRunning(task);
removeTaskFromAllStateMaps(task, Collections.emptyMap());
revokedChangelogs.addAll(task.changelogPartitions());
}
}

128
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
@ -83,6 +84,15 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -83,6 +84,15 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
boolean hasRestoringTasks() {
return !restoring.isEmpty();
}
void clearRestoringPartitions() {
if (!restoring.isEmpty()) {
log.error("Tried to clear restoring partitions but was still restoring the stream tasks {}", restoring);
throw new IllegalStateException("Should not clear restoring partitions while set of restoring tasks is non-empty");
}
restoredPartitions.clear();
restoringByPartition.clear();
}
Set<TaskId> suspendedTaskIds() {
return suspended.keySet();
@ -152,7 +162,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -152,7 +162,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
id, f);
}
} finally {
removeTaskFromRunning(task);
removeTaskFromAllStateMaps(task, suspended);
taskChangelogs.addAll(task.changelogPartitions());
}
}
@ -189,10 +199,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -189,10 +199,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
}
private RuntimeException closeRunning(final boolean isZombie,
final StreamTask task,
final List<TopicPartition> closedTaskChangelogs) {
removeTaskFromRunning(task);
closedTaskChangelogs.addAll(task.changelogPartitions());
final StreamTask task) {
removeTaskFromAllStateMaps(task, Collections.emptyMap());
try {
final boolean clean = !isZombie;
@ -208,7 +216,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -208,7 +216,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
private RuntimeException closeNonRunning(final boolean isZombie,
final StreamTask task,
final List<TopicPartition> closedTaskChangelogs) {
created.remove(task.id());
removeTaskFromAllStateMaps(task, Collections.emptyMap());
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
@ -221,10 +229,11 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -221,10 +229,11 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
return null;
}
// Since a restoring task has not had its topology initialized yet, we need only close the state manager
private RuntimeException closeRestoring(final boolean isZombie,
final StreamTask task,
final List<TopicPartition> closedTaskChangelogs) {
removeTaskFromRestoring(task);
removeTaskFromAllStateMaps(task, Collections.emptyMap());
closedTaskChangelogs.addAll(task.changelogPartitions());
try {
@ -240,7 +249,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -240,7 +249,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
private RuntimeException closeSuspended(final boolean isZombie,
final StreamTask task) {
suspended.remove(task.id());
removeTaskFromAllStateMaps(task, Collections.emptyMap());
try {
final boolean clean = !isZombie;
@ -269,37 +278,30 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -269,37 +278,30 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
return firstException.get();
}
RuntimeException closeZombieTasks(final Set<TaskId> lostTasks, final List<TopicPartition> lostTaskChangelogs) {
RuntimeException closeAllTasksAsZombies() {
log.debug("Closing all active tasks as zombies, current state of active tasks: {}", toString());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
final List<TopicPartition> changelogs = new ArrayList<>(); // not used, as we clear/unsubscribe all changelogs
for (final TaskId id : lostTasks) {
if (suspended.containsKey(id)) {
log.debug("Closing the zombie suspended stream task {}.", id);
firstException.compareAndSet(null, closeSuspended(true, suspended.get(id)));
for (final TaskId id : allAssignedTaskIds()) {
if (running.containsKey(id)) {
log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true, running.get(id)));
} else if (created.containsKey(id)) {
log.debug("Closing the zombie created stream task {}.", id);
firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs));
firstException.compareAndSet(null, closeNonRunning(true, created.get(id), changelogs));
} else if (restoring.containsKey(id)) {
log.debug("Closing the zombie restoring stream task {}.", id);
firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));
} else {
log.warn("Skipping closing the zombie stream task {} as it was already removed.", id);
firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), changelogs));
} else if (suspended.containsKey(id)) {
log.debug("Closing the zombie suspended stream task {}.", id);
firstException.compareAndSet(null, closeSuspended(true, suspended.get(id)));
}
}
// We always clear the prevActiveTasks and replace with current set of running tasks to encode in subscription
// We should exclude any tasks that were lost however, they will be counted as standbys for assignment purposes
prevActiveTasks.clear();
prevActiveTasks.addAll(running.keySet());
clear();
// With the current rebalance protocol, there should not be any running tasks left as they were all lost
if (!prevActiveTasks.isEmpty()) {
log.error("Found the still running stream tasks {} after closing all tasks lost as zombies", prevActiveTasks);
firstException.compareAndSet(null, new IllegalStateException("Not all lost tasks were closed as zombies"));
}
return firstException.get();
}
@ -311,7 +313,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -311,7 +313,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
if (suspended.containsKey(taskId)) {
final StreamTask task = suspended.get(taskId);
log.trace("Found suspended stream task {}", taskId);
suspended.remove(taskId);
removeTaskFromAllStateMaps(task, Collections.emptyMap());
if (task.partitions().equals(partitions)) {
task.resume();
@ -346,8 +348,12 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -346,8 +348,12 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task);
it.remove();
restoringByPartition.keySet().removeAll(task.partitions());
restoringByPartition.keySet().removeAll(task.changelogPartitions());
// Note that because we add back all restored partitions at the top of this loop, clearing them from
// restoredPartitions here doesn't really matter. We do it anyway as it is the correct thing to do,
// and may matter with future changes.
removeFromRestoredPartitions(task);
removeFromRestoringByPartition(task);
log.debug("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state",
task.id(),
task.changelogPartitions());
@ -372,6 +378,24 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -372,6 +378,24 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
}
}
@Override
void removeTaskFromAllStateMaps(final StreamTask task, final Map<TaskId, StreamTask> currentStateMap) {
super.removeTaskFromAllStateMaps(task, currentStateMap);
final TaskId id = task.id();
final Set<TopicPartition> taskPartitions = new HashSet<>(task.partitions());
taskPartitions.addAll(task.changelogPartitions());
if (currentStateMap != restoring) {
restoring.remove(id);
restoringByPartition.keySet().removeAll(taskPartitions);
restoredPartitions.removeAll(taskPartitions);
}
if (currentStateMap != suspended) {
suspended.remove(id);
}
}
void addTaskToRestoring(final StreamTask task) {
restoring.put(task.id(), task);
for (final TopicPartition topicPartition : task.partitions()) {
@ -382,16 +406,14 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -382,16 +406,14 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
}
}
private void removeTaskFromRestoring(final StreamTask task) {
restoring.remove(task.id());
for (final TopicPartition topicPartition : task.partitions()) {
restoredPartitions.remove(topicPartition);
restoringByPartition.remove(topicPartition);
}
for (final TopicPartition topicPartition : task.changelogPartitions()) {
restoredPartitions.remove(topicPartition);
restoringByPartition.remove(topicPartition);
}
private void removeFromRestoringByPartition(final StreamTask task) {
restoringByPartition.keySet().removeAll(task.partitions());
restoringByPartition.keySet().removeAll(task.changelogPartitions());
}
private void removeFromRestoredPartitions(final StreamTask task) {
restoredPartitions.removeAll(task.partitions());
restoredPartitions.removeAll(task.changelogPartitions());
}
/**
@ -497,6 +519,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -497,6 +519,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
restoringByPartition.clear();
restoredPartitions.clear();
suspended.clear();
prevActiveTasks.clear();
}
@Override
@ -511,26 +534,13 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -511,26 +534,13 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
super.shutdown(clean);
}
@Override
public boolean isEmpty() throws IllegalStateException {
if (restoring.isEmpty() && !restoringByPartition.isEmpty()) {
log.error("Assigned stream tasks in an inconsistent state: the set of restoring tasks is empty but the " +
"restoring by partitions map contained {}", restoringByPartition);
throw new IllegalStateException("Found inconsistent state: no tasks restoring but nonempty restoringByPartition");
} else {
return super.isEmpty()
&& restoring.isEmpty()
&& restoringByPartition.isEmpty()
&& restoredPartitions.isEmpty()
&& suspended.isEmpty();
}
}
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(super.toString(indent));
describe(builder, restoring.values(), indent, "Restoring:");
describe(builder, suspended.values(), indent, "Suspended:");
describeTasks(builder, restoring.values(), indent, "Restoring:");
describePartitions(builder, restoringByPartition.keySet(), indent, "Restoring Partitions:");
describePartitions(builder, restoredPartitions, indent, "Restored Partitions:");
describeTasks(builder, suspended.values(), indent, "Suspended:");
return builder.toString();
}

64
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -69,12 +69,17 @@ abstract class AssignedTasks<T extends Task> { @@ -69,12 +69,17 @@ abstract class AssignedTasks<T extends Task> {
try {
final T task = entry.getValue();
task.initializeMetadata();
// don't remove from created until the task has been successfully initialized
removeTaskFromAllStateMaps(task, created);
if (!task.initializeStateStores()) {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
((AssignedStreamsTasks) this).addTaskToRestoring((StreamTask) task);
} else {
transitionToRunning(task);
}
it.remove();
} catch (final LockException e) {
// If this is a permanent error, then we could spam the log since this is in the run loop. But, other related
@ -121,10 +126,25 @@ abstract class AssignedTasks<T extends Task> { @@ -121,10 +126,25 @@ abstract class AssignedTasks<T extends Task> {
}
}
void removeTaskFromRunning(final T task) {
running.remove(task.id());
runningByPartition.keySet().removeAll(task.partitions());
runningByPartition.keySet().removeAll(task.changelogPartitions());
/**
* Removes the passed in task (and its corresponding partitions) from all state maps and sets,
* except for the one it currently resides in.
*
* @param task the task to be removed
* @param currentStateMap the current state map, which the task should not be removed from
*/
void removeTaskFromAllStateMaps(final T task, final Map<TaskId, T> currentStateMap) {
final TaskId id = task.id();
final Set<TopicPartition> taskPartitions = new HashSet<>(task.partitions());
taskPartitions.addAll(task.changelogPartitions());
if (currentStateMap != running) {
running.remove(id);
runningByPartition.keySet().removeAll(taskPartitions);
}
if (currentStateMap != created) {
created.remove(id);
}
}
T runningTaskFor(final TopicPartition partition) {
@ -146,15 +166,16 @@ abstract class AssignedTasks<T extends Task> { @@ -146,15 +166,16 @@ abstract class AssignedTasks<T extends Task> {
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
describe(builder, running.values(), indent, "Running:");
describe(builder, created.values(), indent, "New:");
describeTasks(builder, running.values(), indent, "Running:");
describePartitions(builder, runningByPartition.keySet(), indent, "Running Partitions:");
describeTasks(builder, created.values(), indent, "New:");
return builder.toString();
}
void describe(final StringBuilder builder,
final Collection<T> tasks,
final String indent,
final String name) {
void describeTasks(final StringBuilder builder,
final Collection<T> tasks,
final String indent,
final String name) {
builder.append(indent).append(name);
for (final T t : tasks) {
builder.append(indent).append(t.toString(indent + "\t\t"));
@ -162,6 +183,17 @@ abstract class AssignedTasks<T extends Task> { @@ -162,6 +183,17 @@ abstract class AssignedTasks<T extends Task> {
builder.append("\n");
}
void describePartitions(final StringBuilder builder,
final Collection<TopicPartition> partitions,
final String indent,
final String name) {
builder.append(indent).append(name);
for (final TopicPartition tp : partitions) {
builder.append(indent).append(tp.toString());
}
builder.append("\n");
}
List<T> allTasks() {
final List<T> tasks = new ArrayList<>();
tasks.addAll(running.values());
@ -182,18 +214,6 @@ abstract class AssignedTasks<T extends Task> { @@ -182,18 +214,6 @@ abstract class AssignedTasks<T extends Task> {
created.clear();
}
boolean isEmpty() throws IllegalStateException {
if (running.isEmpty() && !runningByPartition.isEmpty()) {
log.error("Assigned stream tasks in an inconsistent state: the set of running tasks is empty but the " +
"running by partitions map contained {}", runningByPartition);
throw new IllegalStateException("Found inconsistent state: no tasks running but nonempty runningByPartition");
} else {
return runningByPartition.isEmpty()
&& running.isEmpty()
&& created.isEmpty();
}
}
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -207,7 +207,7 @@ public class StoreChangelogReader implements ChangelogReader { @@ -207,7 +207,7 @@ public class StoreChangelogReader implements ChangelogReader {
restoreToOffsets.get(partition));
restorer.setStartingOffset(restoreConsumer.position(partition));
log.debug("Calling restorer for partition {} of task {}", partition, active.restoringTaskFor(partition));
log.debug("Calling restorer for partition {}", partition);
restorer.restoreStarted();
} else {
log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", partition, restorer.storeName());

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java

@ -143,8 +143,8 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener { @@ -143,8 +143,8 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
Set<TaskId> lostTasks = new HashSet<>();
final long start = time.milliseconds();
try {
// close lost active tasks but don't try to commit offsets as we no longer own them
lostTasks = taskManager.closeLostTasks(lostPartitions);
// close all active tasks as lost but don't try to commit offsets as we no longer own them
lostTasks = taskManager.closeLostTasks();
} catch (final Throwable t) {
log.error(
"Error caught during partitions lost, " +
@ -154,7 +154,7 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener { @@ -154,7 +154,7 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
streamThread.setRebalanceException(t);
} finally {
log.info("partitions lost took {} ms.\n" +
"\tsuspended lost active tasks: {}\n",
"\tclosed lost active tasks: {}\n",
time.milliseconds() - start,
lostTasks);
}

58
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -257,56 +257,32 @@ public class TaskManager { @@ -257,56 +257,32 @@ public class TaskManager {
/**
* Closes active tasks as zombies, as these partitions have been lost and are no longer owned.
* NOTE this method assumes that when it is called, EVERY task/partition has been lost and must
* be closed as a zombie.
* @return list of lost tasks
*/
Set<TaskId> closeLostTasks(final Collection<TopicPartition> lostPartitions) {
final Set<TaskId> zombieTasks = partitionsToTaskSet(lostPartitions);
log.debug("Closing lost tasks as zombies: {}", zombieTasks);
Set<TaskId> closeLostTasks() {
final Set<TaskId> lostTasks = new HashSet<>(assignedActiveTasks.keySet());
log.debug("Closing lost active tasks as zombies: {}", lostTasks);
final List<TopicPartition> lostTaskChangelogs = new ArrayList<>();
final RuntimeException exception = active.closeAllTasksAsZombies();
final RuntimeException exception = active.closeZombieTasks(zombieTasks, lostTaskChangelogs);
log.debug("Clearing assigned active tasks: {}", assignedActiveTasks);
assignedActiveTasks.clear();
assignedActiveTasks.keySet().removeAll(zombieTasks);
changelogReader.remove(lostTaskChangelogs);
removeChangelogsFromRestoreConsumer(lostTaskChangelogs, false);
log.debug("Clearing the store changelog reader: {}", changelogReader);
changelogReader.clear();
if (exception != null) {
throw exception;
}
verifyActiveTaskStateIsEmpty();
return zombieTasks;
}
private void verifyActiveTaskStateIsEmpty() throws RuntimeException {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Verify active has no remaining state, and catch if active.isEmpty throws so we can log any non-empty state
try {
if (!(active.isEmpty())) {
log.error("The set of active tasks was non-empty: {}", active);
firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover active task state after closing all zombies"));
}
} catch (final IllegalStateException e) {
firstException.compareAndSet(null, e);
}
if (!(assignedActiveTasks.isEmpty())) {
log.error("The set assignedActiveTasks was non-empty: {}", assignedActiveTasks);
firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover assignedActiveTasks after closing all zombies"));
if (!restoreConsumerAssignedStandbys) {
log.debug("Clearing the restore consumer's assignment: {}", restoreConsumer.assignment());
restoreConsumer.unsubscribe();
}
if (!(changelogReader.isEmpty())) {
log.error("The changelog-reader's internal state was non-empty: {}", changelogReader);
firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover changelog reader state after closing all zombies"));
if (exception != null) {
throw exception;
}
final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
throw fatalException;
}
return lostTasks;
}
void shutdown(final boolean clean) {
@ -413,6 +389,8 @@ public class TaskManager { @@ -413,6 +389,8 @@ public class TaskManager {
final Collection<TopicPartition> restored = changelogReader.restore(active);
active.updateRestored(restored);
removeChangelogsFromRestoreConsumer(restored, false);
} else {
active.clearRestoringPartitions();
}
if (active.allTasksRunning()) {

62
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

@ -84,8 +84,8 @@ public class AssignedStreamsTasksTest { @@ -84,8 +84,8 @@ public class AssignedStreamsTasksTest {
public void shouldInitializeNewTasks() {
t1.initializeMetadata();
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.replay(t1);
addAndInitTask();
@ -99,15 +99,15 @@ public class AssignedStreamsTasksTest { @@ -99,15 +99,15 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet());
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t2.initializeMetadata();
EasyMock.expect(t2.initializeStateStores()).andReturn(true);
t2.initializeTopology();
EasyMock.expectLastCall().once();
final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
EasyMock.expect(t2.partitions()).andReturn(t2partitions);
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
EasyMock.expect(t2.partitions()).andReturn(t2partitions).anyTimes();
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.replay(t1, t2);
@ -127,8 +127,8 @@ public class AssignedStreamsTasksTest { @@ -127,8 +127,8 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t2.initializeStateStores()).andReturn(true);
t2.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList());
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.replay(t2);
@ -173,8 +173,8 @@ public class AssignedStreamsTasksTest { @@ -173,8 +173,8 @@ public class AssignedStreamsTasksTest {
public void shouldCloseRestoringTasks() {
t1.initializeMetadata();
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).times(2);
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).times(3);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.closeStateManager(true);
EasyMock.expectLastCall();
EasyMock.replay(t1);
@ -186,8 +186,9 @@ public class AssignedStreamsTasksTest { @@ -186,8 +186,9 @@ public class AssignedStreamsTasksTest {
}
@Test
public void shouldClosedUnInitializedTasksOnSuspend() {
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList);
public void shouldCloseUnInitializedTasksOnSuspend() {
EasyMock.expect(t1.partitions()).andAnswer(Collections::emptySet).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andAnswer(Collections::emptyList).anyTimes();
t1.close(false, false);
EasyMock.expectLastCall();
@ -213,8 +214,6 @@ public class AssignedStreamsTasksTest { @@ -213,8 +214,6 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseTaskOnSuspendWhenRuntimeException() {
mockTaskInitialization();
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.suspend();
EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
@ -232,8 +231,6 @@ public class AssignedStreamsTasksTest { @@ -232,8 +231,6 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
mockTaskInitialization();
EasyMock.expect(t1.partitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
t1.suspend();
EasyMock.expectLastCall().andThrow(new TaskMigratedException());
@ -281,8 +278,8 @@ public class AssignedStreamsTasksTest { @@ -281,8 +278,8 @@ public class AssignedStreamsTasksTest {
EasyMock.expect(t1.initializeStateStores()).andReturn(true);
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList());
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
}
@Test
@ -537,10 +534,6 @@ public class AssignedStreamsTasksTest { @@ -537,10 +534,6 @@ public class AssignedStreamsTasksTest {
return assignedTasks.created.keySet();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}
@ -549,7 +542,6 @@ public class AssignedStreamsTasksTest { @@ -549,7 +542,6 @@ public class AssignedStreamsTasksTest {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.closeStateManager(false);
}
@ -563,10 +555,6 @@ public class AssignedStreamsTasksTest { @@ -563,10 +555,6 @@ public class AssignedStreamsTasksTest {
return assignedTasks.restoringTaskIds();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}
@ -576,7 +564,6 @@ public class AssignedStreamsTasksTest { @@ -576,7 +564,6 @@ public class AssignedStreamsTasksTest {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.close(false, true);
}
@ -590,10 +577,6 @@ public class AssignedStreamsTasksTest { @@ -590,10 +577,6 @@ public class AssignedStreamsTasksTest {
return assignedTasks.runningTaskIds();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}
@ -603,7 +586,6 @@ public class AssignedStreamsTasksTest { @@ -603,7 +586,6 @@ public class AssignedStreamsTasksTest {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.suspend();
task.closeSuspended(false, null);
}
@ -621,11 +603,7 @@ public class AssignedStreamsTasksTest { @@ -621,11 +603,7 @@ public class AssignedStreamsTasksTest {
public Set<TaskId> taskIds() {
return assignedTasks.suspendedTaskIds();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return Collections.emptyList();
}
}.createTaskAndClear();
}
@ -640,23 +618,21 @@ public class AssignedStreamsTasksTest { @@ -640,23 +618,21 @@ public class AssignedStreamsTasksTest {
abstract Set<TaskId> taskIds();
abstract List<TopicPartition> expectedLostChangelogs();
void createTaskAndClear() {
final StreamTask task = EasyMock.createMock(StreamTask.class);
EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
EasyMock.expect(task.toString(EasyMock.anyString())).andReturn("task").anyTimes();
additionalSetup(task);
EasyMock.replay(task);
action(task);
final List<TopicPartition> changelogs = new ArrayList<>();
final Set<TaskId> ids = new HashSet<>(Collections.singleton(task.id()));
assertEquals(ids, taskIds());
assignedTasks.closeZombieTasks(ids, changelogs);
assignedTasks.closeAllTasksAsZombies();
assertEquals(Collections.emptySet(), taskIds());
assertEquals(expectedLostChangelogs(), changelogs);
}
}

Loading…
Cancel
Save