Browse Source

KAFKA-5937: Improve ProcessorStateManager exception handling

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ted Yu <yuzhihong@gmail.com>, Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3913 from mjsax/kafka-5937-exceptions-processor-state-manager
pull/3913/merge
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
402aa093db
  1. 47
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  2. 36
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
  3. 102
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

47
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
@ -67,8 +66,7 @@ public class ProcessorStateManager implements StateManager { @@ -67,8 +66,7 @@ public class ProcessorStateManager implements StateManager {
private OffsetCheckpoint checkpoint;
/**
* @throws LockException if the state directory cannot be locked because another thread holds the lock
* (this might be recoverable by retrying)
* @throws ProcessorStateException if the task directory does not exist and could not be created
* @throws IOException if any severe error happens while creating or locking the state directory
*/
public ProcessorStateManager(final TaskId taskId,
@ -96,15 +94,7 @@ public class ProcessorStateManager implements StateManager { @@ -96,15 +94,7 @@ public class ProcessorStateManager implements StateManager {
restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
// get a handle on the parent/base directory of the task directory
// note that the parent directory could have been accidentally deleted here,
// so catch that exception if that is the case
try {
baseDir = stateDirectory.directoryForTask(taskId);
} catch (final ProcessorStateException e) {
throw new LockException(String.format("%sFailed to get the directory for task %s. Exception %s",
logPrefix, taskId, e));
}
baseDir = stateDirectory.directoryForTask(taskId);
// load the checkpoint information
checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
@ -256,40 +246,49 @@ public class ProcessorStateManager implements StateManager { @@ -256,40 +246,49 @@ public class ProcessorStateManager implements StateManager {
@Override
public void flush() {
ProcessorStateException firstException = null;
// attempting to flush the stores
if (!stores.isEmpty()) {
log.debug("Flushing all stores registered in the state manager");
for (final StateStore store : stores.values()) {
log.trace("Flushing store {}", store.name());
try {
log.trace("Flushing store={}", store.name());
store.flush();
} catch (final Exception e) {
throw new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e);
if (firstException == null) {
firstException = new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e);
}
log.error("Failed to flush state store {}: ", store.name(), e);
}
}
}
if (firstException != null) {
throw firstException;
}
}
/**
* {@link StateStore#close() Close} all stores (even in case of failure).
* Re-throw the first
* Log all exception and re-throw the first exception that did occur at the end.
* @throws ProcessorStateException if any error happens when closing the state stores
*/
@Override
public void close(final Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException {
RuntimeException firstException = null;
ProcessorStateException firstException = null;
// attempting to close the stores, just in case they
// are not closed by a ProcessorNode yet
if (!stores.isEmpty()) {
log.debug("Closing its state manager and all the registered state stores");
for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
log.debug("Closing storage engine {}", entry.getKey());
for (final StateStore store : stores.values()) {
log.debug("Closing storage engine {}", store.name());
try {
entry.getValue().close();
store.close();
} catch (final Exception e) {
if (firstException == null) {
firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, entry.getKey()), e);
firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, store.name()), e);
}
log.error("Failed to close state store {}: ", entry.getKey(), e);
log.error("Failed to close state store {}: ", store.name(), e);
}
}
@ -309,11 +308,11 @@ public class ProcessorStateManager implements StateManager { @@ -309,11 +308,11 @@ public class ProcessorStateManager implements StateManager {
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
log.trace("Writing checkpoint: {}", ackedOffsets);
checkpointedOffsets.putAll(changelogReader.restoredOffsets());
for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
final String storeName = entry.getKey();
for (final StateStore store : stores.values()) {
final String storeName = store.name();
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) {
if (store.persistent() && storeToChangelogTopic.containsKey(storeName)) {
final String changelogTopic = storeToChangelogTopic.get(storeName);
final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
if (ackedOffsets.containsKey(topicPartition)) {

36
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

@ -62,39 +62,50 @@ public class StateDirectory { @@ -62,39 +62,50 @@ public class StateDirectory {
}
}
/**
* Ensures that the state base directory as well as the application's sub-directory are created.
*
* @throws ProcessorStateException if the base state directory or application state directory does not exist
* and could not be created
*/
public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
this.time = time;
final File baseDir = new File(stateDirConfig);
if (!baseDir.exists() && !baseDir.mkdirs()) {
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
stateDirConfig));
throw new ProcessorStateException(
String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirConfig));
}
stateDir = new File(baseDir, applicationId);
if (!stateDir.exists() && !stateDir.mkdir()) {
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
stateDir.getPath()));
throw new ProcessorStateException(
String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
}
}
/**
* Get or create the directory for the {@link TaskId}
* @param taskId
* Get or create the directory for the provided {@link TaskId}.
* @return directory for the {@link TaskId}
* @throws ProcessorStateException if the task directory does not exists and could not be created
*/
File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created",
taskDir.getPath()));
throw new ProcessorStateException(
String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
}
return taskDir;
}
/**
* Get or create the directory for the global stores.
* @return directory for the global stores
* @throws ProcessorStateException if the global store directory does not exists and could not be created
*/
File globalStateDir() {
final File dir = new File(stateDir, "global");
if (!dir.exists() && !dir.mkdir()) {
throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created",
dir.getPath()));
throw new ProcessorStateException(
String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
}
return dir;
}
@ -102,6 +113,7 @@ public class StateDirectory { @@ -102,6 +113,7 @@ public class StateDirectory {
private String logPrefix() {
return String.format("stream-thread [%s]", Thread.currentThread().getName());
}
/**
* Get the lock for the {@link TaskId}s directory if it is available
* @param taskId
@ -192,9 +204,7 @@ public class StateDirectory { @@ -192,9 +204,7 @@ public class StateDirectory {
}
/**
* Unlock the state directory for the given {@link TaskId}
* @param taskId
* @throws IOException
* Unlock the state directory for the given {@link TaskId}.
*/
synchronized void unlock(final TaskId taskId) throws IOException {
final LockAndOwner lockAndOwner = locks.get(taskId);

102
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

@ -31,6 +31,7 @@ import org.apache.kafka.test.MockChangelogReader; @@ -31,6 +31,7 @@ import org.apache.kafka.test.MockChangelogReader;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -41,6 +42,7 @@ import java.util.Collections; @@ -41,6 +42,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -79,7 +81,6 @@ public class ProcessorStateManagerTest { @@ -79,7 +81,6 @@ public class ProcessorStateManagerTest {
private OffsetCheckpoint checkpoint;
private StateDirectory stateDirectory;
@Before
public void setup() {
baseDir = TestUtils.tempDirectory();
@ -486,6 +487,35 @@ public class ProcessorStateManagerTest { @@ -486,6 +487,35 @@ public class ProcessorStateManagerTest {
}
@Test
public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() throws IOException {
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Collections.singleton(changelogTopicPartition),
false,
stateDirectory,
Collections.singletonMap(storeName, changelogTopic),
changelogReader,
false,
logContext);
final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
@Override
public void flush() {
throw new RuntimeException("KABOOM!");
}
};
stateManager.register(stateStore, false, stateStore.stateRestoreCallback);
try {
stateManager.flush();
fail("Should throw ProcessorStateException if store flush throws exception");
} catch (final ProcessorStateException e) {
// pass
}
}
@Test
public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException {
@ -515,6 +545,76 @@ public class ProcessorStateManagerTest { @@ -515,6 +545,76 @@ public class ProcessorStateManagerTest {
}
}
@Test
public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Collections.singleton(changelogTopicPartition),
false,
stateDirectory,
Collections.singletonMap(storeName, changelogTopic),
changelogReader,
false,
logContext);
final AtomicBoolean flushedStore = new AtomicBoolean(false);
final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) {
@Override
public void flush() {
throw new RuntimeException("KABOOM!");
}
};
final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) {
@Override
public void flush() {
flushedStore.set(true);
}
};
stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback);
stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback);
try {
stateManager.flush();
} catch (final ProcessorStateException expected) { /* ignode */ }
Assert.assertTrue(flushedStore.get());
}
@Test
public void shouldCloseAllStoresEvenIfStoreThrowsExcepiton() throws IOException {
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Collections.singleton(changelogTopicPartition),
false,
stateDirectory,
Collections.singletonMap(storeName, changelogTopic),
changelogReader,
false,
logContext);
final AtomicBoolean closedStore = new AtomicBoolean(false);
final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) {
@Override
public void close() {
throw new RuntimeException("KABOOM!");
}
};
final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) {
@Override
public void close() {
closedStore.set(true);
}
};
stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback);
stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback);
try {
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
} catch (final ProcessorStateException expected) { /* ignode */ }
Assert.assertTrue(closedStore.get());
}
@Test
public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
checkpoint.write(Collections.<TopicPartition, Long>emptyMap());

Loading…
Cancel
Save