Browse Source

KAFKA-14538: Implement KRaft metadata transactions in QuorumController

Implement the QuorumController side of KRaft metadata transactions.

As specified in KIP-868, this PR creates a new metadata version, IBP_3_6_IV1, which contains the
three new records: AbortTransactionRecord, BeginTransactionRecord, EndTransactionRecord.

In order to make offset management unit-testable, this PR moves it out of QuorumController.java and
into OffsetControlManager.java. The general approach here is to track the "last stable offset," which is
calculated by looking at the latest committed offset and the in-progress transaction (if any). When
a transaction is aborted, we revert back to this last stable offset. We also revert back to it when
the controller is transitioning from active to inactive.

In a follow-up PR, we will add support for the transaction records in MetadataLoader. We will also
add support for automatically aborting pending transactions after a controller failover.

Reviewers: David Arthur <mumrah@gmail.com>
pull/14209/head
Colin Patrick McCabe 1 year ago committed by GitHub
parent
commit
adc16d0f31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
  2. 2
      core/src/test/java/kafka/test/annotation/ClusterTest.java
  3. 421
      metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
  4. 219
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  5. 27
      metadata/src/main/resources/common/metadata/AbortTransactionRecord.json
  6. 27
      metadata/src/main/resources/common/metadata/BeginTransactionRecord.json
  7. 24
      metadata/src/main/resources/common/metadata/EndTransactionRecord.json
  8. 270
      metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java
  9. 2
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
  10. 2
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
  11. 6
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  12. 9
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
  13. 1
      server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
  14. 56
      server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java
  15. 4
      tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

2
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java

@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest { @@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_6_IV0, config.metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_3_6_IV1, config.metadataVersion());
}
}

2
core/src/test/java/kafka/test/annotation/ClusterTest.java

@ -41,6 +41,6 @@ public @interface ClusterTest { @@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV0;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV1;
ClusterConfigProperty[] serverProperties() default {};
}

421
metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java

