diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 0c3fcf20146..0753b2a8e96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -203,7 +203,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte } @Override - public void initialized() { + public void initialize() { initialized = true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index d38771362ed..202fa36b5a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -74,7 +74,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { ); } initTopology(); - processorContext.initialized(); + processorContext.initialize(); return stateMgr.checkpointed(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 6db7a3dc640..98511fd53de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -57,7 +57,7 @@ public interface InternalProcessorContext extends ProcessorContext { /** * Mark this context as being initialized */ - void initialized(); + void initialize(); /** * Mark this context as being uninitialized diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 3ac64146187..6f4e61787de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -69,7 +69,7 @@ public class StandbyTask extends AbstractTask { log.trace("Initializing state stores"); registerStateStores(); checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); - processorContext.initialized(); + processorContext.initialize(); taskInitialized = true; return true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 2f97b7f27ba..2ad0acc89d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -274,7 +274,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator transactionInFlight = true; } - processorContext.initialized(); + processorContext.initialize(); taskInitialized = true; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 2869826a421..070dba8efa0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -58,7 +58,7 @@ public class AbstractProcessorContextTest { @Test public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() { - context.initialized(); + context.initialize(); try { context.register(stateStore, null); fail("should throw illegal state exception when context already initialized"); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 6d4f5e25419..bedf8ebf8c4 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -170,7 +170,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple // state mgr will be overridden by the state dir and store maps @Override - public void initialized() {} + public void initialize() {} public void setStreamTime(final long currentTime) { streamTime = currentTime; diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 42ee8fb33ac..ce9838919f0 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -81,7 +81,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext { } @Override - public void initialized() { + public void initialize() { initialized = true; }