diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 8cf99fcc438..dba8b4e1810 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} +import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -1129,7 +1129,7 @@ object GroupMetadataManager { value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata) value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) // version 1 has a non empty expireTimestamp field - value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get) + value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) } val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index afbe5b88204..10119c8d632 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -332,11 +332,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // for version 1 and beyond store offsets in offset manager - // commit timestamp is always set to now. // "default" expiration timestamp is now + retention (and retention may be overridden if v2) // expire timestamp is computed differently for v1 and v2. // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. - // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp + // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect val currentTimestamp = time.milliseconds