Browse Source

KAFKA-14909: check zkMigrationReady tag before migration (#13631)

1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
pull/8946/merge
Luke Chen 2 years ago committed by GitHub
parent
commit
d796480fe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      checkstyle/import-control-metadata.xml
  2. 7
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  3. 11
      clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
  4. 30
      clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
  5. 10
      clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
  6. 3
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  7. 12
      clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
  8. 2
      core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
  9. 20
      core/src/main/scala/kafka/server/ApiVersionManager.scala
  10. 6
      core/src/main/scala/kafka/server/ControllerServer.scala
  11. 2
      core/src/main/scala/kafka/tools/TestRaftServer.scala
  12. 2
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  13. 2
      core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
  14. 2
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  15. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  16. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
  17. 26
      metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
  18. 42
      metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
  19. 31
      metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
  20. 201
      metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java

3
checkstyle/import-control-metadata.xml

@ -153,6 +153,9 @@ @@ -153,6 +153,9 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>

7
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -956,12 +956,13 @@ public class NetworkClient implements KafkaClient { @@ -956,12 +956,13 @@ public class NetworkClient implements KafkaClient {
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(
apiVersionsResponse.data().apiKeys(),
apiVersionsResponse.data().supportedFeatures());
apiVersionsResponse.data().supportedFeatures(),
apiVersionsResponse.data().zkMigrationReady());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo);
apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo);
}
/**

11
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java

@ -48,6 +48,8 @@ public class NodeApiVersions { @@ -48,6 +48,8 @@ public class NodeApiVersions {
private final Map<String, SupportedVersionRange> supportedFeatures;
private final boolean zkMigrationEnabled;
/**
* Create a NodeApiVersions object with the current ApiVersions.
*
@ -76,7 +78,7 @@ public class NodeApiVersions { @@ -76,7 +78,7 @@ public class NodeApiVersions {
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
return new NodeApiVersions(apiVersions, Collections.emptyList());
return new NodeApiVersions(apiVersions, Collections.emptyList(), false);
}
@ -95,7 +97,7 @@ public class NodeApiVersions { @@ -95,7 +97,7 @@ public class NodeApiVersions {
.setMaxVersion(maxVersion)));
}
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures) {
public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures, boolean zkMigrationEnabled) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
@ -112,6 +114,7 @@ public class NodeApiVersions { @@ -112,6 +114,7 @@ public class NodeApiVersions {
new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion()));
}
this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder);
this.zkMigrationEnabled = zkMigrationEnabled;
}
/**
@ -236,4 +239,8 @@ public class NodeApiVersions { @@ -236,4 +239,8 @@ public class NodeApiVersions {
public Map<String, SupportedVersionRange> supportedFeatures() {
return supportedFeatures;
}
public boolean zkMigrationEnabled() {
return zkMigrationEnabled;
}
}

30
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java

@ -119,7 +119,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -119,7 +119,8 @@ public class ApiVersionsResponse extends AbstractResponse {
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, true),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}
@ -131,7 +132,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -131,7 +132,8 @@ public class ApiVersionsResponse extends AbstractResponse {
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
Features.emptySupportedFeatures()
Features.emptySupportedFeatures(),
false
);
}
@ -139,20 +141,22 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -139,20 +141,22 @@ public class ApiVersionsResponse extends AbstractResponse {
int throttleTimeMs,
ApiVersionCollection apiVersions
) {
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
}
public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures
Features<SupportedVersionRange> latestSupportedFeatures,
boolean zkMigrationEnabled
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
latestSupportedFeatures,
Collections.emptyMap(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
UNKNOWN_FINALIZED_FEATURES_EPOCH,
zkMigrationEnabled);
}
public static ApiVersionsResponse createApiVersionsResponse(
@ -163,7 +167,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -163,7 +167,8 @@ public class ApiVersionsResponse extends AbstractResponse {
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
boolean enableUnstableLastVersion
boolean enableUnstableLastVersion,
boolean zkMigrationEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
@ -186,7 +191,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -186,7 +191,8 @@ public class ApiVersionsResponse extends AbstractResponse {
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
);
}
@ -195,7 +201,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -195,7 +201,8 @@ public class ApiVersionsResponse extends AbstractResponse {
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
long finalizedFeaturesEpoch,
boolean zkMigrationEnabled
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
@ -204,7 +211,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -204,7 +211,8 @@ public class ApiVersionsResponse extends AbstractResponse {
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
finalizedFeaturesEpoch,
zkMigrationEnabled
)
);
}
@ -294,7 +302,8 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -294,7 +302,8 @@ public class ApiVersionsResponse extends AbstractResponse {
final ApiVersionCollection apiKeys,
final Features<SupportedVersionRange> latestSupportedFeatures,
final Map<String, Short> finalizedFeatures,
final long finalizedFeaturesEpoch
final long finalizedFeaturesEpoch,
final boolean zkMigrationEnabled
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setThrottleTimeMs(throttleTimeMs);
@ -303,6 +312,7 @@ public class ApiVersionsResponse extends AbstractResponse { @@ -303,6 +312,7 @@ public class ApiVersionsResponse extends AbstractResponse {
data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures));
data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
data.setZkMigrationReady(zkMigrationEnabled);
return data;
}

10
clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java

@ -39,7 +39,7 @@ public class NodeApiVersionsTest { @@ -39,7 +39,7 @@ public class NodeApiVersionsTest {
@Test
public void testUnsupportedVersionsToString() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
@ -68,7 +68,7 @@ public class NodeApiVersionsTest { @@ -68,7 +68,7 @@ public class NodeApiVersionsTest {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
@ -125,7 +125,7 @@ public class NodeApiVersionsTest { @@ -125,7 +125,7 @@ public class NodeApiVersionsTest {
@Test
public void testUsableVersionCalculationNoKnownVersions() {
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
@ -147,7 +147,7 @@ public class NodeApiVersionsTest { @@ -147,7 +147,7 @@ public class NodeApiVersionsTest {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
@ -157,7 +157,7 @@ public class NodeApiVersionsTest { @@ -157,7 +157,7 @@ public class NodeApiVersionsTest {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false);
for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));

3
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -634,7 +634,8 @@ public class KafkaAdminClientTest { @@ -634,7 +634,8 @@ public class KafkaAdminClientTest {
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
Collections.singletonMap("test_feature_1", (short) 2),
defaultFeatureMetadata().finalizedFeaturesEpoch().get()
defaultFeatureMetadata().finalizedFeaturesEpoch().get(),
false
);
}
return new ApiVersionsResponse(

12
clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java

@ -121,7 +121,8 @@ public class ApiVersionsResponseTest { @@ -121,7 +121,8 @@ public class ApiVersionsResponseTest {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
@ -141,7 +142,8 @@ public class ApiVersionsResponseTest { @@ -141,7 +142,8 @@ public class ApiVersionsResponseTest {
10L,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
@ -169,7 +171,8 @@ public class ApiVersionsResponseTest { @@ -169,7 +171,8 @@ public class ApiVersionsResponseTest {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
@ -188,7 +191,8 @@ public class ApiVersionsResponseTest { @@ -188,7 +191,8 @@ public class ApiVersionsResponseTest {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
true
true,
false
);
// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them

2
core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala

@ -158,7 +158,7 @@ object BrokerApiVersionsCommand { @@ -158,7 +158,7 @@ object BrokerApiVersionsCommand {
private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady)
}
/**

20
core/src/main/scala/kafka/server/ApiVersionManager.scala

@ -49,7 +49,8 @@ object ApiVersionManager { @@ -49,7 +49,8 @@ object ApiVersionManager {
forwardingManager,
supportedFeatures,
metadataCache,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)
}
}
@ -58,25 +59,28 @@ class SimpleApiVersionManager( @@ -58,25 +59,28 @@ class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
) extends ApiVersionManager {
def this(
listenerType: ListenerType,
enableUnstableLastVersion: Boolean
enableUnstableLastVersion: Boolean,
zkMigrationEnabled: Boolean
) = {
this(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled)
}
}
@ -85,7 +89,8 @@ class DefaultApiVersionManager( @@ -85,7 +89,8 @@ class DefaultApiVersionManager(
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean = false
) extends ApiVersionManager {
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
@ -103,7 +108,8 @@ class DefaultApiVersionManager( @@ -103,7 +108,8 @@ class DefaultApiVersionManager(
finalizedFeatures.epoch,
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion
enableUnstableLastVersion,
zkMigrationEnabled
)
}
}

6
core/src/main/scala/kafka/server/ControllerServer.scala

@ -175,7 +175,8 @@ class ControllerServer( @@ -175,7 +175,8 @@ class ControllerServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled
config.unstableApiVersionsEnabled,
config.migrationEnabled
)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
@ -270,7 +271,8 @@ class ControllerServer( @@ -270,7 +271,8 @@ class ControllerServer(
"zk migration",
fatal = false,
() => {}
)
),
quorumFeatures
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))

2
core/src/main/scala/kafka/tools/TestRaftServer.scala

@ -74,7 +74,7 @@ class TestRaftServer( @@ -74,7 +74,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
val metaProperties = MetaProperties(

2
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -77,7 +77,7 @@ class SocketServerTest { @@ -77,7 +77,7 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]

2
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala

@ -154,7 +154,7 @@ class ControllerApisTest { @@ -154,7 +154,7 @@ class ControllerApisTest {
new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
)
}

2
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -184,7 +184,7 @@ class KafkaApisTest { @@ -184,7 +184,7 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)
new KafkaApis(
requestChannel = requestChannel,

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java

@ -197,7 +197,7 @@ public class KRaftMetadataRequestBenchmark { @@ -197,7 +197,7 @@ public class KRaftMetadataRequestBenchmark {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)).
build();
}

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java

@ -199,7 +199,7 @@ public class MetadataRequestBenchmark { @@ -199,7 +199,7 @@ public class MetadataRequestBenchmark {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)).
build();
}

26
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java

@ -128,4 +128,30 @@ public class QuorumFeatures { @@ -128,4 +128,30 @@ public class QuorumFeatures {
boolean isControllerId(int nodeId) {
return quorumNodeIds.contains(nodeId);
}
// check if all controller nodes are ZK Migration ready
public Optional<String> reasonAllControllersZkMigrationNotReady() {
List<String> missingApiVers = new ArrayList<>();
List<String> zkMigrationNotReady = new ArrayList<>();
for (int id : quorumNodeIds) {
if (nodeId == id) {
continue; // No need to check local node because the KraftMigrationDriver will be created only when migration config set
}
NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(id));
if (nodeVersions == null) {
missingApiVers.add(String.valueOf(id));
} else if (!nodeVersions.zkMigrationEnabled()) {
zkMigrationNotReady.add(String.valueOf(id));
}
}
boolean isReady = missingApiVers.isEmpty() && zkMigrationNotReady.isEmpty();
if (!isReady) {
String zkMigrationNotReadyMsg = zkMigrationNotReady.isEmpty() ? "" : "Nodes don't enable `zookeeper.metadata.migration.enable`: " + zkMigrationNotReady + ".";
String missingApiVersionMsg = missingApiVers.isEmpty() ? "" : " Missing apiVersion from nodes: " + missingApiVers;
return Optional.of(zkMigrationNotReadyMsg + missingApiVersionMsg);
}
return Optional.empty();
}
}

42
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType; @@ -22,6 +22,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -47,6 +48,7 @@ import java.util.HashMap; @@ -47,6 +48,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
@ -91,6 +93,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -91,6 +93,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private volatile MigrationDriverState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image;
private volatile QuorumFeatures quorumFeatures;
private volatile boolean firstPublish;
public KRaftMigrationDriver(
@ -99,15 +102,17 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -99,15 +102,17 @@ public class KRaftMigrationDriver implements MetadataPublisher {
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
Time time
) {
this.nodeId = nodeId;
this.zkRecordConsumer = zkRecordConsumer;
this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator;
this.time = Time.SYSTEM;
this.time = time;
this.logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
this.log = this.logContext.logger(KRaftMigrationDriver.class);
this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
@ -116,8 +121,22 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -116,8 +121,22 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler;
this.faultHandler = faultHandler;
this.quorumFeatures = quorumFeatures;
}
public KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures
) {
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, Time.SYSTEM);
}
public void start() {
eventQueue.prepend(new PollEvent());
}
@ -149,6 +168,15 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -149,6 +168,15 @@ public class KRaftMigrationDriver implements MetadataPublisher {
transitionTo(MigrationDriverState.INACTIVE);
}
private boolean isControllerQuorumReadyForMigration() {
Optional<String> notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady();
if (notReadyMsg.isPresent()) {
log.info("Still waiting for all controller nodes ready to begin the migration. due to:" + notReadyMsg.get());
return false;
}
return true;
}
private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set<Integer> brokerIds) {
for (BrokerRegistration broker : image.cluster().brokers().values()) {
if (broker.isMigratingZkBroker()) {
@ -432,9 +460,11 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -432,9 +460,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
transitionTo(MigrationDriverState.INACTIVE);
break;
case PRE_MIGRATION:
// Base case when starting the migration
log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
if (isControllerQuorumReadyForMigration()) {
// Base case when starting the migration
log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
}
break;
case MIGRATION:
if (!migrationLeadershipState.zkMigrationComplete()) {

31
metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java

@ -89,7 +89,7 @@ public class QuorumFeaturesTest { @@ -89,7 +89,7 @@ public class QuorumFeaturesTest {
setMinVersion(entry.getValue().min()).
setMaxVersion(entry.getValue().max()));
});
return new NodeApiVersions(Collections.emptyList(), features);
return new NodeApiVersions(Collections.emptyList(), features, false);
}
@Test
@ -100,4 +100,33 @@ public class QuorumFeaturesTest { @@ -100,4 +100,33 @@ public class QuorumFeaturesTest {
assertTrue(quorumFeatures.isControllerId(2));
assertFalse(quorumFeatures.isControllerId(3));
}
@Test
public void testZkMigrationReady() {
ApiVersions apiVersions = new ApiVersions();
QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, LOCAL, Arrays.asList(0, 1, 2));
// create apiVersion with zkMigrationEnabled flag set for node 0, the other 2 nodes have no apiVersions info
apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Missing apiVersion from nodes: [1, 2]"));
// create apiVersion with zkMigrationEnabled flag set for node 1, the other 1 node have no apiVersions info
apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Missing apiVersion from nodes: [2]"));
// create apiVersion with zkMigrationEnabled flag disabled for node 2, should still be not ready
apiVersions.update("2", NodeApiVersions.create());
assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Nodes don't enable `zookeeper.metadata.migration.enable`: [2]"));
// update zkMigrationEnabled flag to enabled for node 2, should be ready now
apiVersions.update("2", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
assertFalse(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
// create apiVersion with zkMigrationEnabled flag disabled for a non-controller, and expect we fill filter it out
apiVersions.update("3", NodeApiVersions.create());
assertFalse(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
}
}

201
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java

@ -16,6 +16,9 @@ @@ -16,6 +16,9 @@
*/
package org.apache.kafka.metadata.migration;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.config.ConfigResource;
@ -23,6 +26,9 @@ import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; @@ -23,6 +26,9 @@ import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -37,6 +43,7 @@ import org.apache.kafka.server.common.MetadataVersion; @@ -37,6 +43,7 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -55,7 +62,34 @@ import java.util.concurrent.TimeUnit; @@ -55,7 +62,34 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class KRaftMigrationDriverTest {
List<Node> controllerNodes = Arrays.asList(
new Node(4, "host4", 0),
new Node(5, "host5", 0),
new Node(6, "host6", 0)
);
ApiVersions apiVersions = new ApiVersions();
QuorumFeatures quorumFeatures = QuorumFeatures.create(4,
apiVersions,
QuorumFeatures.defaultFeatureMap(),
controllerNodes);
Time mockTime = new MockTime(1) {
public long nanoseconds() {
// We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test
return System.nanoTime() - NANOSECONDS.convert(990, MILLISECONDS);
}
};
@BeforeEach
public void setup() {
apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
apiVersions.update("5", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
}
static class NoOpRecordConsumer implements ZkRecordConsumer {
@Override
public void beginMigration() {
@ -292,65 +326,66 @@ public class KRaftMigrationDriverTest { @@ -292,65 +326,66 @@ public class KRaftMigrationDriverTest {
public void testOnlySendNeededRPCsToBrokers() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)));
KRaftMigrationDriver driver = new KRaftMigrationDriver(
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
new MockFaultHandler("test")
);
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
Assertions.assertEquals(1, metadataPropagator.images);
Assertions.assertEquals(0, metadataPropagator.deltas);
delta = new MetadataDelta(image);
delta.replay(new ConfigRecord()
.setResourceType(ConfigResource.Type.BROKER.id())
.setResourceName("1")
.setName("foo")
.setValue("bar"));
provenance = new MetadataProvenance(120, 1, 2);
image = delta.apply(provenance);
enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
Assertions.assertEquals(1, migrationClient.capturedConfigs.size());
Assertions.assertEquals(1, metadataPropagator.images);
Assertions.assertEquals(0, metadataPropagator.deltas);
delta = new MetadataDelta(image);
delta.replay(new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(0)
.setFenced(BrokerRegistrationFencingChange.NONE.value())
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
provenance = new MetadataProvenance(130, 1, 3);
image = delta.apply(provenance);
enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
Assertions.assertEquals(1, metadataPropagator.images);
Assertions.assertEquals(1, metadataPropagator.deltas);
driver.close();
new MockFaultHandler("test"),
quorumFeatures,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
Assertions.assertEquals(1, metadataPropagator.images);
Assertions.assertEquals(0, metadataPropagator.deltas);
delta = new MetadataDelta(image);
delta.replay(new ConfigRecord()
.setResourceType(ConfigResource.Type.BROKER.id())
.setResourceName("1")
.setName("foo")
.setValue("bar"));
provenance = new MetadataProvenance(120, 1, 2);
image = delta.apply(provenance);
enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
Assertions.assertEquals(1, migrationClient.capturedConfigs.size());
Assertions.assertEquals(1, metadataPropagator.images);
Assertions.assertEquals(0, metadataPropagator.deltas);
delta = new MetadataDelta(image);
delta.replay(new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(0)
.setFenced(BrokerRegistrationFencingChange.NONE.value())
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
provenance = new MetadataProvenance(130, 1, 3);
image = delta.apply(provenance);
enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
Assertions.assertEquals(1, metadataPropagator.images);
Assertions.assertEquals(1, metadataPropagator.deltas);
}
}
@ParameterizedTest
@ -381,7 +416,9 @@ public class KRaftMigrationDriverTest { @@ -381,7 +416,9 @@ public class KRaftMigrationDriverTest {
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler
faultHandler,
quorumFeatures,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -400,8 +437,8 @@ public class KRaftMigrationDriverTest { @@ -400,8 +437,8 @@ public class KRaftMigrationDriverTest {
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
Assertions.assertTrue(claimLeaderAttempts.await(1, TimeUnit.MINUTES));
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.ZK_MIGRATION),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
if (authException) {
Assertions.assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass());
@ -412,6 +449,52 @@ public class KRaftMigrationDriverTest { @@ -412,6 +449,52 @@ public class KRaftMigrationDriverTest {
}
@Test
public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1)));
apiVersions.remove("6");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> {
},
new MockFaultHandler("test"),
quorumFeatures,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
driver.start();
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);
// Publish a delta with this node (3000) as the leader
LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
driver.onControllerChange(newLeader);
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
// Current apiVersions are missing the controller node 6, should stay at WAIT_FOR_CONTROLLER_QUORUM state
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
// Current apiVersions of node 6 has no zkMigrationReady set, should still stay at WAIT_FOR_CONTROLLER_QUORUM state
apiVersions.update("6", NodeApiVersions.create());
driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
// all controller nodes are zkMigrationReady, should be able to move to next state
apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
}
}
public void testSkipWaitForBrokersInDualWrite() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet());
@ -422,7 +505,9 @@ public class KRaftMigrationDriverTest { @@ -422,7 +505,9 @@ public class KRaftMigrationDriverTest {
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler
faultHandler,
quorumFeatures,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Loading…
Cancel
Save