Browse Source

KAFKA-15526: Simplify the LogAppendInfo class (#14470)

The LogAppendInfo class is a bit bloated in terms of class fields. That's because it is used as an umbrella class for both leader log appends and follower log appends and needs to carry fields for both. This makes the constructor for the class a bit cludgy to use. It also ends up being a bit confusing when fields are important and when they aren't. I noticed there were a few fields that didn't seem necessary.

Below is a description of changes:

firstOffset is a LogOffsetMetadata but there are no readers of the field that use anything but the messageOffset field - simplified to a long.
LogAppendInfo.errorMessage is only set in one context - when calling LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo. When we use this constructor, we pass up the original exception in LogAppendResult anyway, so the field is redundant with the LogAppendResult.exception field. This allows us to simplify the handling in KAFKA-15459: Convert coordinator retriable errors to a known producer response error #14378 since there are no custom error messages we just return whatever is in the exception message.
We only use targetCompressionType when constructing the LogValidator - just inline the call instead of including it in the LogAppendInfo.
offsetsMonotonic is only used when not assigning offsets to throw an exception - just throw the exception instead of setting a field to throw later.
shallowCount is only there to determine whether there are any messages in the append. Instead, we can just check validBytes which is incremented with a non-zero value every time we increment shallowCount.

Reviewers: Justine Olshan <jolshan@confluent.io>
pull/14476/head
David Mao 1 year ago committed by GitHub
parent
commit
2c925e9f33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      core/src/main/scala/kafka/log/UnifiedLog.scala
  2. 6
      core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
  3. 92
      core/src/main/scala/kafka/server/ReplicaManager.scala
  4. 2
      core/src/test/scala/other/kafka/StressTestLog.scala
  5. 2
      core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
  6. 6
      core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
  7. 10
      core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
  8. 2
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  9. 34
      core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
  10. 9
      core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
  11. 9
      core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
  12. 90
      storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java

55
core/src/main/scala/kafka/log/UnifiedLog.scala

@ -768,10 +768,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -768,10 +768,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
maybeFlushMetadataFile()
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.shallowCount == 0) appendInfo
if (appendInfo.validBytes <= 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the on-disk log
@ -784,13 +784,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -784,13 +784,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(offset.value)))
appendInfo.setFirstOffset(offset.value)
val validateAndOffsetAssignResult = try {
val targetCompression = BrokerCompressionType.forName(config.compressionType).targetCompressionType(appendInfo.sourceCompression)
val validator = new LogValidator(validRecords,
topicPartition,
time,
appendInfo.sourceCompression,
appendInfo.targetCompression,
targetCompression,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
@ -835,18 +836,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -835,18 +836,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic)
throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
records.records.asScala.map(_.offset))
if (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
val firstOffset = appendInfo.firstOffset.map[Long](x => x.messageOffset)
.orElse(records.batches.iterator().next().baseOffset())
val hasFirstOffset = appendInfo.firstOffset != UnifiedLog.UnknownOffset
val firstOffset = if (hasFirstOffset) appendInfo.firstOffset else records.batches.iterator().next().baseOffset()
val firstOrLast = if (appendInfo.firstOffset.isPresent) "First offset" else "Last offset of the first batch"
val firstOrLast = if (hasFirstOffset) "First offset" else "Last offset of the first batch"
throw new UnexpectedAppendOffsetException(
s"Unexpected offset in append to $topicPartition. $firstOrLast " +
s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${localLog.logEndOffset}. " +
@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
maybeDuplicate match {
case Some(duplicate) =>
appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset)))
appendInfo.setFirstOffset(duplicate.firstOffset)
appendInfo.setLastOffset(duplicate.lastOffset)
appendInfo.setLogAppendTime(duplicate.timestamp)
appendInfo.setLogStartOffset(logStartOffset)
case None =>
// Before appending update the first offset metadata to include segment information
appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata =>
new LogOffsetMetadata(offsetMetadata.messageOffset, segment.baseOffset, segment.size)
})
// Append the records, and increment the local log end offset immediately after the append because a
// write to the transaction index below may fail and we want to ensure that the offsets
// of future appends still grow monotonically. The resulting transaction index inconsistency
@ -1097,7 +1089,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1097,7 +1089,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* <ol>
* <li> each message matches its CRC
* <li> each message size is valid (if ignoreRecordSize is false)
* <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other.
* <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other
* <li> that the offsets are monotonically increasing (if requireOffsetsMonotonic is true)
* </ol>
*
* Also compute the following quantities:
@ -1113,10 +1106,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1113,10 +1106,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def analyzeAndValidateRecords(records: MemoryRecords,
origin: AppendOrigin,
ignoreRecordSize: Boolean,
requireOffsetsMonotonic: Boolean,
leaderEpoch: Int): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset: Optional[LogOffsetMetadata] = Optional.empty()
var firstOffset = UnifiedLog.UnknownOffset
var lastOffset = -1L
var lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH
var sourceCompression = CompressionType.NONE
@ -1143,7 +1136,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1143,7 +1136,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// Also indicate whether we have the accurate first offset or not
if (!readFirstMessage) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
firstOffset = Optional.of(new LogOffsetMetadata(batch.baseOffset))
firstOffset = batch.baseOffset
lastOffsetOfFirstBatch = batch.lastOffset
readFirstMessage = true
}
@ -1176,7 +1169,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1176,7 +1169,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
offsetOfMaxTimestamp = lastOffset
}
shallowMessageCount += 1
validBytesCount += batchSize
val batchCompression = CompressionType.forId(batch.compressionType.id)
@ -1184,17 +1176,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1184,17 +1176,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
sourceCompression = batchCompression
}
// Apply broker-side compression if any
val targetCompression = BrokerCompressionType.forName(config.compressionType).targetCompressionType(sourceCompression)
if (requireOffsetsMonotonic && !monotonic)
throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
records.records.asScala.map(_.offset))
val lastLeaderEpochOpt: OptionalInt = if (lastLeaderEpoch != RecordBatch.NO_PARTITION_LEADER_EPOCH)
OptionalInt.of(lastLeaderEpoch)
else
OptionalInt.empty()
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCompression, targetCompression,
shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], null,
LeaderHwChange.NONE)
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCompression,
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE)
}
/**
@ -1588,10 +1581,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1588,10 +1581,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Note that this is only required for pre-V2 message formats because these do not store the first message offset
in the header.
*/
val rollOffset = appendInfo
.firstOffset
.map[Long](_.messageOffset)
.orElse(maxOffsetInMessages - Integer.MAX_VALUE)
val rollOffset = if (appendInfo.firstOffset == UnifiedLog.UnknownOffset)
maxOffsetInMessages - Integer.MAX_VALUE
else
appendInfo.firstOffset
roll(Some(rollOffset))
} else {

6
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

@ -96,10 +96,10 @@ final class KafkaMetadataLog private ( @@ -96,10 +96,10 @@ final class KafkaMetadataLog private (
}
private def handleAndConvertLogAppendInfo(appendInfo: internals.log.LogAppendInfo): LogAppendInfo = {
if (appendInfo.firstOffset.isPresent())
new LogAppendInfo(appendInfo.firstOffset.get().messageOffset, appendInfo.lastOffset)
if (appendInfo.firstOffset != UnifiedLog.UnknownOffset)
new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset)
else
throw new KafkaException(s"Append failed unexpectedly: ${appendInfo.errorMessage}")
throw new KafkaException(s"Append failed unexpectedly")
}
override def lastFetchedEpoch: Int = {

92
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._ @@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._
/*
* Result metadata of a log append operation on the log
*/
case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) {
case class LogAppendResult(info: LogAppendInfo,
exception: Option[Throwable],
hasCustomErrorMessage: Boolean) {
def error: Errors = exception match {
case None => Errors.NONE
case Some(e) => Errors.forException(e)
}
def errorMessage: String = {
exception match {
case Some(e) if hasCustomErrorMessage => e.getMessage
case _ => null
}
}
}
case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) {
@ -739,67 +748,54 @@ class ReplicaManager(val config: KafkaConfig, @@ -739,67 +748,54 @@ class ReplicaManager(val config: KafkaConfig,
}
def appendEntries(allEntries: Map[TopicPartition, MemoryRecords])(unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
val verifiedEntries =
val verifiedEntries =
if (unverifiedEntries.isEmpty)
allEntries
allEntries
else
allEntries.filter { case (tp, _) =>
!unverifiedEntries.contains(tp)
}
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, verifiedEntries, requiredAcks, requestLocal, verificationGuards.toMap)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult],
useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = {
appendResult.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
result.info.lastOffset,
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors,
if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage
)
) // response status
}
}
val unverifiedResults = unverifiedEntries.map {
val errorResults = (unverifiedEntries ++ errorsPerPartition).map {
case (topicPartition, error) =>
val finalException =
// translate transaction coordinator errors to known producer response errors
val customException =
error match {
case Errors.INVALID_TXN_STATE => error.exception("Partition was not added to the transaction")
case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction"))
case Errors.CONCURRENT_TRANSACTIONS |
Errors.COORDINATOR_LOAD_IN_PROGRESS |
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")
case _ => error.exception()
Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
case _ => None
}
topicPartition -> LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(finalException)
Some(customException.getOrElse(error.exception)),
hasCustomErrorMessage = customException.isDefined
)
}
val errorResults = errorsPerPartition.map {
case (topicPartition, error) =>
topicPartition -> LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(error.exception())
val allResults = localProduceResults ++ errorResults
val produceStatus = allResults.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset,
result.info.lastOffset,
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors,
result.errorMessage
)
) // response status
}
val produceStatus = Set((localProduceResults, false), (unverifiedResults, true), (errorResults, false)).flatMap {
case (results, useCustomError) => produceStatusResult(results, useCustomError)
}.toMap
val allResults = localProduceResults ++ unverifiedResults ++ errorResults
actionQueue.add {
() => allResults.foreach { case (topicPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition)
@ -859,7 +855,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -859,7 +855,7 @@ class ReplicaManager(val config: KafkaConfig,
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset.map[Long](_.messageOffset).orElse(-1L),
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
RecordBatch.NO_TIMESTAMP,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
)
@ -1182,7 +1178,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -1182,7 +1178,8 @@ class ReplicaManager(val config: KafkaConfig,
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")),
hasCustomErrorMessage = false))
} else {
try {
val partition = getPartitionOrException(topicPartition)
@ -1197,9 +1194,9 @@ class ReplicaManager(val config: KafkaConfig, @@ -1197,9 +1194,9 @@ class ReplicaManager(val config: KafkaConfig,
if (traceEnabled)
trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
s"${info.firstOffset.orElse(new LogOffsetMetadata(-1))} and ending at offset ${info.lastOffset}")
s"${info.firstOffset} and ending at offset ${info.lastOffset}")
(topicPartition, LogAppendResult(info))
(topicPartition, LogAppendResult(info, exception = None, hasCustomErrorMessage = false))
} catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
@ -1209,15 +1206,16 @@ class ReplicaManager(val config: KafkaConfig, @@ -1209,15 +1206,16 @@ class ReplicaManager(val config: KafkaConfig,
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: KafkaStorageException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e)))
(topicPartition, LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), hasCustomErrorMessage = false))
case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
val recordErrors = rve.recordErrors
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException)))
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors),
Some(rve.invalidException), hasCustomErrorMessage = true))
case t: Throwable =>
val logStartOffset = processFailedRecord(topicPartition, t)
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
Some(t), hasCustomErrorMessage = false))
}
}
}

