Browse Source

KAFKA-15457: Add support for OffsetFetch version 9 in admin client (#14611)

This patch adds support for OffsetFetch version 9 in the admin client. It mainly allows handling two new error codes `STALE_MEMBER_EPOCH` and `UNKNOWN_MEMBER_ID`  introduced as part of KIP-848.

Reviewers: David Jacot <djacot@confluent.io>
pull/14531/merge
vamossagar12 11 months ago committed by GitHub
parent
commit
1a3aca305e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
  2. 2
      clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
  3. 3
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

2
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java

@ -165,6 +165,8 @@ public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<Coordina @@ -165,6 +165,8 @@ public class ListConsumerGroupOffsetsHandler implements AdminApiHandler<Coordina
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case UNKNOWN_MEMBER_ID:
case STALE_MEMBER_EPOCH:
log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;

2
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java

@ -54,6 +54,8 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPO @@ -54,6 +54,8 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPO
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#STALE_MEMBER_EPOCH}
*/
public class OffsetFetchResponse extends AbstractResponse {
public static final long INVALID_OFFSET = -1L;

3
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -3383,7 +3383,8 @@ public class KafkaAdminClientTest { @@ -3383,7 +3383,8 @@ public class KafkaAdminClientTest {
public void testListConsumerGroupOffsetsNonRetriableErrors() throws Exception {
// Non-retriable errors throw an exception
final List<Errors> nonRetriableErrors = Arrays.asList(
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND,
Errors.UNKNOWN_MEMBER_ID, Errors.STALE_MEMBER_EPOCH);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

Loading…
Cancel
Save