From 618226c544f35a02d606342d984bb60c6e1371d7 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 7 Jan 2020 23:03:08 +0100 Subject: [PATCH] KAFKA-9065; Fix endless loop when loading group/transaction metadata (#7554) The current coordinator loading logic causes an infinite loop when there is a gap between the last record in the log and the log end offset. This is possible because of compaction if the active segment is empty. The patch fixes the problem by breaking from the loading loop when a read from the log returns no additional data. Reviewers: Jason Gustafson --- .../group/GroupMetadataManager.scala | 18 +++++++---- .../transaction/TransactionStateManager.scala | 8 ++++- .../group/GroupMetadataManagerTest.scala | 24 ++++++++++++++ .../TransactionStateManagerTest.scala | 31 +++++++++++++++++++ 4 files changed, 74 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index fdb15fc2266..3576e8dead4 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -555,22 +555,28 @@ class GroupMetadataManager(brokerId: Int, warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") case Some(log) => - var currOffset = log.logStartOffset + val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]() + val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]() + val loadedGroups = mutable.Map[String, GroupMetadata]() + val removedGroups = mutable.Set[String]() // buffer may not be needed if records are read from memory var buffer = ByteBuffer.allocate(0) // loop breaks if leader changes at any time during the load, since logEndOffset is -1 - val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]() - val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]() - val loadedGroups = mutable.Map[String, GroupMetadata]() - val removedGroups = mutable.Set[String]() + var currOffset = log.logStartOffset - while (currOffset < logEndOffset && !shuttingDown.get()) { + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + + while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) { val fetchDataInfo = log.read(currOffset, maxLength = config.loadBufferSize, isolation = FetchLogEnd, minOneMessage = true) + + readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + val memRecords = fetchDataInfo.records match { case records: MemoryRecords => records case fileRecords: FileRecords => diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index f38f261e96e..f71b765fd16 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -312,14 +312,20 @@ class TransactionStateManager(brokerId: Int, // loop breaks if leader changes at any time during the load, since logEndOffset is -1 var currOffset = log.logStartOffset + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + try { - while (currOffset < logEndOffset && !shuttingDown.get() && inReadLock(stateLock) { + while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get() && inReadLock(stateLock) { loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch => idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) { val fetchDataInfo = log.read(currOffset, maxLength = config.transactionLogLoadBufferSize, isolation = FetchLogEnd, minOneMessage = true) + + readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + val memRecords = fetchDataInfo.records match { case records: MemoryRecords => records case fileRecords: FileRecords => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 947e4b3ac99..b0f81c8fe49 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -685,6 +685,30 @@ class GroupMetadataManagerTest { } } + @Test + def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = { + // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen + // when all the records are expired and the active segment is truncated or when the partition + // is accidentally corrupted. + val startOffset = 0L + val endOffset = 10L + + val logMock: Log = EasyMock.mock(classOf[Log]) + EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock)) + expectGroupMetadataLoad(logMock, startOffset, MemoryRecords.EMPTY) + EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(endOffset)) + EasyMock.replay(logMock) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ()) + + EasyMock.verify(logMock) + EasyMock.verify(replicaManager) + + assertFalse(groupMetadataManager.isPartitionLoading(groupTopicPartition.partition())) + } + @Test def testOffsetWriteAfterGroupRemoved(): Unit = { // this test case checks the following scenario: diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 4308623e430..82bcc51aabc 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -495,6 +495,37 @@ class TransactionStateManagerTest { assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch) } + @Test + def testLoadTransactionMetadataWithCorruptedLog(): Unit = { + // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen + // when all the records are expired and the active segment is truncated or when the partition + // is accidentally corrupted. + val startOffset = 0L + val endOffset = 10L + + val logMock: Log = EasyMock.mock(classOf[Log]) + EasyMock.expect(replicaManager.getLog(topicPartition)).andStubReturn(Some(logMock)) + EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), + maxLength = EasyMock.anyInt(), + isolation = EasyMock.eq(FetchLogEnd), + minOneMessage = EasyMock.eq(true)) + ).andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), MemoryRecords.EMPTY)) + EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset)) + + EasyMock.replay(logMock) + EasyMock.replay(replicaManager) + + transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ()) + + // let the time advance to trigger the background thread loading + scheduler.tick() + + EasyMock.verify(logMock) + EasyMock.verify(replicaManager) + assertEquals(0, transactionManager.loadingPartitions.size) + } + private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = { transactionManager.getTransactionState(transactionalId) match { case Left(_) => fail("shouldn't have been any errors")