Browse Source

KAFKA-7672 : force write checkpoint during StreamTask #suspend (#6115)

This fix is aiming for #2 issue pointed out within https://issues.apache.org/jira/browse/KAFKA-7672
In the current setup, we do offset checkpoint file write when EOS is turned on during #suspend, which introduces the potential race condition during StateManager #closeSuspend call. To mitigate the problem, we attempt to always write checkpoint file in #suspend call.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/6310/head
Boyang Chen 6 years ago committed by Guozhang Wang
parent
commit
1f9aa01a5b
  1. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
  2. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
  3. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  4. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
  5. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  6. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
  7. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
  8. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  9. 14
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  10. 28
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
  11. 5
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
  12. 25
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
  13. 3
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
  14. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
  15. 2
      streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java

@ -41,7 +41,7 @@ abstract class AbstractStateManager implements StateManager { @@ -41,7 +41,7 @@ abstract class AbstractStateManager implements StateManager {
static final String CHECKPOINT_FILE_NAME = ".checkpoint";
final File baseDir;
private final boolean eosEnabled;
final boolean eosEnabled;
OffsetCheckpoint checkpoint;
final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java

@ -240,14 +240,13 @@ public abstract class AbstractTask implements Task { @@ -240,14 +240,13 @@ public abstract class AbstractTask implements Task {
}
/**
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
* @throws ProcessorStateException if there is an error while closing the state manager
*/
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
void closeStateManager(final boolean clean) throws ProcessorStateException {
ProcessorStateException exception = null;
log.trace("Closing state manager");
try {
stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null);
stateMgr.close(clean);
} catch (final ProcessorStateException e) {
exception = e;
} finally {

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -318,7 +318,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @@ -318,7 +318,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
@Override
public void close(final Map<TopicPartition, Long> offsets) throws IOException {
public void close(final boolean clean) throws IOException {
try {
if (globalStores.isEmpty()) {
return;
@ -341,7 +341,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @@ -341,7 +341,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
if (closeFailed.length() > 0) {
throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + closeFailed);
}
checkpoint(offsets);
} finally {
stateDirectory.unlockGlobalState();
}

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java

@ -105,7 +105,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { @@ -105,7 +105,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
}
public void close() throws IOException {
stateMgr.close(offsets);
stateMgr.close(true);
}
private void initTopology() {

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -89,6 +89,9 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -89,6 +89,9 @@ public class ProcessorStateManager extends AbstractStateManager {
// load the checkpoint information
checkpointableOffsets.putAll(checkpoint.read());
log.trace("Checkpointable offsets read from checkpoint: {}", checkpointableOffsets);
if (eosEnabled) {
// delete the checkpoint file after finish loading its stored offsets
checkpoint.delete();
@ -140,7 +143,7 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -140,7 +143,7 @@ public class ProcessorStateManager extends AbstractStateManager {
restoreCallbacks.put(topic, stateRestoreCallback);
recordConverters.put(topic, recordConverter);
} else {
log.trace("Restoring state store {} from changelog topic {}", storeName, topic);
log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition));
final StateRestorer restorer = new StateRestorer(
storePartition,
@ -254,7 +257,7 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -254,7 +257,7 @@ public class ProcessorStateManager extends AbstractStateManager {
* @throws ProcessorStateException if any error happens when closing the state stores
*/
@Override
public void close(final Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException {
public void close(final boolean clean) throws ProcessorStateException {
ProcessorStateException firstException = null;
// attempting to close the stores, just in case they
// are not closed by a ProcessorNode yet
@ -271,11 +274,17 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -271,11 +274,17 @@ public class ProcessorStateManager extends AbstractStateManager {
log.error("Failed to close state store {}: ", store.name(), e);
}
}
stores.clear();
}
if (ackedOffsets != null) {
checkpoint(ackedOffsets);
if (!clean && eosEnabled && checkpoint != null) {
// delete the checkpoint file if this is an unclean close
try {
checkpoint.delete();
checkpoint = null;
} catch (final IOException e) {
throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e);
}
stores.clear();
}
if (firstException != null) {
@ -287,6 +296,7 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -287,6 +296,7 @@ public class ProcessorStateManager extends AbstractStateManager {
@Override
public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets) {
this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
log.trace("Checkpointable offsets updated with restored offsets: {}", this.checkpointableOffsets);
for (final StateStore store : stores.values()) {
final String storeName = store.name();
// only checkpoint the offset to the offsets file if
@ -302,6 +312,9 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -302,6 +312,9 @@ public class ProcessorStateManager extends AbstractStateManager {
}
}
}
log.trace("Checkpointable offsets updated with active acked offsets: {}", this.checkpointableOffsets);
// write the checkpoint file before closing
if (checkpoint == null) {
checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -127,8 +127,6 @@ public class StandbyTask extends AbstractTask { @@ -127,8 +127,6 @@ public class StandbyTask extends AbstractTask {
* - {@link #commit()}
* - close state
* <pre>
* @param clean ignored by {@code StandbyTask} as it can always try to close cleanly
* (ie, commit, flush, and write checkpoint file)
* @param isZombie ignored by {@code StandbyTask} as it can never be a zombie
*/
@Override
@ -138,14 +136,12 @@ public class StandbyTask extends AbstractTask { @@ -138,14 +136,12 @@ public class StandbyTask extends AbstractTask {
return;
}
log.debug("Closing");
boolean committedSuccessfully = false;
try {
if (clean) {
commit();
committedSuccessfully = true;
}
} finally {
closeStateManager(committedSuccessfully);
closeStateManager(true);
}
taskClosed = true;

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java

@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore; @@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
interface StateManager extends Checkpointable {
File baseDir();
@ -41,7 +40,7 @@ interface StateManager extends Checkpointable { @@ -41,7 +40,7 @@ interface StateManager extends Checkpointable {
void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions,
final InternalProcessorContext processorContext);
void close(final Map<TopicPartition, Long> offsets) throws IOException;
void close(final boolean clean) throws IOException;
StateStore getGlobalStore(final String name);

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -195,6 +195,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -195,6 +195,8 @@ public class StoreChangelogReader implements ChangelogReader {
for (final TopicPartition partition : initialized) {
final StateRestorer restorer = stateRestorers.get(partition);
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
log.trace("Found checkpoint {} from changelog {} for store {}.", restorer.checkpoint(), partition, restorer.storeName());
restoreConsumer.seek(partition, restorer.checkpoint());
logRestoreOffsets(partition,
restorer.checkpoint(),
@ -202,6 +204,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -202,6 +204,8 @@ public class StoreChangelogReader implements ChangelogReader {
restorer.setStartingOffset(restoreConsumer.position(partition));
restorer.restoreStarted();
} else {
log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", partition, restorer.storeName());
restoreConsumer.seekToBeginning(Collections.singletonList(partition));
needsPositionUpdate.add(restorer);
}

14
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate; @@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@ -45,6 +46,7 @@ import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; @@ -45,6 +46,7 @@ import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -296,6 +298,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -296,6 +298,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
producer = producerSupplier.get();
producer.initTransactions();
recordCollector.init(producer);
if (stateMgr.checkpoint != null) {
try {
stateMgr.checkpoint.delete();
stateMgr.checkpoint = null;
} catch (final IOException e) {
throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e);
}
}
}
}
@ -567,6 +578,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -567,6 +578,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
commit(false);
} finally {
if (eosEnabled) {
stateMgr.checkpoint(activeTaskCheckpointableOffsets());
try {
recordCollector.close();
} catch (final ProducerFencedException e) {

28
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java

@ -371,22 +371,11 @@ public class GlobalStateManagerImplTest { @@ -371,22 +371,11 @@ public class GlobalStateManagerImplTest {
initializeConsumer(1, 0, t2);
stateManager.register(store2, stateRestoreCallback);
stateManager.close(Collections.emptyMap());
stateManager.close(true);
assertFalse(store1.isOpen());
assertFalse(store2.isOpen());
}
@Test
public void shouldWriteCheckpointsOnClose() throws IOException {
stateManager.initialize();
initializeConsumer(1, 0, t1);
stateManager.register(store1, stateRestoreCallback);
final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
stateManager.close(expected);
final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
assertEquals(expected, result);
}
@Test(expected = ProcessorStateException.class)
public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
stateManager.initialize();
@ -398,7 +387,7 @@ public class GlobalStateManagerImplTest { @@ -398,7 +387,7 @@ public class GlobalStateManagerImplTest {
}
}, stateRestoreCallback);
stateManager.close(Collections.emptyMap());
stateManager.close(true);
}
@Test
@ -415,7 +404,7 @@ public class GlobalStateManagerImplTest { @@ -415,7 +404,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
stateManager.initialize();
stateManager.close(Collections.emptyMap());
stateManager.close(true);
final StateDirectory stateDir = new StateDirectory(streamsConfig, new MockTime(), true);
try {
// should be able to get the lock now as it should've been released in close
@ -438,9 +427,9 @@ public class GlobalStateManagerImplTest { @@ -438,9 +427,9 @@ public class GlobalStateManagerImplTest {
super.close();
}
}, stateRestoreCallback);
stateManager.close(Collections.emptyMap());
stateManager.close(true);
stateManager.close(Collections.emptyMap());
stateManager.close(true);
}
@Test
@ -460,7 +449,7 @@ public class GlobalStateManagerImplTest { @@ -460,7 +449,7 @@ public class GlobalStateManagerImplTest {
stateManager.register(store2, stateRestoreCallback);
try {
stateManager.close(Collections.emptyMap());
stateManager.close(true);
} catch (final ProcessorStateException e) {
// expected
}
@ -539,7 +528,8 @@ public class GlobalStateManagerImplTest { @@ -539,7 +528,8 @@ public class GlobalStateManagerImplTest {
stateManager.initialize();
initializeConsumer(10, 0, t1);
stateManager.register(store1, stateRestoreCallback);
stateManager.close(Collections.emptyMap());
stateManager.checkpoint(Collections.emptyMap());
stateManager.close(true);
final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();
assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 10L)));
@ -551,7 +541,7 @@ public class GlobalStateManagerImplTest { @@ -551,7 +541,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize();
initializeConsumer(10, 0, t3);
stateManager.register(store3, stateRestoreCallback);
stateManager.close(Collections.emptyMap());
stateManager.close(true);
assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
}

5
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java

@ -204,15 +204,14 @@ public class GlobalStateTaskTest { @@ -204,15 +204,14 @@ public class GlobalStateTaskTest {
@Test
public void shouldCloseStateManagerWithOffsets() throws IOException {
public void shouldFlushStateManagerWithOffsets() throws IOException {
final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
expectedOffsets.put(t1, 52L);
expectedOffsets.put(t2, 100L);
globalStateTask.initialize();
globalStateTask.update(new ConsumerRecord<>(topic1, 1, 51, "foo".getBytes(), "foo".getBytes()));
globalStateTask.close();
globalStateTask.flushState();
assertEquals(expectedOffsets, stateMgr.checkpointed());
assertTrue(stateMgr.closed);
}
@Test

25
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

@ -130,7 +130,7 @@ public class ProcessorStateManagerTest { @@ -130,7 +130,7 @@ public class ProcessorStateManagerTest {
assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -153,7 +153,7 @@ public class ProcessorStateManagerTest { @@ -153,7 +153,7 @@ public class ProcessorStateManagerTest {
assertTrue(persistentStore.keys.contains(intKey));
assertEquals(9, persistentStore.values.get(0).length);
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -176,7 +176,7 @@ public class ProcessorStateManagerTest { @@ -176,7 +176,7 @@ public class ProcessorStateManagerTest {
assertTrue(persistentStore.keys.contains(intKey));
assertEquals(17, persistentStore.values.get(0).length);
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -204,7 +204,7 @@ public class ProcessorStateManagerTest { @@ -204,7 +204,7 @@ public class ProcessorStateManagerTest {
stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -231,7 +231,7 @@ public class ProcessorStateManagerTest { @@ -231,7 +231,7 @@ public class ProcessorStateManagerTest {
stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -292,7 +292,7 @@ public class ProcessorStateManagerTest { @@ -292,7 +292,7 @@ public class ProcessorStateManagerTest {
assertEquals(-1L, (long) changeLogOffsets.get(partition3));
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -315,7 +315,7 @@ public class ProcessorStateManagerTest { @@ -315,7 +315,7 @@ public class ProcessorStateManagerTest {
assertEquals(mockKeyValueStore, stateMgr.getStore(nonPersistentStoreName));
} finally {
stateMgr.close(Collections.emptyMap());
stateMgr.close(true);
}
}
@ -352,7 +352,8 @@ public class ProcessorStateManagerTest { @@ -352,7 +352,8 @@ public class ProcessorStateManagerTest {
} finally {
// close the state manager with the ack'ed offsets
stateMgr.flush();
stateMgr.close(ackedOffsets);
stateMgr.checkpoint(ackedOffsets);
stateMgr.close(true);
}
// make sure all stores are closed, and the checkpoint file is written.
assertTrue(persistentStore.flushed);
@ -398,7 +399,7 @@ public class ProcessorStateManagerTest { @@ -398,7 +399,7 @@ public class ProcessorStateManagerTest {
false,
logContext);
stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
stateMgr.close(null);
stateMgr.close(true);
final Map<TopicPartition, Long> read = checkpoint.read();
assertThat(read, equalTo(offsets));
}
@ -583,7 +584,7 @@ public class ProcessorStateManagerTest { @@ -583,7 +584,7 @@ public class ProcessorStateManagerTest {
stateManager.register(stateStore, stateStore.stateRestoreCallback);
try {
stateManager.close(Collections.emptyMap());
stateManager.close(true);
fail("Should throw ProcessorStateException if store close throws exception");
} catch (final ProcessorStateException e) {
// pass
@ -696,7 +697,7 @@ public class ProcessorStateManagerTest { @@ -696,7 +697,7 @@ public class ProcessorStateManagerTest {
stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
try {
stateManager.close(Collections.emptyMap());
stateManager.close(true);
} catch (final ProcessorStateException expected) { /* ignode */ }
Assert.assertTrue(closedStore.get());
}
@ -721,7 +722,7 @@ public class ProcessorStateManagerTest { @@ -721,7 +722,7 @@ public class ProcessorStateManagerTest {
assertFalse(checkpointFile.exists());
} finally {
if (stateManager != null) {
stateManager.close(null);
stateManager.close(true);
}
}
}

3
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java

@ -366,6 +366,7 @@ public class StandbyTaskTest { @@ -366,6 +366,7 @@ public class StandbyTaskTest {
singletonList(makeWindowedConsumerRecord(changelogName, 10, 1, 0L, 60_000L))
);
task.suspend();
task.closeStateManager(true);
final File taskDir = stateDirectory.directoryForTask(taskId);
@ -592,7 +593,7 @@ public class StandbyTaskTest { @@ -592,7 +593,7 @@ public class StandbyTaskTest {
}
@Override
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
void closeStateManager(final boolean clean) throws ProcessorStateException {
closedStateManager.set(true);
}
};

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java

@ -45,7 +45,7 @@ public class StateManagerStub implements StateManager { @@ -45,7 +45,7 @@ public class StateManagerStub implements StateManager {
public void flush() {}
@Override
public void close(final Map<TopicPartition, Long> offsets) throws IOException {}
public void close(final boolean clean) throws IOException {}
@Override
public StateStore getGlobalStore(final String name) {

2
streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java

@ -66,7 +66,7 @@ public class GlobalStateManagerStub implements GlobalStateManager { @@ -66,7 +66,7 @@ public class GlobalStateManagerStub implements GlobalStateManager {
public void flush() {}
@Override
public void close(final Map<TopicPartition, Long> offsets) throws IOException {
public void close(final boolean clean) throws IOException {
this.offsets.putAll(offsets);
closed = true;
}

Loading…
Cancel
Save