diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index aa6ba5a793a..83722f83d02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -68,7 +68,7 @@ public class NodeApiVersions { */ public static NodeApiVersions create(Collection overrides) { List apiVersions = new LinkedList<>(overrides); - for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) { + for (ApiKeys apiKey : ApiKeys.clientApis()) { boolean exists = false; for (ApiVersion apiVersion : apiVersions) { if (apiVersion.apiKey() == apiKey.id) { diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 1aa420b36f0..b9fc6e57ec3 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -63,7 +63,7 @@ public class ApiKeysTest { Set authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE); // Newer protocol apis include throttle time ms even for cluster actions Set clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES); - for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) { + for (ApiKeys apiKey: ApiKeys.clientApis()) { Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()]; BoundField throttleTimeField = responseSchema.get("throttle_time_ms"); if ((apiKey.clusterAction && !clusterActionsWithThrottleTimeMs.contains(apiKey)) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 1e5f8493f60..19d3c468186 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -162,8 +162,9 @@ public class ApiVersionsResponseTest { assertEquals(10, response.data().finalizedFeaturesEpoch()); } - @Test - public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() { + @ParameterizedTest + @EnumSource(names = {"ZK_BROKER", "BROKER"}) + public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle(ListenerType listenerType) { ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse( AbstractResponse.DEFAULT_THROTTLE_TIME, RecordVersion.current(), @@ -171,11 +172,11 @@ public class ApiVersionsResponseTest { Collections.emptyMap(), ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, null, - ListenerType.ZK_BROKER, + listenerType, true, false ); - assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response)); + assertEquals(new HashSet<>(ApiKeys.apisForListener(listenerType)), apiKeysInResponse(response)); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); assertTrue(response.data().supportedFeatures().isEmpty()); assertTrue(response.data().finalizedFeatures().isEmpty());