Browse Source

KAFKA-14538 Metadata transactions in MetadataLoader (#14208)

This PR contains three main changes:

- Support for transactions in MetadataLoader
- Abort in-progress transaction during controller failover
- Utilize transactions for ZK to KRaft migration

A new MetadataBatchLoader class is added to decouple the loading of record batches from the
publishing of metadata in MetadataLoader. Since a transaction can span across multiple batches (or
multiple transactions could exist within one batch), some buffering of metadata updates was needed
before publishing out to the MetadataPublishers. MetadataBatchLoader accumulates changes into a
MetadataDelta, and uses a callback to publish to the publishers when needed.

One small oddity with this approach is that since we can "splitting" batches in some cases, the
number of bytes returned in the LogDeltaManifest has new semantics. The number of bytes included in
a batch is now only included in the last metadata update that is published as a result of a batch.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
pull/14267/head
David Arthur 1 year ago committed by GitHub
parent
commit
418b8a6e59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
  2. 233
      metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
  3. 192
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  4. 2
      metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
  5. 59
      metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
  6. 262
      metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
  7. 176
      metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
  8. 23
      metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
  9. 2
      metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java
  10. 361
      metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java
  11. 6
      metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java
  12. 3
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
  13. 164
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  14. 16
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
  15. 7
      metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java
  16. 444
      metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java
  17. 155
      metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
  18. 32
      metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
  19. 27
      metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
  20. 46
      metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
  21. 12
      server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
  22. 2
      server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
  23. 12
      tests/kafkatest/tests/core/zookeeper_migration_test.py

8
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala

@ -301,7 +301,13 @@ class BrokerMetadataPublisherTest { @@ -301,7 +301,13 @@ class BrokerMetadataPublisherTest {
.build()
metadataPublisher.onMetadataUpdate(delta, image,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 42));
LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(100)
.numBytes(42)
.build());
verify(groupCoordinator).onNewMetadataImage(image, delta)
}

233
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java

@ -0,0 +1,233 @@ @@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public class ActivationRecordsGenerator {
static ControllerResult<Void> recordsForEmptyLog(
Consumer<String> activationMessageConsumer,
long transactionStartOffset,
boolean zkMigrationEnabled,
BootstrapMetadata bootstrapMetadata,
MetadataVersion metadataVersion
) {
StringBuilder logMessageBuilder = new StringBuilder("Performing controller activation. ");
List<ApiMessageAndVersion> records = new ArrayList<>();
if (transactionStartOffset != -1L) {
// In-flight bootstrap transaction
if (!metadataVersion.isMetadataTransactionSupported()) {
throw new RuntimeException("Detected partial bootstrap records transaction at " +
transactionStartOffset + ", but the metadata.version " + metadataVersion +
" does not support transactions. Cannot continue.");
} else {
logMessageBuilder
.append("Aborting partial bootstrap records transaction at offset ")
.append(transactionStartOffset)
.append(". Re-appending ")
.append(bootstrapMetadata.records().size())
.append(" bootstrap record(s) in new metadata transaction at metadata.version ")
.append(metadataVersion)
.append(" from bootstrap source '")
.append(bootstrapMetadata.source())
.append("'. ");
records.add(new ApiMessageAndVersion(
new AbortTransactionRecord().setReason("Controller failover"), (short) 0));
records.add(new ApiMessageAndVersion(
new BeginTransactionRecord().setName("Bootstrap records"), (short) 0));
}
} else {
// No in-flight transaction
logMessageBuilder
.append("The metadata log appears to be empty. ")
.append("Appending ")
.append(bootstrapMetadata.records().size())
.append(" bootstrap record(s) ");
if (metadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(
new BeginTransactionRecord().setName("Bootstrap records"), (short) 0));
logMessageBuilder.append("in metadata transaction ");
}
logMessageBuilder
.append("at metadata.version ")
.append(metadataVersion)
.append(" from bootstrap source '")
.append(bootstrapMetadata.source())
.append("'. ");
}
// If no records have been replayed, we need to write out the bootstrap records.
// This will include the new metadata.version, as well as things like SCRAM
// initialization, etc.
records.addAll(bootstrapMetadata.records());
if (metadataVersion.isMigrationSupported()) {
if (zkMigrationEnabled) {
logMessageBuilder.append("Putting the controller into pre-migration mode. No metadata updates " +
"will be allowed until the ZK metadata has been migrated. ");
records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
} else {
logMessageBuilder.append("Setting the ZK migration state to NONE since this is a de-novo " +
"KRaft cluster. ");
records.add(ZkMigrationState.NONE.toRecord());
}
} else {
if (zkMigrationEnabled) {
throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() +
" does not support ZK migrations. Cannot continue with ZK migrations enabled.");
}
}
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
if (metadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
return ControllerResult.of(records, null);
} else {
return ControllerResult.atomicOf(records, null);
}
}
static ControllerResult<Void> recordsForNonEmptyLog(
Consumer<String> activationMessageConsumer,
long transactionStartOffset,
boolean zkMigrationEnabled,
FeatureControlManager featureControl,
MetadataVersion metadataVersion
) {
StringBuilder logMessageBuilder = new StringBuilder("Performing controller activation. ");
// Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions
List<ApiMessageAndVersion> records = new ArrayList<>();
// Check for in-flight transaction
if (transactionStartOffset != -1L) {
if (!metadataVersion.isMetadataTransactionSupported()) {
throw new RuntimeException("Detected in-progress transaction at offset " + transactionStartOffset +
", but the metadata.version " + metadataVersion +
" does not support transactions. Cannot continue.");
} else {
logMessageBuilder
.append("Aborting in-progress metadata transaction at offset ")
.append(transactionStartOffset)
.append(". ");
records.add(new ApiMessageAndVersion(
new AbortTransactionRecord().setReason("Controller failover"), (short) 0));
}
}
if (metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
logMessageBuilder.append("No metadata.version feature level record was found in the log. ")
.append("Treating the log as version ")
.append(MetadataVersion.MINIMUM_KRAFT_VERSION)
.append(". ");
}
if (zkMigrationEnabled && !metadataVersion.isMigrationSupported()) {
throw new RuntimeException("Should not have ZK migrations enabled on a cluster running " +
"metadata.version " + featureControl.metadataVersion());
} else if (metadataVersion.isMigrationSupported()) {
logMessageBuilder
.append("Loaded ZK migration state of ")
.append(featureControl.zkMigrationState())
.append(". ");
switch (featureControl.zkMigrationState()) {
case NONE:
// Since this is the default state there may or may not be an actual NONE in the log. Regardless,
// it will eventually be persisted in a snapshot, so we don't need to explicitly write it here.
if (zkMigrationEnabled) {
throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was " +
"created in KRaft mode.");
}
break;
case PRE_MIGRATION:
if (!metadataVersion.isMetadataTransactionSupported()) {
logMessageBuilder
.append("Activating pre-migration controller without empty log. ")
.append("There may be a partial migration. ");
}
break;
case MIGRATION:
if (!zkMigrationEnabled) {
// This can happen if controller leadership transfers to a controller with migrations enabled
// after another controller had finalized the migration. For example, during a rolling restart
// of the controller quorum during which the migration config is being set to false.
logMessageBuilder
.append("Completing the ZK migration since this controller was configured with ")
.append("'zookeeper.metadata.migration.enable' set to 'false'. ");
records.add(ZkMigrationState.POST_MIGRATION.toRecord());
} else {
logMessageBuilder
.append("Staying in ZK migration mode since 'zookeeper.metadata.migration.enable' ")
.append("is still 'true'. ");
}
break;
case POST_MIGRATION:
if (zkMigrationEnabled) {
logMessageBuilder
.append("Ignoring 'zookeeper.metadata.migration.enable' value of 'true' since ")
.append("the ZK migration has been completed. ");
}
break;
default:
throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState());
}
}
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
return ControllerResult.atomicOf(records, null);
}
/**
* Generate the set of activation records.
* </p>
* If the log is empty, write the bootstrap records. If the log is not empty, do some validation and
* possibly write some records to put the log into a valid state. For bootstrap records, if KIP-868
* metadata transactions are supported, ues them. Otherwise, write the bootstrap records as an
* atomic batch. The single atomic batch can be problematic if the bootstrap records are too large
* (e.g., lots of SCRAM credentials). If ZK migrations are enabled, the activation records will
* include a ZkMigrationState record regardless of whether the log was empty or not.
*/
static ControllerResult<Void> generate(
Consumer<String> activationMessageConsumer,
boolean isEmpty,
long transactionStartOffset,
boolean zkMigrationEnabled,
BootstrapMetadata bootstrapMetadata,
FeatureControlManager featureControl
) {
if (isEmpty) {
return recordsForEmptyLog(activationMessageConsumer, transactionStartOffset, zkMigrationEnabled,
bootstrapMetadata, bootstrapMetadata.metadataVersion());
} else {
return recordsForNonEmptyLog(activationMessageConsumer, transactionStartOffset, zkMigrationEnabled,
featureControl, featureControl.metadataVersion());
}
}
}

