diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 0a1e715ccdc..30dd2cab2bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -47,10 +47,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHEC public class StateDirectory { private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+"); - - static final String LOCK_FILE_NAME = ".lock"; private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); + static final String LOCK_FILE_NAME = ".lock"; + private final Object taskDirCreationLock = new Object(); private final Time time; private final String appId; private final File stateDir; @@ -107,9 +107,17 @@ public class StateDirectory { */ public File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); - if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) { - throw new ProcessorStateException( - String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); + if (hasPersistentStores && !taskDir.exists()) { + synchronized (taskDirCreationLock) { + // to avoid a race condition, we need to check again if the directory does not exist: + // otherwise, two threads might pass the outer `if` (and enter the `then` block), + // one blocks on `synchronized` while the other creates the directory, + // and the blocking one fails when trying to create it after it's unblocked + if (!taskDir.exists() && !taskDir.mkdir()) { + throw new ProcessorStateException( + String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); + } + } } return taskDir; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index d1adbe9b125..1645ea82cf6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -34,6 +34,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Objects; @@ -41,6 +42,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -49,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -507,4 +510,51 @@ public class StateDirectoryTest { initializeStateDirectory(false); assertTrue(directory.lockGlobalState()); } + + @Test + public void shouldNotFailWhenCreatingTaskDirectoryInParallel() throws Exception { + final TaskId taskId = new TaskId(0, 0); + final AtomicBoolean passed = new AtomicBoolean(true); + + final CreateTaskDirRunner runner = new CreateTaskDirRunner(directory, taskId, passed); + + final Thread t1 = new Thread(runner); + final Thread t2 = new Thread(runner); + + t1.start(); + t2.start(); + + t1.join(Duration.ofMillis(500L).toMillis()); + t2.join(Duration.ofMillis(500L).toMillis()); + + assertNotNull(runner.taskDirectory); + assertTrue(passed.get()); + assertTrue(runner.taskDirectory.exists()); + assertTrue(runner.taskDirectory.isDirectory()); + } + + private static class CreateTaskDirRunner implements Runnable { + private final StateDirectory directory; + private final TaskId taskId; + private final AtomicBoolean passed; + + private File taskDirectory; + + private CreateTaskDirRunner(final StateDirectory directory, + final TaskId taskId, + final AtomicBoolean passed) { + this.directory = directory; + this.taskId = taskId; + this.passed = passed; + } + + @Override + public void run() { + try { + taskDirectory = directory.directoryForTask(taskId); + } catch (final ProcessorStateException error) { + passed.set(false); + } + } + } } \ No newline at end of file