diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index fdb15fc2266..3576e8dead4 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -555,22 +555,28 @@ class GroupMetadataManager(brokerId: Int, warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") case Some(log) => - var currOffset = log.logStartOffset + val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]() + val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]() + val loadedGroups = mutable.Map[String, GroupMetadata]() + val removedGroups = mutable.Set[String]() // buffer may not be needed if records are read from memory var buffer = ByteBuffer.allocate(0) // loop breaks if leader changes at any time during the load, since logEndOffset is -1 - val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]() - val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]() - val loadedGroups = mutable.Map[String, GroupMetadata]() - val removedGroups = mutable.Set[String]() + var currOffset = log.logStartOffset - while (currOffset < logEndOffset && !shuttingDown.get()) { + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + + while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) { val fetchDataInfo = log.read(currOffset, maxLength = config.loadBufferSize, isolation = FetchLogEnd, minOneMessage = true) + + readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + val memRecords = fetchDataInfo.records match { case records: MemoryRecords => records case fileRecords: FileRecords => diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index f38f261e96e..f71b765fd16 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -312,14 +312,20 @@ class TransactionStateManager(brokerId: Int, // loop breaks if leader changes at any time during the load, since logEndOffset is -1 var currOffset = log.logStartOffset + // loop breaks if no records have been read, since the end of the log has been reached + var readAtLeastOneRecord = true + try { - while (currOffset < logEndOffset && !shuttingDown.get() && inReadLock(stateLock) { + while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get() && inReadLock(stateLock) { loadingPartitions.exists { idAndEpoch: TransactionPartitionAndLeaderEpoch => idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) { val fetchDataInfo = log.read(currOffset, maxLength = config.transactionLogLoadBufferSize, isolation = FetchLogEnd, minOneMessage = true) + + readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 + val memRecords = fetchDataInfo.records match { case records: MemoryRecords => records case fileRecords: FileRecords => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 947e4b3ac99..b0f81c8fe49 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -685,6 +685,30 @@ class GroupMetadataManagerTest { } } + @Test + def testLoadGroupAndOffsetsWithCorruptedLog(): Unit = { + // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen + // when all the records are expired and the active segment is truncated or when the partition + // is accidentally corrupted. + val startOffset = 0L + val endOffset = 10L + + val logMock: Log = EasyMock.mock(classOf[Log]) + EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock)) + expectGroupMetadataLoad(logMock, startOffset, MemoryRecords.EMPTY) + EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(endOffset)) + EasyMock.replay(logMock) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ()) + + EasyMock.verify(logMock) + EasyMock.verify(replicaManager) + + assertFalse(groupMetadataManager.isPartitionLoading(groupTopicPartition.partition())) + } + @Test def testOffsetWriteAfterGroupRemoved(): Unit = { // this test case checks the following scenario: diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 4308623e430..82bcc51aabc 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -495,6 +495,37 @@ class TransactionStateManagerTest { assertEquals(1, transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch) } + @Test + def testLoadTransactionMetadataWithCorruptedLog(): Unit = { + // Simulate a case where startOffset < endOffset but log is empty. This could theoretically happen + // when all the records are expired and the active segment is truncated or when the partition + // is accidentally corrupted. + val startOffset = 0L + val endOffset = 10L + + val logMock: Log = EasyMock.mock(classOf[Log]) + EasyMock.expect(replicaManager.getLog(topicPartition)).andStubReturn(Some(logMock)) + EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), + maxLength = EasyMock.anyInt(), + isolation = EasyMock.eq(FetchLogEnd), + minOneMessage = EasyMock.eq(true)) + ).andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), MemoryRecords.EMPTY)) + EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset)) + + EasyMock.replay(logMock) + EasyMock.replay(replicaManager) + + transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _, _) => ()) + + // let the time advance to trigger the background thread loading + scheduler.tick() + + EasyMock.verify(logMock) + EasyMock.verify(replicaManager) + assertEquals(0, transactionManager.loadingPartitions.size) + } + private def verifyMetadataDoesExistAndIsUsable(transactionalId: String): Unit = { transactionManager.getTransactionState(transactionalId) match { case Left(_) => fail("shouldn't have been any errors")