192
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -147,6 +147,7 @@ import java.util.function.Supplier; @@ -147,6 +147,7 @@ import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.COMPLETES_IN_TRANSACTION;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.RUNS_IN_PREMIGRATION;
@ -612,6 +613,11 @@ public final class QuorumController implements Controller { @@ -612,6 +613,11 @@ public final class QuorumController implements Controller {
}
}
// Visible for testing
OffsetControlManager offsetControl() {
return offsetControl;
}
// Visible for testing
ReplicationControlManager replicationControl() {
return replicationControl;
@ -667,7 +673,16 @@ public final class QuorumController implements Controller { @@ -667,7 +673,16 @@ public final class QuorumController implements Controller {
* even though the cluster really does have metadata. Very few operations should
* use this flag.
*/
RUNS_IN_PREMIGRATION
RUNS_IN_PREMIGRATION,
/**
* This flag signifies that an event will be completed even if it is part of an unfinished transaction.
* This is needed for metadata transactions so that external callers can add records to a transaction
* and still use the returned future. One example usage of this flag is the batches of migrations records.
* The migration driver needs to wait on each submitted batch to avoid overwhelming the controller queue
* with events, so it needs events to be completed based on the committed (i.e., not stable) offset.
*/
COMPLETES_IN_TRANSACTION
}
interface ControllerWriteOperation<T> {
@ -810,7 +825,11 @@ public final class QuorumController implements Controller { @@ -810,7 +825,11 @@ public final class QuorumController implements Controller {
// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
deferredEventQueue.add(resultAndOffset.offset(), this);
if (flags.contains(COMPLETES_IN_TRANSACTION)) {
deferredUnstableEventQueue.add(resultAndOffset.offset(), this);
} else {
deferredEventQueue.add(resultAndOffset.offset(), this);
}
}
}
@ -926,6 +945,10 @@ public final class QuorumController implements Controller { @@ -926,6 +945,10 @@ public final class QuorumController implements Controller {
}
class MigrationRecordConsumer implements ZkRecordConsumer {
private final EnumSet<ControllerOperationFlag> eventFlags = EnumSet.of(
RUNS_IN_PREMIGRATION, COMPLETES_IN_TRANSACTION
);
private volatile OffsetAndEpoch highestMigrationRecordOffset;
class MigrationWriteOperation implements ControllerWriteOperation<Void> {
@ -936,7 +959,7 @@ public final class QuorumController implements Controller { @@ -936,7 +959,7 @@ public final class QuorumController implements Controller {
}
@Override
public ControllerResult<Void> generateRecordsAndResult() {
return ControllerResult.atomicOf(batch, null);
return ControllerResult.of(batch, null);
}
public void processBatchEndOffset(long offset) {
@ -944,40 +967,54 @@ public final class QuorumController implements Controller { @@ -944,40 +967,54 @@ public final class QuorumController implements Controller {
}
}
@Override
public void beginMigration() {
log.info("Starting ZK Migration");
// TODO use KIP-868 transaction
public CompletableFuture<?> beginMigration() {
if (featureControl.metadataVersion().isMetadataTransactionSupported()) {
log.info("Starting migration of ZooKeeper metadata to KRaft.");
ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>(
"Begin ZK Migration Transaction",
new MigrationWriteOperation(Collections.singletonList(
new ApiMessageAndVersion(
new BeginTransactionRecord().setName("ZK Migration"), (short) 0))
), eventFlags);
queue.append(batchEvent);
return batchEvent.future;
} else {
log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " +
"a controller failover or processing error may lead to partially migrated metadata.");
return CompletableFuture.completedFuture(null);
}
}
@Override
public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
if (queue.size() > 100) { // TODO configure this
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large"));
return future;
}
ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>("ZK Migration Batch",
new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION));
ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>(
"ZK Migration Batch",
new MigrationWriteOperation(recordBatch), eventFlags);
queue.append(batchEvent);
return batchEvent.future;
}
@Override
public CompletableFuture<OffsetAndEpoch> completeMigration() {
log.info("Completing ZK Migration");
// TODO use KIP-868 transaction
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>("Complete ZK Migration",
new MigrationWriteOperation(
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
EnumSet.of(RUNS_IN_PREMIGRATION));
log.info("Completing migration of ZooKeeper metadata to KRaft.");
List<ApiMessageAndVersion> records = new ArrayList<>(2);
records.add(ZkMigrationState.MIGRATION.toRecord());
if (featureControl.metadataVersion().isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
}
ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
"Complete ZK Migration",
new MigrationWriteOperation(records),
eventFlags);
queue.append(event);
return event.future.thenApply(__ -> highestMigrationRecordOffset);
}
@Override
public void abortMigration() {
// If something goes wrong during the migration, cause the controller to crash and let the
// next controller abort the migration transaction (if in use).
fatalFaultHandler.handleFault("Aborting the ZK migration");
// TODO use KIP-868 transaction
}
}
@ -998,11 +1035,14 @@ public final class QuorumController implements Controller { @@ -998,11 +1035,14 @@ public final class QuorumController implements Controller {
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch);
// Complete any events in the purgatory that were waiting for this offset.
deferredEventQueue.completeUpTo(offset);
// Advance the committed and stable offsets then complete any pending purgatory
// items that were waiting for these offsets.
offsetControl.handleCommitBatch(batch);
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
// The active controller can delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(offset);
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());
} else {
// If the controller is a standby, replay the records that were
// created by the active controller.
@ -1025,8 +1065,8 @@ public final class QuorumController implements Controller { @@ -1025,8 +1065,8 @@ public final class QuorumController implements Controller {
}
recordIndex++;
}
offsetControl.handleCommitBatch(batch);
}
offsetControl.handleCommitBatch(batch);
}
} finally {
reader.close();
@ -1161,103 +1201,17 @@ public final class QuorumController implements Controller { @@ -1161,103 +1201,17 @@ public final class QuorumController implements Controller {
}
}
/**
* Generate the set of activation records. Until KIP-868 transactions are supported, these records
* are committed to the log as an atomic batch. The records will include the bootstrap metadata records
* (including the bootstrap "metadata.version") and may include a ZK migration record.
*/
public static List<ApiMessageAndVersion> generateActivationRecords(
Logger log,
boolean isLogEmpty,
boolean zkMigrationEnabled,
BootstrapMetadata bootstrapMetadata,
FeatureControlManager featureControl
) {
List<ApiMessageAndVersion> records = new ArrayList<>();
if (isLogEmpty) {
// If no records have been replayed, we need to write out the bootstrap records.
// This will include the new metadata.version, as well as things like SCRAM
// initialization, etc.
log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " +
"at metadata.version {} from {}.", bootstrapMetadata.records().size(),
bootstrapMetadata.metadataVersion(), bootstrapMetadata.source());
records.addAll(bootstrapMetadata.records());
if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
if (zkMigrationEnabled) {
log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " +
"the ZK metadata has been migrated");
records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
} else {
log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.");
records.add(ZkMigrationState.NONE.toRecord());
}
} else {
if (zkMigrationEnabled) {
throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() +
" does not support ZK migrations. Cannot continue with ZK migrations enabled.");
}
}
} else {
// Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions
if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
log.info("No metadata.version feature level record was found in the log. " +
"Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION);
}
if (featureControl.metadataVersion().isMigrationSupported()) {
log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState());
switch (featureControl.zkMigrationState()) {
case NONE:
// Since this is the default state there may or may not be an actual NONE in the log. Regardless,
// it will eventually be persisted in a snapshot, so we don't need to explicitly write it here.
if (zkMigrationEnabled) {
throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode.");
}
break;
case PRE_MIGRATION:
log.warn("Activating pre-migration controller without empty log. There may be a partial migration");
break;
case MIGRATION:
if (!zkMigrationEnabled) {
// This can happen if controller leadership transfers to a controller with migrations enabled
// after another controller had finalized the migration. For example, during a rolling restart
// of the controller quorum during which the migration config is being set to false.
log.warn("Completing the ZK migration since this controller was configured with " +
"'zookeeper.metadata.migration.enable' set to 'false'.");
records.add(ZkMigrationState.POST_MIGRATION.toRecord());
} else {
log.info("Staying in the ZK migration since 'zookeeper.metadata.migration.enable' is still 'true'.");
}
break;
case POST_MIGRATION:
if (zkMigrationEnabled) {
log.info("Ignoring 'zookeeper.metadata.migration.enable' value of 'true' since the ZK migration" +
"has been completed.");
}
break;
default:
throw new IllegalStateException("Unsupported ZkMigrationState " + featureControl.zkMigrationState());
}
} else {
if (zkMigrationEnabled) {
throw new RuntimeException("Should not have ZK migrations enabled on a cluster running metadata.version " + featureControl.metadataVersion());
}
}
}
return records;
}
class CompleteActivationEvent implements ControllerWriteOperation<Void> {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
try {
List<ApiMessageAndVersion> records = generateActivationRecords(log,
return ActivationRecordsGenerator.generate(
log::warn,
logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
zkMigrationEnabled,
bootstrapMetadata,
featureControl);
return ControllerResult.atomicOf(records, null);
} catch (Throwable t) {
throw fatalFaultHandler.handleFault("exception while completing controller " +
"activation", t);
@ -1286,6 +1240,8 @@ public final class QuorumController implements Controller { @@ -1286,6 +1240,8 @@ public final class QuorumController implements Controller {
curClaimEpoch = -1;
deferredEventQueue.failAll(ControllerExceptions.
newWrongControllerException(OptionalInt.empty()));
deferredUnstableEventQueue.failAll(ControllerExceptions.
newWrongControllerException(OptionalInt.empty()));
offsetControl.deactivate();
clusterControl.deactivate();
cancelMaybeFenceReplicas();
@ -1629,10 +1585,17 @@ public final class QuorumController implements Controller { @@ -1629,10 +1585,17 @@ public final class QuorumController implements Controller {
/**
* The deferred event queue which holds deferred operations which are waiting for the metadata
* log's high water mark to advance. This must be accessed only by the event queue thread.
* log's stable offset to advance. This must be accessed only by the event queue thread.
*/
private final DeferredEventQueue deferredEventQueue;
/**
* The deferred event queue which holds deferred operations which are waiting for the metadata
* log's committed offset to advance. This must be accessed only by the event queue thread and
* can contain records which are part of an incomplete transaction.
*/
private final DeferredEventQueue deferredUnstableEventQueue;
/**
* Manages read and write offsets, and in-memory snapshots.
*/
@ -1811,6 +1774,7 @@ public final class QuorumController implements Controller { @@ -1811,6 +1774,7 @@ public final class QuorumController implements Controller {
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
this.deferredUnstableEventQueue = new DeferredEventQueue(logContext);
this.offsetControl = new OffsetControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).

2
metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java

@ -19,7 +19,7 @@ package org.apache.kafka.image.loader; @@ -19,7 +19,7 @@ package org.apache.kafka.image.loader;
/**
* Contains information about the type of a loader manifest.
* Contains information about the type of loader manifest.
*/
public enum LoaderManifestType {
LOG_DELTA,

59
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java

@ -27,6 +27,59 @@ import java.util.Objects; @@ -27,6 +27,59 @@ import java.util.Objects;
* Contains information about a set of changes that were loaded from the metadata log.
*/
public class LogDeltaManifest implements LoaderManifest {
public static class Builder {
private MetadataProvenance provenance;
private LeaderAndEpoch leaderAndEpoch;
private int numBatches = -1;
private long elapsedNs = -1L;
private long numBytes = -1L;
public Builder provenance(MetadataProvenance provenance) {
this.provenance = provenance;
return this;
}
public Builder leaderAndEpoch(LeaderAndEpoch leaderAndEpoch) {
this.leaderAndEpoch = leaderAndEpoch;
return this;
}
public Builder numBatches(int numBatches) {
this.numBatches = numBatches;
return this;
}
public Builder elapsedNs(long elapsedNs) {
this.elapsedNs = elapsedNs;
return this;
}
public Builder numBytes(long numBytes) {
this.numBytes = numBytes;
return this;
}
public LogDeltaManifest build() {
if (provenance == null) {
throw new RuntimeException("provenance must not be null");
}
if (leaderAndEpoch == null) {
throw new RuntimeException("leaderAndEpoch must not be null");
}
if (numBatches == -1) {
throw new RuntimeException("numBatches must not be null");
}
if (elapsedNs == -1L) {
throw new RuntimeException("elapsedNs must not be null");
}
if (numBytes == -1L) {
throw new RuntimeException("numBytes must not be null");
}
return new LogDeltaManifest(provenance, leaderAndEpoch, numBatches, elapsedNs, numBytes);
}
}
/**
* The highest offset and epoch included in this delta, inclusive.
*/
@ -52,7 +105,7 @@ public class LogDeltaManifest implements LoaderManifest { @@ -52,7 +105,7 @@ public class LogDeltaManifest implements LoaderManifest {
*/
private final long numBytes;
public LogDeltaManifest(
LogDeltaManifest(
MetadataProvenance provenance,
LeaderAndEpoch leaderAndEpoch,
int numBatches,
@ -66,6 +119,10 @@ public class LogDeltaManifest implements LoaderManifest { @@ -66,6 +119,10 @@ public class LogDeltaManifest implements LoaderManifest {
this.numBytes = numBytes;
}
public static Builder newBuilder() {
return new Builder();
}
@Override
public LoaderManifestType type() {
return LoaderManifestType.LOG_DELTA;

262
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java

@ -0,0 +1,262 @@ @@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.image.loader;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit
* are buffered into a MetadataDelta to achieve batching of records and reduce the number of times
* MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866).
*
*
*/
public class MetadataBatchLoader {
enum TransactionState {
NO_TRANSACTION,
STARTED_TRANSACTION,
CONTINUED_TRANSACTION,
ENDED_TRANSACTION,
ABORTED_TRANSACTION;
}
@FunctionalInterface
public interface MetadataUpdater {
void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest);
}
private final Logger log;
private final Time time;
private final FaultHandler faultHandler;
private final MetadataUpdater callback;
private MetadataImage image;
private MetadataDelta delta;
private long lastOffset;
private int lastEpoch;
private long lastContainedLogTimeMs;
private long numBytes;
private int numBatches;
private long totalBatchElapsedNs;
private TransactionState transactionState;
public MetadataBatchLoader(
LogContext logContext,
Time time,
FaultHandler faultHandler,
MetadataUpdater callback
) {
this.log = logContext.logger(MetadataBatchLoader.class);
this.time = time;
this.faultHandler = faultHandler;
this.callback = callback;
}
/**
* Reset the state of this batch loader to the given image. Any un-flushed state will be
* discarded.
*
* @param image Metadata image to reset this batch loader's state to.
*/
public void resetToImage(MetadataImage image) {
this.image = image;
this.delta = new MetadataDelta.Builder().setImage(image).build();
this.transactionState = TransactionState.NO_TRANSACTION;
this.lastOffset = image.provenance().lastContainedOffset();
this.lastEpoch = image.provenance().lastContainedEpoch();
this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();
this.numBytes = 0;
this.numBatches = 0;
this.totalBatchElapsedNs = 0;
}
/**
* Load a batch of records from the log. We have to do some bookkeeping here to
* translate between batch offsets and record offsets, and track the number of bytes we
* have read. Additionally, there is the chance that one of the records is a metadata
* version change which needs to be handled differently.
* </p>
* If this batch starts a transaction, any records preceding the transaction in this
* batch will be implicitly added to the transaction.
*
* @param batch The reader which yields the batches.
* @return The time in nanoseconds that elapsed while loading this batch
*/
public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAndEpoch) {
long startNs = time.nanoseconds();
int indexWithinBatch = 0;
lastContainedLogTimeMs = batch.appendTimestamp();
lastEpoch = batch.epoch();
for (ApiMessageAndVersion record : batch.records()) {
try {
replay(record);
} catch (Throwable e) {
faultHandler.handleFault("Error loading metadata log record from offset " +
batch.baseOffset() + indexWithinBatch, e);
}
// Emit the accumulated delta if a new transaction has been started and one of the following is true
// 1) this is not the first record in this batch
// 2) this is not the first batch since last emitting a delta
if (transactionState == TransactionState.STARTED_TRANSACTION && (indexWithinBatch > 0 || numBatches > 0)) {
MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
LogDeltaManifest manifest = LogDeltaManifest.newBuilder()
.provenance(provenance)
.leaderAndEpoch(leaderAndEpoch)
.numBatches(numBatches) // This will be zero if we have not yet read a batch
.elapsedNs(totalBatchElapsedNs)
.numBytes(numBytes) // This will be zero if we have not yet read a batch
.build();
if (log.isDebugEnabled()) {
log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.",
image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
}
applyDeltaAndUpdate(delta, manifest);
transactionState = TransactionState.STARTED_TRANSACTION;
}
lastOffset = batch.baseOffset() + indexWithinBatch;
indexWithinBatch++;
}
long elapsedNs = time.nanoseconds() - startNs;
// Update state for the manifest. The actual byte count will only be included in the last delta emitted for
// a given batch or transaction.
lastOffset = batch.lastOffset();
numBytes += batch.sizeInBytes();
numBatches += 1;
totalBatchElapsedNs += elapsedNs;
return totalBatchElapsedNs;
}
/**
* Flush the metadata accumulated in this batch loader if not in the middle of a transaction. The
* flushed metadata will be passed to the {@link MetadataUpdater} configured for this class.
*/
public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch) {
MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
LogDeltaManifest manifest = LogDeltaManifest.newBuilder()
.provenance(provenance)
.leaderAndEpoch(leaderAndEpoch)
.numBatches(numBatches)
.elapsedNs(totalBatchElapsedNs)
.numBytes(numBytes)
.build();
switch (transactionState) {
case STARTED_TRANSACTION:
case CONTINUED_TRANSACTION:
log.debug("handleCommit: not publishing since a transaction starting at {} is still in progress. " +
"{} batch(es) processed so far.", image.offset(), numBatches);
break;
case ABORTED_TRANSACTION:
log.debug("handleCommit: publishing empty delta between {} and {} from {} batch(es) " +
"since a transaction was aborted", image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches());
applyDeltaAndUpdate(new MetadataDelta.Builder().setImage(image).build(), manifest);
break;
case ENDED_TRANSACTION:
case NO_TRANSACTION:
if (log.isDebugEnabled()) {
log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.",
image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
}
applyDeltaAndUpdate(delta, manifest);
break;
}
}
private void replay(ApiMessageAndVersion record) {
MetadataRecordType type = MetadataRecordType.fromId(record.message().apiKey());
switch (type) {
case BEGIN_TRANSACTION_RECORD:
if (transactionState == TransactionState.STARTED_TRANSACTION ||
transactionState == TransactionState.CONTINUED_TRANSACTION) {
throw new RuntimeException("Encountered BeginTransactionRecord while already in a transaction");
} else {
transactionState = TransactionState.STARTED_TRANSACTION;
}
break;
case END_TRANSACTION_RECORD:
if (transactionState == TransactionState.CONTINUED_TRANSACTION ||
transactionState == TransactionState.STARTED_TRANSACTION) {
transactionState = TransactionState.ENDED_TRANSACTION;
} else {
throw new RuntimeException("Encountered EndTransactionRecord without having seen a BeginTransactionRecord");
}
break;
case ABORT_TRANSACTION_RECORD:
if (transactionState == TransactionState.CONTINUED_TRANSACTION ||
transactionState == TransactionState.STARTED_TRANSACTION) {
transactionState = TransactionState.ABORTED_TRANSACTION;
} else {
throw new RuntimeException("Encountered AbortTransactionRecord without having seen a BeginTransactionRecord");
}
break;
default:
switch (transactionState) {
case STARTED_TRANSACTION:
// If we see a non-transaction record after starting a transaction, transition to CONTINUED_TRANSACTION
transactionState = TransactionState.CONTINUED_TRANSACTION;
break;
case ENDED_TRANSACTION:
case ABORTED_TRANSACTION:
// If we see a non-transaction record after ending a transaction, transition back to NO_TRANSACTION
transactionState = TransactionState.NO_TRANSACTION;
break;
case CONTINUED_TRANSACTION:
case NO_TRANSACTION:
default:
break;
}
delta.replay(record.message());
}
}
private void applyDeltaAndUpdate(MetadataDelta delta, LogDeltaManifest manifest) {
try {
image = delta.apply(manifest.provenance());
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
"metadata delta between offset " + image.offset() +
" and " + manifest.provenance().lastContainedOffset(), e);
}
// Whether we can apply the delta or not, we need to make sure the batch loader gets reset
// to the image known to MetadataLoader
callback.update(delta, image, manifest);
resetToImage(image);
}
}

176
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java

@ -183,6 +183,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -183,6 +183,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
*/
private MetadataImage image;
private final MetadataBatchLoader batchLoader;
/**
* The event queue which runs this loader.
*/
@ -205,9 +207,17 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -205,9 +207,17 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
this.uninitializedPublishers = new LinkedHashMap<>();
this.publishers = new LinkedHashMap<>();
this.image = MetadataImage.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix + "metadata-loader-",
new ShutdownEvent());
this.batchLoader = new MetadataBatchLoader(
logContext,
time,
faultHandler,
this::maybePublishMetadata);
this.batchLoader.resetToImage(this.image);
this.eventQueue = new KafkaEventQueue(
Time.SYSTEM,
logContext,
threadNamePrefix + "metadata-loader-",
new ShutdownEvent());
}
// VisibleForTesting
@ -306,49 +316,49 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -306,49 +316,49 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
return String.join(", ", uninitializedPublishers.keySet());
}
/**
* Callback used by MetadataBatchLoader and handleLoadSnapshot to update the active metadata publishers.
*/
private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, LoaderManifest manifest) {
this.image = image;
if (stillNeedToCatchUp(
"maybePublishMetadata(" + manifest.type().toString() + ")",
manifest.provenance().lastContainedOffset())
) {
return;
}
if (log.isDebugEnabled()) {
log.debug("handleCommit: publishing new image with provenance {}.", image.provenance());
}
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the new metadata " +
"image ending at " + manifest.provenance().lastContainedOffset() +
" with publisher " + publisher.name(), e);
}
}
metrics.updateLastAppliedImageProvenance(image.provenance());
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
}
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
try {
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
build();
LogDeltaManifest manifest = loadLogDelta(delta, reader);
if (log.isDebugEnabled()) {
log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) " +
"in {} us.", image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
}
try {
image = delta.apply(manifest.provenance());
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
"metadata delta between offset " + image.offset() +
" and " + manifest.provenance().lastContainedOffset(), e);
return;
}
if (stillNeedToCatchUp("handleCommit", manifest.provenance().lastContainedOffset())) {
return;
}
if (log.isDebugEnabled()) {
log.debug("handleCommit: publishing new image with provenance {}.", image.provenance());
}
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the new metadata " +
"image ending at " + manifest.provenance().lastContainedOffset() +
" with publisher " + publisher.name(), e);
}
}
metrics.updateLastAppliedImageProvenance(image.provenance());
if (delta.featuresDelta() != null) {
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
}
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
long elapsedNs = batchLoader.loadBatch(batch, currentLeaderAndEpoch);
metrics.updateBatchSize(batch.records().size());
metrics.updateBatchProcessingTimeNs(elapsedNs);
}
batchLoader.maybeFlushBatches(currentLeaderAndEpoch);
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them.
@ -360,57 +370,6 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -360,57 +370,6 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
});
}
/**
* Load some batches of records from the log. We have to do some bookkeeping here to
* translate between batch offsets and record offsets, and track the number of bytes we
* have read. Additionally, there is the chance that one of the records is a metadata
* version change which needs to be handled differently.
*
* @param delta The metadata delta we are preparing.
* @param reader The reader which yields the batches.
* @return A manifest of what was loaded.
*/
LogDeltaManifest loadLogDelta(
MetadataDelta delta,
BatchReader<ApiMessageAndVersion> reader
) {
long startNs = time.nanoseconds();
int numBatches = 0;
long numBytes = 0L;
long lastOffset = image.provenance().lastContainedOffset();
int lastEpoch = image.provenance().lastContainedEpoch();
long lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
int indexWithinBatch = 0;
for (ApiMessageAndVersion record : batch.records()) {
try {
delta.replay(record.message());
} catch (Throwable e) {
faultHandler.handleFault("Error loading metadata log record from offset " +
batch.baseOffset() + indexWithinBatch, e);
}
indexWithinBatch++;
}
metrics.updateBatchSize(batch.records().size());
lastOffset = batch.lastOffset();
lastEpoch = batch.epoch();
lastContainedLogTimeMs = batch.appendTimestamp();
numBytes += batch.sizeInBytes();
numBatches++;
}
MetadataProvenance provenance =
new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
long elapsedNs = time.nanoseconds() - startNs;
metrics.updateBatchProcessingTimeNs(elapsedNs);
return new LogDeltaManifest(provenance,
currentLeaderAndEpoch,
numBatches,
elapsedNs,
numBytes);
}
@Override
public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
eventQueue.append(() -> {
@ -420,39 +379,16 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -420,39 +379,16 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.",
snapshotName, numLoaded);
MetadataDelta delta = new MetadataDelta.Builder().
setImage(image).
build();
setImage(image).
build();
SnapshotManifest manifest = loadSnapshot(delta, reader);
log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} " +
"and this snapshot in {} us.", snapshotName,
image.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
try {
image = delta.apply(manifest.provenance());
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
"snapshot " + snapshotName, e);
return;
}
if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) {
return;
}
log.info("handleLoadSnapshot({}): publishing new snapshot image to {} publisher(s).",
snapshotName, publishers.size());
for (MetadataPublisher publisher : publishers.values()) {
try {
publisher.onMetadataUpdate(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the new metadata " +
"image from snapshot at offset " + reader.lastContainedLogOffset() +
" with publisher " + publisher.name(), e);
}
}
metrics.updateLastAppliedImageProvenance(image.provenance());
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
if (uninitializedPublishers.isEmpty()) {
scheduleInitializeNewPublishers(0);
}
MetadataImage image = delta.apply(manifest.provenance());
maybePublishMetadata(delta, image, manifest);
batchLoader.resetToImage(image);
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them.