@ -0,0 +1,421 @@ @@ -0,0 +1,421 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
import java.util.Optional;
/**
* Manages read and write offsets, and in-memory snapshots.
*
* Also manages the following metrics:
* kafka.controller:type=KafkaController,name=ActiveControllerCount
* kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
* kafka.controller:type=KafkaController,name=LastAppliedRecordOffset
* kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp
* kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
*/
class OffsetControlManager {
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private QuorumControllerMetrics metrics = null;
private Time time = Time.SYSTEM;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder setMetrics(QuorumControllerMetrics metrics) {
this.metrics = metrics;
return this;
}
Builder setTime(Time time) {
this.time = time;
return this;
}
public OffsetControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (metrics == null) {
metrics = new QuorumControllerMetrics(Optional.empty(), time, false);
}
return new OffsetControlManager(logContext,
snapshotRegistry,
metrics,
time);
}
}
/**
* The slf4j logger.
*/
private final Logger log;
/**
* The snapshot registry.
*/
private final SnapshotRegistry snapshotRegistry;
/**
* The quorum controller metrics.
*/
private final QuorumControllerMetrics metrics;
/**
* The clock.
*/
private final Time time;
/**
* The ID of the snapshot that we're currently replaying, or null if there is none.
*/
private OffsetAndEpoch currentSnapshotId;
/**
* The name of the snapshot that we're currently replaying, or null if there is none.
*/
private String currentSnapshotName;
/**
* The latest committed offset.
*/
private long lastCommittedOffset;
/**
* The latest committed epoch.
*/
private int lastCommittedEpoch;
/**
* The latest offset that it is safe to read from.
*/
private long lastStableOffset;
/**
* The offset of the transaction we're in, or -1 if we are not in one.
*/
private long transactionStartOffset;
/**
* The next offset we should write to, or -1 if the controller is not active. Exclusive offset.
*/
private long nextWriteOffset;
private OffsetControlManager(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
QuorumControllerMetrics metrics,
Time time
) {
this.log = logContext.logger(OffsetControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.metrics = metrics;
this.time = time;
this.currentSnapshotId = null;
this.currentSnapshotName = null;
this.lastCommittedOffset = -1L;
this.lastCommittedEpoch = -1;
this.lastStableOffset = -1L;
this.transactionStartOffset = -1L;
this.nextWriteOffset = -1L;
snapshotRegistry.getOrCreateSnapshot(-1L);
metrics.setActive(false);
metrics.setLastCommittedRecordOffset(-1L);
metrics.setLastAppliedRecordOffset(-1L);
metrics.setLastAppliedRecordTimestamp(-1L);
}
/**
* @return The SnapshotRegistry used by this offset control manager.
*/
SnapshotRegistry snapshotRegistry() {
return snapshotRegistry;
}
/**
* @return QuorumControllerMetrics managed by this offset control manager.
*/
QuorumControllerMetrics metrics() {
return metrics;
}
/**
* @return the ID of the current snapshot.
*/
OffsetAndEpoch currentSnapshotId() {
return currentSnapshotId;
}
/**
* @return the name of the current snapshot.
*/
String currentSnapshotName() {
return currentSnapshotName;
}
/**
* @return the last committed offset.
*/
long lastCommittedOffset() {
return lastCommittedOffset;
}
/**
* @return the last committed epoch.
*/
int lastCommittedEpoch() {
return lastCommittedEpoch;
}
/**
* @return the latest offset that it is safe to read from.
*/
long lastStableOffset() {
return lastStableOffset;
}
/**
* @return the transaction start offset, or -1 if there is no transaction.
*/
long transactionStartOffset() {
return transactionStartOffset;
}
/**
* @return the next offset that the active controller should write to.
*/
long nextWriteOffset() {
return nextWriteOffset;
}
/**
* @return true only if the manager is active.
*/
boolean active() {
return nextWriteOffset != -1L;
}
/**
* Called when the QuorumController becomes active.
*
* @param newNextWriteOffset The new next write offset to use. Must be non-negative.
*/
void activate(long newNextWriteOffset) {
if (active()) {
throw new RuntimeException("Can't activate already active OffsetControlManager.");
}
if (newNextWriteOffset < 0) {
throw new RuntimeException("Invalid negative newNextWriteOffset " +
newNextWriteOffset + ".");
}
// Before switching to active, create an in-memory snapshot at the last committed
// offset. This is required because the active controller assumes that there is always
// an in-memory snapshot at the last committed offset.
snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
this.nextWriteOffset = newNextWriteOffset;
metrics.setActive(true);
}
/**
* Called when the QuorumController becomes inactive.
*/
void deactivate() {
if (!active()) {
throw new RuntimeException("Can't deactivate inactive OffsetControlManager.");
}
metrics.setActive(false);
metrics.setLastAppliedRecordOffset(lastStableOffset);
this.nextWriteOffset = -1L;
if (!snapshotRegistry.hasSnapshot(lastStableOffset)) {
throw new RuntimeException("Unable to reset to last stable offset " + lastStableOffset +
". No in-memory snapshot found for this offset.");
}
snapshotRegistry.revertToSnapshot(lastStableOffset);
}
/**
* Handle the callback from the Raft layer indicating that a batch was committed.
*
* @param batch The batch that has been committed.
*/
void handleCommitBatch(Batch<ApiMessageAndVersion> batch) {
this.lastCommittedOffset = batch.lastOffset();
this.lastCommittedEpoch = batch.epoch();
maybeAdvanceLastStableOffset();
metrics.setLastCommittedRecordOffset(batch.lastOffset());
if (!active()) {
// On standby controllers, the last applied record offset is equals to the last
// committed offset.
metrics.setLastAppliedRecordOffset(batch.lastOffset());
metrics.setLastAppliedRecordTimestamp(batch.appendTimestamp());
}
}
/**
* Called by the active controller after it has invoked scheduleAtomicAppend to schedule some
* records to be written.
*
* @param endOffset The offset of the last record that was written.
*/
void handleScheduleAtomicAppend(long endOffset) {
this.nextWriteOffset = endOffset + 1;
snapshotRegistry.getOrCreateSnapshot(endOffset);
metrics.setLastAppliedRecordOffset(endOffset);
// This is not truly the append timestamp. The KRaft client doesn't expose the append
// time when scheduling a write. This is good enough because this is called right after
// the records were given to the KRAft client for appending and the default append linger
// for KRaft is 25ms.
metrics.setLastAppliedRecordTimestamp(time.milliseconds());
}
/**
* Advance the last stable offset if needed.
*/
void maybeAdvanceLastStableOffset() {
long newLastStableOffset;
if (transactionStartOffset == -1L) {
newLastStableOffset = lastCommittedOffset;
} else {
newLastStableOffset = Math.min(transactionStartOffset - 1, lastCommittedOffset);
}
if (lastStableOffset < newLastStableOffset) {
lastStableOffset = newLastStableOffset;
snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset);
if (!active()) {
snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
}
}
}
/**
* Called before we load a Raft snapshot.
*
* @param snapshotId The Raft snapshot offset and epoch.
*/
void beginLoadSnapshot(OffsetAndEpoch snapshotId) {
if (currentSnapshotId != null) {
throw new RuntimeException("Can't begin reading snapshot for " + snapshotId +
", because we are already reading " + currentSnapshotId);
}
this.currentSnapshotId = snapshotId;
this.currentSnapshotName = Snapshots.filenameFromSnapshotId(snapshotId);
log.info("Starting to load snapshot {}. Previous lastCommittedOffset was {}. Previous " +
"transactionStartOffset was {}.", currentSnapshotName, lastCommittedOffset,
transactionStartOffset);
this.snapshotRegistry.reset();
this.lastCommittedOffset = -1L;
this.lastCommittedEpoch = -1;
this.lastStableOffset = -1L;
this.transactionStartOffset = -1L;
this.nextWriteOffset = -1L;
}
/**
* Called after we have finished loading a Raft snapshot.
*
* @param timestamp The timestamp of the snapshot.
*/
void endLoadSnapshot(long timestamp) {
if (currentSnapshotId == null) {
throw new RuntimeException("Can't end loading snapshot, because there is no " +
"current snapshot.");
}
log.info("Successfully loaded snapshot {}.", currentSnapshotName);
this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset());
this.lastCommittedOffset = currentSnapshotId.offset();
this.lastCommittedEpoch = currentSnapshotId.epoch();
this.lastStableOffset = currentSnapshotId.offset();
this.transactionStartOffset = -1L;
this.nextWriteOffset = -1L;
metrics.setLastCommittedRecordOffset(currentSnapshotId.offset());
metrics.setLastAppliedRecordOffset(currentSnapshotId.offset());
metrics.setLastAppliedRecordTimestamp(timestamp);
this.currentSnapshotId = null;
this.currentSnapshotName = null;
}
public void replay(BeginTransactionRecord message, long offset) {
if (currentSnapshotId != null) {
throw new RuntimeException("BeginTransactionRecord cannot appear within a snapshot.");
}
if (transactionStartOffset != -1L) {
throw new RuntimeException("Can't replay a BeginTransactionRecord at " + offset +
" because the transaction at " + transactionStartOffset + " was never closed.");
}
snapshotRegistry.getOrCreateSnapshot(offset - 1);
transactionStartOffset = offset;
log.info("Replayed {} at offset {}.", message, offset);
}
public void replay(EndTransactionRecord message, long offset) {
if (currentSnapshotId != null) {
throw new RuntimeException("EndTransactionRecord cannot appear within a snapshot.");
}
if (transactionStartOffset == -1L) {
throw new RuntimeException("Can't replay an EndTransactionRecord at " + offset +
" because there is no open transaction.");
}
transactionStartOffset = -1L;
log.info("Replayed {} at offset {}.", message, offset);
}
public void replay(AbortTransactionRecord message, long offset) {
if (currentSnapshotId != null) {
throw new RuntimeException("AbortTransactionRecord cannot appear within a snapshot.");
}
if (transactionStartOffset == -1L) {
throw new RuntimeException("Can't replay an AbortTransactionRecord at " + offset +
" because there is no open transaction.");
}
long preTransactionOffset = transactionStartOffset - 1;
snapshotRegistry.revertToSnapshot(preTransactionOffset);
transactionStartOffset = -1L;
log.info("Replayed {} at offset {}. Reverted to offset {}.",
message, offset, preTransactionOffset);
}
// VisibleForTesting
void setNextWriteOffset(long newNextWriteOffset) {
this.nextWriteOffset = newNextWriteOffset;
}
}

