Browse Source

KAFKA-14618; Fix off by one error in snapshot id (#13108)

The KRaft client expects the offset of the snapshot id to be an end offset. End offsets are
exclusive. The MetadataProvenance type was createing a snapshot id using the last contained offset
which is inclusive. This change fixes that and renames some of the fields to make this difference
more obvious.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
pull/13059/head
José Armando García Sancio 2 years ago committed by GitHub
parent
commit
058d8d530b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
  2. 4
      core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
  3. 4
      metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
  4. 39
      metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java
  5. 18
      metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
  6. 7
      metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
  7. 3
      metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
  8. 20
      metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
  9. 4
      metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
  10. 2
      raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java

4
core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala

@ -93,7 +93,7 @@ final class BrokerServerMetrics private ( @@ -93,7 +93,7 @@ final class BrokerServerMetrics private (
)
addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
lastAppliedImageProvenance.get.offset()
lastAppliedImageProvenance.get.lastContainedOffset()
}
addMetric(metrics, lastAppliedRecordTimestampName) { _ =>
@ -132,7 +132,7 @@ final class BrokerServerMetrics private ( @@ -132,7 +132,7 @@ final class BrokerServerMetrics private (
override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
lastAppliedImageProvenance.set(provenance)
override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().offset()
override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
}

4
core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala

@ -67,7 +67,7 @@ final class BrokerServerMetricsTest { @@ -67,7 +67,7 @@ final class BrokerServerMetricsTest {
val expectedValue = 1000
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
expectedValue,
brokerMetrics.lastAppliedImageProvenance.get().epoch(),
brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(),
brokerMetrics.lastAppliedTimestamp()));
assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long])
}
@ -90,7 +90,7 @@ final class BrokerServerMetricsTest { @@ -90,7 +90,7 @@ final class BrokerServerMetricsTest {
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
brokerMetrics.lastAppliedOffset(),
brokerMetrics.lastAppliedImageProvenance.get().epoch(),
brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(),
timestamp))
assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long])
assertEquals(time.milliseconds - timestamp, lagMetric.metricValue.asInstanceOf[Long])

4
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java

