From 96132e2dbb69a0c6c11cb183bb05cefef4e30557 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 21 Sep 2018 13:55:56 -0500 Subject: [PATCH] MINOR: rename InternalProcessorContext.initialized (#5672) Reviewers: Bill Bejeck , Matthias J. Sax --- .../streams/processor/internals/AbstractProcessorContext.java | 2 +- .../streams/processor/internals/GlobalStateUpdateTask.java | 2 +- .../streams/processor/internals/InternalProcessorContext.java | 2 +- .../apache/kafka/streams/processor/internals/StandbyTask.java | 2 +- .../apache/kafka/streams/processor/internals/StreamTask.java | 2 +- .../processor/internals/AbstractProcessorContextTest.java | 2 +- .../org/apache/kafka/test/InternalMockProcessorContext.java | 2 +- .../test/java/org/apache/kafka/test/NoOpProcessorContext.java | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 0c3fcf20146..0753b2a8e96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -203,7 +203,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte } @Override - public void initialized() { + public void initialize() { initialized = true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index d38771362ed..202fa36b5a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -74,7 +74,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { ); } initTopology(); - processorContext.initialized(); + processorContext.initialize(); return stateMgr.checkpointed(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 6db7a3dc640..98511fd53de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -57,7 +57,7 @@ public interface InternalProcessorContext extends ProcessorContext { /** * Mark this context as being initialized */ - void initialized(); + void initialize(); /** * Mark this context as being uninitialized diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 3ac64146187..6f4e61787de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -69,7 +69,7 @@ public class StandbyTask extends AbstractTask { log.trace("Initializing state stores"); registerStateStores(); checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); - processorContext.initialized(); + processorContext.initialize(); taskInitialized = true; return true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 2f97b7f27ba..2ad0acc89d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -274,7 +274,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator transactionInFlight = true; } - processorContext.initialized(); + processorContext.initialize(); taskInitialized = true; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 2869826a421..070dba8efa0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -58,7 +58,7 @@ public class AbstractProcessorContextTest { @Test public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() { - context.initialized(); + context.initialize(); try { context.register(stateStore, null); fail("should throw illegal state exception when context already initialized"); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 6d4f5e25419..bedf8ebf8c4 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -170,7 +170,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple // state mgr will be overridden by the state dir and store maps @Override - public void initialized() {} + public void initialize() {} public void setStreamTime(final long currentTime) { streamTime = currentTime; diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 42ee8fb33ac..ce9838919f0 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -81,7 +81,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext { } @Override - public void initialized() { + public void initialize() { initialized = true; }