Browse Source

KAFKA-8972 (2.4 blocker): bug fix for restoring task (#7617)

This is a typo bug which is due to calling a wrong map.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/7632/head
Boyang Chen 5 years ago committed by Guozhang Wang
parent
commit
f65c2acad7
  1. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
  2. 143
      streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java

@ -281,7 +281,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin @@ -281,7 +281,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs));
} else if (restoring.containsKey(id)) {
log.debug("Closing the zombie restoring stream task {}.", id);
firstException.compareAndSet(null, closeRestoring(true, created.get(id), lostTaskChangelogs));
firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));

143
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java

@ -41,6 +41,7 @@ import org.junit.Test; @@ -41,6 +41,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.function.ThrowingRunnable;
@ -49,6 +50,7 @@ import static org.hamcrest.CoreMatchers.not; @@ -49,6 +50,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
@ -517,6 +519,147 @@ public class AssignedStreamsTasksTest { @@ -517,6 +519,147 @@ public class AssignedStreamsTasksTest {
assignedTasks.shutdown(true);
}
@Test
public void shouldClearZombieCreatedTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
task.close(false, true);
}
@Override
public void action(final StreamTask task) {
assignedTasks.addNewTask(task);
}
@Override
public Set<TaskId> taskIds() {
return assignedTasks.created.keySet();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}
@Test
public void shouldClearZombieRestoringTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.closeStateManager(false);
}
@Override
public void action(final StreamTask task) {
assignedTasks.addTaskToRestoring(task);
}
@Override
public Set<TaskId> taskIds() {
return assignedTasks.restoringTaskIds();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}
@Test
public void shouldClearZombieRunningTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.close(false, true);
}
@Override
public void action(final StreamTask task) {
assignedTasks.transitionToRunning(task);
}
@Override
public Set<TaskId> taskIds() {
return assignedTasks.runningTaskIds();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}
@Test
public void shouldClearZombieSuspendedTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.suspend();
task.closeSuspended(false, null);
}
@Override
public void action(final StreamTask task) {
assignedTasks.transitionToRunning(task);
final List<TopicPartition> revokedChangelogs = new ArrayList<>();
final List<TaskId> ids = Collections.singletonList(task.id());
assignedTasks.suspendOrCloseTasks(new HashSet<>(ids), revokedChangelogs);
assertEquals(clearingPartitions, revokedChangelogs);
}
@Override
public Set<TaskId> taskIds() {
return assignedTasks.suspendedTaskIds();
}
@Override
public List<TopicPartition> expectedLostChangelogs() {
return Collections.emptyList();
}
}.createTaskAndClear();
}
abstract class TaskTestSuite {
TaskId clearingTaskId = new TaskId(0, 0);
List<TopicPartition> clearingPartitions = Collections.singletonList(new TopicPartition("topic", 0));
abstract void additionalSetup(final StreamTask task);
abstract void action(final StreamTask task);
abstract Set<TaskId> taskIds();
abstract List<TopicPartition> expectedLostChangelogs();
void createTaskAndClear() {
final StreamTask task = EasyMock.createMock(StreamTask.class);
EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
additionalSetup(task);
EasyMock.replay(task);
action(task);
final List<TopicPartition> changelogs = new ArrayList<>();
final Set<TaskId> ids = new HashSet<>(Collections.singleton(task.id()));
assertEquals(ids, taskIds());
assignedTasks.closeZombieTasks(ids, changelogs);
assertEquals(Collections.emptySet(), taskIds());
assertEquals(expectedLostChangelogs(), changelogs);
}
}
private void addAndInitTask() {
assignedTasks.addNewTask(t1);
assignedTasks.initializeNewTasks();

Loading…
Cancel
Save