Browse Source

KAFKA-15454: Add support for OffsetCommit version 9 in admin client (#14571)

This patch adds support for OffsetCommit version 9 in the admin client. It mainly allows handling two new error codes `STALE_MEMBER_EPOCH` and `GROUP_ID_NOT_FOUND ` introduced as part of KIP-848.

Reviewers: David Jacot <djacot@confluent.io>
pull/13909/merge
vamossagar12 1 year ago committed by GitHub
parent
commit
8f3731e2bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
  2. 2
      clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
  3. 2
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

2
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java

@ -179,8 +179,10 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co @@ -179,8 +179,10 @@ public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<Co
case INVALID_GROUP_ID:
case INVALID_COMMIT_OFFSET_SIZE:
case GROUP_AUTHORIZATION_FAILED:
case GROUP_ID_NOT_FOUND:
// Member level errors.
case UNKNOWN_MEMBER_ID:
case STALE_MEMBER_EPOCH:
log.debug("OffsetCommit request for group id {} failed due to error {}.",
groupId.idValue, error);
partitionResults.put(topicPartition, error);

2
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java

@ -48,6 +48,8 @@ import java.util.function.Function; @@ -48,6 +48,8 @@ import java.util.function.Function;
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
* - {@link Errors#INVALID_TXN_STATE}
* - {@link Errors#GROUP_ID_NOT_FOUND}
* - {@link Errors#STALE_MEMBER_EPOCH}
*/
public class OffsetCommitResponse extends AbstractResponse {

2
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -4757,7 +4757,7 @@ public class KafkaAdminClientTest { @@ -4757,7 +4757,7 @@ public class KafkaAdminClientTest {
final TopicPartition tp1 = new TopicPartition("foo", 0);
final List<Errors> nonRetriableErrors = Arrays.asList(
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND, Errors.STALE_MEMBER_EPOCH);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

Loading…
Cancel
Save