Browse Source

KAFKA-15479: Remote log segments should be considered once for retention breach (#14407)

When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments less than expected. This is a follow-up of KAFKA-15352

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
pull/14460/head
Kamal Chandraprakash 1 year ago committed by GitHub
parent
commit
a15012078b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 90
      core/src/main/java/kafka/log/remote/RemoteLogManager.java
  2. 367
      core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java

90
core/src/main/java/kafka/log/remote/RemoteLogManager.java

@ -562,7 +562,6 @@ public class RemoteLogManager implements Closeable { @@ -562,7 +562,6 @@ public class RemoteLogManager implements Closeable {
}
cache.truncateFromEnd(endOffset);
}
return checkpoint;
}
@ -707,7 +706,8 @@ public class RemoteLogManager implements Closeable { @@ -707,7 +706,8 @@ public class RemoteLogManager implements Closeable {
}
}
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset)
throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
@ -833,13 +833,11 @@ public class RemoteLogManager implements Closeable { @@ -833,13 +833,11 @@ public class RemoteLogManager implements Closeable {
remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L);
}
private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionSizeData.isPresent()) {
return false;
return shouldDeleteSegment;
}
boolean shouldDeleteSegment = false;
// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
@ -848,7 +846,6 @@ public class RemoteLogManager implements Closeable { @@ -848,7 +846,6 @@ public class RemoteLogManager implements Closeable {
shouldDeleteSegment = true;
}
}
if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
@ -857,12 +854,12 @@ public class RemoteLogManager implements Closeable { @@ -857,12 +854,12 @@ public class RemoteLogManager implements Closeable {
return shouldDeleteSegment;
}
public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionTimeData.isPresent()) {
return false;
return shouldDeleteSegment;
}
boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
@ -874,9 +871,9 @@ public class RemoteLogManager implements Closeable { @@ -874,9 +871,9 @@ public class RemoteLogManager implements Closeable {
return shouldDeleteSegment;
}
private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries) {
private boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries) {
boolean shouldDeleteSegment = false;
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
@ -917,10 +914,8 @@ public class RemoteLogManager implements Closeable { @@ -917,10 +914,8 @@ public class RemoteLogManager implements Closeable {
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
// Delete the segment in remote storage.
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
@ -933,7 +928,7 @@ public class RemoteLogManager implements Closeable { @@ -933,7 +928,7 @@ public class RemoteLogManager implements Closeable {
}
private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
return;
@ -994,13 +989,15 @@ public class RemoteLogManager implements Closeable { @@ -994,13 +989,15 @@ public class RemoteLogManager implements Closeable {
return;
}
RemoteLogSegmentMetadata metadata = segmentsIterator.next();
if (segmentsToDelete.contains(metadata)) {
continue;
}
// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the checkpoint file. It will always return false
// since the checkpoint file was already truncated.
boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!shouldDeleteSegment) {
@ -1008,8 +1005,8 @@ public class RemoteLogManager implements Closeable { @@ -1008,8 +1005,8 @@ public class RemoteLogManager implements Closeable {
isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
shouldDeleteSegment =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
}
}
if (shouldDeleteSegment) {
@ -1019,6 +1016,27 @@ public class RemoteLogManager implements Closeable { @@ -1019,6 +1016,27 @@ public class RemoteLogManager implements Closeable {
}
}
// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}
// Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
// to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
// unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
@ -1041,27 +1059,6 @@ public class RemoteLogManager implements Closeable { @@ -1041,27 +1059,6 @@ public class RemoteLogManager implements Closeable {
}
}
}
// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}
}
private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
@ -1180,7 +1177,12 @@ public class RemoteLogManager implements Closeable { @@ -1180,7 +1177,12 @@ public class RemoteLogManager implements Closeable {
}
}
// segment end offset should be with in the log end offset.
return segmentEndOffset < logEndOffset;
if (segmentEndOffset >= logEndOffset) {
LOGGER.debug("Segment {} end offset {} is more than log end offset {}.",
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, logEndOffset);
return false;
}
return true;
}
/**

367
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java

@ -74,6 +74,7 @@ import org.apache.kafka.test.TestUtils; @@ -74,6 +74,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
@ -105,6 +106,7 @@ import java.util.Set; @@ -105,6 +106,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@ -189,6 +191,7 @@ public class RemoteLogManagerTest { @@ -189,6 +191,7 @@ public class RemoteLogManagerTest {
return epochs;
}
};
private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
private final UnifiedLog mockLog = mock(UnifiedLog.class);
@ -204,7 +207,7 @@ public class RemoteLogManagerTest { @@ -204,7 +207,7 @@ public class RemoteLogManagerTest {
kafka.utils.TestUtils.clearYammerMetrics();
remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
(topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
@ -1259,14 +1262,15 @@ public class RemoteLogManagerTest { @@ -1259,14 +1262,15 @@ public class RemoteLogManagerTest {
segmentEpochs7), logEndOffset, leaderEpochToStartOffset));
// Test a remote segment having larger end offset than the log end offset
TreeMap<Integer, Long> segmentEpochs8 = new TreeMap<>();
segmentEpochs8.put(1, 15L);
segmentEpochs8.put(2, 20L);
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
15,
95, // larger than log end offset
segmentEpochs8), logEndOffset, leaderEpochToStartOffset));
leaderEpochToStartOffset), logEndOffset, leaderEpochToStartOffset));
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
15,
90, // equal to the log end offset
leaderEpochToStartOffset), logEndOffset, leaderEpochToStartOffset));
// Test whether a segment's first offset is earlier to the respective epoch's start offset
TreeMap<Integer, Long> segmentEpochs9 = new TreeMap<>();
@ -1521,188 +1525,212 @@ public class RemoteLogManagerTest { @@ -1521,188 +1525,212 @@ public class RemoteLogManagerTest {
}
}
@Test
public void testDeleteRetentionSizeBreachingSegments() throws RemoteStorageException, IOException {
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(0);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(remoteLogSegmentMetadatas.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenReturn(remoteLogSegmentMetadatas.iterator())
.thenReturn(remoteLogSegmentMetadatas.iterator())
.thenReturn(remoteLogSegmentMetadatas.iterator());
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
@ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments retentionSize={0} retentionMs={1}")
@CsvSource(value = {"0, -1", "-1, 0"})
public void testDeletionOnRetentionBreachedSegments(long retentionSize,
long retentionMs)
throws RemoteStorageException, ExecutionException, InterruptedException {
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", retentionSize);
logProps.put("retention.ms", retentionMs);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", 0L);
logProps.put("retention.ms", -1L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
List<RemoteLogSegmentMetadata> metadataList =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenAnswer(ans -> metadataList.iterator());
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenReturn(CompletableFuture.runAsync(() -> { }));
task.run();
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(0);
task.cleanupExpiredRemoteLogSegments();
assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
}
assertEquals(200L, currentLogStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
@Test
public void testDeleteRetentionMsBreachingSegments() throws RemoteStorageException, IOException {
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(0);
public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, ExecutionException, InterruptedException {
RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
leaderTask.convertToLeader(0);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
List<RemoteLogSegmentMetadata> metadataList =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenAnswer(ans -> metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(remoteLogSegmentMetadatas.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenReturn(remoteLogSegmentMetadatas.iterator())
.thenReturn(remoteLogSegmentMetadatas.iterator());
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", -1L);
logProps.put("retention.ms", 0L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> {
// cancel the task so that we don't delete the second segment
leaderTask.cancel();
return CompletableFuture.runAsync(() -> {
});
});
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", -1L);
logProps.put("retention.ms", 0L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
leaderTask.cleanupExpiredRemoteLogSegments();
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
assertEquals(200L, currentLogStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
verify(remoteStorageManager, never()).deleteLogSegmentData(metadataList.get(1));
task.run();
// test that the 2nd log segment will be deleted by the new leader
RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
newLeaderTask.convertToLeader(1);
assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
}
}
Iterator<RemoteLogSegmentMetadata> firstIterator = metadataList.iterator();
firstIterator.next();
Iterator<RemoteLogSegmentMetadata> secondIterator = metadataList.iterator();
secondIterator.next();
Iterator<RemoteLogSegmentMetadata> thirdIterator = metadataList.iterator();
thirdIterator.next();
@Test
public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, IOException {
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {
RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
leaderTask.convertToLeader(0);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(remoteLogSegmentMetadatas.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenReturn(remoteLogSegmentMetadatas.iterator())
.thenReturn(remoteLogSegmentMetadatas.iterator());
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", -1L);
logProps.put("retention.ms", 0L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> {
// cancel the task so that we don't delete the second segment
leaderTask.cancel();
return CompletableFuture.runAsync(() -> {
});
});
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
.thenReturn(firstIterator);
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0))
.thenReturn(secondIterator)
.thenReturn(thirdIterator);
leaderTask.run();
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
newLeaderTask.cleanupExpiredRemoteLogSegments();
// test that the 2nd log segment will be deleted by the new leader
RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
newLeaderTask.convertToLeader(1);
assertEquals(200L, currentLogStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
Iterator<RemoteLogSegmentMetadata> firstIterator = remoteLogSegmentMetadatas.iterator();
firstIterator.next();
Iterator<RemoteLogSegmentMetadata> secondIterator = remoteLogSegmentMetadatas.iterator();
secondIterator.next();
Iterator<RemoteLogSegmentMetadata> thirdIterator = remoteLogSegmentMetadatas.iterator();
thirdIterator.next();
@ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}")
@CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount,
int deletableSegmentCount)
throws RemoteStorageException, ExecutionException, InterruptedException {
int recordsPerSegment = 100;
int segmentSize = 1024;
List<EpochEntry> epochEntries = Arrays.asList(
new EpochEntry(0, 0L),
new EpochEntry(1, 20L),
new EpochEntry(3, 50L),
new EpochEntry(4, 100L)
);
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch;
long localLogSegmentsSize = 512L;
long retentionSize = ((long) segmentCount - deletableSegmentCount) * segmentSize + localLogSegmentsSize;
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", retentionSize);
logProps.put("retention.ms", -1L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
long localLogStartOffset = (long) segmentCount * recordsPerSegment;
long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1;
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset);
when(mockLog.logEndOffset()).thenReturn(logEndOffset);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
.thenReturn(firstIterator);
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0))
.thenReturn(secondIterator)
.thenReturn(thirdIterator);
List<RemoteLogSegmentMetadata> segmentMetadataList = listRemoteLogSegmentMetadata(
leaderTopicIdPartition, segmentCount, recordsPerSegment, segmentSize, epochEntries);
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
}
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
@ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionTimeBreach segmentCount={0} deletableSegmentCount={1}")
@CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"})
public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount,
int deletableSegmentCount)
throws RemoteStorageException, ExecutionException, InterruptedException {
int recordsPerSegment = 100;
int segmentSize = 1024;
List<EpochEntry> epochEntries = Arrays.asList(
new EpochEntry(0, 0L),
new EpochEntry(1, 20L),
new EpochEntry(3, 50L),
new EpochEntry(4, 100L)
);
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch;
long localLogSegmentsSize = 512L;
long retentionSize = -1L;
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", retentionSize);
logProps.put("retention.ms", 1L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
long localLogStartOffset = (long) segmentCount * recordsPerSegment;
long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1;
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset);
when(mockLog.logEndOffset()).thenReturn(logEndOffset);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize);
newLeaderTask.run();
List<RemoteLogSegmentMetadata> segmentMetadataList = listRemoteLogSegmentMetadataByTime(
leaderTopicIdPartition, segmentCount, deletableSegmentCount, recordsPerSegment, segmentSize, epochEntries);
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
}
assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> segmentMetadataList,
int deletableSegmentCount,
int currentLeaderEpoch)
throws RemoteStorageException, ExecutionException, InterruptedException {
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(segmentMetadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
.thenAnswer(invocation -> {
int leaderEpoch = invocation.getArgument(1);
return segmentMetadataList.stream()
.filter(segmentMetadata -> segmentMetadata.segmentLeaderEpochs().containsKey(leaderEpoch))
.iterator();
});
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(currentLeaderEpoch);
task.cleanupExpiredRemoteLogSegments();
ArgumentCaptor<RemoteLogSegmentMetadata> deletedMetadataCapture = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
verify(remoteStorageManager, times(deletableSegmentCount)).deleteLogSegmentData(deletedMetadataCapture.capture());
if (deletableSegmentCount > 0) {
List<RemoteLogSegmentMetadata> deletedMetadataList = deletedMetadataCapture.getAllValues();
RemoteLogSegmentMetadata expectedEndMetadata = segmentMetadataList.get(deletableSegmentCount - 1);
assertEquals(segmentMetadataList.get(0), deletedMetadataList.get(0));
assertEquals(expectedEndMetadata, deletedMetadataList.get(deletedMetadataList.size() - 1));
assertEquals(currentLogStartOffset.get(), expectedEndMetadata.endOffset() + 1);
}
}
@ -1718,9 +1746,22 @@ public class RemoteLogManagerTest { @@ -1718,9 +1746,22 @@ public class RemoteLogManagerTest {
int recordsPerSegment,
int segmentSize,
List<EpochEntry> epochEntries) {
return listRemoteLogSegmentMetadataByTime(
topicIdPartition, segmentCount, 0, recordsPerSegment, segmentSize, epochEntries);
}
private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition,
int segmentCount,
int deletableSegmentCount,
int recordsPerSegment,
int segmentSize,
List<EpochEntry> epochEntries) {
List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
for (int idx = 0; idx < segmentCount; idx++) {
long timestamp = time.milliseconds();
if (idx < deletableSegmentCount) {
timestamp = time.milliseconds() - 1;
}
long startOffset = (long) idx * recordsPerSegment;
long endOffset = startOffset + recordsPerSegment - 1;
List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ? totalEpochEntries : epochEntries;

Loading…
Cancel
Save