Browse Source

KAFKA-6622; Fix performance issue loading consumer offsets (#4661)

`batch.baseOffset` is an expensive operation (even says so in its javadoc), and yet was called for every single record in a batch when loading offsets. This means that for N records in a gzipped batch, the entire batch will be unzipped N times. The fix is to compute and cache the base offset once as we decompress and process the batch.

Reviewers: Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
pull/4678/head
Radai Rosenblatt 7 years ago committed by Jason Gustafson
parent
commit
5760da7d0b
  1. 7
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

7
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int, @@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int,
}
pendingOffsets.remove(batch.producerId)
} else {
var batchBaseOffset: Option[Long] = None
for (record <- batch.asScala) {
require(record.hasKey, "Group metadata/offset entry key should not be null")
if (batchBaseOffset.isEmpty)
batchBaseOffset = Some(record.offset)
GroupMetadataManager.readMessageKey(record.key) match {
case offsetKey: OffsetKey =>
@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int, @@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int,
} else {
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
if (isTxnOffsetCommit)
pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
else
loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
}
case groupMetadataKey: GroupMetadataKey =>

Loading…
Cancel
Save