Browse Source

KAFKA-698 Avoid advancing the log end offset until the append has actually happened since reads may be happening in the meantime.

0.8.0-beta1-candidate1
Jay Kreps 12 years ago
parent
commit
de1a4d7276
  1. 11
      core/src/main/scala/kafka/log/Log.scala

11
core/src/main/scala/kafka/log/Log.scala

@ -269,14 +269,14 @@ private[kafka] class Log(val dir: File, @@ -269,14 +269,14 @@ private[kafka] class Log(val dir: File,
// assign offsets to the messageset
val offsets =
if(assignOffsets) {
val firstOffset = nextOffset.get
validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec)
val lastOffset = nextOffset.get - 1
val offsetCounter = new AtomicLong(nextOffset.get)
val firstOffset = offsetCounter.get
validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
val lastOffset = offsetCounter.get - 1
(firstOffset, lastOffset)
} else {
if(!messageSetInfo.offsetsMonotonic)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
nextOffset.set(messageSetInfo.lastOffset + 1)
(messageSetInfo.firstOffset, messageSetInfo.lastOffset)
}
@ -285,6 +285,9 @@ private[kafka] class Log(val dir: File, @@ -285,6 +285,9 @@ private[kafka] class Log(val dir: File,
.format(this.name, offsets._1, nextOffset.get(), validMessages))
segment.append(offsets._1, validMessages)
// advance the log end offset
nextOffset.set(offsets._2 + 1)
// return the offset at which the messages were appended
offsets
}

Loading…
Cancel
Save