Browse Source

MINOR: Resuming Tasks should not be initialized twice (#4562)

Avoids double initialization of resuming tasks
Removes race condition in StreamThreadTest plus code cleanup

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/4565/merge
Matthias J. Sax 7 years ago committed by GitHub
parent
commit
bce8125dcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  2. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
  3. 37
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -192,7 +192,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -192,7 +192,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
transactionInFlight = true;
}
initTopology();
}
/**

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java

@ -129,11 +129,11 @@ public class StreamsMetricsImpl implements StreamsMetrics { @@ -129,11 +129,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
addLatencyMetrics(scopeName, parent, operationName, allTagMap);
addLatencyAndThroughputMetrics(scopeName, parent, operationName, allTagMap);
// add the operation metrics with additional tags
Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
addLatencyMetrics(scopeName, sensor, operationName, tagMap);
addLatencyAndThroughputMetrics(scopeName, sensor, operationName, tagMap);
parentSensors.put(sensor, parent);
@ -161,7 +161,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { @@ -161,7 +161,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return sensor;
}
private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
private void addLatencyAndThroughputMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
"The average latency of " + opName + " operation.", tags), new Avg());

37
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -133,8 +133,6 @@ public class StreamThreadTest { @@ -133,8 +133,6 @@ public class StreamThreadTest {
};
}
@SuppressWarnings("unchecked")
@Test
public void testPartitionAssignmentChangeForSingleGroup() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
@ -170,14 +168,13 @@ public class StreamThreadTest { @@ -170,14 +168,13 @@ public class StreamThreadTest {
assertTrue(thread.state() == StreamThread.State.PENDING_SHUTDOWN);
}
@SuppressWarnings("unchecked")
@Test
public void testStateChangeStartClose() throws InterruptedException {
final StreamThread thread = createStreamThread(clientId, config, false);
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
thread.start();
TestUtils.waitForCondition(new TestCondition() {
@Override
@ -185,19 +182,20 @@ public class StreamThreadTest { @@ -185,19 +182,20 @@ public class StreamThreadTest {
return thread.state() == StreamThread.State.RUNNING;
}
}, 10 * 1000, "Thread never started.");
thread.shutdown();
assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return thread.state() == StreamThread.State.DEAD;
}
}, 10 * 1000, "Thread never shut down.");
thread.shutdown();
assertEquals(thread.state(), StreamThread.State.DEAD);
}
private Cluster createCluster(int numNodes) {
private Cluster createCluster(final int numNodes) {
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; ++i) {
nodes.put(i, new Node(i, "localhost", 8121 + i));
@ -362,7 +360,7 @@ public class StreamThreadTest { @@ -362,7 +360,7 @@ public class StreamThreadTest {
}
@Test
public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException {
public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, config, false);
@ -394,7 +392,7 @@ public class StreamThreadTest { @@ -394,7 +392,7 @@ public class StreamThreadTest {
}
@Test
public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException {
public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
@ -423,7 +421,7 @@ public class StreamThreadTest { @@ -423,7 +421,7 @@ public class StreamThreadTest {
}
@Test
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
@ -455,7 +453,7 @@ public class StreamThreadTest { @@ -455,7 +453,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
public void shouldShutdownTaskManagerOnClose() throws InterruptedException {
public void shouldShutdownTaskManagerOnClose() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap());
@ -493,6 +491,7 @@ public class StreamThreadTest { @@ -493,6 +491,7 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
@SuppressWarnings("unchecked")
@Test
public void shouldShutdownTaskManagerOnCloseWithoutStart() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@ -521,6 +520,7 @@ public class StreamThreadTest { @@ -521,6 +520,7 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
@SuppressWarnings("unchecked")
@Test
public void shouldOnlyShutdownOnce() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@ -552,7 +552,7 @@ public class StreamThreadTest { @@ -552,7 +552,7 @@ public class StreamThreadTest {
}
@Test
public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
internalTopologyBuilder.addSink("out", "output", null, null, null);
@ -573,7 +573,7 @@ public class StreamThreadTest { @@ -573,7 +573,7 @@ public class StreamThreadTest {
}
@Test
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException {
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
@ -682,7 +682,8 @@ public class StreamThreadTest { @@ -682,7 +682,8 @@ public class StreamThreadTest {
ThreadStateTransitionValidator newState = null;
@Override
public void onChange(final Thread thread, final ThreadStateTransitionValidator newState,
public void onChange(final Thread thread,
final ThreadStateTransitionValidator newState,
final ThreadStateTransitionValidator oldState) {
++numChanges;
if (this.newState != null) {
@ -696,7 +697,7 @@ public class StreamThreadTest { @@ -696,7 +697,7 @@ public class StreamThreadTest {
}
@Test
public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
public void shouldReturnActiveTaskMetadataWhileRunningState() {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, config, false);
@ -726,7 +727,7 @@ public class StreamThreadTest { @@ -726,7 +727,7 @@ public class StreamThreadTest {
}
@Test
public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException {
public void shouldReturnStandbyTaskMetadataWhileRunningState() {
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
@ -911,7 +912,7 @@ public class StreamThreadTest { @@ -911,7 +912,7 @@ public class StreamThreadTest {
}
@Test
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
final StreamThread thread = createStreamThread(clientId, config, false);
ThreadMetadata metadata = thread.threadMetadata();
assertEquals(StreamThread.State.CREATED.name(), metadata.threadState());
@ -922,7 +923,7 @@ public class StreamThreadTest { @@ -922,7 +923,7 @@ public class StreamThreadTest {
}
@Test
public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException {
public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
@ -1063,7 +1064,7 @@ public class StreamThreadTest { @@ -1063,7 +1064,7 @@ public class StreamThreadTest {
}
@Test
public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
public void shouldReportSkippedRecordsForInvalidTimestamps() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final Properties config = configProps(false);

Loading…
Cancel
Save