From 0186794104b7106fe426024d65730fec79ad999a Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 15 May 2017 12:58:36 -0700 Subject: [PATCH] KAFKA-5248; Remove unused/unneeded retention time in TxnOffsetCommitRequest Author: Jason Gustafson Reviewers: Guozhang Wang , Apurva Mehta , Ismael Juma Closes #3058 from hachikuji/KAFKA-5248 --- .../producer/internals/TransactionManager.java | 3 +-- .../apache/kafka/common/protocol/Protocol.java | 3 --- .../common/requests/TxnOffsetCommitRequest.java | 17 +++-------------- .../common/requests/RequestResponseTest.java | 2 +- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- 5 files changed, 6 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 7e2f81330d6..f3ed2525454 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; -import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset; @@ -461,7 +460,7 @@ public class TransactionManager { pendingTxnOffsetCommits.put(entry.getKey(), committedOffset); } TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId, - producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME, + producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits); return new TxnOffsetCommitHandler(result, builder); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index fb3c8c9e24d..5e057389d86 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1521,9 +1521,6 @@ public class Protocol { new Field("producer_epoch", INT16, "Current epoch associated with the producer id."), - new Field("retention_time", - INT64, - "The time in ms to retain the offset."), new Field("topics", new ArrayOf(new Schema( new Field("topic", STRING), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 3f3024f1c7d..f5334f2153f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -30,7 +30,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; - private static final String RETENTION_TIME_KEY_NAME = "retention_time"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partitions"; @@ -42,16 +41,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest { private final String consumerGroupId; private final long producerId; private final short producerEpoch; - private final long retentionTimeMs; private final Map offsets; - public Builder(String consumerGroupId, long producerId, short producerEpoch, long retentionTimeMs, + public Builder(String consumerGroupId, long producerId, short producerEpoch, Map offsets) { super(ApiKeys.TXN_OFFSET_COMMIT); this.consumerGroupId = consumerGroupId; this.producerId = producerId; this.producerEpoch = producerEpoch; - this.retentionTimeMs = retentionTimeMs; this.offsets = offsets; } @@ -61,23 +58,21 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @Override public TxnOffsetCommitRequest build(short version) { - return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets); + return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, offsets); } } private final String consumerGroupId; private final long producerId; private final short producerEpoch; - private final long retentionTimeMs; private final Map offsets; public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch, - long retentionTimeMs, Map offsets) { + Map offsets) { super(version); this.consumerGroupId = consumerGroupId; this.producerId = producerId; this.producerEpoch = producerEpoch; - this.retentionTimeMs = retentionTimeMs; this.offsets = offsets; } @@ -86,7 +81,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); - this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME); Map offsets = new HashMap<>(); Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME); @@ -116,10 +110,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { return producerEpoch; } - public long retentionTimeMs() { - return retentionTimeMs; - } - public Map offsets() { return offsets; } @@ -130,7 +120,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); struct.set(PRODUCER_ID_KEY_NAME, producerId); struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); - struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs); Map> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets); Object[] partitionsArray = new Object[mappedPartitionOffsets.size()]; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cbfb6a90d17..1cfd6a34078 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -951,7 +951,7 @@ public class RequestResponseTest { final Map offsets = new HashMap<>(); offsets.put(new TopicPartition("topic", 73), new TxnOffsetCommitRequest.CommittedOffset(100, null)); - return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, 73, offsets).build(); + return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, offsets).build(); } private TxnOffsetCommitResponse createTxnOffsetCommitResponse() { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 425b9f16880..4cf3e7da883 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest { new WriteTxnMarkersRequest.Builder(List.empty.asJava) case ApiKeys.TXN_OFFSET_COMMIT => - new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava) + new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava) case key => throw new IllegalArgumentException("Unsupported API key " + apiKey)