|
|
|
@ -661,12 +661,12 @@ public class RemoteLogManagerTest {
@@ -661,12 +661,12 @@ public class RemoteLogManagerTest {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private double yammerMetricValue(String name) { |
|
|
|
|
Gauge<Double> guage = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() |
|
|
|
|
Gauge<Double> gauge = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() |
|
|
|
|
.filter(e -> e.getKey().getMBeanName().contains(name)) |
|
|
|
|
.findFirst() |
|
|
|
|
.get() |
|
|
|
|
.getValue(); |
|
|
|
|
return guage.value(); |
|
|
|
|
return gauge.value(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@ -885,7 +885,7 @@ public class RemoteLogManagerTest {
@@ -885,7 +885,7 @@ public class RemoteLogManagerTest {
|
|
|
|
|
public RemoteStorageManager createRemoteStorageManager() { |
|
|
|
|
return rsmManager; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
) { |
|
|
|
|
assertEquals(rsmManager, remoteLogManager.storageManager()); |
|
|
|
|
} |
|
|
|
@ -1057,12 +1057,12 @@ public class RemoteLogManagerTest {
@@ -1057,12 +1057,12 @@ public class RemoteLogManagerTest {
|
|
|
|
|
txnIdxFile.createNewFile(); |
|
|
|
|
when(remoteStorageManager.fetchIndex(any(RemoteLogSegmentMetadata.class), any(IndexType.class))) |
|
|
|
|
.thenAnswer(ans -> { |
|
|
|
|
RemoteLogSegmentMetadata metadata = ans.<RemoteLogSegmentMetadata>getArgument(0); |
|
|
|
|
IndexType indexType = ans.<IndexType>getArgument(1); |
|
|
|
|
RemoteLogSegmentMetadata metadata = ans.getArgument(0); |
|
|
|
|
IndexType indexType = ans.getArgument(1); |
|
|
|
|
int maxEntries = (int) (metadata.endOffset() - metadata.startOffset()); |
|
|
|
|
OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix()), |
|
|
|
|
OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.IndexFileSuffix()), |
|
|
|
|
metadata.startOffset(), maxEntries * 8); |
|
|
|
|
TimeIndex timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix()), |
|
|
|
|
TimeIndex timeIdx = new TimeIndex(new File(tpDir, metadata.startOffset() + UnifiedLog.TimeIndexFileSuffix()), |
|
|
|
|
metadata.startOffset(), maxEntries * 12); |
|
|
|
|
switch (indexType) { |
|
|
|
|
case OFFSET: |
|
|
|
@ -1241,7 +1241,7 @@ public class RemoteLogManagerTest {
@@ -1241,7 +1241,7 @@ public class RemoteLogManagerTest {
|
|
|
|
|
segmentEpochs5), logEndOffset, leaderEpochToStartOffset)); |
|
|
|
|
|
|
|
|
|
// Test whether any of the epoch's is not with in the leader epoch chain.
|
|
|
|
|
TreeMap<Integer, Long> segmentEpochs6 = new TreeMap<Integer, Long>(); |
|
|
|
|
TreeMap<Integer, Long> segmentEpochs6 = new TreeMap<>(); |
|
|
|
|
segmentEpochs6.put(5, 55L); |
|
|
|
|
segmentEpochs6.put(6, 60L); // epoch 6 exists here but it is missing in leaderEpochToStartOffset
|
|
|
|
|
segmentEpochs6.put(7, 70L); |
|
|
|
@ -1834,7 +1834,7 @@ public class RemoteLogManagerTest {
@@ -1834,7 +1834,7 @@ public class RemoteLogManagerTest {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is the key scenario that we are testing here
|
|
|
|
|
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { |
|
|
|
|
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
}) { |
|
|
|
@ -1900,7 +1900,7 @@ public class RemoteLogManagerTest {
@@ -1900,7 +1900,7 @@ public class RemoteLogManagerTest {
|
|
|
|
|
return 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { |
|
|
|
|
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { |
|
|
|
|
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes); |
|
|
|
|
doNothing().when(firstBatch).writeTo(capture.capture()); |
|
|
|
|
return firstBatch; |
|
|
|
|