diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 02abdeb4235..3048fba355a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +48,14 @@ public class StateDirectory { public StateDirectory(final String applicationId, final String stateDirConfig) { final File baseDir = new File(stateDirConfig); - if (!baseDir.exists()) { - baseDir.mkdir(); + if (!baseDir.exists() && !baseDir.mkdirs()) { + throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", + stateDirConfig)); } stateDir = new File(baseDir, applicationId); - if (!stateDir.exists()) { - stateDir.mkdir(); + if (!stateDir.exists() && !stateDir.mkdir()) { + throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", + stateDir.getPath())); } } @@ -64,8 +67,9 @@ public class StateDirectory { */ public File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); - if (!taskDir.exists()) { - taskDir.mkdir(); + if (!taskDir.exists() && !taskDir.mkdir()) { + throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", + taskDir.getPath())); } return taskDir; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index c17e7bc6f75..6fc855cbb06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -166,4 +166,14 @@ public class StateDirectoryTest { assertTrue(dirs.contains(taskDir2)); } + @Test + public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception { + final File tempDir = TestUtils.tempDirectory(); + final File stateDir = new File(new File(tempDir, "foo"), "state-dir"); + final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath()); + final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0)); + assertTrue(stateDir.exists()); + assertTrue(taskDir.exists()); + } + } \ No newline at end of file