From 5f3746d135697f364aaacf877ce288267d00b9a2 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 30 Sep 2016 11:55:32 -0700 Subject: [PATCH] KAFKA-4233: StateDirectory fails to create directory if any parent directory does not exist Change the creation of the directories, in the StateDirectory constructor, to use mkdirs so any parents get created. Throw an exception if the directory doesn't exist and couldn't be created Author: Damian Guy Reviewers: Michael G. Noll, Eno Thereska, Guozhang Wang Closes #1942 from dguy/kafka-4233 --- .../processor/internals/StateDirectory.java | 16 ++++++++++------ .../processor/internals/StateDirectoryTest.java | 10 ++++++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 02abdeb4235..3048fba355a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +48,14 @@ public class StateDirectory { public StateDirectory(final String applicationId, final String stateDirConfig) { final File baseDir = new File(stateDirConfig); - if (!baseDir.exists()) { - baseDir.mkdir(); + if (!baseDir.exists() && !baseDir.mkdirs()) { + throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", + stateDirConfig)); } stateDir = new File(baseDir, applicationId); - if (!stateDir.exists()) { - stateDir.mkdir(); + if (!stateDir.exists() && !stateDir.mkdir()) { + throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", + stateDir.getPath())); } } @@ -64,8 +67,9 @@ public class StateDirectory { */ public File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); - if (!taskDir.exists()) { - taskDir.mkdir(); + if (!taskDir.exists() && !taskDir.mkdir()) { + throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", + taskDir.getPath())); } return taskDir; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index c17e7bc6f75..6fc855cbb06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -166,4 +166,14 @@ public class StateDirectoryTest { assertTrue(dirs.contains(taskDir2)); } + @Test + public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception { + final File tempDir = TestUtils.tempDirectory(); + final File stateDir = new File(new File(tempDir, "foo"), "state-dir"); + final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath()); + final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0)); + assertTrue(stateDir.exists()); + assertTrue(taskDir.exists()); + } + } \ No newline at end of file