@ -91,11 +91,11 @@ public final class MetadataImage { @@ -91,11 +91,11 @@ public final class MetadataImage {
}
public OffsetAndEpoch highestOffsetAndEpoch() {
return new OffsetAndEpoch(provenance.offset(), provenance.epoch());
return new OffsetAndEpoch(provenance.lastContainedOffset(), provenance.lastContainedEpoch());
}
public long offset() {
return provenance.offset();
return provenance.lastContainedOffset();
}
public FeaturesImage features() {

39
metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package org.apache.kafka.image;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.snapshot.Snapshots;
import java.util.Objects;
@ -28,30 +29,30 @@ import java.util.Objects; @@ -28,30 +29,30 @@ import java.util.Objects;
public final class MetadataProvenance {
public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, -1, -1L);
private final long offset;
private final int epoch;
private final long lastContainedOffset;
private final int lastContainedEpoch;
private final long lastContainedLogTimeMs;
public MetadataProvenance(
long offset,
int epoch,
long lastContainedOffset,
int lastContainedEpoch,
long lastContainedLogTimeMs
) {
this.offset = offset;
this.epoch = epoch;
this.lastContainedOffset = lastContainedOffset;
this.lastContainedEpoch = lastContainedEpoch;
this.lastContainedLogTimeMs = lastContainedLogTimeMs;
}
public OffsetAndEpoch offsetAndEpoch() {
return new OffsetAndEpoch(offset, epoch);
public OffsetAndEpoch snapshotId() {
return new OffsetAndEpoch(lastContainedOffset + 1, lastContainedEpoch);
}
public long offset() {
return offset;
public long lastContainedOffset() {
return lastContainedOffset;
}
public int epoch() {
return epoch;
public int lastContainedEpoch() {
return lastContainedEpoch;
}
public long lastContainedLogTimeMs() {
@ -62,30 +63,30 @@ public final class MetadataProvenance { @@ -62,30 +63,30 @@ public final class MetadataProvenance {
* Returns the name that a snapshot with this provenance would have.
*/
public String snapshotName() {
return String.format("snapshot %020d-%010d", offset, epoch);
return String.format("snapshot %s", Snapshots.filenameFromSnapshotId(snapshotId()));
}
@Override
public boolean equals(Object o) {
if (o == null || !o.getClass().equals(this.getClass())) return false;
MetadataProvenance other = (MetadataProvenance) o;
return offset == other.offset &&
epoch == other.epoch &&
return lastContainedOffset == other.lastContainedOffset &&
lastContainedEpoch == other.lastContainedEpoch &&
lastContainedLogTimeMs == other.lastContainedLogTimeMs;
}
@Override
public int hashCode() {
return Objects.hash(offset,
epoch,
return Objects.hash(lastContainedOffset,
lastContainedEpoch,
lastContainedLogTimeMs);
}
@Override
public String toString() {
return "MetadataProvenance(" +
"offset=" + offset +
", epoch=" + epoch +
"lastContainedOffset=" + lastContainedOffset +
", lastContainedEpoch=" + lastContainedEpoch +
", lastContainedLogTimeMs=" + lastContainedLogTimeMs +
")";
}

18
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java

@ -79,7 +79,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -79,7 +79,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
@Override
public void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
this.lastAppliedOffset = provenance.offset();
this.lastAppliedOffset = provenance.lastContainedOffset();
}
@Override
@ -278,7 +278,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -278,7 +278,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
LogDeltaManifest manifest = loadLogDelta(delta, reader);
if (log.isDebugEnabled()) {
log.debug("Generated a metadata delta between {} and {} from {} batch(es) " +
"in {} us.", image.offset(), manifest.provenance().offset(),
"in {} us.", image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
}
try {
@ -286,10 +286,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -286,10 +286,10 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
"metadata delta between offset " + image.offset() +
" and " + manifest.provenance().offset(), e);
" and " + manifest.provenance().lastContainedOffset(), e);
return;
}
if (stillNeedToCatchUp(manifest.provenance().offset())) {
if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
return;
}
log.debug("Publishing new image with provenance {}.", image.provenance());
@ -298,7 +298,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -298,7 +298,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
publisher.publishLogDelta(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the new metadata " +
"image ending at " + manifest.provenance().offset() +
"image ending at " + manifest.provenance().lastContainedOffset() +
" with publisher " + publisher.name(), e);
}
}
@ -332,8 +332,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -332,8 +332,8 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
long startNs = time.nanoseconds();
int numBatches = 0;
long numBytes = 0L;
long lastOffset = image.provenance().offset();
int lastEpoch = image.provenance().epoch();
long lastOffset = image.provenance().lastContainedOffset();
int lastEpoch = image.provenance().lastContainedEpoch();
long lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();
while (reader.hasNext()) {
@ -376,7 +376,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -376,7 +376,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
SnapshotManifest manifest = loadSnapshot(delta, reader);
if (log.isDebugEnabled()) {
log.debug("Generated a metadata delta from a snapshot at offset {} " +
"in {} us.", manifest.provenance().offset(),
"in {} us.", manifest.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
}
try {
@ -386,7 +386,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> @@ -386,7 +386,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
"snapshot at offset " + reader.lastContainedLogOffset(), e);
return;
}
if (stillNeedToCatchUp(manifest.provenance().offset())) {
if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
return;
}
log.debug("Publishing new snapshot image with provenance {}.", image.provenance());

7
metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java

@ -98,9 +98,10 @@ public class SnapshotEmitter implements SnapshotGenerator.Emitter { @@ -98,9 +98,10 @@ public class SnapshotEmitter implements SnapshotGenerator.Emitter {
@Override
public void maybeEmit(MetadataImage image) {
MetadataProvenance provenance = image.provenance();
Optional<SnapshotWriter<ApiMessageAndVersion>> snapshotWriter =
raftClient.createSnapshot(provenance.offsetAndEpoch(),
provenance.lastContainedLogTimeMs());
Optional<SnapshotWriter<ApiMessageAndVersion>> snapshotWriter = raftClient.createSnapshot(
provenance.snapshotId(),
provenance.lastContainedLogTimeMs()
);
if (!snapshotWriter.isPresent()) {
log.error("Not generating {} because it already exists.", provenance.snapshotName());
return;

3
metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java

@ -205,8 +205,7 @@ public class SnapshotGenerator implements MetadataPublisher { @@ -205,8 +205,7 @@ public class SnapshotGenerator implements MetadataPublisher {
MetadataImage newImage,
SnapshotManifest manifest
) {
log.debug("Resetting the snapshot counters because we just read a snapshot at offset {}.",
newImage.provenance().offset());
log.debug("Resetting the snapshot counters because we just read {}.", newImage.provenance().snapshotName());
resetSnapshotCounters();
}

20
metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java

@ -39,12 +39,12 @@ import org.junit.jupiter.api.Timeout; @@ -39,12 +39,12 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
@ -154,13 +154,11 @@ public class MetadataLoaderTest { @@ -154,13 +154,11 @@ public class MetadataLoaderTest {
MetadataProvenance provenance,
List<List<ApiMessageAndVersion>> lists
) {
List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
lists.forEach(records -> batches.add(Batch.data(
provenance.offset(),
provenance.epoch(),
provenance.lastContainedLogTimeMs(),
0,
records)));
List<Batch<ApiMessageAndVersion>> batches = lists
.stream()
.map(records -> Batch.data(0, 0, 0, 0, records))
.collect(Collectors.toList());
return new MockSnapshotReader(provenance, batches);
}
@ -179,17 +177,17 @@ public class MetadataLoaderTest { @@ -179,17 +177,17 @@ public class MetadataLoaderTest {
@Override
public OffsetAndEpoch snapshotId() {
return provenance.offsetAndEpoch();
return provenance.snapshotId();
}
@Override
public long lastContainedLogOffset() {
return provenance.offset();
return provenance.lastContainedOffset();
}
@Override
public int lastContainedLogEpoch() {
return provenance.epoch();
return provenance.lastContainedEpoch();
}
@Override

4
metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java

@ -142,7 +142,7 @@ public class SnapshotEmitterTest { @@ -142,7 +142,7 @@ public class SnapshotEmitterTest {
@Override
public long lastContainedLogOffset() {
return snapshotId.offset();
return snapshotId.offset() - 1;
}
@Override
@ -190,7 +190,7 @@ public class SnapshotEmitterTest { @@ -190,7 +190,7 @@ public class SnapshotEmitterTest {
build();
emitter.maybeEmit(MetadataImageTest.IMAGE1);
MockSnapshotWriter writer = mockRaftClient.writers.get(
MetadataImageTest.IMAGE1.highestOffsetAndEpoch());
MetadataImageTest.IMAGE1.provenance().snapshotId());
assertNotNull(writer);
assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().offset(),
writer.lastContainedLogOffset());

2
raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java

@ -52,7 +52,7 @@ public final class Snapshots { @@ -52,7 +52,7 @@ public final class Snapshots {
return logDir;
}
static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) {
public static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) {
return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset()), EPOCH_FORMATTER.format(snapshotId.epoch()));
}

Loading…
Cancel
Save