23
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java

@ -465,6 +465,15 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -465,6 +465,15 @@ public class KRaftMigrationDriver implements MetadataPublisher {
@Override
public void run() throws Exception {
if (!firstPublish && image.isEmpty()) {
// KAFKA-15389 When first loading from an empty log, MetadataLoader can publish an empty image
log.debug("Encountered an empty MetadataImage while waiting for the first image to be published. " +
"Ignoring this image since it either does not include bootstrap records or it is a valid " +
"image for an older unsupported metadata version.");
completionHandler.accept(null);
return;
}
KRaftMigrationDriver.this.firstPublish = true;
MetadataImage prevImage = KRaftMigrationDriver.this.image;
KRaftMigrationDriver.this.image = image;
@ -627,7 +636,15 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -627,7 +636,15 @@ public class KRaftMigrationDriver implements MetadataPublisher {
Set<Integer> brokersInMetadata = new HashSet<>();
log.info("Starting ZK migration");
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
zkRecordConsumer.beginMigration();
try {
FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
"the metadata layer to begin the migration transaction",
zkRecordConsumer.beginMigration(),
Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
} catch (Throwable t) {
log.error("Could not start the migration", t);
super.handleException(t);
}
try {
zkMigrationClient.readAllMetadata(batch -> {
try {
@ -669,9 +686,9 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -669,9 +686,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
// exercising the snapshot handling code in KRaftMigrationZkWriter.
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
} catch (Throwable t) {
zkRecordConsumer.abortMigration();
MigrationManifest partialManifest = manifestBuilder.build();
log.error("Aborted metadata migration from ZooKeeper to KRaft. {}.", partialManifest);
log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest);
zkRecordConsumer.abortMigration(); // This terminates the controller via fatal fault handler
super.handleException(t);
}
}

2
metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java

@ -23,7 +23,7 @@ import java.util.List; @@ -23,7 +23,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
public interface ZkRecordConsumer {
void beginMigration();
CompletableFuture<?> beginMigration();
CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch);
CompletableFuture<OffsetAndEpoch> completeMigration();
void abortMigration();

361
metadata/src/test/java/org/apache/kafka/controller/ActivationRecordsGeneratorTest.java

@ -0,0 +1,361 @@ @@ -0,0 +1,361 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* This class is for testing the log message or exception produced by ActivationRecordsGenerator. For tests that
* verify the semantics of the returned records, see QuorumControllerTest.
*/
public class ActivationRecordsGeneratorTest {
@Test
public void testActivationMessageForEmptyLog() {
ControllerResult<Void> result;
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.0-IV1 from bootstrap source 'test'.", logMsg),
-1L,
false,
BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, "test"),
MetadataVersion.MINIMUM_KRAFT_VERSION
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.4-IV0 from bootstrap " +
"source 'test'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.", logMsg),
-1L,
false,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test"),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(2, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) at metadata.version 3.4-IV0 from bootstrap " +
"source 'test'. Putting the controller into pre-migration mode. No metadata updates will be allowed " +
"until the ZK metadata has been migrated.", logMsg),
-1L,
true,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test"),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(2, result.records().size());
assertEquals(
"The bootstrap metadata.version 3.3-IV2 does not support ZK migrations. Cannot continue with ZK migrations enabled.",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> fail(),
-1L,
true,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV2, "test"),
MetadataVersion.IBP_3_3_IV2
)).getMessage()
);
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.6-IV1 from bootstrap " +
"source 'test'. Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.", logMsg),
-1L,
false,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
assertEquals(4, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. The metadata log appears to be empty. " +
"Appending 1 bootstrap record(s) in metadata transaction at metadata.version 3.6-IV1 from bootstrap " +
"source 'test'. Putting the controller into pre-migration mode. No metadata updates will be allowed " +
"until the ZK metadata has been migrated.", logMsg),
-1L,
true,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
assertEquals(4, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting partial bootstrap records " +
"transaction at offset 0. Re-appending 1 bootstrap record(s) in new metadata transaction at " +
"metadata.version 3.6-IV1 from bootstrap source 'test'. Setting the ZK migration state to NONE " +
"since this is a de-novo KRaft cluster.", logMsg),
0L,
false,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
assertEquals(5, result.records().size());
result = ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting partial bootstrap records " +
"transaction at offset 0. Re-appending 1 bootstrap record(s) in new metadata transaction at " +
"metadata.version 3.6-IV1 from bootstrap source 'test'. Putting the controller into pre-migration " +
"mode. No metadata updates will be allowed until the ZK metadata has been migrated.", logMsg),
0L,
true,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
MetadataVersion.IBP_3_6_IV1
);
assertFalse(result.isAtomic());
assertEquals(5, result.records().size());
assertEquals(
"Detected partial bootstrap records transaction at 0, but the metadata.version 3.6-IV0 does not " +
"support transactions. Cannot continue.",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForEmptyLog(
logMsg -> assertEquals("", logMsg),
0L,
true,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0, "test"),
MetadataVersion.IBP_3_6_IV0
)).getMessage()
);
}
FeatureControlManager buildFeatureControl(
MetadataVersion metadataVersion,
Optional<ZkMigrationState> zkMigrationState
) {
FeatureControlManager featureControl = new FeatureControlManager.Builder()
.setMetadataVersion(metadataVersion).build();
zkMigrationState.ifPresent(migrationState ->
featureControl.replay((ZkMigrationStateRecord) migrationState.toRecord().message()));
return featureControl;
}
@Test
public void testActivationMessageForNonEmptyLogNoMigrations() {
ControllerResult<Void> result;
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. No metadata.version feature level " +
"record was found in the log. Treating the log as version 3.0-IV1.", logMsg),
-1L,
false,
buildFeatureControl(MetadataVersion.MINIMUM_KRAFT_VERSION, Optional.empty()),
MetadataVersion.MINIMUM_KRAFT_VERSION
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation.", logMsg),
-1L,
false,
buildFeatureControl(MetadataVersion.IBP_3_3_IV0, Optional.empty()),
MetadataVersion.IBP_3_3_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of NONE.", logMsg),
-1L,
false,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.empty()),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
"transaction at offset 42. Loaded ZK migration state of NONE.", logMsg),
42L,
false,
buildFeatureControl(MetadataVersion.IBP_3_6_IV1, Optional.empty()),
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
assertEquals(
"Detected in-progress transaction at offset 42, but the metadata.version 3.6-IV0 does not support " +
"transactions. Cannot continue.",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
42L,
false,
buildFeatureControl(MetadataVersion.IBP_3_6_IV0, Optional.empty()),
MetadataVersion.IBP_3_6_IV0
)).getMessage()
);
}
@Test
public void testActivationMessageForNonEmptyLogWithMigrations() {
ControllerResult<Void> result;
assertEquals(
"Should not have ZK migrations enabled on a cluster running metadata.version 3.3-IV0",
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_3_IV0, Optional.empty()),
MetadataVersion.IBP_3_3_IV0
)).getMessage()
);
assertEquals(
"Should not have ZK migrations enabled on a cluster that was created in KRaft mode.",
assertThrows(RuntimeException.class, () -> {
ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> fail(),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.empty()),
MetadataVersion.IBP_3_4_IV0
);
}).getMessage()
);
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of " +
"PRE_MIGRATION. Activating pre-migration controller without empty log. There may be a partial " +
"migration.", logMsg),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.PRE_MIGRATION)),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of " +
"PRE_MIGRATION.", logMsg),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_6_IV1, Optional.of(ZkMigrationState.PRE_MIGRATION)),
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of MIGRATION. " +
"Staying in ZK migration mode since 'zookeeper.metadata.migration.enable' is still 'true'.", logMsg),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.MIGRATION)),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of MIGRATION. " +
"Completing the ZK migration since this controller was configured with " +
"'zookeeper.metadata.migration.enable' set to 'false'.", logMsg),
-1L,
false,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.MIGRATION)),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
"transaction at offset 42. Loaded ZK migration state of MIGRATION. Completing the ZK migration " +
"since this controller was configured with 'zookeeper.metadata.migration.enable' set to 'false'.", logMsg),
42L,
false,
buildFeatureControl(MetadataVersion.IBP_3_6_IV1, Optional.of(ZkMigrationState.MIGRATION)),
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(2, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of " +
"POST_MIGRATION.", logMsg),
-1L,
false,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.POST_MIGRATION)),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
"transaction at offset 42. Loaded ZK migration state of POST_MIGRATION.", logMsg),
42L,
false,
buildFeatureControl(MetadataVersion.IBP_3_6_IV1, Optional.of(ZkMigrationState.POST_MIGRATION)),
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Loaded ZK migration state of " +
"POST_MIGRATION. Ignoring 'zookeeper.metadata.migration.enable' value of 'true' since the " +
"ZK migration has been completed.", logMsg),
-1L,
true,
buildFeatureControl(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.POST_MIGRATION)),
MetadataVersion.IBP_3_4_IV0
);
assertTrue(result.isAtomic());
assertEquals(0, result.records().size());
result = ActivationRecordsGenerator.recordsForNonEmptyLog(
logMsg -> assertEquals("Performing controller activation. Aborting in-progress metadata " +
"transaction at offset 42. Loaded ZK migration state of POST_MIGRATION. Ignoring " +
"'zookeeper.metadata.migration.enable' value of 'true' since the ZK migration has been completed.", logMsg),
42L,
true,
buildFeatureControl(MetadataVersion.IBP_3_6_IV1, Optional.of(ZkMigrationState.POST_MIGRATION)),
MetadataVersion.IBP_3_6_IV1
);
assertTrue(result.isAtomic());
assertEquals(1, result.records().size());
}
}

