Browse Source

KAFKA-15353: make sure AlterPartitionRequest.build() is idempotent (#14236)

As described in https://issues.apache.org/jira/browse/KAFKA-15353
When the AlterPartitionRequest version is < 3 and its builder.build is called multiple times, both newIsrWithEpochs and newIsr will all be empty. This can happen if the sender retires on errors.

Reviewers: Luke Chen <showuon@gmail.com>
pull/14338/head
Calvin Liu 1 year ago committed by GitHub
parent
commit
b41b2dfcf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
  2. 13
      clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java

16
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java

@ -85,12 +85,16 @@ public class AlterPartitionRequest extends AbstractRequest { @@ -85,12 +85,16 @@ public class AlterPartitionRequest extends AbstractRequest {
if (version < 3) {
data.topics().forEach(topicData -> {
topicData.partitions().forEach(partitionData -> {
List<Integer> newIsr = new ArrayList<>(partitionData.newIsrWithEpochs().size());
partitionData.newIsrWithEpochs().forEach(brokerState -> {
newIsr.add(brokerState.brokerId());
});
partitionData.setNewIsr(newIsr);
partitionData.setNewIsrWithEpochs(Collections.emptyList());
// The newIsrWithEpochs will be empty after build. Then we can skip the conversion if the build
// is called again.
if (partitionData.newIsrWithEpochs().size() > 0) {
List<Integer> newIsr = new ArrayList<>(partitionData.newIsrWithEpochs().size());
partitionData.newIsrWithEpochs().forEach(brokerState -> {
newIsr.add(brokerState.brokerId());
});
partitionData.setNewIsr(newIsr);
partitionData.setNewIsrWithEpochs(Collections.emptyList());
}
});
});
}

13
clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java

@ -72,5 +72,18 @@ class AlterPartitionRequestTest { @@ -72,5 +72,18 @@ class AlterPartitionRequestTest {
assertEquals(newIsrWithBrokerEpoch, partitionData.newIsrWithEpochs());
assertTrue(partitionData.newIsr().isEmpty());
}
// Build the request again to make sure build() is idempotent.
alterPartitionRequest = builder.build(version);
assertEquals(1, alterPartitionRequest.data().topics().size());
assertEquals(1, alterPartitionRequest.data().topics().get(0).partitions().size());
alterPartitionRequest.data().topics().get(0).partitions().get(0);
if (version < 3) {
assertEquals(Arrays.asList(1, 2, 3), partitionData.newIsr());
assertTrue(partitionData.newIsrWithEpochs().isEmpty());
} else {
assertEquals(newIsrWithBrokerEpoch, partitionData.newIsrWithEpochs());
assertTrue(partitionData.newIsr().isEmpty());
}
}
}

Loading…
Cancel
Save