Browse Source

MINOR: Move `ControllerPurgatory` to `server-common` (#13555)

This patch renames from `ControllerPurgatory` to `DeferredEventQueue` and moves it from the `metadata` module to `server-common` module.

Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
pull/13538/head
David Jacot 2 years ago committed by GitHub
parent
commit
2d0b816150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      checkstyle/import-control-metadata.xml
  2. 16
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  3. 6
      server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java
  4. 27
      server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java
  5. 43
      server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java

1
checkstyle/import-control-metadata.xml

@ -86,6 +86,7 @@ @@ -86,6 +86,7 @@
<allow pkg="org.apache.kafka.metadata.authorizer" />
<allow pkg="org.apache.kafka.metadata.migration" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />

16
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -88,6 +88,8 @@ import org.apache.kafka.metadata.migration.ZkMigrationState; @@ -88,6 +88,8 @@ import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
@ -667,7 +669,7 @@ public final class QuorumController implements Controller { @@ -667,7 +669,7 @@ public final class QuorumController implements Controller {
// If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data.
OptionalLong maybeOffset = purgatory.highestPendingOffset();
OptionalLong maybeOffset = deferredEventQueue.highestPendingOffset();
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can complete immediately.
@ -726,7 +728,7 @@ public final class QuorumController implements Controller { @@ -726,7 +728,7 @@ public final class QuorumController implements Controller {
// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
purgatory.add(resultAndOffset.offset(), this);
deferredEventQueue.add(resultAndOffset.offset(), this);
}
}
@ -906,7 +908,7 @@ public final class QuorumController implements Controller { @@ -906,7 +908,7 @@ public final class QuorumController implements Controller {
log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch);
// Complete any events in the purgatory that were waiting for this offset.
purgatory.completeUpTo(offset);
deferredEventQueue.completeUpTo(offset);
// The active controller can delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(offset);
@ -1185,7 +1187,7 @@ public final class QuorumController implements Controller { @@ -1185,7 +1187,7 @@ public final class QuorumController implements Controller {
raftClient.resign(curClaimEpoch);
curClaimEpoch = -1;
controllerMetrics.setActive(false);
purgatory.failAll(newNotControllerException());
deferredEventQueue.failAll(newNotControllerException());
if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
throw new RuntimeException("Unable to find last committed offset " +
@ -1483,10 +1485,10 @@ public final class QuorumController implements Controller { @@ -1483,10 +1485,10 @@ public final class QuorumController implements Controller {
private final SnapshotRegistry snapshotRegistry;
/**
* The purgatory which holds deferred operations which are waiting for the metadata
* The deferred event queue which holds deferred operations which are waiting for the metadata
* log's high water mark to advance. This must be accessed only by the event queue thread.
*/
private final ControllerPurgatory purgatory;
private final DeferredEventQueue deferredEventQueue;
/**
* A predicate that returns information about whether a ConfigResource exists.
@ -1684,7 +1686,7 @@ public final class QuorumController implements Controller { @@ -1684,7 +1686,7 @@ public final class QuorumController implements Controller {
this.time = time;
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.purgatory = new ControllerPurgatory();
this.deferredEventQueue = new DeferredEventQueue();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).

6
metadata/src/main/java/org/apache/kafka/controller/DeferredEvent.java → server-common/src/main/java/org/apache/kafka/deferred/DeferredEvent.java

@ -15,12 +15,12 @@ @@ -15,12 +15,12 @@
* limitations under the License.
*/
package org.apache.kafka.controller;
package org.apache.kafka.deferred;
/**
* Represents a deferred event in the controller purgatory.
* Represents a deferred event in the {{@link DeferredEventQueue}}.
*/
interface DeferredEvent {
public interface DeferredEvent {
/**
* Complete the event.
*

27
metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java → server-common/src/main/java/org/apache/kafka/deferred/DeferredEventQueue.java

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.controller;
package org.apache.kafka.deferred;
import java.util.ArrayList;
import java.util.Iterator;
@ -25,11 +25,10 @@ import java.util.OptionalLong; @@ -25,11 +25,10 @@ import java.util.OptionalLong;
import java.util.TreeMap;
/**
* The purgatory which holds events that have been started, but not yet completed.
* We wait for the high water mark of the metadata log to advance before completing
* them.
* The queue which holds deferred events that have been started, but not yet completed.
* We wait for the high watermark of the log to advance before completing them.
*/
class ControllerPurgatory {
public class DeferredEventQueue {
/**
* A map from log offsets to events. Each event will be completed once the log
* advances past its offset.
@ -41,7 +40,7 @@ class ControllerPurgatory { @@ -41,7 +40,7 @@ class ControllerPurgatory {
*
* @param offset The offset which the high water mark has advanced to.
*/
void completeUpTo(long offset) {
public void completeUpTo(long offset) {
Iterator<Entry<Long, List<DeferredEvent>>> iter = pending.entrySet().iterator();
while (iter.hasNext()) {
Entry<Long, List<DeferredEvent>> entry = iter.next();
@ -56,11 +55,11 @@ class ControllerPurgatory { @@ -56,11 +55,11 @@ class ControllerPurgatory {
}
/**
* Fail all the pending purgatory entries.
* Fail all deferred events with the provided exception.
*
* @param exception The exception to fail the entries with.
*/
void failAll(Exception exception) {
public void failAll(Exception exception) {
Iterator<Entry<Long, List<DeferredEvent>>> iter = pending.entrySet().iterator();
while (iter.hasNext()) {
Entry<Long, List<DeferredEvent>> entry = iter.next();
@ -72,18 +71,18 @@ class ControllerPurgatory { @@ -72,18 +71,18 @@ class ControllerPurgatory {
}
/**
* Add a new purgatory event.
* Add a new deferred event to be completed by the provided offset.
*
* @param offset The offset to add the new event at.
* @param event The new event.
*/
void add(long offset, DeferredEvent event) {
public void add(long offset, DeferredEvent event) {
if (!pending.isEmpty()) {
long lastKey = pending.lastKey();
if (offset < lastKey) {
throw new RuntimeException("There is already a purgatory event with " +
"offset " + lastKey + ". We should not add one with an offset of " +
offset + " which " + "is lower than that.");
throw new IllegalArgumentException("There is already a deferred event with " +
"offset " + lastKey + ". We should not add one with an offset of " +
offset + " which is lower than that.");
}
}
List<DeferredEvent> events = pending.get(offset);
@ -98,7 +97,7 @@ class ControllerPurgatory { @@ -98,7 +97,7 @@ class ControllerPurgatory {
* Get the offset of the highest pending event, or empty if there are no pending
* events.
*/
OptionalLong highestPendingOffset() {
public OptionalLong highestPendingOffset() {
if (pending.isEmpty()) {
return OptionalLong.empty();
} else {

43
metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java → server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java

@ -15,11 +15,12 @@ @@ -15,11 +15,12 @@
* limitations under the License.
*/
package org.apache.kafka.controller;
package org.apache.kafka.deferred;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -29,7 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; @@ -29,7 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class ControllerPurgatoryTest {
public class DeferredEventQueueTest {
static class SampleDeferredEvent implements DeferredEvent {
private final CompletableFuture<Void> future = new CompletableFuture<>();
@ -50,48 +51,48 @@ public class ControllerPurgatoryTest { @@ -50,48 +51,48 @@ public class ControllerPurgatoryTest {
@Test
public void testCompleteEvents() {
ControllerPurgatory purgatory = new ControllerPurgatory();
DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
SampleDeferredEvent event1 = new SampleDeferredEvent();
SampleDeferredEvent event2 = new SampleDeferredEvent();
SampleDeferredEvent event3 = new SampleDeferredEvent();
purgatory.add(1, event1);
assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
purgatory.add(1, event2);
assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
purgatory.add(3, event3);
assertEquals(OptionalLong.of(3L), purgatory.highestPendingOffset());
purgatory.completeUpTo(2);
deferredEventQueue.add(1, event1);
assertEquals(OptionalLong.of(1L), deferredEventQueue.highestPendingOffset());
deferredEventQueue.add(1, event2);
assertEquals(OptionalLong.of(1L), deferredEventQueue.highestPendingOffset());
deferredEventQueue.add(3, event3);
assertEquals(OptionalLong.of(3L), deferredEventQueue.highestPendingOffset());
deferredEventQueue.completeUpTo(2);
assertTrue(event1.future.isDone());
assertTrue(event2.future.isDone());
assertFalse(event3.future.isDone());
purgatory.completeUpTo(4);
deferredEventQueue.completeUpTo(4);
assertTrue(event3.future.isDone());
assertEquals(OptionalLong.empty(), purgatory.highestPendingOffset());
assertEquals(OptionalLong.empty(), deferredEventQueue.highestPendingOffset());
}
@Test
public void testFailOnIncorrectOrdering() {
ControllerPurgatory purgatory = new ControllerPurgatory();
DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
SampleDeferredEvent event1 = new SampleDeferredEvent();
SampleDeferredEvent event2 = new SampleDeferredEvent();
purgatory.add(2, event1);
assertThrows(RuntimeException.class, () -> purgatory.add(1, event2));
deferredEventQueue.add(2, event1);
assertThrows(RuntimeException.class, () -> deferredEventQueue.add(1, event2));
}
@Test
public void testFailEvents() {
ControllerPurgatory purgatory = new ControllerPurgatory();
DeferredEventQueue deferredEventQueue = new DeferredEventQueue();
SampleDeferredEvent event1 = new SampleDeferredEvent();
SampleDeferredEvent event2 = new SampleDeferredEvent();
SampleDeferredEvent event3 = new SampleDeferredEvent();
purgatory.add(1, event1);
purgatory.add(3, event2);
purgatory.add(3, event3);
purgatory.completeUpTo(2);
deferredEventQueue.add(1, event1);
deferredEventQueue.add(3, event2);
deferredEventQueue.add(3, event3);
deferredEventQueue.completeUpTo(2);
assertTrue(event1.future.isDone());
assertFalse(event2.future.isDone());
assertFalse(event3.future.isDone());
purgatory.failAll(new RuntimeException("failed"));
deferredEventQueue.failAll(new RuntimeException("failed"));
assertTrue(event2.future.isDone());
assertTrue(event3.future.isDone());
assertEquals(RuntimeException.class, assertThrows(ExecutionException.class,
Loading…
Cancel
Save