6
metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java

@ -50,9 +50,9 @@ public class OffsetControlManagerTest { @@ -50,9 +50,9 @@ public class OffsetControlManagerTest {
assertNull(offsetControl.currentSnapshotName());
assertEquals(-1L, offsetControl.lastCommittedOffset());
assertEquals(-1, offsetControl.lastCommittedEpoch());
assertEquals(-1, offsetControl.lastStableOffset());
assertEquals(-1, offsetControl.transactionStartOffset());
assertEquals(-1, offsetControl.nextWriteOffset());
assertEquals(-1L, offsetControl.lastStableOffset());
assertEquals(-1L, offsetControl.transactionStartOffset());
assertEquals(-1L, offsetControl.nextWriteOffset());
assertFalse(offsetControl.active());
assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList());
}

3
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java

@ -24,6 +24,7 @@ import java.util.List; @@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
@ -211,6 +212,6 @@ public class QuorumControllerIntegrationTestUtils { @@ -211,6 +212,6 @@ public class QuorumControllerIntegrationTestUtils {
controller.renounce();
future.complete(null);
});
future.get();
future.get(30, TimeUnit.SECONDS);
}
}

164
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

@ -45,8 +45,11 @@ import org.apache.kafka.common.Uuid; @@ -45,8 +45,11 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.requests.AlterPartitionRequest;
@ -96,9 +99,11 @@ import org.apache.kafka.metadata.RecordTestUtils; @@ -96,9 +99,11 @@ import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -111,6 +116,8 @@ import org.apache.kafka.timeline.SnapshotRegistry; @@ -111,6 +116,8 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -123,6 +130,7 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA @@ -123,6 +130,7 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA
import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat;
@ -219,7 +227,7 @@ public class QuorumControllerTest { @@ -219,7 +227,7 @@ public class QuorumControllerTest {
new ResultOrError<>(Collections.emptyMap())),
controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
BROKER0, Collections.emptyList())).get());
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(4L));
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(6L));
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
}
@ -538,7 +546,7 @@ public class QuorumControllerTest { @@ -538,7 +546,7 @@ public class QuorumControllerTest {
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV1)).
setListeners(listeners));
assertEquals(3L, reply.get().epoch());
assertEquals(5L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
@ -554,7 +562,7 @@ public class QuorumControllerTest { @@ -554,7 +562,7 @@ public class QuorumControllerTest {
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(3L).setBrokerId(0).
setWantFence(false).setBrokerEpoch(5L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
createTopicsRequestData, Collections.singleton("foo")).
@ -1286,13 +1294,14 @@ public class QuorumControllerTest { @@ -1286,13 +1294,14 @@ public class QuorumControllerTest {
stateInLog.ifPresent(zkMigrationState ->
featureControlManager.replay((ZkMigrationStateRecord) zkMigrationState.toRecord().message()));
List<ApiMessageAndVersion> records = QuorumController.generateActivationRecords(
log,
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
msg -> { },
!stateInLog.isPresent(),
-1L,
zkMigrationEnabled,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
featureControlManager);
RecordTestUtils.replayAll(featureControlManager, records);
RecordTestUtils.replayAll(featureControlManager, result.records());
return featureControlManager;
}
@ -1363,13 +1372,36 @@ public class QuorumControllerTest { @@ -1363,13 +1372,36 @@ public class QuorumControllerTest {
@Test
public void testActivationRecordsNonEmptyLog() {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
FeatureControlManager featureControl = getActivationRecords(
MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testActivationRecordsPartialBootstrap(boolean zkMigrationEnabled) {
FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
.setSnapshotRegistry(new SnapshotRegistry(new LogContext()))
.setMetadataVersion(MetadataVersion.IBP_3_6_IV1)
.build();
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
logMsg -> { },
true,
0L,
zkMigrationEnabled,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
featureControlManager);
assertFalse(result.isAtomic());
assertTrue(RecordTestUtils.recordAtIndexAs(
AbortTransactionRecord.class, result.records(), 0).isPresent());
assertTrue(RecordTestUtils.recordAtIndexAs(
BeginTransactionRecord.class, result.records(), 1).isPresent());
assertTrue(RecordTestUtils.recordAtIndexAs(
EndTransactionRecord.class, result.records(), result.records().size() - 1).isPresent());
}
@Test
public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
try (
@ -1388,4 +1420,118 @@ public class QuorumControllerTest { @@ -1388,4 +1420,118 @@ public class QuorumControllerTest {
assertThrows(FaultHandlerException.class, controlEnv::close);
}
}
@Test
public void testActivationRecordsPartialTransaction() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
.setSnapshotRegistry(snapshotRegistry)
.setMetadataVersion(MetadataVersion.IBP_3_6_IV1)
.build();
OffsetControlManager offsetControlManager = new OffsetControlManager.Builder().build();
offsetControlManager.replay(new BeginTransactionRecord(), 10);
offsetControlManager.handleCommitBatch(Batch.data(20, 1, 1L, 0,
Collections.singletonList(new ApiMessageAndVersion(new BeginTransactionRecord(), (short) 0))));
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
logMsg -> { },
false,
offsetControlManager.transactionStartOffset(),
false,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
featureControlManager);
assertTrue(result.isAtomic());
offsetControlManager.replay(
RecordTestUtils.recordAtIndexAs(AbortTransactionRecord.class, result.records(), 0).get(),
21
);
assertEquals(-1L, offsetControlManager.transactionStartOffset());
}
@Test
public void testActivationRecordsPartialTransactionNoSupport() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
.setSnapshotRegistry(snapshotRegistry)
.setMetadataVersion(MetadataVersion.IBP_3_6_IV0)
.build();
OffsetControlManager offsetControlManager = new OffsetControlManager.Builder().build();
offsetControlManager.replay(new BeginTransactionRecord(), 10);
offsetControlManager.handleCommitBatch(Batch.data(20, 1, 1L, 0,
Collections.singletonList(new ApiMessageAndVersion(new BeginTransactionRecord(), (short) 0))));
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.generate(
msg -> { },
false,
offsetControlManager.transactionStartOffset(),
false,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0, "test"),
featureControlManager)
);
}
private final static List<ApiMessageAndVersion> ZK_MIGRATION_RECORDS =
Collections.unmodifiableList(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord().
setName("spam").
setTopicId(Uuid.fromString("qvRJLpDYRHmgEi8_TPBYTQ")),
(short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
setTopicId(Uuid.fromString("qvRJLpDYRHmgEi8_TPBYTQ")).setReplicas(Arrays.asList(0, 1, 2)).
setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
setTopicId(Uuid.fromString("qvRJLpDYRHmgEi8_TPBYTQ")).setReplicas(Arrays.asList(1, 2, 0)).
setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0)
));
@Test
public void testFailoverDuringMigrationTransaction() throws Exception {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setZkMigrationEnabled(true)).
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"));
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
QuorumController active = controlEnv.activeController(true);
ZkRecordConsumer migrationConsumer = active.zkRecordConsumer();
migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);
migrationConsumer.acceptBatch(ZK_MIGRATION_RECORDS).get(30, TimeUnit.SECONDS);
forceRenounce(active);
// Ensure next controller doesn't see the topic from partial migration
QuorumController newActive = controlEnv.activeController(true);
CompletableFuture<Map<String, ResultOrError<Uuid>>> results =
newActive.findTopicIds(ANONYMOUS_CONTEXT, Collections.singleton("spam"));
assertEquals(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
results.get(30, TimeUnit.SECONDS).get("spam").error().error());
assertEquals(
ZkMigrationState.PRE_MIGRATION,
newActive.appendReadEvent("read migration state", OptionalLong.empty(),
() -> newActive.featureControl().zkMigrationState()
).get(30, TimeUnit.SECONDS)
);
// Ensure the migration can happen on new active controller
migrationConsumer = newActive.zkRecordConsumer();
migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);
migrationConsumer.acceptBatch(ZK_MIGRATION_RECORDS).get(30, TimeUnit.SECONDS);
migrationConsumer.completeMigration().get(30, TimeUnit.SECONDS);
results = newActive.findTopicIds(ANONYMOUS_CONTEXT, Collections.singleton("spam"));
assertTrue(results.get(30, TimeUnit.SECONDS).get("spam").isResult());
assertEquals(ZkMigrationState.MIGRATION, newActive.appendReadEvent("read migration state", OptionalLong.empty(),
() -> newActive.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
}
}
}

