Browse Source

KAFKA-4233: StateDirectory fails to create directory if any parent directory does not exist

Change the creation of the directories, in the StateDirectory constructor, to use mkdirs so any parents get created. Throw an exception if the directory doesn't exist and couldn't be created

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll, Eno Thereska, Guozhang Wang

Closes #1942 from dguy/kafka-4233
pull/1904/merge
Damian Guy 8 years ago committed by Guozhang Wang
parent
commit
5f3746d135
  1. 16
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
  2. 10
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

16
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,12 +48,14 @@ public class StateDirectory { @@ -47,12 +48,14 @@ public class StateDirectory {
public StateDirectory(final String applicationId, final String stateDirConfig) {
final File baseDir = new File(stateDirConfig);
if (!baseDir.exists()) {
baseDir.mkdir();
if (!baseDir.exists() && !baseDir.mkdirs()) {
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
stateDirConfig));
}
stateDir = new File(baseDir, applicationId);
if (!stateDir.exists()) {
stateDir.mkdir();
if (!stateDir.exists() && !stateDir.mkdir()) {
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
stateDir.getPath()));
}
}
@ -64,8 +67,9 @@ public class StateDirectory { @@ -64,8 +67,9 @@ public class StateDirectory {
*/
public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (!taskDir.exists()) {
taskDir.mkdir();
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created",
taskDir.getPath()));
}
return taskDir;
}

10
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

@ -166,4 +166,14 @@ public class StateDirectoryTest { @@ -166,4 +166,14 @@ public class StateDirectoryTest {
assertTrue(dirs.contains(taskDir2));
}
@Test
public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception {
final File tempDir = TestUtils.tempDirectory();
final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath());
final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
assertTrue(stateDir.exists());
assertTrue(taskDir.exists());
}
}
Loading…
Cancel
Save