219
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -49,10 +49,13 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -49,10 +49,13 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
@ -104,6 +107,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult; @@ -104,6 +107,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
@ -426,22 +430,6 @@ public final class QuorumController implements Controller { @@ -426,22 +430,6 @@ public final class QuorumController implements Controller {
return raftClient.leaderAndEpoch().leaderId();
}
/**
* @return The offset that we should perform read operations at.
*/
private long currentReadOffset() {
if (isActiveController()) {
// The active controller keeps an in-memory snapshot at the last committed offset,
// which we want to read from when performing read operations. This will avoid
// reading uncommitted data.
return lastCommittedOffset;
} else {
// Standby controllers never have uncommitted data in memory. Therefore, we always
// read the latest from every data structure.
return SnapshotRegistry.LATEST_EPOCH;
}
}
private void handleEventEnd(String name, long startProcessingTimeNs) {
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs;
@ -467,10 +455,10 @@ public final class QuorumController implements Controller { @@ -467,10 +455,10 @@ public final class QuorumController implements Controller {
fromInternal(exception, () -> latestController());
int epoch = curClaimEpoch;
if (epoch == -1) {
epoch = lastCommittedEpoch;
epoch = offsetControl.lastCommittedEpoch();
}
String failureMessage = info.failureMessage(epoch, deltaUs,
isActiveController(), lastCommittedOffset);
isActiveController(), offsetControl.lastCommittedOffset());
if (info.isTimeoutException() && (!deltaUs.isPresent())) {
controllerMetrics.incrementOperationsTimedOut();
}
@ -712,7 +700,7 @@ public final class QuorumController implements Controller { @@ -712,7 +700,7 @@ public final class QuorumController implements Controller {
}
ControllerResult<T> result = op.generateRecordsAndResult();
if (result.records().isEmpty()) {
op.processBatchEndOffset(writeOffset);
op.processBatchEndOffset(offsetControl.nextWriteOffset() - 1);
// If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data.
@ -736,16 +724,15 @@ public final class QuorumController implements Controller { @@ -736,16 +724,15 @@ public final class QuorumController implements Controller {
// them to the log.
long offset = appendRecords(log, result, maxRecordsPerBatch,
new Function<List<ApiMessageAndVersion>, Long>() {
private long prevEndOffset = writeOffset;
@Override
public Long apply(List<ApiMessageAndVersion> records) {
// Start by trying to apply the record to our in-memory state. This should always
// succeed; if it does not, that's a fatal error. It is important to do this before
// scheduling the record for Raft replication.
int recordIndex = 0;
long nextWriteOffset = offsetControl.nextWriteOffset();
for (ApiMessageAndVersion message : records) {
long recordOffset = prevEndOffset + 1 + recordIndex;
long recordOffset = nextWriteOffset + recordIndex;
try {
replay(message.message(), Optional.empty(), recordOffset);
} catch (Throwable e) {
@ -753,22 +740,20 @@ public final class QuorumController implements Controller { @@ -753,22 +740,20 @@ public final class QuorumController implements Controller {
"record at offset %d on active controller, from the " +
"batch with baseOffset %d",
message.message().getClass().getSimpleName(),
recordOffset, prevEndOffset + 1);
recordOffset, nextWriteOffset);
throw fatalFaultHandler.handleFault(failureMessage, e);
}
recordIndex++;
}
long nextEndOffset = prevEndOffset + recordIndex;
long nextEndOffset = nextWriteOffset - 1 + recordIndex;
raftClient.scheduleAtomicAppend(controllerEpoch,
OptionalLong.of(prevEndOffset + 1),
OptionalLong.of(nextWriteOffset),
records);
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
prevEndOffset = nextEndOffset;
offsetControl.handleScheduleAtomicAppend(nextEndOffset);
return nextEndOffset;
}
});
op.processBatchEndOffset(offset);
updateWriteOffset(offset);
resultAndOffset = ControllerResultAndOffset.of(offset, result);
log.debug("Read-write operation {} will be completed when the log " +
@ -997,12 +982,7 @@ public final class QuorumController implements Controller { @@ -997,12 +982,7 @@ public final class QuorumController implements Controller {
recordIndex++;
}
}
updateLastCommittedState(
offset,
epoch,
batch.appendTimestamp()
);
offsetControl.handleCommitBatch(batch);
}
} finally {
reader.close();
@ -1017,43 +997,39 @@ public final class QuorumController implements Controller { @@ -1017,43 +997,39 @@ public final class QuorumController implements Controller {
String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
if (isActiveController()) {
throw fatalFaultHandler.handleFault("Asked to load snapshot " + snapshotName +
", but we are the active controller at epoch " + curClaimEpoch);
", but we are the active controller at epoch " + curClaimEpoch);
}
log.info("Starting to replay snapshot {}, from last commit offset {} and epoch {}",
snapshotName, lastCommittedOffset, lastCommittedEpoch);
resetToEmptyState();
offsetControl.beginLoadSnapshot(reader.snapshotId());
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
log.debug("Replaying snapshot {} batch with last offset of {}",
snapshotName, offset);
snapshotName, offset);
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
replay(message.message(), Optional.of(reader.snapshotId()),
reader.lastContainedLogOffset());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record " +
"from snapshot %s on standby controller, which was %d of " +
"%d record(s) in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(), reader.snapshotId(),
i, messages.size(), batch.baseOffset());
"from snapshot %s on standby controller, which was %d of " +
"%d record(s) in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(), reader.snapshotId(),
i, messages.size(), batch.baseOffset());
throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
}
}
log.info("Finished replaying snapshot {}", snapshotName);
updateLastCommittedState(
reader.lastContainedLogOffset(),
reader.lastContainedLogEpoch(),
reader.lastContainedLogTimestamp());
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
offsetControl.endLoadSnapshot(reader.lastContainedLogTimestamp());
} catch (FaultHandlerException e) {
throw e;
} catch (Throwable e) {
throw fatalFaultHandler.handleFault("Error while loading snapshot " +
reader.snapshotId(), e);
} finally {
reader.close();
}
@ -1076,15 +1052,16 @@ public final class QuorumController implements Controller { @@ -1076,15 +1052,16 @@ public final class QuorumController implements Controller {
} else {
log.warn("Renouncing the leadership due to a metadata log event. " +
"We were the leader at epoch {}, but in the new epoch {}, " +
"the leader is {}. Reverting to last committed offset {}.",
curClaimEpoch, newLeader.epoch(), newLeaderName, lastCommittedOffset);
"the leader is {}. Reverting to last stable offset {}.",
curClaimEpoch, newLeader.epoch(), newLeaderName,
offsetControl.lastStableOffset());
renounce();
}
} else if (newLeader.isLeader(nodeId)) {
long newLastWriteOffset = raftClient.logEndOffset() - 1;
log.info("Becoming the active controller at epoch {}, last write offset {}.",
newLeader.epoch(), newLastWriteOffset);
claim(newLeader.epoch(), newLastWriteOffset);
long newNextWriteOffset = raftClient.logEndOffset();
log.info("Becoming the active controller at epoch {}, next write offset {}.",
newLeader.epoch(), newNextWriteOffset);
claim(newLeader.epoch(), newNextWriteOffset);
} else {
log.info("In the new epoch {}, the leader is {}.",
newLeader.epoch(), newLeaderName);
@ -1116,38 +1093,16 @@ public final class QuorumController implements Controller { @@ -1116,38 +1093,16 @@ public final class QuorumController implements Controller {
return claimEpoch != -1;
}
private void updateWriteOffset(long offset) {
writeOffset = offset;
if (isActiveController()) {
controllerMetrics.setLastAppliedRecordOffset(writeOffset);
// This is not truly the append timestamp. The KRaft client doesn't expose the append time when scheduling a write.
// This is good enough because this is called right after the records were given to the KRAft client for appending and
// the default append linger for KRaft is 25ms.
controllerMetrics.setLastAppliedRecordTimestamp(time.milliseconds());
} else {
// Change the last applied record metrics back to the last committed state. Inactive controllers report the last committed
// state while active controllers report the latest state which may include uncommitted data.
controllerMetrics.setLastAppliedRecordOffset(lastCommittedOffset);
controllerMetrics.setLastAppliedRecordTimestamp(lastCommittedTimestamp);
}
}
private void claim(int epoch, long newLastWriteOffset) {
private void claim(int epoch, long newNextWriteOffset) {
try {
if (curClaimEpoch != -1) {
throw new RuntimeException("Cannot claim leadership because we are already the " +
"active controller.");
}
curClaimEpoch = epoch;
controllerMetrics.setActive(true);
updateWriteOffset(newLastWriteOffset);
offsetControl.activate(newNextWriteOffset);
clusterControl.activate();
// Before switching to active, create an in-memory snapshot at the last committed
// offset. This is required because the active controller assumes that there is always
// an in-memory snapshot at the last committed offset.
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
// Prepend the activate event. It is important that this event go at the beginning
// of the queue rather than the end (hence prepend rather than append). It's also
// important not to use prepend for anything else, to preserve the ordering here.
@ -1276,22 +1231,6 @@ public final class QuorumController implements Controller { @@ -1276,22 +1231,6 @@ public final class QuorumController implements Controller {
}
}
private void updateLastCommittedState(
long offset,
int epoch,
long timestamp
) {
lastCommittedOffset = offset;
lastCommittedEpoch = epoch;
lastCommittedTimestamp = timestamp;
controllerMetrics.setLastCommittedRecordOffset(offset);
if (!isActiveController()) {
controllerMetrics.setLastAppliedRecordOffset(offset);
controllerMetrics.setLastAppliedRecordTimestamp(timestamp);
}
}
void renounce() {
try {
if (curClaimEpoch == -1) {
@ -1300,16 +1239,9 @@ public final class QuorumController implements Controller { @@ -1300,16 +1239,9 @@ public final class QuorumController implements Controller {
}
raftClient.resign(curClaimEpoch);
curClaimEpoch = -1;
controllerMetrics.setActive(false);
deferredEventQueue.failAll(ControllerExceptions.
newWrongControllerException(OptionalInt.empty()));
if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
throw new RuntimeException("Unable to find last committed offset " +
lastCommittedEpoch + " in snapshot registry.");
}
snapshotRegistry.revertToSnapshot(lastCommittedOffset);
updateWriteOffset(-1);
offsetControl.deactivate();
clusterControl.deactivate();
cancelMaybeFenceReplicas();
cancelMaybeBalancePartitionLeaders();
@ -1555,20 +1487,20 @@ public final class QuorumController implements Controller { @@ -1555,20 +1487,20 @@ public final class QuorumController implements Controller {
case ZK_MIGRATION_STATE_RECORD:
featureControl.replay((ZkMigrationStateRecord) message);
break;
case BEGIN_TRANSACTION_RECORD:
offsetControl.replay((BeginTransactionRecord) message, offset);
break;
case END_TRANSACTION_RECORD:
offsetControl.replay((EndTransactionRecord) message, offset);
break;
case ABORT_TRANSACTION_RECORD:
offsetControl.replay((AbortTransactionRecord) message, offset);
break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
}
/**
* Clear all data structures and reset all KRaft state.
*/
private void resetToEmptyState() {
snapshotRegistry.reset();
updateLastCommittedState(-1, -1, -1);
}
/**
* Handles faults that cause a controller failover, but which don't abort the process.
*/
@ -1621,6 +1553,11 @@ public final class QuorumController implements Controller { @@ -1621,6 +1553,11 @@ public final class QuorumController implements Controller {
*/
private final DeferredEventQueue deferredEventQueue;
/**
* Manages read and write offsets, and in-memory snapshots.
*/
private final OffsetControlManager offsetControl;
/**
* A predicate that returns information about whether a ConfigResource exists.
*/
@ -1699,26 +1636,6 @@ public final class QuorumController implements Controller { @@ -1699,26 +1636,6 @@ public final class QuorumController implements Controller {
*/
private volatile int curClaimEpoch;
/**
* The last offset we have committed, or -1 if we have not committed any offsets.
*/
private long lastCommittedOffset = -1;
/**
* The epoch of the last offset we have committed, or -1 if we have not committed any offsets.
*/
private int lastCommittedEpoch = -1;
/**
* The timestamp in milliseconds of the last batch we have committed, or -1 if we have not committed any offset.
*/
private long lastCommittedTimestamp = -1;
/**
* If we have called scheduleWrite, this is the last offset we got back from it.
*/
private long writeOffset;
/**
* How long to delay partition leader balancing operations.
*/
@ -1803,6 +1720,12 @@ public final class QuorumController implements Controller { @@ -1803,6 +1720,12 @@ public final class QuorumController implements Controller {
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.offsetControl = new OffsetControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setMetrics(controllerMetrics).
setTime(time).
build();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).
@ -1876,9 +1799,6 @@ public final class QuorumController implements Controller { @@ -1876,9 +1799,6 @@ public final class QuorumController implements Controller {
this.zkRecordConsumer = new MigrationRecordConsumer();
this.zkMigrationEnabled = zkMigrationEnabled;
this.recordRedactor = new RecordRedactor(configSchema);
updateWriteOffset(-1);
resetToEmptyState();
log.info("Creating new QuorumController with clusterId {}.{}",
clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : "");
@ -1939,7 +1859,7 @@ public final class QuorumController implements Controller { @@ -1939,7 +1859,7 @@ public final class QuorumController implements Controller {
if (names.isEmpty())
return CompletableFuture.completedFuture(Collections.emptyMap());
return appendReadEvent("findTopicIds", context.deadlineNs(),
() -> replicationControl.findTopicIds(currentReadOffset(), names));
() -> replicationControl.findTopicIds(offsetControl.lastStableOffset(), names));
}
@Override
@ -1947,7 +1867,7 @@ public final class QuorumController implements Controller { @@ -1947,7 +1867,7 @@ public final class QuorumController implements Controller {
ControllerRequestContext context
) {
return appendReadEvent("findAllTopicIds", context.deadlineNs(),
() -> replicationControl.findAllTopicIds(currentReadOffset()));
() -> replicationControl.findAllTopicIds(offsetControl.lastStableOffset()));
}
@Override
@ -1958,7 +1878,7 @@ public final class QuorumController implements Controller { @@ -1958,7 +1878,7 @@ public final class QuorumController implements Controller {
if (ids.isEmpty())
return CompletableFuture.completedFuture(Collections.emptyMap());
return appendReadEvent("findTopicNames", context.deadlineNs(),
() -> replicationControl.findTopicNames(currentReadOffset(), ids));
() -> replicationControl.findTopicNames(offsetControl.lastStableOffset(), ids));
}
@Override
@ -1978,7 +1898,7 @@ public final class QuorumController implements Controller { @@ -1978,7 +1898,7 @@ public final class QuorumController implements Controller {
Map<ConfigResource, Collection<String>> resources
) {
return appendReadEvent("describeConfigs", context.deadlineNs(),
() -> configurationControl.describeConfigs(currentReadOffset(), resources));
() -> configurationControl.describeConfigs(offsetControl.lastStableOffset(), resources));
}
@Override
@ -2000,7 +1920,7 @@ public final class QuorumController implements Controller { @@ -2000,7 +1920,7 @@ public final class QuorumController implements Controller {
ControllerRequestContext context
) {
return appendReadEvent("getFinalizedFeatures", context.deadlineNs(),
() -> featureControl.finalizedFeatures(currentReadOffset()));
() -> featureControl.finalizedFeatures(offsetControl.lastStableOffset()));
}
@Override
@ -2045,7 +1965,8 @@ public final class QuorumController implements Controller { @@ -2045,7 +1965,8 @@ public final class QuorumController implements Controller {
new ListPartitionReassignmentsResponseData().setErrorMessage(null));
}
return appendReadEvent("listPartitionReassignments", context.deadlineNs(),
() -> replicationControl.listPartitionReassignments(request.topics(), currentReadOffset()));
() -> replicationControl.listPartitionReassignments(request.topics(),
offsetControl.lastStableOffset()));
}
@Override
@ -2120,7 +2041,7 @@ public final class QuorumController implements Controller { @@ -2120,7 +2041,7 @@ public final class QuorumController implements Controller {
return appendWriteEvent("registerBroker", context.deadlineNs(),
() -> {
ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, writeOffset + 1, featureControl.
registerBroker(request, offsetControl.nextWriteOffset(), featureControl.
finalizedFeatures(Long.MAX_VALUE));
rescheduleMaybeFenceStaleBrokers();
return result;
@ -2272,9 +2193,9 @@ public final class QuorumController implements Controller { @@ -2272,9 +2193,9 @@ public final class QuorumController implements Controller {
}
// VisibleForTesting
void setWriteOffset(long newWriteOffset) {
appendControlEvent("setWriteOffset", () -> {
this.writeOffset = newWriteOffset;
void setNewNextWriteOffset(long newNextWriteOffset) {
appendControlEvent("setNewNextWriteOffset", () -> {
offsetControl.setNextWriteOffset(newNextWriteOffset);
});
}
}

27
metadata/src/main/resources/common/metadata/AbortTransactionRecord.json

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 25,
"type": "metadata",
"name": "AbortTransactionRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Reason", "type": "string", "default": "null",
"versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
"about": "An optional textual description of why the transaction was aborted." }
]
}

27
metadata/src/main/resources/common/metadata/BeginTransactionRecord.json

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 23,
"type": "metadata",
"name": "BeginTransactionRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Name", "type": "string", "default": "null",
"versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
"about": "An optional textual description of this transaction." }
]
}

24
metadata/src/main/resources/common/metadata/EndTransactionRecord.json

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 24,
"type": "metadata",
"name": "EndTransactionRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
]
}

270
metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java

@ -0,0 +1,270 @@ @@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.TrackingSnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class OffsetControlManagerTest {
@Test
public void testInitialValues() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
assertNull(offsetControl.currentSnapshotId());
assertNull(offsetControl.currentSnapshotName());
assertEquals(-1L, offsetControl.lastCommittedOffset());
assertEquals(-1, offsetControl.lastCommittedEpoch());
assertEquals(-1, offsetControl.lastStableOffset());
assertEquals(-1, offsetControl.transactionStartOffset());
assertEquals(-1, offsetControl.nextWriteOffset());
assertFalse(offsetControl.active());
assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList());
}
@Test
public void testActivate() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.activate(1000L);
assertEquals(1000L, offsetControl.nextWriteOffset());
assertTrue(offsetControl.active());
assertTrue(offsetControl.metrics().active());
assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList());
}
@Test
public void testActivateFailsIfAlreadyActive() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.activate(1000L);
assertEquals("Can't activate already active OffsetControlManager.",
assertThrows(RuntimeException.class,
() -> offsetControl.activate(2000L)).
getMessage());
}
@Test
public void testActivateFailsIfNewNextWriteOffsetIsNegative() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
assertEquals("Invalid negative newNextWriteOffset -2.",
assertThrows(RuntimeException.class,
() -> offsetControl.activate(-2)).
getMessage());
}
@Test
public void testActivateAndDeactivate() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.activate(1000L);
assertEquals(1000L, offsetControl.nextWriteOffset());
offsetControl.deactivate();
assertEquals(-1L, offsetControl.nextWriteOffset());
}
@Test
public void testDeactivateFailsIfNotActive() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
assertEquals("Can't deactivate inactive OffsetControlManager.",
assertThrows(RuntimeException.class,
() -> offsetControl.deactivate()).
getMessage());
}
private static Batch<ApiMessageAndVersion> newFakeBatch(
long lastOffset,
int epoch,
long appendTimestamp
) {
return Batch.data(
lastOffset,
epoch,
appendTimestamp,
100,
Collections.singletonList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)));
}
@Test
public void testHandleCommitBatch() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.handleCommitBatch(newFakeBatch(1000L, 200, 3000L));
assertEquals(Arrays.asList(1000L), offsetControl.snapshotRegistry().epochsList());
assertEquals(1000L, offsetControl.lastCommittedOffset());
assertEquals(200, offsetControl.lastCommittedEpoch());
assertEquals(1000L, offsetControl.lastStableOffset());
assertEquals(-1L, offsetControl.transactionStartOffset());
assertEquals(-1L, offsetControl.nextWriteOffset());
assertFalse(offsetControl.active());
assertFalse(offsetControl.metrics().active());
assertEquals(1000L, offsetControl.metrics().lastAppliedRecordOffset());
assertEquals(1000L, offsetControl.metrics().lastCommittedRecordOffset());
assertEquals(3000L, offsetControl.metrics().lastAppliedRecordTimestamp());
}
@Test
public void testHandleScheduleAtomicAppend() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.handleScheduleAtomicAppend(2000L);
assertEquals(2001L, offsetControl.nextWriteOffset());
assertEquals(2000L, offsetControl.metrics().lastAppliedRecordOffset());
assertEquals(-1L, offsetControl.lastStableOffset());
assertEquals(-1L, offsetControl.lastCommittedOffset());
assertEquals(Arrays.asList(-1L, 2000L), offsetControl.snapshotRegistry().epochsList());
offsetControl.handleCommitBatch(newFakeBatch(2000L, 200, 3000L));
assertEquals(2000L, offsetControl.lastStableOffset());
assertEquals(2000L, offsetControl.lastCommittedOffset());
assertEquals(Arrays.asList(2000L), offsetControl.snapshotRegistry().epochsList());
}
@Test
public void testHandleLoadSnapshot() {
TrackingSnapshotRegistry snapshotRegistry = new TrackingSnapshotRegistry(new LogContext());
OffsetControlManager offsetControl = new OffsetControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
assertEquals(Arrays.asList("snapshot[-1]", "reset"), snapshotRegistry.operations());
assertEquals(new OffsetAndEpoch(4000L, 300), offsetControl.currentSnapshotId());
assertEquals("00000000000000004000-0000000300", offsetControl.currentSnapshotName());
assertEquals(Arrays.asList(), offsetControl.snapshotRegistry().epochsList());
offsetControl.endLoadSnapshot(3456L);
assertEquals(Arrays.asList("snapshot[-1]", "reset", "snapshot[4000]"),
snapshotRegistry.operations());
assertNull(offsetControl.currentSnapshotId());
assertNull(offsetControl.currentSnapshotName());
assertEquals(Arrays.asList(4000L), offsetControl.snapshotRegistry().epochsList());
assertEquals(4000L, offsetControl.lastCommittedOffset());
assertEquals(300, offsetControl.lastCommittedEpoch());
assertEquals(4000L, offsetControl.lastStableOffset());
assertEquals(-1L, offsetControl.transactionStartOffset());
assertEquals(-1L, offsetControl.nextWriteOffset());
assertEquals(4000L, offsetControl.metrics().lastCommittedRecordOffset());
assertEquals(4000L, offsetControl.metrics().lastAppliedRecordOffset());
assertEquals(3456L, offsetControl.metrics().lastAppliedRecordTimestamp());
}
@Test
public void testBeginTransactionRecordNotAllowedInSnapshot() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
assertEquals("BeginTransactionRecord cannot appear within a snapshot.",
assertThrows(RuntimeException.class,
() -> offsetControl.replay(new BeginTransactionRecord(), 1000L)).
getMessage());
}
@Test
public void testEndTransactionRecordNotAllowedInSnapshot() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
assertEquals("EndTransactionRecord cannot appear within a snapshot.",
assertThrows(RuntimeException.class,
() -> offsetControl.replay(new EndTransactionRecord(), 1000L)).
getMessage());
}
@Test
public void testAbortTransactionRecordNotAllowedInSnapshot() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
assertEquals("AbortTransactionRecord cannot appear within a snapshot.",
assertThrows(RuntimeException.class,
() -> offsetControl.replay(new AbortTransactionRecord(), 1000L)).
getMessage());
}
@Test
public void testEndLoadSnapshotFailsWhenNotInSnapshot() {
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
assertEquals("Can't end loading snapshot, because there is no current snapshot.",
assertThrows(RuntimeException.class,
() -> offsetControl.endLoadSnapshot(1000L)).
getMessage());
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testReplayTransaction(boolean aborted) {
TrackingSnapshotRegistry snapshotRegistry = new TrackingSnapshotRegistry(new LogContext());
OffsetControlManager offsetControl = new OffsetControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
offsetControl.replay(new BeginTransactionRecord(), 1500L);
assertEquals(1500L, offsetControl.transactionStartOffset());
assertEquals(Arrays.asList(-1L, 1499L), offsetControl.snapshotRegistry().epochsList());
offsetControl.handleCommitBatch(newFakeBatch(1550L, 100, 2000L));
assertEquals(1550L, offsetControl.lastCommittedOffset());
assertEquals(100, offsetControl.lastCommittedEpoch());
assertEquals(1499L, offsetControl.lastStableOffset());
assertEquals(Arrays.asList(1499L), offsetControl.snapshotRegistry().epochsList());
if (aborted) {
offsetControl.replay(new AbortTransactionRecord(), 1600L);
assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]", "revert[1499]"),
snapshotRegistry.operations());
} else {
offsetControl.replay(new EndTransactionRecord(), 1600L);
assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]"),
snapshotRegistry.operations());
}
assertEquals(-1L, offsetControl.transactionStartOffset());
assertEquals(1499L, offsetControl.lastStableOffset());
offsetControl.handleCommitBatch(newFakeBatch(1650, 100, 2100L));
assertEquals(1650, offsetControl.lastStableOffset());
assertEquals(Arrays.asList(1650L), offsetControl.snapshotRegistry().epochsList());
}
@Test
public void testLoadSnapshotClearsTransactionalState() {
TrackingSnapshotRegistry snapshotRegistry = new TrackingSnapshotRegistry(new LogContext());
OffsetControlManager offsetControl = new OffsetControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
offsetControl.replay(new BeginTransactionRecord(), 1500L);
offsetControl.beginLoadSnapshot(new OffsetAndEpoch(4000L, 300));
assertEquals(-1L, offsetControl.transactionStartOffset());
assertEquals(Arrays.asList("snapshot[-1]", "snapshot[1499]", "reset"),
snapshotRegistry.operations());
}
}

