Browse Source

KAFKA-15287: Change NodeApiVersions.create() to support both zk and kraft (#14185)

Reviewers: dengziming <dengziming1993@gmail.com>
pull/13361/merge
vveicc 1 year ago committed by GitHub
parent
commit
594156e01b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
  2. 2
      clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
  3. 9
      clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java

2
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java

@ -68,7 +68,7 @@ public class NodeApiVersions { @@ -68,7 +68,7 @@ public class NodeApiVersions {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
for (ApiKeys apiKey : ApiKeys.clientApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {

2
clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java

@ -63,7 +63,7 @@ public class ApiKeysTest { @@ -63,7 +63,7 @@ public class ApiKeysTest {
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES);
for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
for (ApiKeys apiKey: ApiKeys.clientApis()) {
Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
BoundField throttleTimeField = responseSchema.get("throttle_time_ms");
if ((apiKey.clusterAction && !clusterActionsWithThrottleTimeMs.contains(apiKey))

9
clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java

@ -162,8 +162,9 @@ public class ApiVersionsResponseTest { @@ -162,8 +162,9 @@ public class ApiVersionsResponseTest {
assertEquals(10, response.data().finalizedFeaturesEpoch());
}
@Test
public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
@ParameterizedTest
@EnumSource(names = {"ZK_BROKER", "BROKER"})
public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(ListenerType listenerType) {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
@ -171,11 +172,11 @@ public class ApiVersionsResponseTest { @@ -171,11 +172,11 @@ public class ApiVersionsResponseTest {
Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
listenerType,
true,
false
);
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());

Loading…
Cancel
Save