Browse Source

KAFKA-15466: Add KIP-919 support for some admin APIs (#14399)

Add support for --bootstrap-controller in the following command-line tools:
    - kafka-cluster.sh
    - kafka-configs.sh
    - kafka-features.sh
    - kafka-metadata-quorum.sh

To implement this, the following AdminClient APIs now support the new bootstrap.controllers
configuration:
    - Admin.alterConfigs
    - Admin.describeCluster
    - Admin.describeConfigs
    - Admin.describeFeatures
    - Admin.describeMetadataQuorum
    - Admin.incrementalAlterConfigs
    - Admin.updateFeatures

Command-line tool changes:
    - Add CommandLineUtils.initializeBootstrapProperties to handle parsing --bootstrap-controller
      in addition to --bootstrap-server.
    - Add --bootstrap-controller to ConfigCommand.scala, ClusterTool.java, FeatureCommand.java, and
      MetadataQuorumCommand.java.

KafkaAdminClient changes:
    - Add the AdminBootstrapAddresses class to handle extracting bootstrap.servers or
      bootstrap.controllers from the config map for KafkaAdminClient.
    - In AdminMetadataManager, store the new usingBootstrapControllers boolean. Generalize
      authException to encompass the concept of fatal exceptions in general. (For example, the
      fatal exception where we talked to the wrong node type.) Treat
      MismatchedEndpointTypeException and UnsupportedEndpointTypeException as fatal exceptions.
    - Extend NodeProvider to include information about whether bootstrap.controllers is supported.
    - Modify the APIs described above to support bootstrap.controllers.

Server-side changes:
    - Support DescribeConfigsRequest on kcontrollers.
    - Add KRaftMetadataCache to the kcontroller to simplify implemeting describeConfigs (and
      probably more APIs in the future). It's mainly a wrapper around MetadataImage, so there is
      essentially no extra resource consumption.
    - Split RuntimeLoggerManager out of ConfigAdminManager to handle the incrementalAlterConfigs
      support for BROKER_LOGGER. This is now supported on kcontrollers as well as brokers.
    - Fix bug in AuthHelper.computeDescribeClusterResponse that resulted in us always sending back
      BROKER as the endpoint type, even on the kcontroller.

Miscellaneous:
    - Fix a few places in exceptions and log messages where we wrote "broker" instead of "node".
      For example, an exception in NodeApiVersions.java, and a log message in NetworkClient.java.
    - Fix the slf4j log prefix used by KafkaRequestHandler logging so that request handlers on a
      controller don't look like they're on a broker.
    - Make the FinalizedVersionRange constructor public for the sake of a junit test.
    - Add unit and integration tests for the above.

Reviewers: David Arthur <mumrah@gmail.com>, Doguscan Namal <namal.doguscan@gmail.com>
pull/10629/merge
Colin Patrick McCabe 12 months ago committed by GitHub
parent
commit
fcac880fd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      checkstyle/import-control-core.xml
  2. 2
      checkstyle/import-control-server-common.xml
  3. 2
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  4. 4
      clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
  5. 15
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
  6. 2
      clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
  7. 265
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  8. 108
      clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java
  9. 50
      clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
  10. 2
      clients/src/main/resources/common/message/DescribeConfigsRequest.json
  11. 2
      clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
  12. 50
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  13. 81
      clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java
  14. 12
      clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
  15. 150
      core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java
  16. 39
      core/src/main/scala/kafka/admin/ConfigCommand.scala
  17. 3
      core/src/main/scala/kafka/server/AuthHelper.scala
  18. 75
      core/src/main/scala/kafka/server/ConfigAdminManager.scala
  19. 37
      core/src/main/scala/kafka/server/ConfigHelper.scala
  20. 40
      core/src/main/scala/kafka/server/ControllerApis.scala
  21. 40
      core/src/main/scala/kafka/server/ControllerServer.scala
  22. 28
      core/src/main/scala/kafka/server/KafkaApis.scala
  23. 38
      core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  24. 38
      core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala
  25. 98
      core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java
  26. 5
      core/src/test/java/kafka/test/ClusterInstance.java
  27. 11
      core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
  28. 5
      core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
  29. 242
      core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java
  30. 114
      core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
  31. 4
      core/src/test/java/kafka/testkit/TestKitNodes.java
  32. 32
      core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  33. 8
      core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
  34. 20
      core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
  35. 96
      core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
  36. 6
      core/src/test/scala/unit/kafka/server/AuthHelperTest.scala
  37. 48
      core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
  38. 28
      core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
  39. 51
      server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
  40. 42
      server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
  41. 19
      tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
  42. 25
      tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
  43. 19
      tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
  44. 10
      tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

3
checkstyle/import-control-core.xml

@ -95,6 +95,8 @@ @@ -95,6 +95,8 @@
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.server.authorizer"/>
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="kafka.testkit"/>
<allow pkg="kafka.test.annotation"/>
<allow pkg="kafka.test.junit"/>
<allow pkg="kafka.network"/>
@ -108,7 +110,6 @@ @@ -108,7 +110,6 @@
</subpackage>
<subpackage name="junit">
<allow pkg="kafka.test"/>
<allow pkg="kafka.testkit"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.metadata" />
</subpackage>

2
checkstyle/import-control-server-common.xml

@ -85,10 +85,12 @@ @@ -85,10 +85,12 @@
<subpackage name="util">
<!-- InterBrokerSendThread uses some clients classes that are not part of the public -->
<!-- API but are still relatively common -->
<allow class="org.apache.kafka.clients.admin.AdminClientConfig" />
<allow class="org.apache.kafka.clients.ClientRequest" />
<allow class="org.apache.kafka.clients.ClientResponse" />
<allow class="org.apache.kafka.clients.KafkaClient" />
<allow class="org.apache.kafka.clients.RequestCompletionHandler" />
<allow class="org.apache.kafka.clients.CommonClientConfigs" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.server.util.json" />

2
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -811,7 +811,7 @@ public class NetworkClient implements KafkaClient { @@ -811,7 +811,7 @@ public class NetworkClient implements KafkaClient {
nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
log.warn("Connection to node {} ({}) could not be established. Node may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector

4
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java

@ -129,7 +129,7 @@ public class NodeApiVersions { @@ -129,7 +129,7 @@ public class NodeApiVersions {
*/
public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
if (!supportedVersions.containsKey(apiKey))
throw new UnsupportedVersionException("The broker does not support " + apiKey);
throw new UnsupportedVersionException("The node does not support " + apiKey);
ApiVersion supportedVersion = supportedVersions.get(apiKey);
Optional<ApiVersion> intersectVersion = ApiVersionsResponse.intersect(supportedVersion,
new ApiVersion()
@ -140,7 +140,7 @@ public class NodeApiVersions { @@ -140,7 +140,7 @@ public class NodeApiVersions {
if (intersectVersion.isPresent())
return intersectVersion.get().maxVersion();
else
throw new UnsupportedVersionException("The broker does not support " + apiKey +
throw new UnsupportedVersionException("The node does not support " + apiKey +
" with version in range [" + oldestAllowedVersion + "," + latestAllowedVersion + "]. The supported" +
" range is [" + supportedVersion.minVersion() + "," + supportedVersion.maxVersion() + "].");
}

15
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

@ -47,6 +47,13 @@ public class AdminClientConfig extends AbstractConfig { @@ -47,6 +47,13 @@ public class AdminClientConfig extends AbstractConfig {
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
/**
* <code>bootstrap.controllers</code>
*/
public static final String BOOTSTRAP_CONTROLLERS_CONFIG = "bootstrap.controllers";
public static final String BOOTSTRAP_CONTROLLERS_DOC = "A list of host/port pairs to use for establishing the initial " +
"connection to the KRaft controller quorum. This list should be in the form <code>host1:port1,host2:port2,...</code>.";
/**
* <code>client.dns.lookup</code>
*/
@ -135,8 +142,14 @@ public class AdminClientConfig extends AbstractConfig { @@ -135,8 +142,14 @@ public class AdminClientConfig extends AbstractConfig {
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
"",
Importance.HIGH,
BOOTSTRAP_SERVERS_DOC)
BOOTSTRAP_SERVERS_DOC).
define(BOOTSTRAP_CONTROLLERS_CONFIG,
Type.LIST,
"",
Importance.HIGH,
BOOTSTRAP_CONTROLLERS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
.define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, SEND_BUFFER_DOC)

2
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java

@ -35,7 +35,7 @@ public class FinalizedVersionRange { @@ -35,7 +35,7 @@ public class FinalizedVersionRange {
*
* @throws IllegalArgumentException Raised when the condition described above is not met.
*/
FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
public FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
if (minVersionLevel < 0 || maxVersionLevel < 0 || maxVersionLevel < minVersionLevel) {
throw new IllegalArgumentException(
String.format(

265
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -38,6 +38,7 @@ import org.apache.kafka.clients.admin.internals.AdminApiDriver; @@ -38,6 +38,7 @@ import org.apache.kafka.clients.admin.internals.AdminApiDriver;
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminBootstrapAddresses;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
import org.apache.kafka.clients.admin.internals.AlterConsumerGroupOffsetsHandler;
@ -83,12 +84,14 @@ import org.apache.kafka.common.errors.DisconnectException; @@ -83,12 +84,14 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
@ -126,6 +129,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData; @@ -126,6 +129,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
@ -232,7 +236,6 @@ import org.apache.kafka.common.utils.Time; @@ -232,7 +236,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
@ -477,8 +480,11 @@ public class KafkaAdminClient extends AdminClient { @@ -477,8 +480,11 @@ public class KafkaAdminClient extends AdminClient {
return createInternal(config, timeoutProcessorFactory, null);
}
static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory,
HostResolver hostResolver) {
static KafkaAdminClient createInternal(
AdminClientConfig config,
TimeoutProcessorFactory timeoutProcessorFactory,
HostResolver hostResolver
) {
Metrics metrics = null;
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
@ -489,11 +495,12 @@ public class KafkaAdminClient extends AdminClient { @@ -489,11 +495,12 @@ public class KafkaAdminClient extends AdminClient {
try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
AdminBootstrapAddresses adminAddresses = AdminBootstrapAddresses.fromConfig(config);
AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
adminAddresses.usingBootstrapControllers());
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds());
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
@ -655,6 +662,7 @@ public class KafkaAdminClient extends AdminClient { @@ -655,6 +662,7 @@ public class KafkaAdminClient extends AdminClient {
*/
private interface NodeProvider {
Node provide();
boolean supportsUseControllers();
}
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@ -662,13 +670,25 @@ public class KafkaAdminClient extends AdminClient { @@ -662,13 +670,25 @@ public class KafkaAdminClient extends AdminClient {
public Node provide() {
return client.leastLoadedNode(time.milliseconds());
}
@Override
public boolean supportsUseControllers() {
return true;
}
}
private class ConstantNodeIdProvider implements NodeProvider {
private final int nodeId;
private final boolean supportsUseControllers;
ConstantNodeIdProvider(int nodeId, boolean supportsUseControllers) {
this.nodeId = nodeId;
this.supportsUseControllers = supportsUseControllers;
}
ConstantNodeIdProvider(int nodeId) {
this.nodeId = nodeId;
this.supportsUseControllers = false;
}
@Override
@ -684,12 +704,27 @@ public class KafkaAdminClient extends AdminClient { @@ -684,12 +704,27 @@ public class KafkaAdminClient extends AdminClient {
metadataManager.requestUpdate();
return null;
}
@Override
public boolean supportsUseControllers() {
return supportsUseControllers;
}
}
/**
* Provides the controller node.
*/
private class ControllerNodeProvider implements NodeProvider {
private final boolean supportsUseControllers;
ControllerNodeProvider(boolean supportsUseControllers) {
this.supportsUseControllers = supportsUseControllers;
}
ControllerNodeProvider() {
this.supportsUseControllers = false;
}
@Override
public Node provide() {
if (metadataManager.isReady() &&
@ -699,6 +734,11 @@ public class KafkaAdminClient extends AdminClient { @@ -699,6 +734,11 @@ public class KafkaAdminClient extends AdminClient {
metadataManager.requestUpdate();
return null;
}
@Override
public boolean supportsUseControllers() {
return supportsUseControllers;
}
}
/**
@ -715,6 +755,67 @@ public class KafkaAdminClient extends AdminClient { @@ -715,6 +755,67 @@ public class KafkaAdminClient extends AdminClient {
metadataManager.requestUpdate();
return null;
}
@Override
public boolean supportsUseControllers() {
return false;
}
}
/**
* Provides the least loaded broker, or the active kcontroller if we're using
* bootstrap.controllers.
*/
private class ConstantBrokerOrActiveKController implements NodeProvider {
private final int nodeId;
ConstantBrokerOrActiveKController(int nodeId) {
this.nodeId = nodeId;
}
@Override
public Node provide() {
if (metadataManager.isReady()) {
if (metadataManager.usingBootstrapControllers()) {
return metadataManager.controller();
} else if (metadataManager.nodeById(nodeId) != null) {
return metadataManager.nodeById(nodeId);
}
}
metadataManager.requestUpdate();
return null;
}
@Override
public boolean supportsUseControllers() {
return true;
}
}
/**
* Provides the least loaded broker, or the active kcontroller if we're using
* bootstrap.controllers.
*/
private class LeastLoadedBrokerOrActiveKController implements NodeProvider {
@Override
public Node provide() {
if (metadataManager.isReady()) {
if (metadataManager.usingBootstrapControllers()) {
return metadataManager.controller();
} else {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
return client.leastLoadedNode(time.milliseconds());
}
}
metadataManager.requestUpdate();
return null;
}
@Override
public boolean supportsUseControllers() {
return true;
}
}
abstract class Call {
@ -1448,6 +1549,10 @@ public class KafkaAdminClient extends AdminClient { @@ -1448,6 +1549,10 @@ public class KafkaAdminClient extends AdminClient {
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
call.handleTimeoutFailure(time.milliseconds(),
new TimeoutException("The AdminClient thread is not accepting new calls."));
} else if (metadataManager.usingBootstrapControllers() &&
(!call.nodeProvider.supportsUseControllers())) {
call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " +
"yet supported when communicating directly with the controller quorum."));
} else {
enqueue(call, now);
}
@ -1457,6 +1562,58 @@ public class KafkaAdminClient extends AdminClient { @@ -1457,6 +1562,58 @@ public class KafkaAdminClient extends AdminClient {
* Create a new metadata call.
*/
private Call makeMetadataCall(long now) {
if (metadataManager.usingBootstrapControllers()) {
return makeControllerMetadataCall(now);
} else {
return makeBrokerMetadataCall(now);
}
}
private Call makeControllerMetadataCall(long now) {
// Use DescribeCluster here, as specified by KIP-919.
return new Call(true, "describeCluster", calcDeadlineMs(now, requestTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
public DescribeClusterRequest.Builder createRequest(int timeoutMs) {
return new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(false)
.setEndpointType(EndpointType.CONTROLLER.id()));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse;
Cluster cluster;
try {
cluster = parseDescribeClusterResponse(response.data());
} catch (ApiException e) {
handleFailure(e);
return;
}
long now = time.milliseconds();
metadataManager.update(cluster, now);
// Unassign all unsent requests after a metadata refresh to allow for a new
// destination to be selected from the new metadata
unassignUnsentCalls(node -> true);
}
@Override
boolean handleUnsupportedVersionException(final UnsupportedVersionException e) {
metadataManager.updateFailed(e);
return false;
}
@Override
public void handleFailure(Throwable e) {
metadataManager.updateFailed(e);
}
};
}
private Call makeBrokerMetadataCall(long now) {
// We use MetadataRequest here so that we can continue to support brokers that are too
// old to handle DescribeCluster.
return new Call(true, "fetchMetadata", calcDeadlineMs(now, requestTimeoutMs),
new MetadataUpdateNodeIdProvider()) {
@Override
@ -1480,6 +1637,12 @@ public class KafkaAdminClient extends AdminClient { @@ -1480,6 +1637,12 @@ public class KafkaAdminClient extends AdminClient {
unassignUnsentCalls(node -> true);
}
@Override
boolean handleUnsupportedVersionException(final UnsupportedVersionException e) {
metadataManager.updateFailed(e);
return false;
}
@Override
public void handleFailure(Throwable e) {
metadataManager.updateFailed(e);
@ -1488,6 +1651,32 @@ public class KafkaAdminClient extends AdminClient { @@ -1488,6 +1651,32 @@ public class KafkaAdminClient extends AdminClient {
}
}
static Cluster parseDescribeClusterResponse(DescribeClusterResponseData response) {
ApiError apiError = new ApiError(response.errorCode(), response.errorMessage());
if (apiError.isFailure()) {
throw apiError.exception();
}
if (response.endpointType() != EndpointType.CONTROLLER.id()) {
throw new MismatchedEndpointTypeException("Expected response from CONTROLLER " +
"endpoint, but got response from endpoint type " + (int) response.endpointType());
}
List<Node> nodes = new ArrayList<>();
Node controllerNode = null;
for (DescribeClusterResponseData.DescribeClusterBroker node : response.brokers()) {
Node newNode = new Node(node.brokerId(), node.host(), node.port(), node.rack());
nodes.add(newNode);
if (node.brokerId() == response.controllerId()) {
controllerNode = newNode;
}
}
return new Cluster(response.clusterId(),
nodes,
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet(),
controllerNode);
}
/**
* Returns true if a topic name cannot be represented in an RPC. This function does NOT check
* whether the name is too long, contains invalid characters, etc. It is better to enforce
@ -2082,7 +2271,7 @@ public class KafkaAdminClient extends AdminClient { @@ -2082,7 +2271,7 @@ public class KafkaAdminClient extends AdminClient {
final long now = time.milliseconds();
runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
new LeastLoadedBrokerOrActiveKController()) {
private boolean useMetadataRequest = false;
@ -2090,8 +2279,9 @@ public class KafkaAdminClient extends AdminClient { @@ -2090,8 +2279,9 @@ public class KafkaAdminClient extends AdminClient {
AbstractRequest.Builder createRequest(int timeoutMs) {
if (!useMetadataRequest) {
return new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(
options.includeAuthorizedOperations()));
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
.setEndpointType(metadataManager.usingBootstrapControllers() ?
EndpointType.CONTROLLER.id() : EndpointType.BROKER.id()));
} else {
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
@ -2148,6 +2338,9 @@ public class KafkaAdminClient extends AdminClient { @@ -2148,6 +2338,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) {
if (metadataManager.usingBootstrapControllers()) {
return false;
}
if (useMetadataRequest) {
return false;
}
@ -2319,11 +2512,11 @@ public class KafkaAdminClient extends AdminClient { @@ -2319,11 +2512,11 @@ public class KafkaAdminClient extends AdminClient {
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
// Partition the requested config resources based on which broker they must be sent to with the
// null broker being used for config resources which can be obtained from any broker
final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> brokerFutures = new HashMap<>(configResources.size());
final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> nodeFutures = new HashMap<>(configResources.size());
for (ConfigResource resource : configResources) {
Integer broker = nodeFor(resource);
brokerFutures.compute(broker, (key, value) -> {
nodeFutures.compute(broker, (key, value) -> {
if (value == null) {
value = new HashMap<>();
}
@ -2333,12 +2526,12 @@ public class KafkaAdminClient extends AdminClient { @@ -2333,12 +2526,12 @@ public class KafkaAdminClient extends AdminClient {
}
final long now = time.milliseconds();
for (Map.Entry<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> entry : brokerFutures.entrySet()) {
Integer broker = entry.getKey();
for (Map.Entry<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> entry : nodeFutures.entrySet()) {
final Integer node = entry.getKey();
Map<ConfigResource, KafkaFutureImpl<Config>> unified = entry.getValue();
runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
broker != null ? new ConstantNodeIdProvider(broker) : new LeastLoadedNodeProvider()) {
node != null ? new ConstantNodeIdProvider(node, true) : new LeastLoadedBrokerOrActiveKController()) {
@Override
DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
@ -2362,9 +2555,9 @@ public class KafkaAdminClient extends AdminClient { @@ -2362,9 +2555,9 @@ public class KafkaAdminClient extends AdminClient {
DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult = entry.getValue();
KafkaFutureImpl<Config> future = unified.get(configResource);
if (future == null) {
if (broker != null) {
log.warn("The config {} in the response from broker {} is not in the request",
configResource, broker);
if (node != null) {
log.warn("The config {} in the response from node {} is not in the request",
configResource, node);
} else {
log.warn("The config {} in the response from the least loaded broker is not in the request",
configResource);
@ -2380,7 +2573,7 @@ public class KafkaAdminClient extends AdminClient { @@ -2380,7 +2573,7 @@ public class KafkaAdminClient extends AdminClient {
}
completeUnrealizedFutures(
unified.entrySet().stream(),
configResource -> "The broker response did not contain a result for config resource " + configResource);
configResource -> "The node response did not contain a result for config resource " + configResource);
}
@Override
@ -2390,7 +2583,7 @@ public class KafkaAdminClient extends AdminClient { @@ -2390,7 +2583,7 @@ public class KafkaAdminClient extends AdminClient {
}, now);
}
return new DescribeConfigsResult(new HashMap<>(brokerFutures.entrySet().stream()
return new DescribeConfigsResult(new HashMap<>(nodeFutures.entrySet().stream()
.flatMap(x -> x.getValue().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
}
@ -2441,20 +2634,20 @@ public class KafkaAdminClient extends AdminClient { @@ -2441,20 +2634,20 @@ public class KafkaAdminClient extends AdminClient {
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
// and send the request to that specific broker. Other resources are grouped together into
// a single request that may be sent to any broker.
// and send the request to that specific node. Other resources are grouped together into
// a single request that may be sent to any node.
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
Integer node = nodeFor(resource);
if (node != null) {
NodeProvider nodeProvider = new ConstantNodeIdProvider(node);
NodeProvider nodeProvider = new ConstantBrokerOrActiveKController(node);
allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
unifiedRequestResources.add(resource);
}
if (!unifiedRequestResources.isEmpty())
allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedBrokerOrActiveKController()));
return new AlterConfigsResult(new HashMap<>(allFutures));
}
@ -2506,21 +2699,31 @@ public class KafkaAdminClient extends AdminClient { @@ -2506,21 +2699,31 @@ public class KafkaAdminClient extends AdminClient {
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
// and send the request to that specific broker. Other resources are grouped together into
// a single request that may be sent to any broker.
// BROKER_LOGGER requests always go to a specific, constant broker or controller node.
//
// BROKER resource changes for a specific (non-default) resource go to either that specific
// node (if using bootstrap.servers), or directly to the active controller (if using
// bootstrap.controllers)
//
// All other requests go to the least loaded broker (if using bootstrap.servers) or the
// active controller (if using bootstrap.controllers)
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
Integer node = nodeFor(resource);
if (metadataManager.usingBootstrapControllers()) {
if (!resource.type().equals(ConfigResource.Type.BROKER_LOGGER)) {
node = null;
}
}
if (node != null) {
NodeProvider nodeProvider = new ConstantNodeIdProvider(node);
NodeProvider nodeProvider = new ConstantNodeIdProvider(node, true);
allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
unifiedRequestResources.add(resource);
}
if (!unifiedRequestResources.isEmpty())
allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedBrokerOrActiveKController()));
return new AlterConfigsResult(new HashMap<>(allFutures));
}
@ -3872,7 +4075,7 @@ public class KafkaAdminClient extends AdminClient { @@ -3872,7 +4075,7 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) {
"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedBrokerOrActiveKController()) {
private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) {
final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
@ -3938,7 +4141,7 @@ public class KafkaAdminClient extends AdminClient { @@ -3938,7 +4141,7 @@ public class KafkaAdminClient extends AdminClient {
final long now = time.milliseconds();
final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
new ControllerNodeProvider(true)) {
@Override
UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
@ -4009,7 +4212,7 @@ public class KafkaAdminClient extends AdminClient { @@ -4009,7 +4212,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
NodeProvider provider = new LeastLoadedNodeProvider();
NodeProvider provider = new LeastLoadedBrokerOrActiveKController();
final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();

108
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddresses.java

@ -0,0 +1,108 @@ @@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
final public class AdminBootstrapAddresses {
private final boolean usingBootstrapControllers;
private final List<InetSocketAddress> addresses;
AdminBootstrapAddresses(
boolean usingBootstrapControllers,
List<InetSocketAddress> addresses
) {
this.usingBootstrapControllers = usingBootstrapControllers;
this.addresses = addresses;
}
public boolean usingBootstrapControllers() {
return usingBootstrapControllers;
}
public List<InetSocketAddress> addresses() {
return addresses;
}
public static AdminBootstrapAddresses fromConfig(AbstractConfig config) {
List<String> bootstrapServers = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
if (bootstrapServers == null) {
bootstrapServers = Collections.emptyList();
}
List<String> controllerServers = config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
if (controllerServers == null) {
controllerServers = Collections.emptyList();
}
String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG);
if (bootstrapServers.isEmpty()) {
if (controllerServers.isEmpty()) {
throw new ConfigException("You must set either " +
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " or " +
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
} else {
return new AdminBootstrapAddresses(true,
ClientUtils.parseAndValidateAddresses(controllerServers, clientDnsLookupConfig));
}
} else {
if (controllerServers.isEmpty()) {
return new AdminBootstrapAddresses(false,
ClientUtils.parseAndValidateAddresses(bootstrapServers, clientDnsLookupConfig));
} else {
throw new ConfigException("You cannot set both " +
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " and " +
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
}
}
}
@Override
public int hashCode() {
return Objects.hash(usingBootstrapControllers, addresses);
}
@Override
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(AdminBootstrapAddresses.class))) return false;
AdminBootstrapAddresses other = (AdminBootstrapAddresses) o;
return usingBootstrapControllers == other.usingBootstrapControllers &&
addresses.equals(other.addresses);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("AdminBootstrapAddresses");
bld.append("(usingBoostrapControllers=").append(usingBootstrapControllers);
bld.append(", addresses=[");
String prefix = "";
for (InetSocketAddress address : addresses) {
bld.append(prefix).append(address);
prefix = ", ";
}
bld.append("])");
return bld.toString();
}
}

50
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java

@ -21,7 +21,11 @@ import org.apache.kafka.clients.MetadataUpdater; @@ -21,7 +21,11 @@ import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
@ -52,6 +56,11 @@ public class AdminMetadataManager { @@ -52,6 +56,11 @@ public class AdminMetadataManager {
*/
private final long metadataExpireMs;
/**
* True if we are communicating directly with the controller quorum as specified by KIP-919.
*/
private final boolean usingBootstrapControllers;
/**
* Used to update the NetworkClient metadata.
*/
@ -79,10 +88,9 @@ public class AdminMetadataManager { @@ -79,10 +88,9 @@ public class AdminMetadataManager {
private Cluster cluster = Cluster.empty();
/**
* If we got an authorization exception when we last attempted to fetch
* metadata, this is it; null, otherwise.
* If this is non-null, it is a fatal exception that will terminate all attempts at communication.
*/
private AuthenticationException authException = null;
private ApiException fatalException = null;
public class AdminMetadataUpdater implements MetadataUpdater {
@Override
@ -130,21 +138,31 @@ public class AdminMetadataManager { @@ -130,21 +138,31 @@ public class AdminMetadataManager {
UPDATE_PENDING
}
public AdminMetadataManager(LogContext logContext, long refreshBackoffMs, long metadataExpireMs) {
public AdminMetadataManager(
LogContext logContext,
long refreshBackoffMs,
long metadataExpireMs,
boolean usingBootstrapControllers
) {
this.log = logContext.logger(AdminMetadataManager.class);
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.usingBootstrapControllers = usingBootstrapControllers;
this.updater = new AdminMetadataUpdater();
}
public boolean usingBootstrapControllers() {
return usingBootstrapControllers;
}
public AdminMetadataUpdater updater() {
return updater;
}
public boolean isReady() {
if (authException != null) {
log.debug("Metadata is not usable: failed to get metadata.", authException);
throw authException;
if (fatalException != null) {
log.debug("Metadata is not usable: failed to get metadata.", fatalException);
throw fatalException;
}
if (cluster.nodes().isEmpty()) {
log.trace("Metadata is not ready: bootstrap nodes have not been " +
@ -230,7 +248,21 @@ public class AdminMetadataManager { @@ -230,7 +248,21 @@ public class AdminMetadataManager {
if (exception instanceof AuthenticationException) {
log.warn("Metadata update failed due to authentication error", exception);
this.authException = (AuthenticationException) exception;
this.fatalException = (ApiException) exception;
} else if (exception instanceof MismatchedEndpointTypeException) {
log.warn("Metadata update failed due to mismatched endpoint type error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof UnsupportedEndpointTypeException) {
log.warn("Metadata update failed due to unsupported endpoint type error", exception);
this.fatalException = (ApiException) exception;
} else if (exception instanceof UnsupportedVersionException) {
if (usingBootstrapControllers) {
log.warn("The remote node is not a CONTROLLER that supports the KIP-919 " +
"DESCRIBE_CLUSTER api.", exception);
} else {
log.warn("The remote node is not a BROKER that supports the METADATA api.", exception);
}
this.fatalException = (ApiException) exception;
} else {
log.info("Metadata update failed", exception);
}
@ -249,7 +281,7 @@ public class AdminMetadataManager { @@ -249,7 +281,7 @@ public class AdminMetadataManager {
}
this.state = State.QUIESCENT;
this.authException = null;
this.fatalException = null;
if (!cluster.nodes().isEmpty()) {
this.cluster = cluster;

2
clients/src/main/resources/common/message/DescribeConfigsRequest.json

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
{
"apiKey": 32,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "DescribeConfigsRequest",
// Version 1 adds IncludeSynonyms.
// Version 2 is the same as version 1.

2
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java

@ -73,7 +73,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable { @@ -73,7 +73,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(),
adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() {
@Override
public List<Node> fetchNodes() {

50
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -55,6 +55,7 @@ import org.apache.kafka.common.errors.GroupSubscribedToTopicException; @@ -55,6 +55,7 @@ import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
@ -283,6 +284,55 @@ public class KafkaAdminClientTest { @@ -283,6 +284,55 @@ public class KafkaAdminClientTest {
assertTrue(exception.getCause() instanceof ConfigException);
}
@Test
public void testParseDescribeClusterResponseWithError() {
assertThrows(MismatchedEndpointTypeException.class,
() -> KafkaAdminClient.parseDescribeClusterResponse(new DescribeClusterResponseData().
setErrorCode(Errors.MISMATCHED_ENDPOINT_TYPE.code()).
setErrorMessage("The request was sent to an endpoint of type BROKER, " +
"but we wanted an endpoint of type CONTROLLER")));
}
@Test
public void testParseDescribeClusterResponseWithUnexpectedEndpointType() {
assertThrows(MismatchedEndpointTypeException.class,
() -> KafkaAdminClient.parseDescribeClusterResponse(new DescribeClusterResponseData().
setEndpointType(EndpointType.BROKER.id())));
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testParseSuccessfulDescribeClusterResponse(boolean includeController) {
Cluster cluster = KafkaAdminClient.parseDescribeClusterResponse(new DescribeClusterResponseData().
setControllerId(includeController ? 0 : -1).
setEndpointType(EndpointType.CONTROLLER.id()).
setClusterId("Ek8tjqq1QBWfnaoyHFZqDg").
setBrokers(new DescribeClusterResponseData.DescribeClusterBrokerCollection(Arrays.asList(
new DescribeClusterBroker().
setBrokerId(0).
setHost("controller0.com").
setPort(9092),
new DescribeClusterBroker().
setBrokerId(1).
setHost("controller1.com").
setPort(9092),
new DescribeClusterBroker().
setBrokerId(2).
setHost("controller2.com").
setPort(9092)).iterator())));
if (includeController) {
assertNotNull(cluster.controller());
assertEquals(0, cluster.controller().id());
} else {
assertNull(cluster.controller());
}
assertEquals("Ek8tjqq1QBWfnaoyHFZqDg", cluster.clusterResource().clusterId());
assertEquals(new HashSet<>(Arrays.asList(
new Node(0, "controller0.com", 9092),
new Node(1, "controller1.com", 9092),
new Node(2, "controller2.com", 9092))), new HashSet<>(cluster.nodes()));
}
@Test
public void testGetOrCreateListValue() {
Map<String, List<String>> map = new HashMap<>();

81
clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminBootstrapAddressesTest.java

@ -0,0 +1,81 @@ @@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AdminBootstrapAddressesTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testNoBootstrapSet(boolean nullValue) {
Map<String, Object> map = new HashMap<>();
if (nullValue) {
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, null);
map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, null);
} else {
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "");
map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "");
}
AdminClientConfig config = new AdminClientConfig(map);
assertEquals("You must set either bootstrap.servers or bootstrap.controllers",
assertThrows(ConfigException.class, () -> AdminBootstrapAddresses.fromConfig(config)).
getMessage());
}
@Test
public void testTwoBootstrapsSet() {
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "localhost:9092");
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClientConfig config = new AdminClientConfig(map);
assertEquals("You cannot set both bootstrap.servers and bootstrap.controllers",
assertThrows(ConfigException.class, () -> AdminBootstrapAddresses.fromConfig(config)).
getMessage());
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testFromConfig(boolean usingBootstrapControllers) {
Map<String, Object> map = new HashMap<>();
String connectString = "localhost:9092,localhost:9093,localhost:9094";
if (usingBootstrapControllers) {
map.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, connectString);
} else {
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, connectString);
}
AdminClientConfig config = new AdminClientConfig(map);
AdminBootstrapAddresses addresses = AdminBootstrapAddresses.fromConfig(config);
assertEquals(usingBootstrapControllers, addresses.usingBootstrapControllers());
assertEquals(Arrays.asList(
new InetSocketAddress("localhost", 9092),
new InetSocketAddress("localhost", 9093),
new InetSocketAddress("localhost", 9094)),
addresses.addresses());
}
}

12
clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java

@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.AuthenticationException; @@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.InetSocketAddress;
import java.util.Collections;
@ -40,7 +42,15 @@ public class AdminMetadataManagerTest { @@ -40,7 +42,15 @@ public class AdminMetadataManagerTest {
private final long refreshBackoffMs = 100;
private final long metadataExpireMs = 60000;
private final AdminMetadataManager mgr = new AdminMetadataManager(
logContext, refreshBackoffMs, metadataExpireMs);
logContext, refreshBackoffMs, metadataExpireMs, false);
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSetUsingBootstrapControllers(boolean usingBootstrapControllers) {
AdminMetadataManager manager = new AdminMetadataManager(
logContext, refreshBackoffMs, metadataExpireMs, usingBootstrapControllers);
assertEquals(usingBootstrapControllers, manager.usingBootstrapControllers());
}
@Test
public void testMetadataReady() {

150
core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java

@ -0,0 +1,150 @@ @@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.logger;
import kafka.utils.Log4jController;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.LogLevelConfig;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER_LOGGER;
/**
* Manages runtimes changes to slf4j settings.
*/
public class RuntimeLoggerManager {
static final String VALID_LOG_LEVELS_STRING;
static {
ArrayList<String> logLevels = new ArrayList<>(LogLevelConfig.VALID_LOG_LEVELS);
logLevels.sort(String::compareTo);
VALID_LOG_LEVELS_STRING = String.join(", ", logLevels);
}
private final int nodeId;
private final Logger log;
public RuntimeLoggerManager(int nodeId, Logger log) {
this.nodeId = nodeId;
this.log = log;
}
public void applyChangesForResource(
boolean authorizedForClusterResource,
boolean validateOnly,
AlterConfigsResource resource
) {
if (!authorizedForClusterResource) {
throw new ClusterAuthorizationException(Errors.CLUSTER_AUTHORIZATION_FAILED.message());
}
validateResourceNameIsNodeId(resource.resourceName());
validateLogLevelConfigs(resource.configs());
if (!validateOnly) {
alterLogLevelConfigs(resource.configs());
}
}
void alterLogLevelConfigs(Collection<AlterableConfig> ops) {
ops.forEach(op -> {
String loggerName = op.name();
String logLevel = op.value();
switch (OpType.forId(op.configOperation())) {
case SET:
if (Log4jController.logLevel(loggerName, logLevel)) {
log.warn("Updated the log level of {} to {}", loggerName, logLevel);
} else {
log.error("Failed to update the log level of {} to {}", loggerName, logLevel);
}
break;
case DELETE:
if (Log4jController.unsetLogLevel(loggerName)) {
log.warn("Unset the log level of {}", loggerName);
} else {
log.error("Failed to unset the log level of {}", loggerName);
}
break;
default:
throw new IllegalArgumentException(
"Invalid log4j configOperation: " + op.configOperation());
}
});
}
void validateResourceNameIsNodeId(String resourceName) {
int requestId;
try {
requestId = Integer.parseInt(resourceName);
} catch (NumberFormatException e) {
throw new InvalidRequestException("Node id must be an integer, but it is: " +
resourceName);
}
if (requestId != nodeId) {
throw new InvalidRequestException("Unexpected node id. Expected " + nodeId +
", but received " + nodeId);
}
}
void validateLoggerNameExists(String loggerName) {
if (!Log4jController.loggerExists(loggerName)) {
throw new InvalidConfigurationException("Logger " + loggerName + " does not exist!");
}
}
void validateLogLevelConfigs(Collection<AlterableConfig> ops) {
ops.forEach(op -> {
String loggerName = op.name();
switch (OpType.forId(op.configOperation())) {
case SET:
validateLoggerNameExists(loggerName);
String logLevel = op.value();
if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
throw new InvalidConfigurationException("Cannot set the log level of " +
loggerName + " to " + logLevel + " as it is not a supported log level. " +
"Valid log levels are " + VALID_LOG_LEVELS_STRING);
}
break;
case DELETE:
validateLoggerNameExists(loggerName);
if (loggerName.equals(Log4jController.ROOT_LOGGER())) {
throw new InvalidRequestException("Removing the log level of the " +
Log4jController.ROOT_LOGGER() + " logger is not allowed");
}
break;
case APPEND:
throw new InvalidRequestException(OpType.APPEND +
" operation is not allowed for the " + BROKER_LOGGER + " resource");
case SUBTRACT:
throw new InvalidRequestException(OpType.SUBTRACT +
" operation is not allowed for the " + BROKER_LOGGER + " resource");
default:
throw new InvalidRequestException("Unknown operation type " +
(int) op.configOperation() + " is not alowed for the " +
BROKER_LOGGER + " resource");
}
});
}
}

39
core/src/main/scala/kafka/admin/ConfigCommand.scala

@ -27,7 +27,6 @@ import kafka.utils.{Exit, Logging, PasswordEncoder} @@ -27,7 +27,6 @@ import kafka.utils.{Exit, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
@ -317,7 +316,11 @@ object ConfigCommand extends Logging { @@ -317,7 +316,11 @@ object ConfigCommand extends Logging {
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
CommandLineUtils.initializeBootstrapProperties(opts.parser,
opts.options,
props,
opts.bootstrapServerOpt,
opts.bootstrapControllerOpt)
val adminClient = Admin.create(props)
if (opts.options.has(opts.alterOpt) && opts.entityTypes.size != opts.entityNames.size)
@ -762,11 +765,14 @@ object ConfigCommand extends Logging { @@ -762,11 +765,14 @@ object ConfigCommand extends Logging {
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka server to connect to. " +
"This is required for describing and altering broker configs.")
val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The Kafka controllers to connect to.")
.withRequiredArg
.describedAs("controller to connect to")
.ofType(classOf[String])
val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
"This is used only with --bootstrap-server option for describing and altering broker configs.")
.withRequiredArg
@ -873,14 +879,14 @@ object ConfigCommand extends Logging { @@ -873,14 +879,14 @@ object ConfigCommand extends Logging {
if (entityTypeVals.size != entityTypeVals.distinct.size)
throw new IllegalArgumentException(s"Duplicate entity type(s) specified: ${entityTypeVals.diff(entityTypeVals.distinct).mkString(",")}")
val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt))
(BrokerSupportedConfigTypes, "--bootstrap-server")
val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt))
(BrokerSupportedConfigTypes, "--bootstrap-server or --bootstrap-controller")
else
(ZkSupportedConfigTypes, "--zookeeper")
entityTypeVals.foreach(entityTypeVal =>
if (!allowedEntityTypes.contains(entityTypeVal))
throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(", ")} with the $connectOptString argument")
throw new IllegalArgumentException(s"Invalid entity type $entityTypeVal, the entity type must be one of ${allowedEntityTypes.mkString(", ")} with a $connectOptString argument")
)
if (entityTypeVals.isEmpty)
throw new IllegalArgumentException("At least one entity type must be specified")
@ -894,21 +900,20 @@ object ConfigCommand extends Logging { @@ -894,21 +900,20 @@ object ConfigCommand extends Logging {
val hasEntityName = entityNames.exists(_.nonEmpty)
val hasEntityDefault = entityNames.exists(_.isEmpty)
if (!options.has(bootstrapServerOpt) && !options.has(zkConnectOpt))
throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified")
else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt))
throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
val numConnectOptions = (if (options.has(bootstrapServerOpt)) 1 else 0) +
(if (options.has(bootstrapControllerOpt)) 1 else 0) +
(if (options.has(zkConnectOpt)) 1 else 0)
if (numConnectOptions == 0)
throw new IllegalArgumentException("One of the required --bootstrap-server, --boostrap-controller, or --zookeeper arguments must be specified")
else if (numConnectOptions > 1)
throw new IllegalArgumentException("Only one of --bootstrap-server, --boostrap-controller, and --zookeeper can be specified")
if (options.has(allOpt) && options.has(zkConnectOpt)) {
throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all")
}
if (options.has(zkTlsConfigFile) && options.has(bootstrapServerOpt)) {
throw new IllegalArgumentException("--bootstrap-server doesn't support --zk-tls-config-file option. " +
"If you intend the command to communicate directly with ZooKeeper, please use the option --zookeeper instead of --bootstrap-server. " +
"Otherwise, remove the --zk-tls-config-file option.")
if (options.has(zkTlsConfigFile) && !options.has(zkConnectOpt)) {
throw new IllegalArgumentException("Only the --zookeeper option can be used with the --zk-tls-config-file option.")
}
if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
try brokerId.toInt catch {

3
core/src/main/scala/kafka/server/AuthHelper.scala

@ -185,6 +185,7 @@ class AuthHelper(authorizer: Option[Authorizer]) { @@ -185,6 +185,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
setClusterId(clusterId).
setControllerId(effectiveControllerId).
setClusterAuthorizedOperations(clusterAuthorizedOperations).
setBrokers(nodes)
setBrokers(nodes).
setEndpointType(expectedEndpointType.id())
}
}

75
core/src/main/scala/kafka/server/ConfigAdminManager.scala

@ -16,24 +16,23 @@ @@ -16,24 +16,23 @@
*/
package kafka.server
import kafka.server.logger.RuntimeLoggerManager
import java.util
import java.util.Properties
import kafka.server.metadata.ConfigRepository
import kafka.utils.Log4jController
import kafka.utils._
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterableConfig => IAlterableConfig}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_ERROR}
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{Resource, ResourceType}
@ -87,6 +86,8 @@ class ConfigAdminManager(nodeId: Int, @@ -87,6 +86,8 @@ class ConfigAdminManager(nodeId: Int,
this.logIdent = "[ConfigAdminManager[nodeId=" + nodeId + "]: "
val runtimeLoggerManager = new RuntimeLoggerManager(nodeId, logger.underlying)
/**
* Preprocess an incremental configuration operation on the broker. This step handles
* setting log4j levels, as well as filtering out some invalid resource requests that
@ -135,14 +136,10 @@ class ConfigAdminManager(nodeId: Int, @@ -135,14 +136,10 @@ class ConfigAdminManager(nodeId: Int,
}
resourceType match {
case BROKER_LOGGER =>
if (!authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME)) {
throw new ClusterAuthorizationException(Errors.CLUSTER_AUTHORIZATION_FAILED.message())
}
validateResourceNameIsCurrentNodeId(resource.resourceName())
validateLogLevelConfigs(resource.configs())
if (!request.validateOnly()) {
alterLogLevelConfigs(resource.configs())
}
runtimeLoggerManager.applyChangesForResource(
authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME),
request.validateOnly(),
resource)
results.put(resource, ApiError.NONE)
case BROKER =>
// The resource name must be either blank (if setting a cluster config) or
@ -290,56 +287,6 @@ class ConfigAdminManager(nodeId: Int, @@ -290,56 +287,6 @@ class ConfigAdminManager(nodeId: Int,
throw new InvalidRequestException(s"Unexpected broker id, expected ${nodeId}, but received ${name}")
}
}
def validateLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
def validateLoggerNameExists(loggerName: String): Unit = {
if (!Log4jController.loggerExists(loggerName)) {
throw new InvalidConfigurationException(s"Logger $loggerName does not exist!")
}
}
ops.forEach { op =>
val loggerName = op.name
OpType.forId(op.configOperation()) match {
case OpType.SET =>
validateLoggerNameExists(loggerName)
val logLevel = op.value()
if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
throw new InvalidConfigurationException(
s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " +
s"Valid log levels are $validLevelsStr"
)
}
case OpType.DELETE =>
validateLoggerNameExists(loggerName)
if (loggerName == Log4jController.ROOT_LOGGER)
throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed")
case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} " +
s"operation is not allowed for the ${BROKER_LOGGER} resource")
case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} " +
s"operation is not allowed for the ${BROKER_LOGGER} resource")
case _ => throw new InvalidRequestException(s"Unknown operation type ${op.configOperation()} " +
s"is not allowed for the ${BROKER_LOGGER} resource")
}
}
}
def alterLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
ops.forEach { op =>
val loggerName = op.name()
val logLevel = op.value()
OpType.forId(op.configOperation()) match {
case OpType.SET =>
info(s"Updating the log level of $loggerName to $logLevel")
Log4jController.logLevel(loggerName, logLevel)
case OpType.DELETE =>
info(s"Unset the log level of $loggerName")
Log4jController.unsetLogLevel(loggerName)
case _ => throw new IllegalArgumentException(
s"Invalid log4j configOperation: ${op.configOperation()}")
}
}
}
}
object ConfigAdminManager {

37
core/src/main/scala/kafka/server/ConfigHelper.scala

@ -17,17 +17,22 @@ @@ -17,17 +17,22 @@
package kafka.server
import kafka.network.RequestChannel
import java.util.{Collections, Properties}
import kafka.server.metadata.ConfigRepository
import kafka.utils.{Log4jController, Logging}
import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
import org.apache.kafka.common.message.DescribeConfigsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
import org.apache.kafka.common.requests.{ApiError, DescribeConfigsRequest, DescribeConfigsResponse}
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig
@ -41,6 +46,36 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo @@ -41,6 +46,36 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala
}
def handleDescribeConfigsRequest(
request: RequestChannel.Request,
authHelper: AuthHelper
): DescribeConfigsResponseData = {
val describeConfigsRequest = request.body[DescribeConfigsRequest]
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource =>
ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
}
}
val authorizedConfigs = describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
val unauthorizedConfigs = unauthorizedResources.map { resource =>
val error = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
}
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
.setErrorMessage(error.message)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
.setResourceName(resource.resourceName)
.setResourceType(resource.resourceType)
}
new DescribeConfigsResponseData().setResults((authorizedConfigs ++ unauthorizedConfigs).asJava)
}
def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource],
includeSynonyms: Boolean,
includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {

40
core/src/main/scala/kafka/server/ControllerApis.scala

@ -26,12 +26,14 @@ import java.util.function.Consumer @@ -26,12 +26,14 @@ import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.logger.RuntimeLoggerManager
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException, UnsupportedVersionException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
@ -73,12 +75,15 @@ class ControllerApis( @@ -73,12 +75,15 @@ class ControllerApis(
val config: KafkaConfig,
val metaProperties: MetaProperties,
val registrationsPublisher: ControllerRegistrationsPublisher,
val apiVersionManager: ApiVersionManager
val apiVersionManager: ApiVersionManager,
val metadataCache: KRaftMetadataCache
) extends ApiRequestHandler with Logging {
this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
val authHelper = new AuthHelper(authorizer)
val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId, logger.underlying)
private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config)
def isClosed: Boolean = aclApis.isClosed
@ -115,6 +120,7 @@ class ControllerApis( @@ -115,6 +120,7 @@ class ControllerApis(
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
@ -709,10 +715,26 @@ class ControllerApis( @@ -709,10 +715,26 @@ class ControllerApis(
val duplicateResources = new util.HashSet[ConfigResource]
val configChanges = new util.HashMap[ConfigResource,
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
val brokerLoggerResponses = new util.ArrayList[AlterConfigsResourceResponse](1)
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure()) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
@ -759,6 +781,7 @@ class ControllerApis( @@ -759,6 +781,7 @@ class ControllerApis(
setErrorMessage(entry.getValue.message()).
setResourceName(entry.getKey.name()).
setResourceType(entry.getKey.`type`().id())))
brokerLoggerResponses.forEach(r => response.responses().add(r))
requestHelper.sendResponseMaybeThrottle(request, throttleMs =>
new IncrementalAlterConfigsResponse(response.setThrottleTimeMs(throttleMs)))
}
@ -791,6 +814,13 @@ class ControllerApis( @@ -791,6 +814,13 @@ class ControllerApis(
}
}
def handleDescribeConfigsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val responseData = configHelper.handleDescribeConfigsRequest(request, authHelper)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeConfigsResponse(responseData.setThrottleTimeMs(requestThrottleMs)))
CompletableFuture.completedFuture[Unit](())
}
def createPartitions(
context: ControllerRequestContext,
request: CreatePartitionsRequestData,
@ -1028,6 +1058,10 @@ class ControllerApis( @@ -1028,6 +1058,10 @@ class ControllerApis(
def handleDescribeCluster(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Unlike on the broker, DESCRIBE_CLUSTER on the controller requires a high level of
// permissions (ALTER on CLUSTER).
if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported()) {
throw new UnsupportedVersionException("Direct-to-controller communication is not " +
"supported with the current MetadataVersion.")
}
authHelper.authorizeClusterOperation(request, ALTER)
val response = authHelper.computeDescribeClusterResponse(
request,

40
core/src/main/scala/kafka/server/ControllerServer.scala

@ -26,7 +26,7 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli @@ -26,7 +26,7 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@ -117,6 +117,8 @@ class ControllerServer( @@ -117,6 +117,8 @@ class ControllerServer(
var migrationSupport: Option[ControllerMigrationSupport] = None
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]()
@volatile var metadataCache : KRaftMetadataCache = _
@volatile var metadataCachePublisher: KRaftMetadataCachePublisher = _
@volatile var featuresPublisher: FeaturesPublisher = _
@volatile var registrationsPublisher: ControllerRegistrationsPublisher = _
@volatile var incarnationId: Uuid = _
@ -159,6 +161,10 @@ class ControllerServer( @@ -159,6 +161,10 @@ class ControllerServer(
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)
featuresPublisher = new FeaturesPublisher(logContext)
registrationsPublisher = new ControllerRegistrationsPublisher()
@ -313,14 +319,19 @@ class ControllerServer( @@ -313,14 +319,19 @@ class ControllerServer(
config,
sharedServer.metaProps,
registrationsPublisher,
apiVersionManager)
apiVersionManager,
metadataCache)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
time,
config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
DataPlaneAcceptor.ThreadPrefix,
"controller")
// Set up the metadata cache publisher.
metadataPublishers.add(metadataCachePublisher)
// Set up the metadata features publisher.
metadataPublishers.add(featuresPublisher)
@ -453,14 +464,6 @@ class ControllerServer( @@ -453,14 +464,6 @@ class ControllerServer(
// Ensure that we're not the Raft leader prior to shutting down our socket server, for a
// smoother transition.
sharedServer.ensureNotRaftLeader()
if (featuresPublisher != null) {
featuresPublisher.close()
featuresPublisher = null
}
if (registrationsPublisher != null) {
registrationsPublisher.close()
registrationsPublisher = null
}
incarnationId = null
if (registrationManager != null) {
CoreUtils.swallow(registrationManager.close(), this)
@ -472,6 +475,21 @@ class ControllerServer( @@ -472,6 +475,21 @@ class ControllerServer(
}
metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get())
metadataPublishers.clear()
if (metadataCache != null) {
metadataCache = null
}
if (metadataCachePublisher != null) {
metadataCachePublisher.close()
metadataCachePublisher = null
}
if (featuresPublisher != null) {
featuresPublisher.close()
featuresPublisher = null
}
if (registrationsPublisher != null) {
registrationsPublisher.close()
registrationsPublisher = null
}
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
migrationSupport.foreach(_.shutdown(this))

28
core/src/main/scala/kafka/server/KafkaApis.scala

@ -2926,33 +2926,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -2926,33 +2926,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
val describeConfigsRequest = request.body[DescribeConfigsRequest]
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.partition { resource =>
ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
}
}
val authorizedConfigs = configHelper.describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
val unauthorizedConfigs = unauthorizedResources.map { resource =>
val error = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
}
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
.setErrorMessage(error.message)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
.setResourceName(resource.resourceName)
.setResourceType(resource.resourceType)
}
val responseData = configHelper.handleDescribeConfigsRequest(request, authHelper)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeConfigsResponse(new DescribeConfigsResponseData().setThrottleTimeMs(requestThrottleMs)
.setResults((authorizedConfigs ++ unauthorizedConfigs).asJava)))
new DescribeConfigsResponse(responseData.setThrottleTimeMs(requestThrottleMs)))
}
def handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit = {

38
core/src/main/scala/kafka/server/KafkaRequestHandler.scala

@ -81,14 +81,17 @@ object KafkaRequestHandler { @@ -81,14 +81,17 @@ object KafkaRequestHandler {
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: ApiRequestHandler,
time: Time) extends Runnable with Logging {
this.logIdent = s"[Kafka Request Handler $id on Broker $brokerId], "
class KafkaRequestHandler(
id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: ApiRequestHandler,
time: Time,
nodeName: String = "broker"
) extends Runnable with Logging {
this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId], "
private val shutdownComplete = new CountDownLatch(1)
private val requestLocal = RequestLocal.withThreadConfinedCaching
@volatile private var stopped = false
@ -184,13 +187,16 @@ class KafkaRequestHandler(id: Int, @@ -184,13 +187,16 @@ class KafkaRequestHandler(id: Int,
}
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging {
class KafkaRequestHandlerPool(
val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String,
nodeName: String = "broker"
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
@ -204,7 +210,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, @@ -204,7 +210,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}

38
core/src/main/scala/kafka/server/metadata/KRaftMetadataCachePublisher.scala

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
class KRaftMetadataCachePublisher(
val metadataCache: KRaftMetadataCache
) extends MetadataPublisher {
override def name(): String = "KRaftMetadataCachePublisher"
override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
metadataCache.setImage(newImage)
}
}

98
core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.logger;
import kafka.utils.Log4jController;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RuntimeLoggerManagerTest {
private final static Logger LOG = LoggerFactory.getLogger(RuntimeLoggerManagerTest.class);
private final static RuntimeLoggerManager MANAGER = new RuntimeLoggerManager(5, LOG);
@Test
public void testValidateSetLogLevelConfig() {
MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")));
}
@Test
public void testValidateDeleteLogLevelConfig() {
MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.DELETE.id()).
setValue("")));
}
@ParameterizedTest
@ValueSource(bytes = {(byte) 2, (byte) 3})
public void testOperationNotAllowed(byte id) {
OpType opType = AlterConfigOp.OpType.forId(id);
assertEquals(opType + " operation is not allowed for the BROKER_LOGGER resource",
Assertions.assertThrows(InvalidRequestException.class,
() -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(id).
setValue("TRACE")))).getMessage());
}
@Test
public void testValidateBogusLogLevelNameNotAllowed() {
assertEquals("Cannot set the log level of " + LOG.getName() + " to BOGUS as it is not " +
"a supported log level. Valid log levels are DEBUG, ERROR, FATAL, INFO, TRACE, WARN",
Assertions.assertThrows(InvalidConfigurationException.class,
() -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(OpType.SET.id()).
setValue("BOGUS")))).getMessage());
}
@Test
public void testValidateSetRootLogLevelConfig() {
MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(Log4jController.ROOT_LOGGER()).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")));
}
@Test
public void testValidateRemoveRootLogLevelConfigNotAllowed() {
assertEquals("Removing the log level of the " + Log4jController.ROOT_LOGGER() +
" logger is not allowed",
Assertions.assertThrows(InvalidRequestException.class,
() -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig().
setName(Log4jController.ROOT_LOGGER()).
setConfigOperation(OpType.DELETE.id()).
setValue("")))).getMessage());
}
}

5
core/src/test/java/kafka/test/ClusterInstance.java

@ -88,6 +88,11 @@ public interface ClusterInstance { @@ -88,6 +88,11 @@ public interface ClusterInstance {
*/
String bootstrapServers();
/**
* The broker connect string which can be used by clients for bootstrapping to the controller quorum.
*/
String bootstrapControllers();
/**
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).

11
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java

@ -26,7 +26,6 @@ import kafka.test.ClusterInstance; @@ -26,7 +26,6 @@ import kafka.test.ClusterInstance;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
@ -143,7 +142,12 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte @@ -143,7 +142,12 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public String bootstrapServers() {
return clusterReference.get().clientProperties().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
return clusterReference.get().bootstrapServers();
}
@Override
public String bootstrapControllers() {
return clusterReference.get().bootstrapControllers();
}
@Override
@ -237,7 +241,8 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte @@ -237,7 +241,8 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Admin createAdminClient(Properties configOverrides) {
Admin admin = Admin.create(clusterReference.get().clientProperties(configOverrides));
Admin admin = Admin.create(clusterReference.get().
newClientPropertiesBuilder(configOverrides).build());
admins.add(admin);
return admin;
}

5
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java

@ -198,6 +198,11 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext @@ -198,6 +198,11 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
return TestUtils.bootstrapServers(clusterReference.get().servers(), clusterReference.get().listenerName());
}
@Override
public String bootstrapControllers() {
throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
}
@Override
public Collection<SocketServer> brokerSocketServers() {
return servers()

242
core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java

@ -0,0 +1,242 @@ @@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.test.server;
import kafka.server.ControllerServer;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeFeaturesResult;
import org.apache.kafka.clients.admin.DescribeMetadataQuorumResult;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.FinalizedVersionRange;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(120)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BootstrapControllersIntegrationTest {
private KafkaClusterTestKit cluster;
private String bootstrapControllerString;
@BeforeAll
public void createCluster() throws Exception {
this.cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(3).
setNumControllerNodes(3).build()).build();
this.cluster.format();
this.cluster.startup();
this.cluster.waitForActiveController();
this.cluster.waitForReadyBrokers();
StringBuilder bootstrapControllerStringBuilder = new StringBuilder();
String prefix = "";
for (ControllerServer controller : cluster.controllers().values()) {
bootstrapControllerStringBuilder.append(prefix);
prefix = ",";
int port = controller.socketServerFirstBoundPortFuture().get(1, TimeUnit.MINUTES);
bootstrapControllerStringBuilder.append("localhost:").append(port);
}
bootstrapControllerString = bootstrapControllerStringBuilder.toString();
}
@AfterAll
public void destroyCluster() throws Exception {
cluster.close();
}
private Properties adminProperties(boolean usingBootstrapControllers) {
Properties properties = cluster.clientProperties();
if (usingBootstrapControllers) {
properties.remove(BOOTSTRAP_SERVERS_CONFIG);
properties.setProperty(BOOTSTRAP_CONTROLLERS_CONFIG, bootstrapControllerString);
}
return properties;
}
@Test
public void testPutBrokersInBootstrapControllersConfig() throws Exception {
Properties properties = cluster.clientProperties();
properties.put(BOOTSTRAP_CONTROLLERS_CONFIG, properties.getProperty(BOOTSTRAP_SERVERS_CONFIG));
properties.remove(BOOTSTRAP_SERVERS_CONFIG);
try (Admin admin = Admin.create(properties)) {
ExecutionException exception = assertThrows(ExecutionException.class,
() -> admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES));
assertNotNull(exception.getCause());
assertEquals(MismatchedEndpointTypeException.class, exception.getCause().getClass());
assertEquals("The request was sent to an endpoint of type BROKER, but we wanted " +
"an endpoint of type CONTROLLER", exception.getCause().getMessage());
}
}
@Disabled
@Test
public void testPutControllersInBootstrapBrokersConfig() throws Exception {
Properties properties = cluster.clientProperties();
properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapControllerString);
try (Admin admin = Admin.create(properties)) {
ExecutionException exception = assertThrows(ExecutionException.class,
() -> admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES));
assertNotNull(exception.getCause());
assertEquals(MismatchedEndpointTypeException.class, exception.getCause().getClass());
assertEquals("This endpoint does not appear to be a BROKER.",
exception.getCause().getMessage());
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDescribeCluster(boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
DescribeClusterResult result = admin.describeCluster();
assertEquals(cluster.controllers().values().iterator().next().clusterId(),
result.clusterId().get(1, TimeUnit.MINUTES));
if (usingBootstrapControllers) {
assertEquals(((QuorumController) cluster.waitForActiveController()).nodeId(),
result.controller().get().id());
}
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDescribeFeatures(boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
DescribeFeaturesResult result = admin.describeFeatures();
short metadataVersion = cluster.controllers().values().iterator().next().
featuresPublisher().features().metadataVersion().featureLevel();
assertEquals(new FinalizedVersionRange(metadataVersion, metadataVersion),
result.featureMetadata().get(1, TimeUnit.MINUTES).finalizedFeatures().
get(MetadataVersion.FEATURE_NAME));
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testUpdateFeatures(boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
UpdateFeaturesResult result = admin.updateFeatures(Collections.singletonMap("foo.bar.feature",
new FeatureUpdate((short) 1, FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions());
ExecutionException exception =
assertThrows(ExecutionException.class,
() -> result.all().get(1, TimeUnit.MINUTES));
assertNotNull(exception.getCause());
assertEquals(InvalidUpdateVersionException.class, exception.getCause().getClass());
assertTrue(exception.getCause().getMessage().endsWith("does not support this feature."),
"expected message to end with 'does not support this feature', but it was: " +
exception.getCause().getMessage());
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDescribeMetadataQuorum(boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
DescribeMetadataQuorumResult result = admin.describeMetadataQuorum();
assertEquals(((QuorumController) cluster.waitForActiveController()).nodeId(),
result.quorumInfo().get(1, TimeUnit.MINUTES).leaderId());
}
}
@Test
public void testUsingBootstrapControllersOnUnsupportedAdminApi() throws Exception {
try (Admin admin = Admin.create(adminProperties(true))) {
ListOffsetsResult result = admin.listOffsets(Collections.singletonMap(
new TopicPartition("foo", 0), OffsetSpec.earliest()));
ExecutionException exception =
assertThrows(ExecutionException.class,
() -> result.all().get(1, TimeUnit.MINUTES));
assertNotNull(exception.getCause());
assertEquals(UnsupportedEndpointTypeException.class, exception.getCause().getClass());
assertEquals("This Admin API is not yet supported when communicating directly with " +
"the controller quorum.", exception.getCause().getMessage());
}
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testIncrementalAlterConfigs(boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminProperties(usingBootstrapControllers))) {
int nodeId = usingBootstrapControllers ?
cluster.controllers().values().iterator().next().config().nodeId() :
cluster.brokers().values().iterator().next().config().nodeId();
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
ConfigResource defaultResource = new ConfigResource(BROKER, "");
Map<ConfigResource, Collection<AlterConfigOp>> alterations = new HashMap<>();
alterations.put(nodeResource, Arrays.asList(
new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"),
AlterConfigOp.OpType.SET)));
alterations.put(defaultResource, Arrays.asList(
new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"),
AlterConfigOp.OpType.SET)));
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
Config config = admin.describeConfigs(Arrays.asList(nodeResource)).
all().get(1, TimeUnit.MINUTES).get(nodeResource);
ConfigEntry entry = config.entries().stream().
filter(e -> e.name().equals("my.custom.config")).
findFirst().get();
assertEquals(DYNAMIC_BROKER_CONFIG, entry.source(),
"Expected entry for my.custom.config to come from DYNAMIC_BROKER_CONFIG. " +
"Instead, the entry was: " + entry);
});
}
}
}

114
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java

@ -29,6 +29,7 @@ import kafka.server.MetaProperties; @@ -29,6 +29,7 @@ import kafka.server.MetaProperties;
import kafka.tools.StorageTool;
import kafka.utils.Logging;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
@ -446,51 +447,96 @@ public class KafkaClusterTestKit implements AutoCloseable { @@ -446,51 +447,96 @@ public class KafkaClusterTestKit implements AutoCloseable {
"Failed to wait for publisher to publish the metadata update to each broker.");
}
public Properties controllerClientProperties() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
if (!controllers.isEmpty()) {
Collection<Node> controllerNodes = RaftConfig.voterConnectionsToNodes(
controllerQuorumVotersFutureManager.future.get());
StringBuilder bld = new StringBuilder();
String prefix = "";
for (Node node : controllerNodes) {
bld.append(prefix).append(node.id()).append('@');
bld.append(node.host()).append(":").append(node.port());
prefix = ",";
public String quorumVotersConfig() throws ExecutionException, InterruptedException {
Collection<Node> controllerNodes = RaftConfig.voterConnectionsToNodes(
controllerQuorumVotersFutureManager.future.get());
StringBuilder bld = new StringBuilder();
String prefix = "";
for (Node node : controllerNodes) {
bld.append(prefix).append(node.id()).append('@');
bld.append(node.host()).append(":").append(node.port());
prefix = ",";
}
return bld.toString();
}
public class ClientPropertiesBuilder {
private Properties properties;
private boolean usingBootstrapControllers = false;
public ClientPropertiesBuilder() {
this.properties = new Properties();
}
public ClientPropertiesBuilder(Properties properties) {
this.properties = properties;
}
public ClientPropertiesBuilder setUsingBootstrapControllers(boolean usingBootstrapControllers) {
this.usingBootstrapControllers = usingBootstrapControllers;
return this;
}
public Properties build() {
if (usingBootstrapControllers) {
properties.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, bootstrapControllers());
properties.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
} else {
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
properties.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
}
properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, bld.toString());
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
controllerNodes.stream().map(n -> n.host() + ":" + n.port()).
collect(Collectors.joining(",")));
return properties;
}
return properties;
}
public ClientPropertiesBuilder newClientPropertiesBuilder(Properties properties) {
return new ClientPropertiesBuilder(properties);
}
public ClientPropertiesBuilder newClientPropertiesBuilder() {
return new ClientPropertiesBuilder();
}
public Properties clientProperties() {
return clientProperties(new Properties());
return new ClientPropertiesBuilder().build();
}
public Properties clientProperties(Properties configOverrides) {
if (!brokers.isEmpty()) {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
ListenerName listenerName = nodes.externalListenerName();
int port = broker.boundPort(listenerName);
if (port <= 0) {
throw new RuntimeException("Broker " + brokerId + " does not yet " +
public String bootstrapServers() {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
int brokerId = entry.getKey();
BrokerServer broker = entry.getValue();
ListenerName listenerName = nodes.externalListenerName();
int port = broker.boundPort(listenerName);
if (port <= 0) {
throw new RuntimeException("Broker " + brokerId + " does not yet " +
"have a bound port for " + listenerName + ". Did you start " +
"the cluster yet?");
}
bld.append(prefix).append("localhost:").append(port);
prefix = ",";
}
return bld.toString();
}
public String bootstrapControllers() {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
int id = entry.getKey();
ControllerServer controller = entry.getValue();
ListenerName listenerName = nodes.controllerListenerName();
int port = controller.socketServer().boundPort(listenerName);
if (port <= 0) {
throw new RuntimeException("Controller " + id + " does not yet " +
"have a bound port for " + listenerName + ". Did you start " +
"the cluster yet?");
}
bld.append(prefix).append("localhost:").append(port);
prefix = ",";
}
configOverrides.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bld.toString());
bld.append(prefix).append("localhost:").append(port);
prefix = ",";
}
return configOverrides;
return bld.toString();
}
public Map<Integer, ControllerServer> controllers() {

4
core/src/test/java/kafka/testkit/TestKitNodes.java

@ -185,6 +185,10 @@ public class TestKitNodes { @@ -185,6 +185,10 @@ public class TestKitNodes {
return new ListenerName("EXTERNAL");
}
public ListenerName controllerListenerName() {
return new ListenerName("CONTROLLER");
}
public TestKitNodes copyWithAbsolutePaths(String baseDirectory) {
NavigableMap<Integer, ControllerNode> newControllerNodes = new TreeMap<>();
NavigableMap<Integer, BrokerNode> newBrokerNodes = new TreeMap<>();

32
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala

@ -26,7 +26,7 @@ import org.apache.kafka.clients.admin._ @@ -26,7 +26,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.PolicyViolationException
import org.apache.kafka.common.errors.{PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@ -52,12 +52,11 @@ import org.slf4j.LoggerFactory @@ -52,12 +52,11 @@ import org.slf4j.LoggerFactory
import java.io.File
import java.nio.file.{FileSystems, Path}
import java.{lang, util}
import java.util.concurrent.{CompletableFuture, CompletionStage}
import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Arrays, Collections, Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable
import scala.concurrent.ExecutionException
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@ -1188,6 +1187,33 @@ class KRaftClusterTest { @@ -1188,6 +1187,33 @@ class KRaftClusterTest {
cluster.close()
}
}
@Test
def testDirectToControllerCommunicationFailsOnOlderMetadataVersion(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).
build()
try {
cluster.format()
cluster.startup()
val admin = Admin.create(cluster.newClientPropertiesBuilder().
setUsingBootstrapControllers(true).
build())
try {
val exception = assertThrows(classOf[ExecutionException],
() => admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES))
assertNotNull(exception.getCause)
assertEquals(classOf[UnsupportedVersionException], exception.getCause.getClass)
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
}
class BadAuthorizer() extends Authorizer {

8
core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala

@ -69,10 +69,8 @@ class KafkaServerKRaftRegistrationTest { @@ -69,10 +69,8 @@ class KafkaServerKRaftRegistrationTest {
val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
// Enable migration configs and restart brokers
val props = kraftCluster.controllerClientProperties()
val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@ -108,10 +106,8 @@ class KafkaServerKRaftRegistrationTest { @@ -108,10 +106,8 @@ class KafkaServerKRaftRegistrationTest {
kraftCluster.startup()
// Enable migration configs and restart brokers
val props = kraftCluster.controllerClientProperties()
val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
assertThrows(classOf[IllegalArgumentException], () => zkCluster.rollingBrokerRestart())

20
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala

@ -172,10 +172,8 @@ class ZkMigrationIntegrationTest { @@ -172,10 +172,8 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig());
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed
@ -297,10 +295,8 @@ class ZkMigrationIntegrationTest { @@ -297,10 +295,8 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@ -367,10 +363,8 @@ class ZkMigrationIntegrationTest { @@ -367,10 +363,8 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@ -434,10 +428,8 @@ class ZkMigrationIntegrationTest { @@ -434,10 +428,8 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()
@ -496,10 +488,8 @@ class ZkMigrationIntegrationTest { @@ -496,10 +488,8 @@ class ZkMigrationIntegrationTest {
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
val clientProps = kraftCluster.controllerClientProperties()
val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig())
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
zkCluster.rollingBrokerRestart()

96
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala

@ -44,6 +44,10 @@ class ConfigCommandTest extends Logging { @@ -44,6 +44,10 @@ class ConfigCommandTest extends Logging {
private val zkConnect = "localhost:2181"
private val dummyAdminZkClient = new DummyAdminZkClient(null)
private val zookeeperBootstrap = Array("--zookeeper", zkConnect)
private val brokerBootstrap = Array("--bootstrap-server", "localhost:9092")
private val controllerBootstrap = Array("--bootstrap-controller", "localhost:9093")
@Test
def shouldExitWithNonZeroStatusOnArgError(): Unit = {
assertNonZeroStatusExit(Array("--blah"))
@ -51,32 +55,28 @@ class ConfigCommandTest extends Logging { @@ -51,32 +55,28 @@ class ConfigCommandTest extends Logging {
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity(): Unit = {
assertNonZeroStatusExit(Array(
"--zookeeper", zkConnect,
assertNonZeroStatusExit(zookeeperBootstrap ++ Array(
"--entity-type", "topics",
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithClientsEntity(): Unit = {
assertNonZeroStatusExit(Array(
"--zookeeper", zkConnect,
assertNonZeroStatusExit(zookeeperBootstrap ++ Array(
"--entity-type", "clients",
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusOnZkCommandWithIpsEntity(): Unit = {
assertNonZeroStatusExit(Array(
"--zookeeper", zkConnect,
assertNonZeroStatusExit(zookeeperBootstrap ++ Array(
"--entity-type", "ips",
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName(): Unit = {
assertNonZeroStatusExit(Array(
"--bootstrap-server", "localhost:9092",
assertNonZeroStatusExit(brokerBootstrap ++ Array(
"--entity-type", "users",
"--alter", "--add-config", "consumer_byte_rate=20000"))
}
@ -90,6 +90,12 @@ class ConfigCommandTest extends Logging { @@ -90,6 +90,12 @@ class ConfigCommandTest extends Logging {
"--describe"))
}
@Test
def shouldExitWithNonZeroStatusIfBothBootstrapServerAndBootstrapControllerGiven(): Unit = {
assertNonZeroStatusExit(brokerBootstrap ++ controllerBootstrap ++ Array(
"--describe", "--broker-defaults" ))
}
@Test
def shouldExitWithNonZeroStatusOnBrokerCommandWithZkTlsConfigFile(): Unit = {
assertNonZeroStatusExit(Array(
@ -119,66 +125,96 @@ class ConfigCommandTest extends Logging { @@ -119,66 +125,96 @@ class ConfigCommandTest extends Logging {
@Test
def shouldFailParseArgumentsForClientsEntityTypeUsingZookeeper(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => testArgumentParse("clients", zkConfig = true))
assertThrows(classOf[IllegalArgumentException], () => testArgumentParse(zookeeperBootstrap, "clients"))
}
@Test
def shouldParseArgumentsForClientsEntityType(): Unit = {
testArgumentParse("clients", zkConfig = false)
def shouldParseArgumentsForClientsEntityTypeWithBrokerBootstrap(): Unit = {
testArgumentParse(brokerBootstrap, "clients")
}
@Test
def shouldParseArgumentsForClientsEntityTypeWithControllerBootstrap(): Unit = {
testArgumentParse(controllerBootstrap, "clients")
}
@Test
def shouldParseArgumentsForUsersEntityTypeUsingZookeeper(): Unit = {
testArgumentParse("users", zkConfig = true)
testArgumentParse(zookeeperBootstrap, "users")
}
@Test
def shouldParseArgumentsForUsersEntityType(): Unit = {
testArgumentParse("users", zkConfig = false)
def shouldParseArgumentsForUsersEntityTypeWithBrokerBootstrap(): Unit = {
testArgumentParse(brokerBootstrap, "users")
}
@Test
def shouldParseArgumentsForUsersEntityTypeWithControllerBootstrap(): Unit = {
testArgumentParse(controllerBootstrap, "users")
}
@Test
def shouldFailParseArgumentsForTopicsEntityTypeUsingZookeeper(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => testArgumentParse("topics", zkConfig = true))
assertThrows(classOf[IllegalArgumentException], () => testArgumentParse(zookeeperBootstrap, "topics"))
}
@Test
def shouldParseArgumentsForTopicsEntityTypeWithBrokerBootstrap(): Unit = {
testArgumentParse(brokerBootstrap, "topics")
}
@Test
def shouldParseArgumentsForTopicsEntityType(): Unit = {
testArgumentParse("topics", zkConfig = false)
def shouldParseArgumentsForTopicsEntityTypeWithControllerBootstrap(): Unit = {
testArgumentParse(controllerBootstrap, "topics")
}
@Test
def shouldParseArgumentsForBrokersEntityTypeUsingZookeeper(): Unit = {
testArgumentParse("brokers", zkConfig = true)
testArgumentParse(zookeeperBootstrap, "brokers")
}
@Test
def shouldParseArgumentsForBrokersEntityTypeWithBrokerBootstrap(): Unit = {
testArgumentParse(brokerBootstrap, "brokers")
}
@Test
def shouldParseArgumentsForBrokersEntityTypeWithControllerBootstrap(): Unit = {
testArgumentParse(controllerBootstrap, "brokers")
}
@Test
def shouldParseArgumentsForBrokersEntityType(): Unit = {
testArgumentParse("brokers", zkConfig = false)
def shouldParseArgumentsForBrokerLoggersEntityTypeWithBrokerBootstrap(): Unit = {
testArgumentParse(brokerBootstrap, "broker-loggers")
}
@Test
def shouldParseArgumentsForBrokerLoggersEntityType(): Unit = {
testArgumentParse("broker-loggers", zkConfig = false)
def shouldParseArgumentsForBrokerLoggersEntityTypeWithControllerBootstrap(): Unit = {
testArgumentParse(controllerBootstrap, "broker-loggers")
}
@Test
def shouldFailParseArgumentsForIpEntityTypeUsingZookeeper(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => testArgumentParse("ips", zkConfig = true))
assertThrows(classOf[IllegalArgumentException], () => testArgumentParse(zookeeperBootstrap, "ips"))
}
@Test
def shouldParseArgumentsForIpEntityType(): Unit = {
testArgumentParse("ips", zkConfig = false)
def shouldParseArgumentsForIpEntityTypeWithBrokerBootstrap(): Unit = {
testArgumentParse(brokerBootstrap, "ips")
}
def testArgumentParse(entityType: String, zkConfig: Boolean): Unit = {
@Test
def shouldParseArgumentsForIpEntityTypeWithControllerBootstrap(): Unit = {
testArgumentParse(controllerBootstrap, "ips")
}
def testArgumentParse(
bootstrapArguments: Array[String],
entityType: String
): Unit = {
val shortFlag: String = s"--${entityType.dropRight(1)}"
val connectOpts = if (zkConfig)
("--zookeeper", zkConnect)
else
("--bootstrap-server", "localhost:9092")
val connectOpts = (bootstrapArguments(0), bootstrapArguments(1))
// Should parse correctly
var createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,

6
core/src/test/scala/unit/kafka/server/AuthHelperTest.scala

@ -220,7 +220,8 @@ class AuthHelperTest { @@ -220,7 +220,8 @@ class AuthHelperTest {
assertEquals(new DescribeClusterResponseData().
setClusterId("ltCWoi9wRhmHSQCIgAznEg").
setControllerId(-1).
setClusterAuthorizedOperations(Int.MinValue), responseData)
setClusterAuthorizedOperations(Int.MinValue).
setEndpointType(2.toByte), responseData)
}
@Test
@ -240,6 +241,7 @@ class AuthHelperTest { @@ -240,6 +241,7 @@ class AuthHelperTest {
setClusterId("ltCWoi9wRhmHSQCIgAznEg").
setControllerId(1).
setClusterAuthorizedOperations(Int.MinValue).
setBrokers(nodes), responseData)
setBrokers(nodes).
setEndpointType(2.toByte), responseData)
}
}

48
core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala

@ -21,11 +21,10 @@ import java.util @@ -21,11 +21,10 @@ import java.util
import java.util.Collections
import kafka.server.metadata.MockConfigRepository
import kafka.utils.{Log4jController, TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC, UNKNOWN}
import org.apache.kafka.common.config.LogLevelConfig.VALID_LOG_LEVELS
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.message.{AlterConfigsRequestData, AlterConfigsResponseData, IncrementalAlterConfigsRequestData, IncrementalAlterConfigsResponseData}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => LAlterConfigsResourceCollection}
@ -44,8 +43,6 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} @@ -44,8 +43,6 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Assertions, Test}
import org.slf4j.LoggerFactory
import scala.jdk.CollectionConverters._
class ConfigAdminManagerTest {
val logger = LoggerFactory.getLogger(classOf[ConfigAdminManagerTest])
@ -259,47 +256,6 @@ class ConfigAdminManagerTest { @@ -259,47 +256,6 @@ class ConfigAdminManagerTest {
() => manager.validateResourceNameIsCurrentNodeId("e")).getMessage())
}
@Test
def testValidateLogLevelConfigs(): Unit = {
val manager = newConfigAdminManager(5)
manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
setName(logger.getName).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")))
manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
setName(logger.getName).
setConfigOperation(OpType.DELETE.id()).
setValue("")))
assertEquals("APPEND operation is not allowed for the BROKER_LOGGER resource",
Assertions.assertThrows(classOf[InvalidRequestException],
() => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
setName(logger.getName).
setConfigOperation(OpType.APPEND.id()).
setValue("TRACE")))).getMessage())
assertEquals(s"Cannot set the log level of ${logger.getName} to BOGUS as it is not " +
s"a supported log level. Valid log levels are ${VALID_LOG_LEVELS.asScala.mkString(", ")}",
Assertions.assertThrows(classOf[InvalidConfigurationException],
() => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
setName(logger.getName).
setConfigOperation(OpType.SET.id()).
setValue("BOGUS")))).getMessage())
}
@Test
def testValidateRootLogLevelConfigs(): Unit = {
val manager = newConfigAdminManager(5)
manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
setName(Log4jController.ROOT_LOGGER).
setConfigOperation(OpType.SET.id()).
setValue("TRACE")))
assertEquals(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed",
Assertions.assertThrows(classOf[InvalidRequestException],
() => manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
setName(Log4jController.ROOT_LOGGER).
setConfigOperation(OpType.DELETE.id()).
setValue("")))).getMessage())
}
def brokerLogger1Incremental(): IAlterConfigsResource = new IAlterConfigsResource().
setResourceName("1").
setResourceType(BROKER_LOGGER.id).

28
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala

@ -20,6 +20,7 @@ package kafka.server @@ -20,6 +20,7 @@ package kafka.server
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import kafka.test.MockController
import kafka.utils.NotNothing
import org.apache.kafka.clients.admin.AlterConfigOp
@ -60,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource @@ -60,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.slf4j.LoggerFactory
import java.net.InetAddress
import java.util
@ -72,6 +74,8 @@ import scala.jdk.CollectionConverters._ @@ -72,6 +74,8 @@ import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class ControllerApisTest {
val logger = LoggerFactory.getLogger(classOf[ControllerApisTest])
object MockControllerMutationQuota {
val errorMessage = "quota exceeded in test"
var throttleTimeMs = 1000
@ -117,6 +121,7 @@ class ControllerApisTest { @@ -117,6 +121,7 @@ class ControllerApisTest {
)
private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
private val raftManager: RaftManager[ApiMessageAndVersion] = mock(classOf[RaftManager[ApiMessageAndVersion]])
private val metadataCache: KRaftMetadataCache = MetadataCache.kRaftMetadataCache(0)
private val quotasNeverThrottleControllerMutations = QuotaManagers(
clientQuotaManager,
@ -160,7 +165,8 @@ class ControllerApisTest { @@ -160,7 +165,8 @@ class ControllerApisTest {
ListenerType.CONTROLLER,
true,
false,
() => Features.fromKRaftVersion(MetadataVersion.latest()))
() => Features.fromKRaftVersion(MetadataVersion.latest())),
metadataCache
)
}
@ -487,16 +493,17 @@ class ControllerApisTest { @@ -487,16 +493,17 @@ class ControllerApisTest {
response.data().responses().asScala.toSet)
}
@Test
def testInvalidIncrementalAlterConfigsResources(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testInvalidIncrementalAlterConfigsResources(denyAllAuthorizer: Boolean): Unit = {
val requestData = new IncrementalAlterConfigsRequestData().setResources(
new AlterConfigsResourceCollection(util.Arrays.asList(
new AlterConfigsResource().
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER_LOGGER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
setName("kafka.server.KafkaConfig").
setValue("TRACE").
setName("kafka.server.ControllerApisTest").
setValue("DEBUG").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
setResourceName("3").
@ -521,7 +528,12 @@ class ControllerApisTest { @@ -521,7 +528,12 @@ class ControllerApisTest {
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
).iterator()))
val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
createControllerApis(Some(createDenyAllAuthorizer()),
val authorizer = if (denyAllAuthorizer) {
Some(createDenyAllAuthorizer())
} else {
None
}
createControllerApis(authorizer,
new MockController.Builder().build()).handleIncrementalAlterConfigs(request)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
@ -533,8 +545,8 @@ class ControllerApisTest { @@ -533,8 +545,8 @@ class ControllerApisTest {
val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
assertEquals(Set(
new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Unexpected resource type BROKER_LOGGER.").
setErrorCode(if (denyAllAuthorizer) CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
setErrorMessage(if (denyAllAuthorizer) CLUSTER_AUTHORIZATION_FAILED.message() else null).
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER_LOGGER.id()),
new AlterConfigsResourceResponse().

51
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java

@ -19,6 +19,8 @@ package org.apache.kafka.server.util; @@ -19,6 +19,8 @@ package org.apache.kafka.server.util;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
@ -194,4 +196,53 @@ public class CommandLineUtils { @@ -194,4 +196,53 @@ public class CommandLineUtils {
}
}
}
static class InitializeBootstrapException extends RuntimeException {
private final static long serialVersionUID = 1L;
InitializeBootstrapException(String message) {
super(message);
}
}
public static void initializeBootstrapProperties(
Properties properties,
Optional<String> bootstrapServer,
Optional<String> bootstrapControllers
) {
if (bootstrapServer.isPresent()) {
if (bootstrapControllers.isPresent()) {
throw new InitializeBootstrapException("You cannot specify both " +
"--bootstrap-controller and --bootstrap-server.");
}
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer.get());
properties.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
} else if (bootstrapControllers.isPresent()) {
properties.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
properties.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
bootstrapControllers.get());
} else {
throw new InitializeBootstrapException("You must specify either --bootstrap-controller " +
"or --bootstrap-server.");
}
}
public static void initializeBootstrapProperties(
OptionParser parser,
OptionSet options,
Properties properties,
OptionSpec<String> bootstrapServer,
OptionSpec<String> bootstrapControllers
) {
try {
initializeBootstrapProperties(properties,
options.has(bootstrapServer) ?
Optional.of(options.valueOf(bootstrapServer).toString()) : Optional.empty(),
options.has(bootstrapControllers) ?
Optional.of(options.valueOf(bootstrapControllers).toString()) : Optional.empty());
} catch (InitializeBootstrapException e) {
printUsageAndExit(parser, e.getMessage());
}
}
}

42
server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java

@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test; @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -224,4 +225,45 @@ public class CommandLineUtilsTest { @@ -224,4 +225,45 @@ public class CommandLineUtilsTest {
assertEquals("existing-string-3", props.get("sondkey"));
assertEquals("500", props.get("iondkey"));
}
static private Properties createTestProps() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "this");
props.setProperty("bootstrap.controllers", "that");
return props;
}
@Test
public void testInitializeBootstrapPropertiesWithNoBootstraps() {
assertEquals("You must specify either --bootstrap-controller or --bootstrap-server.",
assertThrows(CommandLineUtils.InitializeBootstrapException.class,
() -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
Optional.empty(), Optional.empty())).getMessage());
}
@Test
public void testInitializeBootstrapPropertiesWithBrokerBootstrap() {
Properties props = createTestProps();
CommandLineUtils.initializeBootstrapProperties(props,
Optional.of("127.0.0.2:9094"), Optional.empty());
assertEquals("127.0.0.2:9094", props.getProperty("bootstrap.servers"));
assertNull(props.getProperty("bootstrap.controllers"));
}
@Test
public void testInitializeBootstrapPropertiesWithControllerBootstrap() {
Properties props = createTestProps();
CommandLineUtils.initializeBootstrapProperties(props,
Optional.empty(), Optional.of("127.0.0.2:9094"));
assertNull(props.getProperty("bootstrap.servers"));
assertEquals("127.0.0.2:9094", props.getProperty("bootstrap.controllers"));
}
@Test
public void testInitializeBootstrapPropertiesWithBothBootstraps() {
assertEquals("You cannot specify both --bootstrap-controller and --bootstrap-server.",
assertThrows(CommandLineUtils.InitializeBootstrapException.class,
() -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage());
}
}

19
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java

@ -18,6 +18,7 @@ package org.apache.kafka.tools; @@ -18,6 +18,7 @@ package org.apache.kafka.tools;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
@ -25,9 +26,11 @@ import org.apache.kafka.clients.admin.Admin; @@ -25,9 +26,11 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@ -65,9 +68,13 @@ public class ClusterTool { @@ -65,9 +68,13 @@ public class ClusterTool {
Subparser unregisterParser = subparsers.addParser("unregister")
.help("Unregister a broker.");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) {
subpparser.addArgument("--bootstrap-server", "-b")
MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
.help("A list of host/port pairs to use for establishing the connection to the Kafka cluster.");
connectionOptions.addArgument("--bootstrap-controller", "-C")
.action(store())
.help("A list of host/port pairs to use for establishing the connection to the KRaft controllers.");
subpparser.addArgument("--config", "-c")
.action(store())
.help("A property file containing configurations for the Admin client.");
@ -83,13 +90,9 @@ public class ClusterTool { @@ -83,13 +90,9 @@ public class ClusterTool {
String configPath = namespace.getString("config");
Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath);
String bootstrapServer = namespace.getString("bootstrap_server");
if (bootstrapServer != null) {
properties.setProperty("bootstrap.servers", bootstrapServer);
}
if (properties.getProperty("bootstrap.servers") == null) {
throw new TerseException("Please specify --bootstrap-server.");
}
CommandLineUtils.initializeBootstrapProperties(properties,
Optional.ofNullable(namespace.getString("bootstrap_server")),
Optional.ofNullable(namespace.getString("bootstrap_controller")));
switch (command) {
case "cluster-id": {

25
tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java

@ -30,6 +30,7 @@ import net.sourceforge.argparse4j.ArgumentParsers; @@ -30,6 +30,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
@ -43,6 +44,7 @@ import org.apache.kafka.clients.admin.UpdateFeaturesResult; @@ -43,6 +44,7 @@ import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.CommandLineUtils;
import static net.sourceforge.argparse4j.impl.Arguments.append;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -73,13 +75,12 @@ public class FeatureCommand { @@ -73,13 +75,12 @@ public class FeatureCommand {
.newArgumentParser("kafka-features")
.defaultHelp(true)
.description("This tool manages feature flags in Kafka.");
parser
.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
.required(true);
parser
.addArgument("--command-config")
MutuallyExclusiveGroup bootstrapGroup = parser.addMutuallyExclusiveGroup().required(true);
bootstrapGroup.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.");
bootstrapGroup.addArgument("--bootstrap-controller")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the KRaft quorum.");
parser.addArgument("--command-config")
.type(Arguments.fileType())
.help("Property file containing configs to be passed to Admin Client.");
Subparsers subparsers = parser.addSubparsers().dest("command");
@ -93,13 +94,9 @@ public class FeatureCommand { @@ -93,13 +94,9 @@ public class FeatureCommand {
String configPath = namespace.getString("command_config");
Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath);
String bootstrapServer = namespace.getString("bootstrap_server");
if (bootstrapServer != null) {
properties.setProperty("bootstrap.servers", bootstrapServer);
}
if (properties.getProperty("bootstrap.servers") == null) {
throw new TerseException("Please specify --bootstrap-server.");
}
CommandLineUtils.initializeBootstrapProperties(properties,
Optional.ofNullable(namespace.getString("bootstrap_server")),
Optional.ofNullable(namespace.getString("bootstrap_controller")));
try (Admin adminClient = Admin.create(properties)) {
switch (command) {

19
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java

@ -20,15 +20,16 @@ import net.sourceforge.argparse4j.ArgumentParsers; @@ -20,15 +20,16 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.File;
import java.io.IOException;
@ -75,12 +76,12 @@ public class MetadataQuorumCommand { @@ -75,12 +76,12 @@ public class MetadataQuorumCommand {
.newArgumentParser("kafka-metadata-quorum")
.defaultHelp(true)
.description("This tool describes kraft metadata quorum status.");
parser
.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
.required(true);
parser
.addArgument("--command-config")
MutuallyExclusiveGroup connectionOptions = parser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.");
connectionOptions.addArgument("--bootstrap-controller")
.help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka controllers.");
parser.addArgument("--command-config")
.type(Arguments.fileType())
.help("Property file containing configs to be passed to Admin Client.");
addDescribeSubParser(parser);
@ -92,7 +93,9 @@ public class MetadataQuorumCommand { @@ -92,7 +93,9 @@ public class MetadataQuorumCommand {
File optionalCommandConfig = namespace.get("command_config");
final Properties props = getProperties(optionalCommandConfig);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"));
CommandLineUtils.initializeBootstrapProperties(props,
Optional.ofNullable(namespace.getString("bootstrap_server")),
Optional.ofNullable(namespace.getString("bootstrap_controller")));
admin = Admin.create(props);
if (command.equals("describe")) {

10
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

@ -71,6 +71,16 @@ public class FeatureCommandTest { @@ -71,6 +71,16 @@ public class FeatureCommandTest {
"SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV0)
public void testDescribeWithKRaftAndBootstrapControllers() {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe"))
);
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithZk() {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->

Loading…
Cancel
Save