Browse Source

KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#9393)

In this PR, I have addressed the review comments from @chia7712 in #9001 which were provided after #9001 was merged. The changes are made mainly to KafkaAdminClient:

Improve error message in updateFeatures api when feature name is empty.
Propagate top-level error message in updateFeatures api.
Add an empty-parameter variety for describeFeatures api.
Minor documentation updates to @param and @return to make these resemble other apis.

Reviewers: Chia-Ping Tsai chia7712@gmail.com, Jun Rao junrao@gmail.com
pull/9411/head
Kowshik Prakasam 4 years ago committed by GitHub
parent
commit
de4183485b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  2. 11
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  3. 19
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

22
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

@ -1306,6 +1306,17 @@ public interface Admin extends AutoCloseable { @@ -1306,6 +1306,17 @@ public interface Admin extends AutoCloseable {
*/
AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);
/**
* Describes finalized as well as supported features.
* <p>
* This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options.
* See the overload for more details.
*
* @return the {@link DescribeFeaturesResult} containing the result
*/
default DescribeFeaturesResult describeFeatures() {
return describeFeatures(new DescribeFeaturesOptions());
}
/**
* Describes finalized as well as supported features. By default, the request is issued to any
@ -1320,9 +1331,9 @@ public interface Admin extends AutoCloseable { @@ -1320,9 +1331,9 @@ public interface Admin extends AutoCloseable {
* If the request timed out before the describe operation could finish.</li>
* </ul>
* <p>
* @param options the options to use
*
* @return the {@link DescribeFeaturesResult} containing the result
* @param options the options to use
* @return the {@link DescribeFeaturesResult} containing the result
*/
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
@ -1367,10 +1378,9 @@ public interface Admin extends AutoCloseable { @@ -1367,10 +1378,9 @@ public interface Admin extends AutoCloseable {
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
*
* @return the {@link UpdateFeaturesResult} containing the result
* @param featureUpdates the map of finalized feature name to {@link FeatureUpdate}
* @param options the options to use
* @return the {@link UpdateFeaturesResult} containing the result
*/
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

11
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -4410,6 +4410,10 @@ public class KafkaAdminClient extends AdminClient { @@ -4410,6 +4410,10 @@ public class KafkaAdminClient extends AdminClient {
final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be empty.");
}
updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
}
@ -4424,10 +4428,6 @@ public class KafkaAdminClient extends AdminClient { @@ -4424,10 +4428,6 @@ public class KafkaAdminClient extends AdminClient {
for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
final FeatureUpdate update = entry.getValue();
if (feature.trim().isEmpty()) {
throw new IllegalArgumentException("Provided feature can not be null or empty.");
}
final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
new UpdateFeaturesRequestData.FeatureUpdateKey();
requestItem.setFeature(feature);
@ -4471,7 +4471,8 @@ public class KafkaAdminClient extends AdminClient { @@ -4471,7 +4471,8 @@ public class KafkaAdminClient extends AdminClient {
break;
default:
for (final Map.Entry<String, KafkaFutureImpl<Void>> entry : updateFutures.entrySet()) {
entry.getValue().completeExceptionally(topLevelError.exception());
final String errorMsg = response.data().errorMessage();
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
}
break;
}

19
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -4062,19 +4062,12 @@ public class KafkaAdminClientTest { @@ -4062,19 +4062,12 @@ public class KafkaAdminClientTest {
@Test
public void testUpdateFeaturesShouldFailRequestForInvalidFeatureName() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
final UpdateFeaturesResult result = env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions());
final Map<String, KafkaFuture<Void>> futures = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
final Throwable cause = assertThrows(ExecutionException.class, () -> entry.getValue().get());
assertEquals(KafkaException.class, cause.getCause().getClass());
}
final KafkaFuture<Void> future = result.all();
final Throwable cause = assertThrows(ExecutionException.class, () -> future.get());
assertEquals(KafkaException.class, cause.getCause().getClass());
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)),
Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
new UpdateFeaturesOptions()));
}
}

Loading…
Cancel
Save