diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java index 36482aacefc..c3064688498 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java @@ -41,7 +41,7 @@ abstract class AbstractStateManager implements StateManager { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; final File baseDir; - private final boolean eosEnabled; + final boolean eosEnabled; OffsetCheckpoint checkpoint; final Map checkpointableOffsets = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 1838eb7e2aa..33878017b7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -240,14 +240,13 @@ public abstract class AbstractTask implements Task { } /** - * @param writeCheckpoint boolean indicating if a checkpoint file should be written * @throws ProcessorStateException if there is an error while closing the state manager */ - void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { + void closeStateManager(final boolean clean) throws ProcessorStateException { ProcessorStateException exception = null; log.trace("Closing state manager"); try { - stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null); + stateMgr.close(clean); } catch (final ProcessorStateException e) { exception = e; } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 48319a54e4c..768281428b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -318,7 +318,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @Override - public void close(final Map offsets) throws IOException { + public void close(final boolean clean) throws IOException { try { if (globalStores.isEmpty()) { return; @@ -341,7 +341,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob if (closeFailed.length() > 0) { throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + closeFailed); } - checkpoint(offsets); } finally { stateDirectory.unlockGlobalState(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 202fa36b5a9..29c861e4291 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -105,7 +105,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { } public void close() throws IOException { - stateMgr.close(offsets); + stateMgr.close(true); } private void initTopology() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index f75f185b37d..5c54ee7b4fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -89,6 +89,9 @@ public class ProcessorStateManager extends AbstractStateManager { // load the checkpoint information checkpointableOffsets.putAll(checkpoint.read()); + + log.trace("Checkpointable offsets read from checkpoint: {}", checkpointableOffsets); + if (eosEnabled) { // delete the checkpoint file after finish loading its stored offsets checkpoint.delete(); @@ -140,7 +143,7 @@ public class ProcessorStateManager extends AbstractStateManager { restoreCallbacks.put(topic, stateRestoreCallback); recordConverters.put(topic, recordConverter); } else { - log.trace("Restoring state store {} from changelog topic {}", storeName, topic); + log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition)); final StateRestorer restorer = new StateRestorer( storePartition, @@ -254,7 +257,7 @@ public class ProcessorStateManager extends AbstractStateManager { * @throws ProcessorStateException if any error happens when closing the state stores */ @Override - public void close(final Map ackedOffsets) throws ProcessorStateException { + public void close(final boolean clean) throws ProcessorStateException { ProcessorStateException firstException = null; // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet @@ -271,11 +274,17 @@ public class ProcessorStateManager extends AbstractStateManager { log.error("Failed to close state store {}: ", store.name(), e); } } + stores.clear(); + } - if (ackedOffsets != null) { - checkpoint(ackedOffsets); + if (!clean && eosEnabled && checkpoint != null) { + // delete the checkpoint file if this is an unclean close + try { + checkpoint.delete(); + checkpoint = null; + } catch (final IOException e) { + throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e); } - stores.clear(); } if (firstException != null) { @@ -287,6 +296,7 @@ public class ProcessorStateManager extends AbstractStateManager { @Override public void checkpoint(final Map checkpointableOffsets) { this.checkpointableOffsets.putAll(changelogReader.restoredOffsets()); + log.trace("Checkpointable offsets updated with restored offsets: {}", this.checkpointableOffsets); for (final StateStore store : stores.values()) { final String storeName = store.name(); // only checkpoint the offset to the offsets file if @@ -302,6 +312,9 @@ public class ProcessorStateManager extends AbstractStateManager { } } } + + log.trace("Checkpointable offsets updated with active acked offsets: {}", this.checkpointableOffsets); + // write the checkpoint file before closing if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index bd8ba34cb15..749b2ed6cbf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -127,8 +127,6 @@ public class StandbyTask extends AbstractTask { * - {@link #commit()} * - close state *
-     * @param clean ignored by {@code StandbyTask} as it can always try to close cleanly
-     *              (ie, commit, flush, and write checkpoint file)
      * @param isZombie ignored by {@code StandbyTask} as it can never be a zombie
      */
     @Override
@@ -138,14 +136,12 @@ public class StandbyTask extends AbstractTask {
             return;
         }
         log.debug("Closing");
-        boolean committedSuccessfully = false;
         try {
             if (clean) {
                 commit();
-                committedSuccessfully = true;
             }
         } finally {
-            closeStateManager(committedSuccessfully);
+            closeStateManager(true);
         }
 
         taskClosed = true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index f6efde6c5d6..74cf12bd0b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 
 interface StateManager extends Checkpointable {
     File baseDir();
@@ -41,7 +40,7 @@ interface StateManager extends Checkpointable {
     void reinitializeStateStoresForPartitions(final Collection partitions,
                                               final InternalProcessorContext processorContext);
 
-    void close(final Map offsets) throws IOException;
+    void close(final boolean clean) throws IOException;
 
     StateStore getGlobalStore(final String name);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 59fb36fa6a2..c8a9842ff9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -195,6 +195,8 @@ public class StoreChangelogReader implements ChangelogReader {
         for (final TopicPartition partition : initialized) {
             final StateRestorer restorer = stateRestorers.get(partition);
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
+                log.trace("Found checkpoint {} from changelog {} for store {}.", restorer.checkpoint(), partition, restorer.storeName());
+
                 restoreConsumer.seek(partition, restorer.checkpoint());
                 logRestoreOffsets(partition,
                         restorer.checkpoint(),
@@ -202,6 +204,8 @@ public class StoreChangelogReader implements ChangelogReader {
                 restorer.setStartingOffset(restoreConsumer.position(partition));
                 restorer.restoreStarted();
             } else {
+                log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", partition, restorer.storeName());
+
                 restoreConsumer.seekToBeginning(Collections.singletonList(partition));
                 needsPositionUpdate.add(restorer);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c4fecef4285..6dbf394ae4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -45,6 +46,7 @@ import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -296,6 +298,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             producer = producerSupplier.get();
             producer.initTransactions();
             recordCollector.init(producer);
+
+            if (stateMgr.checkpoint != null) {
+                try {
+                    stateMgr.checkpoint.delete();
+                    stateMgr.checkpoint = null;
+                } catch (final IOException e) {
+                    throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e);
+                }
+            }
         }
     }
 
@@ -567,6 +578,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 commit(false);
             } finally {
                 if (eosEnabled) {
+
+                    stateMgr.checkpoint(activeTaskCheckpointableOffsets());
+
                     try {
                         recordCollector.close();
                     } catch (final ProducerFencedException e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 809a0b679d3..5dd5f8651cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -371,22 +371,11 @@ public class GlobalStateManagerImplTest {
         initializeConsumer(1, 0, t2);
         stateManager.register(store2, stateRestoreCallback);
 
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
         assertFalse(store1.isOpen());
         assertFalse(store2.isOpen());
     }
 
-    @Test
-    public void shouldWriteCheckpointsOnClose() throws IOException {
-        stateManager.initialize();
-        initializeConsumer(1, 0, t1);
-        stateManager.register(store1, stateRestoreCallback);
-        final Map expected = Collections.singletonMap(t1, 25L);
-        stateManager.close(expected);
-        final Map result = readOffsetsCheckpoint();
-        assertEquals(expected, result);
-    }
-
     @Test(expected = ProcessorStateException.class)
     public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
         stateManager.initialize();
@@ -398,7 +387,7 @@ public class GlobalStateManagerImplTest {
             }
         }, stateRestoreCallback);
 
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
     }
 
     @Test
@@ -415,7 +404,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
         stateManager.initialize();
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
         final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true);
         try {
             // should be able to get the lock now as it should've been released in close
@@ -438,9 +427,9 @@ public class GlobalStateManagerImplTest {
                 super.close();
             }
         }, stateRestoreCallback);
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
 
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
     }
 
     @Test
@@ -460,7 +449,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store2, stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.emptyMap());
+            stateManager.close(true);
         } catch (final ProcessorStateException e) {
             // expected
         }
@@ -539,7 +528,8 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         initializeConsumer(10, 0, t1);
         stateManager.register(store1, stateRestoreCallback);
-        stateManager.close(Collections.emptyMap());
+        stateManager.checkpoint(Collections.emptyMap());
+        stateManager.close(true);
 
         final Map checkpointMap = stateManager.checkpointed();
         assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 10L)));
@@ -551,7 +541,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         initializeConsumer(10, 0, t3);
         stateManager.register(store3, stateRestoreCallback);
-        stateManager.close(Collections.emptyMap());
+        stateManager.close(true);
 
         assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 53d374b862b..90012552795 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -204,15 +204,14 @@ public class GlobalStateTaskTest {
 
 
     @Test
-    public void shouldCloseStateManagerWithOffsets() throws IOException {
+    public void shouldFlushStateManagerWithOffsets() throws IOException {
         final Map expectedOffsets = new HashMap<>();
         expectedOffsets.put(t1, 52L);
         expectedOffsets.put(t2, 100L);
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
-        globalStateTask.close();
+        globalStateTask.flushState();
         assertEquals(expectedOffsets, stateMgr.checkpointed());
-        assertTrue(stateMgr.closed);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 119790f7788..7d8298296a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -130,7 +130,7 @@ public class ProcessorStateManagerTest {
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
             assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -153,7 +153,7 @@ public class ProcessorStateManagerTest {
             assertTrue(persistentStore.keys.contains(intKey));
             assertEquals(9, persistentStore.values.get(0).length);
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -176,7 +176,7 @@ public class ProcessorStateManagerTest {
             assertTrue(persistentStore.keys.contains(intKey));
             assertEquals(17, persistentStore.values.get(0).length);
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -204,7 +204,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -231,7 +231,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -292,7 +292,7 @@ public class ProcessorStateManagerTest {
             assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -315,7 +315,7 @@ public class ProcessorStateManagerTest {
             assertEquals(mockKeyValueStore, stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
-            stateMgr.close(Collections.emptyMap());
+            stateMgr.close(true);
         }
     }
 
@@ -352,7 +352,8 @@ public class ProcessorStateManagerTest {
         } finally {
             // close the state manager with the ack'ed offsets
             stateMgr.flush();
-            stateMgr.close(ackedOffsets);
+            stateMgr.checkpoint(ackedOffsets);
+            stateMgr.close(true);
         }
         // make sure all stores are closed, and the checkpoint file is written.
         assertTrue(persistentStore.flushed);
@@ -398,7 +399,7 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
         stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
-        stateMgr.close(null);
+        stateMgr.close(true);
         final Map read = checkpoint.read();
         assertThat(read, equalTo(offsets));
     }
@@ -583,7 +584,7 @@ public class ProcessorStateManagerTest {
         stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.emptyMap());
+            stateManager.close(true);
             fail("Should throw ProcessorStateException if store close throws exception");
         } catch (final ProcessorStateException e) {
             // pass
@@ -696,7 +697,7 @@ public class ProcessorStateManagerTest {
         stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.emptyMap());
+            stateManager.close(true);
         } catch (final ProcessorStateException expected) { /* ignode */ }
         Assert.assertTrue(closedStore.get());
     }
@@ -721,7 +722,7 @@ public class ProcessorStateManagerTest {
             assertFalse(checkpointFile.exists());
         } finally {
             if (stateManager != null) {
-                stateManager.close(null);
+                stateManager.close(true);
             }
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 52599c67a6d..a57c4070d79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -366,6 +366,7 @@ public class StandbyTaskTest {
             singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 60_000L))
         );
 
+        task.suspend();
         task.closeStateManager(true);
 
         final File taskDir = stateDirectory.directoryForTask(taskId);
@@ -592,7 +593,7 @@ public class StandbyTaskTest {
             }
 
             @Override
-            void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
+            void closeStateManager(final boolean clean) throws ProcessorStateException {
                 closedStateManager.set(true);
             }
         };
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index b14731d62e0..37b44d35bff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -45,7 +45,7 @@ public class StateManagerStub implements StateManager {
     public void flush() {}
 
     @Override
-    public void close(final Map offsets) throws IOException {}
+    public void close(final boolean clean) throws IOException {}
 
     @Override
     public StateStore getGlobalStore(final String name) {
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index a97a91ead9c..ccb17622b52 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -66,7 +66,7 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     public void flush() {}
 
     @Override
-    public void close(final Map offsets) throws IOException {
+    public void close(final boolean clean) throws IOException {
         this.offsets.putAll(offsets);
         closed = true;
     }