diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
index ade866d6a07..f803edaf3b6 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -86,6 +86,7 @@
+
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index f648278c4c6..780cfa8925f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -88,6 +88,8 @@ import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
@@ -667,7 +669,7 @@ public final class QuorumController implements Controller {
// If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data.
- OptionalLong maybeOffset = purgatory.highestPendingOffset();
+ OptionalLong maybeOffset = deferredEventQueue.highestPendingOffset();
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can complete immediately.
@@ -726,7 +728,7 @@ public final class QuorumController implements Controller {
// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
- purgatory.add(resultAndOffset.offset(), this);
+ deferredEventQueue.add(resultAndOffset.offset(), this);
}
}
@@ -906,7 +908,7 @@ public final class QuorumController implements Controller {
log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch);
// Complete any events in the purgatory that were waiting for this offset.
- purgatory.completeUpTo(offset);
+ deferredEventQueue.completeUpTo(offset);
// The active controller can delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(offset);
@@ -1185,7 +1187,7 @@ public final class QuorumController implements Controller {
raftClient.resign(curClaimEpoch);
curClaimEpoch = -1;
controllerMetrics.setActive(false);
- purgatory.failAll(newNotControllerException());
+ deferredEventQueue.failAll(newNotControllerException());
if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
throw new RuntimeException("Unable to find last committed offset " +
@@ -1483,10 +1485,10 @@ public final class QuorumController implements Controller {
private final SnapshotRegistry snapshotRegistry;
/**
- * The purgatory which holds deferred operations which are waiting for the metadata
+ * The deferred event queue which holds deferred operations which are waiting for the metadata
* log's high water mark to advance. This must be accessed only by the event queue thread.
*/
- private final ControllerPurgatory purgatory;
+ private final DeferredEventQueue deferredEventQueue;
/**
* A predicate that returns information about whether a ConfigResource exists.
@@ -1684,7 +1686,7 @@ public final class QuorumController implements Controller {
this.time = time;
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
- this.purgatory = new ControllerPurgatory();
+ this.deferredEventQueue = new DeferredEventQueue();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).
diff --git a/metadata/src/main/java/org/apache/kafka/controller/DeferredEvent.java b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java
similarity index 88%
rename from metadata/src/main/java/org/apache/kafka/controller/DeferredEvent.java
rename to server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java
index e1606f35320..8a7e3a01c0d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/DeferredEvent.java
+++ b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.kafka.controller;
+package org.apache.kafka.deferred;
/**
- * Represents a deferred event in the controller purgatory.
+ * Represents a deferred event in the {{@link DeferredEventQueue}}.
*/
-interface DeferredEvent {
+public interface DeferredEvent {
/**
* Complete the event.
*
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
similarity index 78%
rename from metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
rename to server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
index 1add1083d32..b66d0a3db8e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
+++ b/server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.controller;
+package org.apache.kafka.deferred;
import java.util.ArrayList;
import java.util.Iterator;
@@ -25,11 +25,10 @@ import java.util.OptionalLong;
import java.util.TreeMap;
/**
- * The purgatory which holds events that have been started, but not yet completed.
- * We wait for the high water mark of the metadata log to advance before completing
- * them.
+ * The queue which holds deferred events that have been started, but not yet completed.
+ * We wait for the high watermark of the log to advance before completing them.
*/
-class ControllerPurgatory {
+public class DeferredEventQueue {
/**
* A map from log offsets to events. Each event will be completed once the log
* advances past its offset.
@@ -41,7 +40,7 @@ class ControllerPurgatory {
*
* @param offset The offset which the high water mark has advanced to.
*/
- void completeUpTo(long offset) {
+ public void completeUpTo(long offset) {
Iterator>> iter = pending.entrySet().iterator();
while (iter.hasNext()) {
Entry> entry = iter.next();
@@ -56,11 +55,11 @@ class ControllerPurgatory {
}
/**
- * Fail all the pending purgatory entries.
+ * Fail all deferred events with the provided exception.
*
* @param exception The exception to fail the entries with.
*/
- void failAll(Exception exception) {
+ public void failAll(Exception exception) {
Iterator>> iter = pending.entrySet().iterator();
while (iter.hasNext()) {
Entry> entry = iter.next();
@@ -72,18 +71,18 @@ class ControllerPurgatory {
}
/**
- * Add a new purgatory event.
+ * Add a new deferred event to be completed by the provided offset.
*
* @param offset The offset to add the new event at.
* @param event The new event.
*/
- void add(long offset, DeferredEvent event) {
+ public void add(long offset, DeferredEvent event) {
if (!pending.isEmpty()) {
long lastKey = pending.lastKey();
if (offset < lastKey) {
- throw new RuntimeException("There is already a purgatory event with " +
- "offset " + lastKey + ". We should not add one with an offset of " +
- offset + " which " + "is lower than that.");
+ throw new IllegalArgumentException("There is already a deferred event with " +
+ "offset " + lastKey + ". We should not add one with an offset of " +
+ offset + " which is lower than that.");
}
}
List events = pending.get(offset);
@@ -98,7 +97,7 @@ class ControllerPurgatory {
* Get the offset of the highest pending event, or empty if there are no pending
* events.
*/
- OptionalLong highestPendingOffset() {
+ public OptionalLong highestPendingOffset() {
if (pending.isEmpty()) {
return OptionalLong.empty();
} else {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
similarity index 71%
rename from metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
rename to server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
index 6eaf182a9a0..09ec0994694 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
+++ b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.kafka.controller;
+package org.apache.kafka.deferred;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -29,7 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
-public class ControllerPurgatoryTest {
+public class DeferredEventQueueTest {
static class SampleDeferredEvent implements DeferredEvent {
private final CompletableFuture future = new CompletableFuture<>();
@@ -50,48 +51,48 @@ public class ControllerPurgatoryTest {
@Test
public void testCompleteEvents() {
- ControllerPurgatory purgatory = new ControllerPurgatory();
+ DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
SampleDeferredEvent event1 = new SampleDeferredEvent();
SampleDeferredEvent event2 = new SampleDeferredEvent();
SampleDeferredEvent event3 = new SampleDeferredEvent();
- purgatory.add(1, event1);
- assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
- purgatory.add(1, event2);
- assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
- purgatory.add(3, event3);
- assertEquals(OptionalLong.of(3L), purgatory.highestPendingOffset());
- purgatory.completeUpTo(2);
+ deferredEventQueue.add(1, event1);
+ assertEquals(OptionalLong.of(1L), deferredEventQueue.highestPendingOffset());
+ deferredEventQueue.add(1, event2);
+ assertEquals(OptionalLong.of(1L), deferredEventQueue.highestPendingOffset());
+ deferredEventQueue.add(3, event3);
+ assertEquals(OptionalLong.of(3L), deferredEventQueue.highestPendingOffset());
+ deferredEventQueue.completeUpTo(2);
assertTrue(event1.future.isDone());
assertTrue(event2.future.isDone());
assertFalse(event3.future.isDone());
- purgatory.completeUpTo(4);
+ deferredEventQueue.completeUpTo(4);
assertTrue(event3.future.isDone());
- assertEquals(OptionalLong.empty(), purgatory.highestPendingOffset());
+ assertEquals(OptionalLong.empty(), deferredEventQueue.highestPendingOffset());
}
@Test
public void testFailOnIncorrectOrdering() {
- ControllerPurgatory purgatory = new ControllerPurgatory();
+ DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
SampleDeferredEvent event1 = new SampleDeferredEvent();
SampleDeferredEvent event2 = new SampleDeferredEvent();
- purgatory.add(2, event1);
- assertThrows(RuntimeException.class, () -> purgatory.add(1, event2));
+ deferredEventQueue.add(2, event1);
+ assertThrows(RuntimeException.class, () -> deferredEventQueue.add(1, event2));
}
@Test
public void testFailEvents() {
- ControllerPurgatory purgatory = new ControllerPurgatory();
+ DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
SampleDeferredEvent event1 = new SampleDeferredEvent();
SampleDeferredEvent event2 = new SampleDeferredEvent();
SampleDeferredEvent event3 = new SampleDeferredEvent();
- purgatory.add(1, event1);
- purgatory.add(3, event2);
- purgatory.add(3, event3);
- purgatory.completeUpTo(2);
+ deferredEventQueue.add(1, event1);
+ deferredEventQueue.add(3, event2);
+ deferredEventQueue.add(3, event3);
+ deferredEventQueue.completeUpTo(2);
assertTrue(event1.future.isDone());
assertFalse(event2.future.isDone());
assertFalse(event3.future.isDone());
- purgatory.failAll(new RuntimeException("failed"));
+ deferredEventQueue.failAll(new RuntimeException("failed"));
assertTrue(event2.future.isDone());
assertTrue(event3.future.isDone());
assertEquals(RuntimeException.class, assertThrows(ExecutionException.class,