Browse Source

KAFKA-7403; Use default timestamp if no expire timestamp set in offset commit value (#5690)

This fixes a regression caused by KAFKA-4682 (KIP-211) which caused offset commit failures after upgrading from an older version which used the v1 inter-broker format.
pull/5688/merge
Vahid Hashemian 6 years ago committed by Jason Gustafson
parent
commit
2933f21374
  1. 4
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  2. 3
      core/src/main/scala/kafka/server/KafkaApis.scala

4
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors @@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
@ -1129,7 +1129,7 @@ object GroupMetadataManager { @@ -1129,7 +1129,7 @@ object GroupMetadataManager {
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
// version 1 has a non empty expireTimestamp field
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get)
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
}
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)

3
core/src/main/scala/kafka/server/KafkaApis.scala

@ -332,11 +332,10 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -332,11 +332,10 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// for version 1 and beyond store offsets in offset manager
// commit timestamp is always set to now.
// "default" expiration timestamp is now + retention (and retention may be overridden if v2)
// expire timestamp is computed differently for v1 and v2.
// - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
// - If v1 and explicit retention time is provided we calculate expiration timestamp based on that
// - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds

Loading…
Cancel
Save