Browse Source

KAFKA-9065; Fix endless loop when loading group/transaction metadata (#7554)

The current coordinator loading logic causes an infinite loop when there is a gap between the last record in the log and the log end offset. This is possible because of compaction if the active segment is empty. The patch fixes the problem by breaking from the loading loop when a read from the log returns no additional data.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/7907/head
David Jacot 5 years ago committed by Jason Gustafson
parent
commit
618226c544
  1. 18
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  2. 8
      core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
  3. 24
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
  4. 31
      core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

18
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -555,22 +555,28 @@ class GroupMetadataManager(brokerId: Int, @@ -555,22 +555,28 @@ class GroupMetadataManager(brokerId: Int,
warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
case Some(log) =>
var currOffset = log.logStartOffset
val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()
// buffer may not be needed if records are read from memory
var buffer = ByteBuffer.allocate(0)
// loop breaks if leader changes at any time during the load, since logEndOffset is -1
val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()
var currOffset = log.logStartOffset
while (currOffset < logEndOffset && !shuttingDown.get()) {
// loop breaks if no records have been read, since the end of the log has been reached
var readAtLeastOneRecord = true
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
val fetchDataInfo = log.read(currOffset,
maxLength = config.loadBufferSize,
isolation = FetchLogEnd,
minOneMessage = true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
val memRecords = fetchDataInfo.records match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>

8
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

@ -312,14 +312,20 @@ class TransactionStateManager(brokerId: Int, @@ -312,14 +312,20 @@ class TransactionStateManager(brokerId: Int,
// loop breaks if leader changes at any time during the load, since logEndOffset is -1
var currOffset = log.logStartOffset
// loop breaks if no records have been read, since the end of the log has been reached
var readAtLeastOneRecord = true
try {
while (currOffset < logEndOffset && !shuttingDown.get() && inReadLock(stateLock) {
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get() && inReadLock(stateLock) {
loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch =>
idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
val fetchDataInfo = log.read(currOffset,
maxLength = config.transactionLogLoadBufferSize,
isolation = FetchLogEnd,
minOneMessage = true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
val memRecords = fetchDataInfo.records match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>

24
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

@ -685,6 +685,30 @@ class GroupMetadataManagerTest { @@ -685,6 +685,30 @@ class GroupMetadataManagerTest {
}
}
@Test
def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = {
// Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
// when all the records are expired and the active segment is truncated or when the partition
// is accidentally corrupted.
val startOffset = 0L
val endOffset = 10L
val logMock: Log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
expectGroupMetadataLoad(logMock, startOffset, MemoryRecords.EMPTY)
EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(endOffset))
EasyMock.replay(logMock)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
EasyMock.verify(logMock)
EasyMock.verify(replicaManager)
assertFalse(groupMetadataManager.isPartitionLoading(groupTopicPartition.partition()))
}
@Test
def testOffsetWriteAfterGroupRemoved(): Unit = {
// this test case checks the following scenario:

31
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

@ -495,6 +495,37 @@ class TransactionStateManagerTest { @@ -495,6 +495,37 @@ class TransactionStateManagerTest {
assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
}
@Test
def testLoadTransactionMetadataWithCorruptedLog(): Unit = {
// Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen
// when all the records are expired and the active segment is truncated or when the partition
// is accidentally corrupted.
val startOffset = 0L
val endOffset = 10L
val logMock: Log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(topicPartition)).andStubReturn(Some(logMock))
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
maxLength = EasyMock.anyInt(),
isolation = EasyMock.eq(FetchLogEnd),
minOneMessage = EasyMock.eq(true))
).andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset))
EasyMock.replay(logMock)
EasyMock.replay(replicaManager)
transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ())
// let the time advance to trigger the background thread loading
scheduler.tick()
EasyMock.verify(logMock)
EasyMock.verify(replicaManager)
assertEquals(0, transactionManager.loadingPartitions.size)
}
private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = {
transactionManager.getTransactionState(transactionalId) match {
case Left(_) => fail("shouldn't have been any errors")

Loading…
Cancel
Save