Browse Source

KAFKA-5248; Remove unused/unneeded retention time in TxnOffsetCommitRequest

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3058 from hachikuji/KAFKA-5248
pull/3042/merge
Jason Gustafson 8 years ago
parent
commit
0186794104
  1. 3
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  2. 3
      clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
  3. 17
      clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
  4. 2
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  5. 2
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

3
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -37,7 +37,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; @@ -37,7 +37,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
@ -461,7 +460,7 @@ public class TransactionManager { @@ -461,7 +460,7 @@ public class TransactionManager {
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
}
TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
pendingTxnOffsetCommits);
return new TxnOffsetCommitHandler(result, builder);
}

3
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java

@ -1521,9 +1521,6 @@ public class Protocol { @@ -1521,9 +1521,6 @@ public class Protocol {
new Field("producer_epoch",
INT16,
"Current epoch associated with the producer id."),
new Field("retention_time",
INT64,
"The time in ms to retain the offset."),
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),

17
clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java

@ -30,7 +30,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @@ -30,7 +30,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
private static final String RETENTION_TIME_KEY_NAME = "retention_time";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
@ -42,16 +41,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @@ -42,16 +41,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final long retentionTimeMs;
private final Map<TopicPartition, CommittedOffset> offsets;
public Builder(String consumerGroupId, long producerId, short producerEpoch, long retentionTimeMs,
public Builder(String consumerGroupId, long producerId, short producerEpoch,
Map<TopicPartition, CommittedOffset> offsets) {
super(ApiKeys.TXN_OFFSET_COMMIT);
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.retentionTimeMs = retentionTimeMs;
this.offsets = offsets;
}
@ -61,23 +58,21 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @@ -61,23 +58,21 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
@Override
public TxnOffsetCommitRequest build(short version) {
return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets);
return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, offsets);
}
}
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final long retentionTimeMs;
private final Map<TopicPartition, CommittedOffset> offsets;
public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch,
long retentionTimeMs, Map<TopicPartition, CommittedOffset> offsets) {
Map<TopicPartition, CommittedOffset> offsets) {
super(version);
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.retentionTimeMs = retentionTimeMs;
this.offsets = offsets;
}
@ -86,7 +81,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @@ -86,7 +81,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
@ -116,10 +110,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @@ -116,10 +110,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
return producerEpoch;
}
public long retentionTimeMs() {
return retentionTimeMs;
}
public Map<TopicPartition, CommittedOffset> offsets() {
return offsets;
}
@ -130,7 +120,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @@ -130,7 +120,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
struct.set(PRODUCER_ID_KEY_NAME, producerId);
struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);
Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];

2
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -951,7 +951,7 @@ public class RequestResponseTest { @@ -951,7 +951,7 @@ public class RequestResponseTest {
final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 73),
new TxnOffsetCommitRequest.CommittedOffset(100, null));
return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, 73, offsets).build();
return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, offsets).build();
}
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {

2
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest {
new WriteTxnMarkersRequest.Builder(List.empty.asJava)
case ApiKeys.TXN_OFFSET_COMMIT =>
new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava)
new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava)
case key =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)

Loading…
Cancel
Save