16
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java

@ -125,6 +125,10 @@ public class QuorumControllerTestEnv implements AutoCloseable { @@ -125,6 +125,10 @@ public class QuorumControllerTestEnv implements AutoCloseable {
}
QuorumController activeController() throws InterruptedException {
return activeController(false);
}
QuorumController activeController(boolean waitForActivation) throws InterruptedException {
AtomicReference<QuorumController> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
LeaderAndEpoch leader = logEnv.leaderAndEpoch();
@ -141,6 +145,18 @@ public class QuorumControllerTestEnv implements AutoCloseable { @@ -141,6 +145,18 @@ public class QuorumControllerTestEnv implements AutoCloseable {
}
});
if (waitForActivation) {
try {
// ControllerActivation happens after curClaimEpoch is set, so we need to put something on
// the end of the queue and wait for it to complete before returning the active controller.
value.get()
.appendReadEvent("wait for activation", OptionalLong.empty(), () -> null)
.get(20000, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
throw new RuntimeException("Failed while waiting for controller activation", t);
}
}
return value.get();
}

7
metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisherTest.java

@ -130,7 +130,12 @@ public class ControllerMetadataMetricsPublisherTest { @@ -130,7 +130,12 @@ public class ControllerMetadataMetricsPublisherTest {
if (isSnapshot) {
return new SnapshotManifest(MetadataProvenance.EMPTY, 0);
} else {
return new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 0, 0, 0);
return LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(0)
.elapsedNs(0)
.numBytes(0).build();
}
}

444
metadata/src/test/java/org/apache/kafka/image/loader/MetadataBatchLoaderTest.java

@ -0,0 +1,444 @@ @@ -0,0 +1,444 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class MetadataBatchLoaderTest {
static final Uuid TOPIC_FOO = Uuid.fromString("c6uHMgPkRp2Urjlh-RxMNQ");
static final Uuid TOPIC_BAR = Uuid.fromString("tUWOOPvzQhmZZ_eXmTCcig");
static final List<ApiMessageAndVersion> TOPIC_TXN_BATCH_1;
static final List<ApiMessageAndVersion> TOPIC_TXN_BATCH_2;
static final List<ApiMessageAndVersion> TOPIC_NO_TXN_BATCH;
static final List<ApiMessageAndVersion> TXN_BEGIN_SINGLETON;
static final List<ApiMessageAndVersion> TXN_END_SINGLETON;
static final List<ApiMessageAndVersion> TXN_ABORT_SINGLETON;
static final LeaderAndEpoch LEADER_AND_EPOCH = new LeaderAndEpoch(OptionalInt.of(1), 42);
static {
{
TOPIC_TXN_BATCH_1 = Arrays.asList(
new ApiMessageAndVersion(new BeginTransactionRecord().setName("txn-1"), (short) 0),
new ApiMessageAndVersion(new TopicRecord()
.setName("foo")
.setTopicId(TOPIC_FOO), (short) 0),
new ApiMessageAndVersion(new PartitionRecord()
.setPartitionId(0)
.setTopicId(TOPIC_FOO), (short) 0)
);
TOPIC_TXN_BATCH_2 = Arrays.asList(
new ApiMessageAndVersion(new PartitionRecord()
.setPartitionId(1)
.setTopicId(TOPIC_FOO), (short) 0),
new ApiMessageAndVersion(new PartitionRecord()
.setPartitionId(2)
.setTopicId(TOPIC_FOO), (short) 0),
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)
);
TOPIC_NO_TXN_BATCH = Arrays.asList(
new ApiMessageAndVersion(new TopicRecord()
.setName("bar")
.setTopicId(TOPIC_BAR), (short) 0),
new ApiMessageAndVersion(new PartitionRecord()
.setPartitionId(0)
.setTopicId(TOPIC_BAR), (short) 0),
new ApiMessageAndVersion(new PartitionRecord()
.setPartitionId(1)
.setTopicId(TOPIC_BAR), (short) 0)
);
TXN_BEGIN_SINGLETON = Collections.singletonList(
new ApiMessageAndVersion(new BeginTransactionRecord().setName("txn-1"), (short) 0));
TXN_END_SINGLETON = Collections.singletonList(
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
TXN_ABORT_SINGLETON = Collections.singletonList(
new ApiMessageAndVersion(new AbortTransactionRecord(), (short) 0));
}
}
static List<ApiMessageAndVersion> noOpRecords(int n) {
return IntStream.range(0, n)
.mapToObj(__ -> new ApiMessageAndVersion(new NoOpRecord(), (short) 0))
.collect(Collectors.toList());
}
static class MockMetadataUpdater implements MetadataBatchLoader.MetadataUpdater {
MetadataImage latestImage = null;
MetadataDelta latestDelta = null;
LogDeltaManifest latestManifest = null;
int updates = 0;
@Override
public void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest) {
latestDelta = delta;
latestImage = image;
latestManifest = manifest;
updates++;
}
public void reset() {
latestImage = null;
latestDelta = null;
latestManifest = null;
updates = 0;
}
}
@Test
public void testAlignedTransactionBatches() {
Batch<ApiMessageAndVersion> batch1 = Batch.data(
10, 1, 0, 10, TOPIC_TXN_BATCH_1);
Batch<ApiMessageAndVersion> batch2 = Batch.data(
13, 2, 0, 10, noOpRecords(3));
Batch<ApiMessageAndVersion> batch3 = Batch.data(
16, 2, 0, 30, TOPIC_TXN_BATCH_2);
MockMetadataUpdater updater = new MockMetadataUpdater();
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
new MockFaultHandler("testAlignedTransactionBatches"),
updater
);
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch1, LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
batchLoader.loadBatch(batch2, LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
batchLoader.loadBatch(batch3, LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
assertNotNull(updater.latestImage.topics().getTopic("foo"));
assertEquals(18, updater.latestImage.provenance().lastContainedOffset());
assertEquals(2, updater.latestImage.provenance().lastContainedEpoch());
}
@Test
public void testSingletonBeginAndEnd() {
Batch<ApiMessageAndVersion> batch1 = Batch.data(
13, 1, 0, 30, noOpRecords(3));
Batch<ApiMessageAndVersion> batch2 = Batch.data(
16, 2, 0, 30, TXN_BEGIN_SINGLETON);
Batch<ApiMessageAndVersion> batch3 = Batch.data(
17, 3, 0, 10, TOPIC_NO_TXN_BATCH);
Batch<ApiMessageAndVersion> batch4 = Batch.data(
20, 4, 0, 10, TXN_END_SINGLETON);
MockMetadataUpdater updater = new MockMetadataUpdater();
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
new MockFaultHandler("testSingletonBeginAndEnd"),
updater
);
// All in one commit
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch1, LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
batchLoader.loadBatch(batch2, LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
assertNull(updater.latestImage.topics().getTopic("bar"));
batchLoader.loadBatch(batch3, LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
batchLoader.loadBatch(batch4, LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertNotNull(updater.latestImage.topics().getTopic("bar"));
assertEquals(20, updater.latestImage.provenance().lastContainedOffset());
assertEquals(4, updater.latestImage.provenance().lastContainedEpoch());
// Each batch in a separate commit
updater.reset();
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch1, LEADER_AND_EPOCH);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
batchLoader.loadBatch(batch2, LEADER_AND_EPOCH);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
batchLoader.loadBatch(batch3, LEADER_AND_EPOCH);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
batchLoader.loadBatch(batch4, LEADER_AND_EPOCH);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(2, updater.updates);
}
@Test
public void testUnexpectedBeginTransaction() {
MockMetadataUpdater updater = new MockMetadataUpdater();
MockFaultHandler faultHandler = new MockFaultHandler("testUnexpectedBeginTransaction");
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
faultHandler,
updater
);
Batch<ApiMessageAndVersion> batch1 = Batch.data(
10, 2, 0, 30, TOPIC_TXN_BATCH_1);
Batch<ApiMessageAndVersion> batch2 = Batch.data(
13, 2, 0, 30, TXN_BEGIN_SINGLETON);
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch1, LEADER_AND_EPOCH);
assertNull(faultHandler.firstException());
batchLoader.loadBatch(batch2, LEADER_AND_EPOCH);
assertEquals(RuntimeException.class, faultHandler.firstException().getCause().getClass());
assertEquals(
"Encountered BeginTransactionRecord while already in a transaction",
faultHandler.firstException().getCause().getMessage()
);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
}
@Test
public void testUnexpectedEndTransaction() {
MockMetadataUpdater updater = new MockMetadataUpdater();
MockFaultHandler faultHandler = new MockFaultHandler("testUnexpectedAbortTransaction");
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
faultHandler,
updater
);
// First batch gets loaded fine
Batch<ApiMessageAndVersion> batch1 = Batch.data(
10, 2, 0, 30, TOPIC_NO_TXN_BATCH);
// Second batch throws an error, but shouldn't interfere with prior batches
Batch<ApiMessageAndVersion> batch2 = Batch.data(
13, 2, 0, 30, TXN_END_SINGLETON);
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch1, LEADER_AND_EPOCH);
assertNull(faultHandler.firstException());
batchLoader.loadBatch(batch2, LEADER_AND_EPOCH);
assertEquals(RuntimeException.class, faultHandler.firstException().getCause().getClass());
assertEquals(
"Encountered EndTransactionRecord without having seen a BeginTransactionRecord",
faultHandler.firstException().getCause().getMessage()
);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
assertNotNull(updater.latestImage.topics().getTopic("bar"));
}
@Test
public void testUnexpectedAbortTransaction() {
MockMetadataUpdater updater = new MockMetadataUpdater();
MockFaultHandler faultHandler = new MockFaultHandler("testUnexpectedAbortTransaction");
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
faultHandler,
updater
);
// First batch gets loaded fine
Batch<ApiMessageAndVersion> batch1 = Batch.data(
10, 2, 0, 30, TOPIC_NO_TXN_BATCH);
// Second batch throws an error, but shouldn't interfere with prior batches
Batch<ApiMessageAndVersion> batch2 = Batch.data(
13, 2, 0, 30, TXN_ABORT_SINGLETON);
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch1, LEADER_AND_EPOCH);
assertNull(faultHandler.firstException());
batchLoader.loadBatch(batch2, LEADER_AND_EPOCH);
assertEquals(RuntimeException.class, faultHandler.firstException().getCause().getClass());
assertEquals(
"Encountered AbortTransactionRecord without having seen a BeginTransactionRecord",
faultHandler.firstException().getCause().getMessage()
);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(1, updater.updates);
assertNotNull(updater.latestImage.topics().getTopic("bar"));
}
private MetadataBatchLoader loadSingleBatch(
MockMetadataUpdater updater,
MockFaultHandler faultHandler,
List<ApiMessageAndVersion> batchRecords
) {
Batch<ApiMessageAndVersion> batch = Batch.data(
10, 42, 0, 100, batchRecords);
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
faultHandler,
updater
);
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(batch, LEADER_AND_EPOCH);
return batchLoader;
}
@Test
public void testMultipleTransactionsInOneBatch() {
List<ApiMessageAndVersion> batchRecords = new ArrayList<>();
batchRecords.addAll(TOPIC_TXN_BATCH_1);
batchRecords.addAll(TOPIC_TXN_BATCH_2);
batchRecords.addAll(TXN_BEGIN_SINGLETON);
batchRecords.addAll(TOPIC_NO_TXN_BATCH);
batchRecords.addAll(TXN_END_SINGLETON);
MockMetadataUpdater updater = new MockMetadataUpdater();
MockFaultHandler faultHandler = new MockFaultHandler("testMultipleTransactionsInOneBatch");
MetadataBatchLoader batchLoader = loadSingleBatch(updater, faultHandler, batchRecords);
assertEquals(1, updater.updates);
assertEquals(0, updater.latestManifest.numBytes());
assertEquals(15, updater.latestImage.provenance().lastContainedOffset());
assertEquals(42, updater.latestImage.provenance().lastContainedEpoch());
assertNotNull(updater.latestImage.topics().getTopic("foo"));
assertNull(updater.latestImage.topics().getTopic("bar"));
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(2, updater.updates);
assertEquals(100, updater.latestManifest.numBytes());
assertEquals(20, updater.latestImage.provenance().lastContainedOffset());
assertEquals(42, updater.latestImage.provenance().lastContainedEpoch());
assertNotNull(updater.latestImage.topics().getTopic("foo"));
assertNotNull(updater.latestImage.topics().getTopic("bar"));
}
@Test
public void testMultipleTransactionsInOneBatchesWithNoOp() {
List<ApiMessageAndVersion> batchRecords = new ArrayList<>();
batchRecords.addAll(noOpRecords(1));
batchRecords.addAll(TOPIC_TXN_BATCH_1);
batchRecords.addAll(noOpRecords(1));
batchRecords.addAll(TOPIC_TXN_BATCH_2);
// A batch with non-transactional records between two transactions causes a delta to get published
batchRecords.addAll(noOpRecords(1));
batchRecords.addAll(TXN_BEGIN_SINGLETON);
batchRecords.addAll(noOpRecords(1));
batchRecords.addAll(TOPIC_NO_TXN_BATCH);
batchRecords.addAll(noOpRecords(1));
batchRecords.addAll(TXN_END_SINGLETON);
batchRecords.addAll(noOpRecords(1));
MockMetadataUpdater updater = new MockMetadataUpdater();
MockFaultHandler faultHandler = new MockFaultHandler("testMultipleTransactionsInOneBatches");
MetadataBatchLoader batchLoader = loadSingleBatch(updater, faultHandler, batchRecords);
assertEquals(2, updater.updates);
assertEquals(0, updater.latestManifest.numBytes());
assertEquals(18, updater.latestImage.provenance().lastContainedOffset());
assertEquals(42, updater.latestImage.provenance().lastContainedEpoch());
assertNotNull(updater.latestImage.topics().getTopic("foo"));
assertNull(updater.latestImage.topics().getTopic("bar"));
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
assertEquals(3, updater.updates);
assertEquals(100, updater.latestManifest.numBytes());
assertEquals(26, updater.latestImage.provenance().lastContainedOffset());
assertEquals(42, updater.latestImage.provenance().lastContainedEpoch());
assertNotNull(updater.latestImage.topics().getTopic("foo"));
assertNotNull(updater.latestImage.topics().getTopic("bar"));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testOneTransactionInMultipleBatches(boolean abortTxn) {
MockMetadataUpdater updater = new MockMetadataUpdater();
MetadataBatchLoader batchLoader = new MetadataBatchLoader(
new LogContext(),
new MockTime(),
new MockFaultHandler("testOneTransactionInMultipleBatches"),
updater
);
batchLoader.resetToImage(MetadataImage.EMPTY);
batchLoader.loadBatch(Batch.data(
16, 2, 0, 10, TXN_BEGIN_SINGLETON), LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
batchLoader.loadBatch(Batch.data(
17, 3, 0, 30, TOPIC_NO_TXN_BATCH), LEADER_AND_EPOCH);
assertEquals(0, updater.updates);
if (abortTxn) {
batchLoader.loadBatch(Batch.data(
20, 4, 0, 10, TXN_ABORT_SINGLETON), LEADER_AND_EPOCH);
} else {
batchLoader.loadBatch(Batch.data(
20, 4, 0, 10, TXN_END_SINGLETON), LEADER_AND_EPOCH);
}
assertEquals(0, updater.updates);
batchLoader.maybeFlushBatches(LEADER_AND_EPOCH);
// Regardless of end/abort, we should publish an updated MetadataProvenance and manifest
assertEquals(50, updater.latestManifest.numBytes());
assertEquals(3, updater.latestManifest.numBatches());
assertEquals(20, updater.latestImage.provenance().lastContainedOffset());
assertEquals(4, updater.latestImage.provenance().lastContainedEpoch());
if (abortTxn) {
assertNull(updater.latestImage.topics().getTopic("bar"));
} else {
assertNotNull(updater.latestImage.topics().getTopic("bar"));
}
}
}

155
metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java

@ -19,7 +19,11 @@ package org.apache.kafka.image.loader; @@ -19,7 +19,11 @@ package org.apache.kafka.image.loader;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.record.ControlRecordType;
@ -42,7 +46,10 @@ import org.junit.jupiter.api.Test; @@ -42,7 +46,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
@ -59,6 +66,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; @@ -59,6 +66,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -376,6 +384,15 @@ public class MetadataLoaderTest { @@ -376,6 +384,15 @@ public class MetadataLoaderTest {
private boolean closed = false;
private MockTime time = null;
static MockBatchReader newSingleBatchReader(
long batchBaseOffset,
int epoch,
List<ApiMessageAndVersion> records
) {
return new MockBatchReader(batchBaseOffset,
Collections.singletonList(newBatch(batchBaseOffset, epoch, records)));
}
static Batch<ApiMessageAndVersion> newBatch(
long batchBaseOffset,
int epoch,
@ -459,8 +476,14 @@ public class MetadataLoaderTest { @@ -459,8 +476,14 @@ public class MetadataLoaderTest {
assertEquals(300L, loader.lastAppliedOffset());
}
assertTrue(publishers.get(0).closed);
assertEquals(new LogDeltaManifest(new MetadataProvenance(300, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
3000000L, 10),
assertEquals(
LogDeltaManifest.newBuilder()
.provenance(new MetadataProvenance(300, 100, 4000))
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(0L)
.numBytes(10)
.build(),
publishers.get(0).latestLogDeltaManifest);
assertEquals(MetadataVersion.IBP_3_3_IV1,
publishers.get(0).latestImage.features().metadataVersion());
@ -622,4 +645,132 @@ public class MetadataLoaderTest { @@ -622,4 +645,132 @@ public class MetadataLoaderTest {
}
faultHandler.maybeRethrowFirstException();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testPublishTransaction(boolean abortTxn) throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testTransactions");
MockPublisher publisher = new MockPublisher("testTransactions");
List<MockPublisher> publishers = Collections.singletonList(publisher);
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
build()) {
loader.installPublishers(publishers).get();
loader.waitForAllEventsToBeHandled();
loader.handleCommit(
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
new ApiMessageAndVersion(new BeginTransactionRecord()
.setName("testTransactions"), (short) 0),
new ApiMessageAndVersion(new TopicRecord()
.setName("foo")
.setTopicId(Uuid.fromString("dMCqhcK4T5miGH5wEX7NsQ")), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
assertNull(publisher.latestImage.topics().getTopic("foo"),
"Topic should not be visible since we started transaction");
loader.handleCommit(
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(Uuid.fromString("dMCqhcK4T5miGH5wEX7NsQ"))
.setPartitionId(0), (short) 0),
new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(Uuid.fromString("dMCqhcK4T5miGH5wEX7NsQ"))
.setPartitionId(1), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
assertNull(publisher.latestImage.topics().getTopic("foo"),
"Topic should not be visible after subsequent batch");
if (abortTxn) {
loader.handleCommit(
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
new ApiMessageAndVersion(new AbortTransactionRecord(), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
assertNull(publisher.latestImage.topics().getTopic("foo"),
"Topic should not be visible since the transaction was aborted");
} else {
loader.handleCommit(
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
assertNotNull(publisher.latestImage.topics().getTopic("foo"),
"Topic should be visible now that transaction has ended");
}
}
faultHandler.maybeRethrowFirstException();
}
@Test
public void testPublishTransactionWithinBatch() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testPublishTransactionWithinBatch");
MockPublisher publisher = new MockPublisher("testPublishTransactionWithinBatch");
List<MockPublisher> publishers = Collections.singletonList(publisher);
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
build()) {
loader.installPublishers(publishers).get();
loader.waitForAllEventsToBeHandled();
loader.handleCommit(
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
new ApiMessageAndVersion(new BeginTransactionRecord()
.setName("txn-1"), (short) 0),
new ApiMessageAndVersion(new TopicRecord()
.setName("foo")
.setTopicId(Uuid.fromString("HQSM3ccPQISrHqYK_C8GpA")), (short) 0),
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
// After MetadataLoader is fixed to handle arbitrary transactions, we would expect "foo"
// to be visible at this point.
assertNotNull(publisher.latestImage.topics().getTopic("foo"));
}
faultHandler.maybeRethrowFirstException();
}
@Test
public void testSnapshotDuringTransaction() throws Exception {
MockFaultHandler faultHandler = new MockFaultHandler("testSnapshotDuringTransaction");
MockPublisher publisher = new MockPublisher("testSnapshotDuringTransaction");
List<MockPublisher> publishers = Collections.singletonList(publisher);
try (MetadataLoader loader = new MetadataLoader.Builder().
setFaultHandler(faultHandler).
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
build()) {
loader.installPublishers(publishers).get();
loader.waitForAllEventsToBeHandled();
loader.handleCommit(
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
new ApiMessageAndVersion(new BeginTransactionRecord()
.setName("txn-1"), (short) 0),
new ApiMessageAndVersion(new TopicRecord()
.setName("foo")
.setTopicId(Uuid.fromString("HQSM3ccPQISrHqYK_C8GpA")), (short) 0)
)));
loader.waitForAllEventsToBeHandled();
assertNull(publisher.latestImage.topics().getTopic("foo"));
// loading a snapshot discards any in-flight transaction
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
new MetadataProvenance(600, 101, 4000), asList(
asList(new ApiMessageAndVersion(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
)));
loader.waitForAllEventsToBeHandled();
assertEquals("Uum7sfhHQP-obSvfywmNUA",
publisher.latestImage.topics().getTopic("foo").id().toString());
}
faultHandler.maybeRethrowFirstException();
}
}

32
metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java

@ -79,6 +79,15 @@ public class SnapshotGeneratorTest { @@ -79,6 +79,15 @@ public class SnapshotGeneratorTest {
}
}
static LogDeltaManifest.Builder logDeltaManifestBuilder() {
return LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(100)
.numBytes(100);
}
private final static MetadataDelta TEST_DELTA;
static {
@ -100,15 +109,12 @@ public class SnapshotGeneratorTest { @@ -100,15 +109,12 @@ public class SnapshotGeneratorTest {
setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
build()) {
// Publish a log delta batch. This one will not trigger a snapshot yet.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().build());
// Publish a log delta batch. This will trigger a snapshot.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().build());
// Publish a log delta batch. This one will be ignored because there are other images
// queued for writing.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 2000));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().numBytes(2000).build());
assertEquals(Collections.emptyList(), emitter.images());
emitter.setReady();
}
@ -129,8 +135,7 @@ public class SnapshotGeneratorTest { @@ -129,8 +135,7 @@ public class SnapshotGeneratorTest {
build()) {
disabledReason.compareAndSet(null, "we are testing disable()");
// No snapshots are generated because snapshots are disabled.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 100));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().build());
}
assertEquals(Collections.emptyList(), emitter.images());
faultHandler.maybeRethrowFirstException();
@ -148,18 +153,15 @@ public class SnapshotGeneratorTest { @@ -148,18 +153,15 @@ public class SnapshotGeneratorTest {
setMaxTimeSinceLastSnapshotNs(TimeUnit.MINUTES.toNanos(30)).
build()) {
// This image isn't published yet.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 50));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().numBytes(50).build());
assertEquals(Collections.emptyList(), emitter.images());
mockTime.sleep(TimeUnit.MINUTES.toNanos(40));
// Next image is published because of the time delay.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 50));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().numBytes(50).build());
TestUtils.waitForCondition(() -> emitter.images().size() == 1, "images.size == 1");
// bytesSinceLastSnapshot was reset to 0 by the previous snapshot,
// so this does not trigger a new snapshot.
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 100, 150));
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().numBytes(150).build());
}
assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
faultHandler.maybeRethrowFirstException();
@ -175,7 +177,7 @@ public class SnapshotGeneratorTest { @@ -175,7 +177,7 @@ public class SnapshotGeneratorTest {
build()) {
for (int i = 0; i < 2; i++) {
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 1, 10000, 50000));
logDeltaManifestBuilder().elapsedNs(10000).numBytes(50000).build());
}
}
assertEquals(Collections.emptyList(), emitter.images());

27
metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java

@ -39,6 +39,7 @@ import java.util.HashSet; @@ -39,6 +39,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
@ -90,6 +91,32 @@ public class RecordTestUtils { @@ -90,6 +91,32 @@ public class RecordTestUtils {
replayAll(target, Collections.singletonList(recordAndVersion));
}
public static <T extends ApiMessage> Optional<T> recordAtIndexAs(
Class<T> recordClazz,
List<ApiMessageAndVersion> recordsAndVersions,
int recordIndex
) {
if (recordIndex > recordsAndVersions.size() - 1) {
return Optional.empty();
} else {
if (recordIndex == -1) {
return recordsAndVersions.stream().map(ApiMessageAndVersion::message)
.filter(record -> record.getClass().isAssignableFrom(recordClazz))
.map(recordClazz::cast)
.findFirst();
} else {
ApiMessageAndVersion messageAndVersion = recordsAndVersions.get(recordIndex);
ApiMessage record = messageAndVersion.message();
if (record.getClass().isAssignableFrom(recordClazz)) {
return Optional.of(recordClazz.cast(record));
} else {
return Optional.empty();
}
}
}
}
public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper<D, I> {
private final Supplier<I> emptyImageSupplier;
private final Function<I, D> deltaUponImageCreator;

46
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java

@ -141,8 +141,8 @@ public class KRaftMigrationDriverTest { @@ -141,8 +141,8 @@ public class KRaftMigrationDriverTest {
static class NoOpRecordConsumer implements ZkRecordConsumer {
@Override
public void beginMigration() {
public CompletableFuture<?> beginMigration() {
return CompletableFuture.completedFuture(null);
}
@Override
@ -201,6 +201,15 @@ public class KRaftMigrationDriverTest { @@ -201,6 +201,15 @@ public class KRaftMigrationDriverTest {
}
}
static LogDeltaManifest.Builder logDeltaManifestBuilder(MetadataProvenance provenance, LeaderAndEpoch newLeader) {
return LogDeltaManifest.newBuilder()
.provenance(provenance)
.leaderAndEpoch(newLeader)
.numBatches(1)
.elapsedNs(100)
.numBytes(42);
}
RegisterBrokerRecord zkBrokerRecord(int id) {
RegisterBrokerRecord record = new RegisterBrokerRecord();
record.setBrokerId(id);
@ -264,7 +273,7 @@ public class KRaftMigrationDriverTest { @@ -264,7 +273,7 @@ public class KRaftMigrationDriverTest {
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
@ -346,8 +355,8 @@ public class KRaftMigrationDriverTest { @@ -346,8 +355,8 @@ public class KRaftMigrationDriverTest {
// Notify the driver that it is the leader
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
// Publish metadata of all the ZK brokers being ready
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
assertTrue(claimLeaderAttempts.await(1, TimeUnit.MINUTES));
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
@ -382,7 +391,7 @@ public class KRaftMigrationDriverTest { @@ -382,7 +391,7 @@ public class KRaftMigrationDriverTest {
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
// Current apiVersions are missing the controller node 6, should stay at WAIT_FOR_CONTROLLER_QUORUM state
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
@ -430,8 +439,8 @@ public class KRaftMigrationDriverTest { @@ -430,8 +439,8 @@ public class KRaftMigrationDriverTest {
image = delta.apply(provenance);
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
@ -511,7 +520,7 @@ public class KRaftMigrationDriverTest { @@ -511,7 +520,7 @@ public class KRaftMigrationDriverTest {
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
// Wait for migration
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
@ -564,7 +573,7 @@ public class KRaftMigrationDriverTest { @@ -564,7 +573,7 @@ public class KRaftMigrationDriverTest {
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
// Wait for migration
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
@ -575,7 +584,7 @@ public class KRaftMigrationDriverTest { @@ -575,7 +584,7 @@ public class KRaftMigrationDriverTest {
delta = new MetadataDelta(image);
RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
image = delta.apply(provenance);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
driver.migrationState().get(1, TimeUnit.MINUTES);
assertEquals(1, topicClient.deletedTopics.size());
@ -617,7 +626,7 @@ public class KRaftMigrationDriverTest { @@ -617,7 +626,7 @@ public class KRaftMigrationDriverTest {
// Publish a delta making a different node the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3001), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
// Fake a complete migration
migrationClient.setMigrationRecoveryState(
@ -630,7 +639,7 @@ public class KRaftMigrationDriverTest { @@ -630,7 +639,7 @@ public class KRaftMigrationDriverTest {
image = delta.apply(provenance);
// Standby driver does not do anything with this delta besides remember the image
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());
// Standby becomes leader
newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
@ -651,8 +660,9 @@ public class KRaftMigrationDriverTest { @@ -651,8 +660,9 @@ public class KRaftMigrationDriverTest {
AtomicInteger migrationBeginCalls = new AtomicInteger(0);
NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
@Override
public void beginMigration() {
public CompletableFuture<?> beginMigration() {
migrationBeginCalls.incrementAndGet();
return CompletableFuture.completedFuture(null);
}
};
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
@ -680,10 +690,10 @@ public class KRaftMigrationDriverTest { @@ -680,10 +690,10 @@ public class KRaftMigrationDriverTest {
// Call onMetadataUpdate twice. The first call will trigger the migration to begin (due to presence of brokers)
// Both calls will "wakeup" the driver and cause a PollEvent to be run. Calling these back-to-back effectively
// causes two MigrateMetadataEvents to be enqueued. Ensure only one is actually run.
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");

12
server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java

@ -162,6 +162,12 @@ public class SnapshotRegistry { @@ -162,6 +162,12 @@ public class SnapshotRegistry {
return snapshots.containsKey(epoch);
}
private String epochsToString() {
return epochsList()
.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
}
/**
* Gets the snapshot for a specific epoch.
*/
@ -169,8 +175,7 @@ public class SnapshotRegistry { @@ -169,8 +175,7 @@ public class SnapshotRegistry {
Snapshot snapshot = snapshots.get(epoch);
if (snapshot == null) {
throw new RuntimeException("No in-memory snapshot for epoch " + epoch + ". Snapshot " +
"epochs are: " + epochsList().stream().map(e -> e.toString()).
collect(Collectors.joining(", ")));
"epochs are: " + epochsToString());
}
return snapshot;
}
@ -187,7 +192,8 @@ public class SnapshotRegistry { @@ -187,7 +192,8 @@ public class SnapshotRegistry {
Snapshot last = head.prev();
if (last.epoch() > epoch) {
throw new RuntimeException("Can't create a new in-memory snapshot at epoch " + epoch +
" because there is already a snapshot with epoch " + last.epoch());
" because there is already a snapshot with epoch " + last.epoch() + ". Snapshot epochs are " +
epochsToString());
} else if (last.epoch() == epoch) {
return last;
}

2
server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java

@ -60,7 +60,7 @@ public class SnapshotRegistryTest { @@ -60,7 +60,7 @@ public class SnapshotRegistryTest {
assertThrows(RuntimeException.class, () -> registry.getSnapshot(456));
assertIteratorContains(registry.iterator(), snapshot123);
assertEquals("Can't create a new in-memory snapshot at epoch 1 because there is already " +
"a snapshot with epoch 123", assertThrows(RuntimeException.class,
"a snapshot with epoch 123. Snapshot epochs are 123", assertThrows(RuntimeException.class,
() -> registry.getOrCreateSnapshot(1)).getMessage());
Snapshot snapshot456 = registry.getOrCreateSnapshot(456);
assertIteratorContains(registry.iterator(), snapshot123, snapshot456);

12
tests/kafkatest/tests/core/zookeeper_migration_test.py

@ -28,7 +28,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer @@ -28,7 +28,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import DEV_BRANCH, V_3_4_0
from kafkatest.version import DEV_BRANCH, LATEST_3_4
class TestMigration(ProduceConsumeValidateTest):
@ -140,14 +140,14 @@ class TestMigration(ProduceConsumeValidateTest): @@ -140,14 +140,14 @@ class TestMigration(ProduceConsumeValidateTest):
This test ensures that even if we enable migrations after the upgrade to 3.5, that no migration
is able to take place.
"""
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=V_3_4_0)
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=LATEST_3_4)
self.zk.start()
self.kafka = KafkaService(self.test_context,
num_nodes=3,
zk=self.zk,
allow_zk_with_kraft=True,
version=V_3_4_0,
version=LATEST_3_4,
server_prop_overrides=[["zookeeper.metadata.migration.enable", "false"]],
topics={self.topic: {"partitions": self.partitions,
"replication-factor": self.replication_factor,
@ -202,17 +202,17 @@ class TestMigration(ProduceConsumeValidateTest): @@ -202,17 +202,17 @@ class TestMigration(ProduceConsumeValidateTest):
the correct migration state in the log.
"""
zk_quorum = partial(ServiceQuorumInfo, zk)
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=V_3_4_0)
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=LATEST_3_4)
self.kafka = KafkaService(self.test_context,
num_nodes=3,
zk=self.zk,
version=V_3_4_0,
version=LATEST_3_4,
quorum_info_provider=zk_quorum,
allow_zk_with_kraft=True,
server_prop_overrides=[["zookeeper.metadata.migration.enable", "true"]])
remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=V_3_4_0,
controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=LATEST_3_4,
allow_zk_with_kraft=True,
isolated_kafka=self.kafka,
server_prop_overrides=[["zookeeper.connect", self.zk.connect_setting()],

Loading…
Cancel
Save