2
core/src/test/scala/other/kafka/StressTestLog.scala

@ -124,7 +124,7 @@ object StressTestLog { @@ -124,7 +124,7 @@ object StressTestLog {
class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress {
override def work(): Unit = {
val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0)
require((!logAppendInfo.firstOffset.isPresent || logAppendInfo.firstOffset.get().messageOffset == currentOffset)
require((logAppendInfo.firstOffset == -1 || logAppendInfo.firstOffset == currentOffset)
&& logAppendInfo.lastOffset == currentOffset)
currentOffset += 1
if (currentOffset % 1000 == 0)

2
core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala

@ -150,7 +150,7 @@ abstract class AbstractLogCleanerIntegrationTest { @@ -150,7 +150,7 @@ abstract class AbstractLogCleanerIntegrationTest {
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
incCounter()
(key, value, appendInfo.firstOffset.get.messageOffset)
(key, value, appendInfo.firstOffset)
}
}

6
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala

@ -70,7 +70,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati @@ -70,7 +70,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val largeMessageOffset = appendInfo.firstOffset.get.messageOffset
val largeMessageOffset = appendInfo.firstOffset
val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
@ -172,7 +172,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati @@ -172,7 +172,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val largeMessageOffset = appendInfo.firstOffset.map[Long](_.messageOffset).get
val largeMessageOffset = appendInfo.firstOffset
// also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_11_0_IV0.version)
@ -320,7 +320,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati @@ -320,7 +320,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val offsets = appendInfo.firstOffset.get.messageOffset to appendInfo.lastOffset
val offsets = appendInfo.firstOffset to appendInfo.lastOffset
kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
}

10
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

@ -295,17 +295,17 @@ class LogCleanerTest { @@ -295,17 +295,17 @@ class LogCleanerTest {
// check duplicate append from producer 1
var logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(0L, logAppendInfo.firstOffset)
assertEquals(2L, logAppendInfo.lastOffset)
// check duplicate append from producer 3
logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
assertEquals(6L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(6L, logAppendInfo.firstOffset)
assertEquals(7L, logAppendInfo.lastOffset)
// check duplicate append from producer 2
logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
assertEquals(3L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(3L, logAppendInfo.firstOffset)
assertEquals(5L, logAppendInfo.lastOffset)
// do one more append and a round of cleaning to force another deletion from producer 1's batch
@ -321,7 +321,7 @@ class LogCleanerTest { @@ -321,7 +321,7 @@ class LogCleanerTest {
// duplicate append from producer1 should still be fine
logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(0L, logAppendInfo.firstOffset)
assertEquals(2L, logAppendInfo.lastOffset)
}
@ -2048,7 +2048,7 @@ class LogCleanerTest { @@ -2048,7 +2048,7 @@ class LogCleanerTest {
private def writeToLog(log: UnifiedLog, seq: Iterable[(Int, Int)]): Iterable[Long] = {
for ((key, value) <- seq)
yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset.get.messageOffset
yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
}
private def key(id: Long) = ByteBuffer.wrap(id.toString.getBytes)

2
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -320,7 +320,7 @@ class LogManagerTest { @@ -320,7 +320,7 @@ class LogManagerTest {
for (_ <- 0 until numMessages) {
val set = TestUtils.singletonRecords("test".getBytes())
val info = log.appendAsLeader(set, leaderEpoch = 0)
offset = info.firstOffset.get.messageOffset
offset = info.firstOffset
}
log.updateHighWatermark(log.logEndOffset)

34
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

@ -160,17 +160,17 @@ class UnifiedLogTest { @@ -160,17 +160,17 @@ class UnifiedLogTest {
val records = TestUtils.records(simpleRecords)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
assertEquals(new LogOffsetMetadata(0, 0, 0), firstAppendInfo.firstOffset.get)
assertEquals(0, firstAppendInfo.firstOffset)
val secondAppendInfo = log.appendAsLeader(
TestUtils.records(simpleRecords),
leaderEpoch = 0
)
assertEquals(new LogOffsetMetadata(simpleRecords.size, 0, records.sizeInBytes), secondAppendInfo.firstOffset.get)
assertEquals(simpleRecords.size, secondAppendInfo.firstOffset)
log.roll()
val afterRollAppendInfo = log.appendAsLeader(TestUtils.records(simpleRecords), leaderEpoch = 0)
assertEquals(new LogOffsetMetadata(simpleRecords.size * 2, simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
assertEquals(simpleRecords.size * 2, afterRollAppendInfo.firstOffset)
}
@Test
@ -1260,7 +1260,7 @@ class UnifiedLogTest { @@ -1260,7 +1260,7 @@ class UnifiedLogTest {
), producerId = pid, producerEpoch = epoch, sequence = seq)
val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
assertEquals(
multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset.get.messageOffset + 1,
multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1,
3,
"should have appended 3 entries"
)
@ -1268,8 +1268,8 @@ class UnifiedLogTest { @@ -1268,8 +1268,8 @@ class UnifiedLogTest {
// Append a Duplicate of the tail, when the entry at the tail has multiple records.
val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
assertEquals(
multiEntryAppendInfo.firstOffset.get.messageOffset,
dupMultiEntryAppendInfo.firstOffset.get.messageOffset,
multiEntryAppendInfo.firstOffset,
dupMultiEntryAppendInfo.firstOffset,
"Somehow appended a duplicate entry with multiple log records to the tail"
)
assertEquals(multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset,
@ -1305,8 +1305,8 @@ class UnifiedLogTest { @@ -1305,8 +1305,8 @@ class UnifiedLogTest {
val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
assertEquals(
origAppendInfo.firstOffset.get.messageOffset,
newAppendInfo.firstOffset.get.messageOffset,
origAppendInfo.firstOffset,
newAppendInfo.firstOffset,
"Inserted a duplicate records into the log"
)
assertEquals(origAppendInfo.lastOffset, newAppendInfo.lastOffset,
@ -1786,7 +1786,7 @@ class UnifiedLogTest { @@ -1786,7 +1786,7 @@ class UnifiedLogTest {
log.appendAsLeader(
TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds),
leaderEpoch = 0
).firstOffset.get.messageOffset,
).firstOffset,
"Should still be able to append and should get the logEndOffset assigned to the new append")
// cleanup the log
@ -2940,7 +2940,7 @@ class UnifiedLogTest { @@ -2940,7 +2940,7 @@ class UnifiedLogTest {
new SimpleRecord("baz".getBytes))
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// add more transactional records
seq += 3
@ -2948,13 +2948,13 @@ class UnifiedLogTest { @@ -2948,13 +2948,13 @@ class UnifiedLogTest {
new SimpleRecord("blah".getBytes)), leaderEpoch = 0)
// LSO should not have changed
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// now transaction is committed
val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds())
// first unstable offset is not updated until the high watermark is advanced
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
// now there should be no first unstable offset
@ -3347,7 +3347,7 @@ class UnifiedLogTest { @@ -3347,7 +3347,7 @@ class UnifiedLogTest {
new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes),
new SimpleRecord("c".getBytes)), leaderEpoch = 0)
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// mix in some non-transactional data
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
@ -3362,14 +3362,14 @@ class UnifiedLogTest { @@ -3362,14 +3362,14 @@ class UnifiedLogTest {
new SimpleRecord("f".getBytes)), leaderEpoch = 0)
// LSO should not have changed
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// now first producer's transaction is aborted
val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT, mockTime.milliseconds())
log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
// LSO should now point to one less than the first offset of the second transaction
assertEquals(secondAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(secondAppendInfo.firstOffset), log.firstUnstableOffset)
// commit the second transaction
val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.COMMIT, mockTime.milliseconds())
@ -3394,7 +3394,7 @@ class UnifiedLogTest { @@ -3394,7 +3394,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// this write should spill to the second segment
seq = 3
@ -3402,7 +3402,7 @@ class UnifiedLogTest { @@ -3402,7 +3402,7 @@ class UnifiedLogTest {
new SimpleRecord("d".getBytes),
new SimpleRecord("e".getBytes),
new SimpleRecord("f".getBytes)), leaderEpoch = 0)
assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset), log.firstUnstableOffset.asJava)
assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
// now abort the transaction

9
core/src/test/scala/unit/kafka/server/MockFetcherThread.scala

@ -23,10 +23,10 @@ import org.apache.kafka.common.requests.FetchResponse @@ -23,10 +23,10 @@ import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata}
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
import java.util.{Optional, OptionalInt}
import java.util.OptionalInt
import scala.collection.{Map, Set, mutable}
import scala.jdk.CollectionConverters._
@ -102,7 +102,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, @@ -102,7 +102,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
state.logStartOffset = partitionData.logStartOffset
state.highWatermark = partitionData.highWatermark
Some(new LogAppendInfo(Optional.of(new LogOffsetMetadata(fetchOffset)),
Some(new LogAppendInfo(fetchOffset,
lastOffset,
lastEpoch,
maxTimestamp,
@ -111,10 +111,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, @@ -111,10 +111,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
state.logStartOffset,
RecordConversionStats.EMPTY,
CompressionType.NONE,
CompressionType.NONE,
batches.size,
FetchResponse.recordsSize(partitionData),
true,
batches.headOption.map(_.lastOffset).getOrElse(-1)))
}

9
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala

@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMeta @@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMeta
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata}
import org.apache.kafka.storage.internals.log.{LogAppendInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -765,7 +765,7 @@ class ReplicaFetcherThreadTest { @@ -765,7 +765,7 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(partition.appendRecordsToFollowerOrFutureReplica(any(), any())).thenReturn(Some(new LogAppendInfo(
Optional.empty[LogOffsetMetadata],
-1,
0,
OptionalInt.empty,
RecordBatch.NO_TIMESTAMP,
@ -774,10 +774,7 @@ class ReplicaFetcherThreadTest { @@ -774,10 +774,7 @@ class ReplicaFetcherThreadTest {
-1L,
RecordConversionStats.EMPTY,
CompressionType.NONE,
CompressionType.NONE,
-1,
0, // No records.
false,
-1, // No records.
-1L
)))

90
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java

@ -23,7 +23,6 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError; @@ -23,7 +23,6 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
/**
@ -31,12 +30,11 @@ import java.util.OptionalInt; @@ -31,12 +30,11 @@ import java.util.OptionalInt;
*/
public class LogAppendInfo {
public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, -1,
false, -1L);
RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L);
private Optional<LogOffsetMetadata> firstOffset;
private long firstOffset;
private long lastOffset;
private long maxTimestamp;
private long offsetOfMaxTimestamp;
@ -46,21 +44,16 @@ public class LogAppendInfo { @@ -46,21 +44,16 @@ public class LogAppendInfo {
private final OptionalInt lastLeaderEpoch;
private final CompressionType sourceCompression;
private final CompressionType targetCompression;
private final int shallowCount;
private final int validBytes;
private final boolean offsetsMonotonic;
private final long lastOffsetOfFirstBatch;
private final List<RecordError> recordErrors;
private final String errorMessage;
private final LeaderHwChange leaderHwChange;
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower. If the message is a duplicate message the segment base offset and relative position
* in segment will be unknown.
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
@ -69,13 +62,10 @@ public class LogAppendInfo { @@ -69,13 +62,10 @@ public class LogAppendInfo {
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param targetCompression The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
*/
public LogAppendInfo(Optional<LogOffsetMetadata> firstOffset,
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
@ -84,22 +74,18 @@ public class LogAppendInfo { @@ -84,22 +74,18 @@ public class LogAppendInfo {
long logStartOffset,
RecordConversionStats recordConversionStats,
CompressionType sourceCompression,
CompressionType targetCompression,
int shallowCount,
int validBytes,
boolean offsetsMonotonic,
long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset,
recordConversionStats, sourceCompression, targetCompression, shallowCount, validBytes, offsetsMonotonic,
lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(), null, LeaderHwChange.NONE);
recordConversionStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(),
LeaderHwChange.NONE);
}
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower. If the message is a duplicate message the segment base offset and relative position
* in segment will be unknown.
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
@ -108,17 +94,13 @@ public class LogAppendInfo { @@ -108,17 +94,13 @@ public class LogAppendInfo {
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param targetCompression The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param errorMessage error message
* @param recordErrors List of record errors that caused the respective batch to be dropped
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record
* Same if high watermark is not changed. None is the default value and it means append failed
*/
public LogAppendInfo(Optional<LogOffsetMetadata> firstOffset,
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
@ -127,13 +109,9 @@ public class LogAppendInfo { @@ -127,13 +109,9 @@ public class LogAppendInfo {
long logStartOffset,
RecordConversionStats recordConversionStats,
CompressionType sourceCompression,
CompressionType targetCompression,
int shallowCount,
int validBytes,
boolean offsetsMonotonic,
long lastOffsetOfFirstBatch,
List<RecordError> recordErrors,
String errorMessage,
LeaderHwChange leaderHwChange) {
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
@ -144,21 +122,17 @@ public class LogAppendInfo { @@ -144,21 +122,17 @@ public class LogAppendInfo {
this.logStartOffset = logStartOffset;
this.recordConversionStats = recordConversionStats;
this.sourceCompression = sourceCompression;
this.targetCompression = targetCompression;
this.shallowCount = shallowCount;
this.validBytes = validBytes;
this.offsetsMonotonic = offsetsMonotonic;
this.lastOffsetOfFirstBatch = lastOffsetOfFirstBatch;
this.recordErrors = recordErrors;
this.errorMessage = errorMessage;
this.leaderHwChange = leaderHwChange;
}
public Optional<LogOffsetMetadata> firstOffset() {
public long firstOffset() {
return firstOffset;
}
public void setFirstOffset(Optional<LogOffsetMetadata> firstOffset) {
public void setFirstOffset(long firstOffset) {
this.firstOffset = firstOffset;
}
@ -218,30 +192,14 @@ public class LogAppendInfo { @@ -218,30 +192,14 @@ public class LogAppendInfo {
return sourceCompression;
}
public CompressionType targetCompression() {
return targetCompression;
}
public int shallowCount() {
return shallowCount;
}
public int validBytes() {
return validBytes;
}
public boolean offsetsMonotonic() {
return offsetsMonotonic;
}
public List<RecordError> recordErrors() {
return recordErrors;
}
public String errorMessage() {
return errorMessage;
}
public LeaderHwChange leaderHwChange() {
return leaderHwChange;
}
@ -253,7 +211,7 @@ public class LogAppendInfo { @@ -253,7 +211,7 @@ public class LogAppendInfo {
* offset to avoid decompressing the data.
*/
public long firstOrLastOffsetOfFirstBatch() {
return firstOffset.map(x -> x.messageOffset).orElse(lastOffsetOfFirstBatch);
return firstOffset >= 0 ? firstOffset : lastOffsetOfFirstBatch;
}
/**
@ -262,8 +220,8 @@ public class LogAppendInfo { @@ -262,8 +220,8 @@ public class LogAppendInfo {
* @return Maximum possible number of messages described by LogAppendInfo
*/
public long numMessages() {
if (firstOffset.isPresent() && firstOffset.get().messageOffset >= 0 && lastOffset >= 0) {
return lastOffset - firstOffset.get().messageOffset + 1;
if (firstOffset >= 0 && lastOffset >= 0) {
return lastOffset - firstOffset + 1;
}
return 0;
}
@ -276,24 +234,22 @@ public class LogAppendInfo { @@ -276,24 +234,22 @@ public class LogAppendInfo {
*/
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordConversionStats,
sourceCompression, targetCompression, shallowCount, validBytes, offsetsMonotonic, lastOffsetOfFirstBatch, recordErrors, errorMessage, newLeaderHwChange);
sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange);
}
public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) {
return new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, -1,
false, -1L);
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L);
}
/**
* In ProduceResponse V8+, we add two new fields record_errors and error_message (see KIP-467).
* For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one
* in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors and errorMessage
* in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors
*/
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors, String errorMessage) {
return new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, -1,
false, -1L, recordErrors, errorMessage, LeaderHwChange.NONE);
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE);
}
@Override
@ -308,13 +264,9 @@ public class LogAppendInfo { @@ -308,13 +264,9 @@ public class LogAppendInfo {
", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordConversionStats +
", sourceCompression=" + sourceCompression +
", targetCompression=" + targetCompression +
", shallowCount=" + shallowCount +
", validBytes=" + validBytes +
", offsetsMonotonic=" + offsetsMonotonic +
", lastOffsetOfFirstBatch=" + lastOffsetOfFirstBatch +
", recordErrors=" + recordErrors +
", errorMessage='" + errorMessage + '\'' +
", leaderHwChange=" + leaderHwChange +
')';
}

Loading…
Cancel
Save