Browse Source

KAFKA-10262: Ensure that creating task directory is thread safe (#9010)

Reviewers: A. Sophie Blee-Goldman <sohpie@confluent.io>, John Roesler <john@confluent.io>
pull/8449/merge
Matthias J. Sax 4 years ago committed by GitHub
parent
commit
f2db8d5318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
  2. 50
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

18
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

@ -47,10 +47,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHEC @@ -47,10 +47,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHEC
public class StateDirectory {
private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
static final String LOCK_FILE_NAME = ".lock";
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
static final String LOCK_FILE_NAME = ".lock";
private final Object taskDirCreationLock = new Object();
private final Time time;
private final String appId;
private final File stateDir;
@ -107,9 +107,17 @@ public class StateDirectory { @@ -107,9 +107,17 @@ public class StateDirectory {
*/
public File directoryForTask(final TaskId taskId) {
final File taskDir = new File(stateDir, taskId.toString());
if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
if (hasPersistentStores && !taskDir.exists()) {
synchronized (taskDirCreationLock) {
// to avoid a race condition, we need to check again if the directory does not exist:
// otherwise, two threads might pass the outer `if` (and enter the `then` block),
// one blocks on `synchronized` while the other creates the directory,
// and the blocking one fails when trying to create it after it's unblocked
if (!taskDir.exists() && !taskDir.mkdir()) {
throw new ProcessorStateException(
String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
}
}
}
return taskDir;
}

50
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java

@ -34,6 +34,7 @@ import java.nio.channels.FileChannel; @@ -34,6 +34,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
@ -41,6 +42,7 @@ import java.util.Properties; @@ -41,6 +42,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -49,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F @@ -49,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@ -507,4 +510,51 @@ public class StateDirectoryTest { @@ -507,4 +510,51 @@ public class StateDirectoryTest {
initializeStateDirectory(false);
assertTrue(directory.lockGlobalState());
}
@Test
public void shouldNotFailWhenCreatingTaskDirectoryInParallel() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final AtomicBoolean passed = new AtomicBoolean(true);
final CreateTaskDirRunner runner = new CreateTaskDirRunner(directory, taskId, passed);
final Thread t1 = new Thread(runner);
final Thread t2 = new Thread(runner);
t1.start();
t2.start();
t1.join(Duration.ofMillis(500L).toMillis());
t2.join(Duration.ofMillis(500L).toMillis());
assertNotNull(runner.taskDirectory);
assertTrue(passed.get());
assertTrue(runner.taskDirectory.exists());
assertTrue(runner.taskDirectory.isDirectory());
}
private static class CreateTaskDirRunner implements Runnable {
private final StateDirectory directory;
private final TaskId taskId;
private final AtomicBoolean passed;
private File taskDirectory;
private CreateTaskDirRunner(final StateDirectory directory,
final TaskId taskId,
final AtomicBoolean passed) {
this.directory = directory;
this.taskId = taskId;
this.passed = passed;
}
@Override
public void run() {
try {
taskDirectory = directory.directoryForTask(taskId);
} catch (final ProcessorStateException error) {
passed.set(false);
}
}
}
}
Loading…
Cancel
Save