From bce8125dcf5a87ed29bbe795cd7591d45bdf3d9a Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 15 Feb 2018 10:52:58 -0800 Subject: [PATCH] MINOR: Resuming Tasks should not be initialized twice (#4562) Avoids double initialization of resuming tasks Removes race condition in StreamThreadTest plus code cleanup Author: Matthias J. Sax Reviewers: Bill Bejeck , Guozhang Wang --- .../processor/internals/StreamTask.java | 1 - .../internals/StreamsMetricsImpl.java | 6 +-- .../processor/internals/StreamThreadTest.java | 37 ++++++++++--------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 56c0ab31f4b..6bca02ad9bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -192,7 +192,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } transactionInFlight = true; } - initTopology(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index 03bbceb25c4..b2ce2e7dcf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -129,11 +129,11 @@ public class StreamsMetricsImpl implements StreamsMetrics { // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addLatencyMetrics(scopeName, parent, operationName, allTagMap); + addLatencyAndThroughputMetrics(scopeName, parent, operationName, allTagMap); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); - addLatencyMetrics(scopeName, sensor, operationName, tagMap); + addLatencyAndThroughputMetrics(scopeName, sensor, operationName, tagMap); parentSensors.put(sensor, parent); @@ -161,7 +161,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { return sensor; } - private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map tags) { + private void addLatencyAndThroughputMetrics(String scopeName, Sensor sensor, String opName, Map tags) { maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName), "The average latency of " + opName + " operation.", tags), new Avg()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index cc056044792..482b764f3ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -133,8 +133,6 @@ public class StreamThreadTest { }; } - - @SuppressWarnings("unchecked") @Test public void testPartitionAssignmentChangeForSingleGroup() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); @@ -170,14 +168,13 @@ public class StreamThreadTest { assertTrue(thread.state() == StreamThread.State.PENDING_SHUTDOWN); } - @SuppressWarnings("unchecked") @Test public void testStateChangeStartClose() throws InterruptedException { - final StreamThread thread = createStreamThread(clientId, config, false); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); + thread.start(); TestUtils.waitForCondition(new TestCondition() { @Override @@ -185,19 +182,20 @@ public class StreamThreadTest { return thread.state() == StreamThread.State.RUNNING; } }, 10 * 1000, "Thread never started."); + thread.shutdown(); - assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN); TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { return thread.state() == StreamThread.State.DEAD; } }, 10 * 1000, "Thread never shut down."); + thread.shutdown(); assertEquals(thread.state(), StreamThread.State.DEAD); } - private Cluster createCluster(int numNodes) { + private Cluster createCluster(final int numNodes) { HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) { nodes.put(i, new Node(i, "localhost", 8121 + i)); @@ -362,7 +360,7 @@ public class StreamThreadTest { } @Test - public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException { + public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, config, false); @@ -394,7 +392,7 @@ public class StreamThreadTest { } @Test - public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException { + public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); @@ -423,7 +421,7 @@ public class StreamThreadTest { } @Test - public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException { + public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); @@ -455,7 +453,7 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test - public void shouldShutdownTaskManagerOnClose() throws InterruptedException { + public void shouldShutdownTaskManagerOnClose() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.emptyMap()); @@ -493,6 +491,7 @@ public class StreamThreadTest { EasyMock.verify(taskManager); } + @SuppressWarnings("unchecked") @Test public void shouldShutdownTaskManagerOnCloseWithoutStart() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); @@ -521,6 +520,7 @@ public class StreamThreadTest { EasyMock.verify(taskManager); } + @SuppressWarnings("unchecked") @Test public void shouldOnlyShutdownOnce() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); @@ -552,7 +552,7 @@ public class StreamThreadTest { } @Test - public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException { + public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() { internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); internalTopologyBuilder.addSink("out", "output", null, null, null); @@ -573,7 +573,7 @@ public class StreamThreadTest { } @Test - public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException { + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source"); @@ -682,7 +682,8 @@ public class StreamThreadTest { ThreadStateTransitionValidator newState = null; @Override - public void onChange(final Thread thread, final ThreadStateTransitionValidator newState, + public void onChange(final Thread thread, + final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState) { ++numChanges; if (this.newState != null) { @@ -696,7 +697,7 @@ public class StreamThreadTest { } @Test - public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException { + public void shouldReturnActiveTaskMetadataWhileRunningState() { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, config, false); @@ -726,7 +727,7 @@ public class StreamThreadTest { } @Test - public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException { + public void shouldReturnStandbyTaskMetadataWhileRunningState() { internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) .groupByKey().count(Materialized.>as("count-one")); @@ -911,7 +912,7 @@ public class StreamThreadTest { } @Test - public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException { + public void shouldAlwaysUpdateTasksMetadataAfterChangingState() { final StreamThread thread = createStreamThread(clientId, config, false); ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.CREATED.name(), metadata.threadState()); @@ -922,7 +923,7 @@ public class StreamThreadTest { } @Test - public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException { + public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() { internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) .groupByKey().count(Materialized.>as("count-one")); @@ -1063,7 +1064,7 @@ public class StreamThreadTest { } @Test - public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception { + public void shouldReportSkippedRecordsForInvalidTimestamps() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final Properties config = configProps(false);