2
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java

@ -92,7 +92,7 @@ public class QuorumControllerIntegrationTestUtils { @@ -92,7 +92,7 @@ public class QuorumControllerIntegrationTestUtils {
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0))
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latest()))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
.setListeners(new ListenerCollection(
Arrays.asList(

2
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java

@ -110,7 +110,7 @@ public class QuorumControllerMetricsIntegrationTest { @@ -110,7 +110,7 @@ public class QuorumControllerMetricsIntegrationTest {
}
});
if (forceFailoverUsingLogLayer) {
controlEnv.activeController().setWriteOffset(123L);
controlEnv.activeController().setNewNextWriteOffset(123L);
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
createTopics(controlEnv.activeController(), "test_", 1, 1);

6
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

@ -158,7 +158,7 @@ public class QuorumControllerTest { @@ -158,7 +158,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
@ -199,7 +199,7 @@ public class QuorumControllerTest { @@ -199,7 +199,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@ -536,7 +536,7 @@ public class QuorumControllerTest { @@ -536,7 +536,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)).
setListeners(listeners));
assertEquals(3L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =

9
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -177,7 +177,10 @@ public enum MetadataVersion { @@ -177,7 +177,10 @@ public enum MetadataVersion {
IBP_3_5_IV2(11, "3.5", "IV2", true),
// Remove leader epoch bump when KRaft controller shrinks the ISR (KAFKA-15021)
IBP_3_6_IV0(12, "3.6", "IV0", false);
IBP_3_6_IV0(12, "3.6", "IV0", false),
// Add metadata transactions
IBP_3_6_IV1(13, "3.6", "IV1", true);
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
public static final String FEATURE_NAME = "metadata.version";
@ -267,6 +270,10 @@ public enum MetadataVersion { @@ -267,6 +270,10 @@ public enum MetadataVersion {
return !this.isAtLeast(IBP_3_6_IV0);
}
public boolean isMetadataTransactionSupported() {
return this.isAtLeast(IBP_3_6_IV1);
}
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}

1
server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java

@ -204,6 +204,7 @@ public class SnapshotRegistry { @@ -204,6 +204,7 @@ public class SnapshotRegistry {
* @param targetEpoch The epoch of the snapshot to revert to.
*/
public void revertToSnapshot(long targetEpoch) {
log.debug("Reverting to in-memory snapshot {}", targetEpoch);
Snapshot target = getSnapshot(targetEpoch);
Iterator<Snapshot> iterator = iterator(target);
iterator.next();

56
server-common/src/test/java/org/apache/kafka/timeline/TrackingSnapshotRegistry.java

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.timeline;
import org.apache.kafka.common.utils.LogContext;
import java.util.ArrayList;
import java.util.List;
public class TrackingSnapshotRegistry extends SnapshotRegistry {
private final List<String> operations = new ArrayList<>();
public TrackingSnapshotRegistry(LogContext logContext) {
super(logContext);
}
public List<String> operations() {
return new ArrayList<>(operations);
}
@Override
public void revertToSnapshot(long targetEpoch) {
operations.add("revert[" + targetEpoch + "]");
super.revertToSnapshot(targetEpoch);
}
@Override
public void reset() {
operations.add("reset");
super.reset();
}
@Override
public Snapshot getOrCreateSnapshot(long epoch) {
if (!hasSnapshot(epoch)) {
operations.add("snapshot[" + epoch + "]");
}
return super.getOrCreateSnapshot(epoch);
}
}

4
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

@ -67,7 +67,7 @@ public class FeatureCommandTest { @@ -67,7 +67,7 @@ public class FeatureCommandTest {
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
);
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.6-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
"SupportedMaxVersion: 3.6-IV1\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
@ -125,7 +125,7 @@ public class FeatureCommandTest { @@ -125,7 +125,7 @@ public class FeatureCommandTest {
"disable", "--feature", "metadata.version"))
);
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions 1-12", commandOutput);
"metadata.version. Local controller 3000 only supports versions 1-13", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),

Loading…
Cancel
Save