@ -17,6 +17,8 @@
@@ -17,6 +17,8 @@
package org.apache.kafka.tiered.storage.utils ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.record.FileLogInputStream ;
import org.apache.kafka.common.record.FileRecords ;
import org.apache.kafka.common.utils.Time ;
import org.apache.kafka.common.utils.Timer ;
import org.apache.kafka.common.utils.Utils ;
@ -67,12 +69,14 @@ public final class BrokerLocalStorage {
@@ -67,12 +69,14 @@ public final class BrokerLocalStorage {
public void waitForEarliestLocalOffset ( TopicPartition topicPartition ,
Long offset ) {
Function < OffsetHolder , Optional < String > > relativePosFunc = offsetHolder - > {
if ( offsetHolder . firstLogFileBaseOffset < offset ) {
return Optional . of ( "smaller than" ) ;
Optional < String > result = Optional . empty ( ) ;
if ( offsetHolder . firstLogFileBaseOffset < offset & &
! isOffsetPresentInFirstLocalSegment ( topicPartition , offsetHolder . firstLogFileBaseOffset , offset ) ) {
result = Optional . of ( "smaller than" ) ;
} else if ( offsetHolder . firstLogFileBaseOffset > offset ) {
return Optional . of ( "ahead of" ) ;
result = Optional . of ( "ahead of" ) ;
}
return Optional . empty ( ) ;
return result ;
} ;
waitForOffset ( topicPartition , offset , relativePosFunc ) ;
}
@ -90,10 +94,12 @@ public final class BrokerLocalStorage {
@@ -90,10 +94,12 @@ public final class BrokerLocalStorage {
public void waitForAtLeastEarliestLocalOffset ( TopicPartition topicPartition ,
Long offset ) {
Function < OffsetHolder , Optional < String > > relativePosFunc = offsetHolder - > {
if ( offsetHolder . firstLogFileBaseOffset < offset ) {
return Optional . of ( "smaller than" ) ;
Optional < String > result = Optional . empty ( ) ;
if ( offsetHolder . firstLogFileBaseOffset < offset & &
! isOffsetPresentInFirstLocalSegment ( topicPartition , offsetHolder . firstLogFileBaseOffset , offset ) ) {
result = Optional . of ( "smaller than" ) ;
}
return Optional . empty ( ) ;
return result ;
} ;
waitForOffset ( topicPartition , offset , relativePosFunc ) ;
}
@ -119,6 +125,37 @@ public final class BrokerLocalStorage {
@@ -119,6 +125,37 @@ public final class BrokerLocalStorage {
}
}
/ * *
* Check if the given offset is present in the first local segment of the given topic - partition .
* @param topicPartition The topic - partition to check .
* @param firstLogFileBaseOffset The base offset of the first local segment .
* @param offsetToSearch The offset to search .
* @return true if the offset is present in the first local segment , false otherwise .
* /
private boolean isOffsetPresentInFirstLocalSegment ( TopicPartition topicPartition ,
Long firstLogFileBaseOffset ,
Long offsetToSearch ) {
if ( offsetToSearch < firstLogFileBaseOffset ) {
return false ;
}
if ( offsetToSearch . equals ( firstLogFileBaseOffset ) ) {
return true ;
}
File partitionDir = new File ( brokerStorageDirectory . getAbsolutePath ( ) , topicPartition . toString ( ) ) ;
File firstSegmentFile = new File ( partitionDir . getAbsolutePath ( ) ,
LogFileUtils . filenamePrefixFromOffset ( firstLogFileBaseOffset ) + LogFileUtils . LOG_FILE_SUFFIX ) ;
try ( FileRecords fileRecords = FileRecords . open ( firstSegmentFile , false ) ) {
for ( FileLogInputStream . FileChannelRecordBatch batch : fileRecords . batches ( ) ) {
if ( batch . baseOffset ( ) < = offsetToSearch & & batch . lastOffset ( ) > = offsetToSearch ) {
return true ;
}
}
} catch ( final IOException ex ) {
return false ;
}
return false ;
}
public void eraseStorage ( ) throws IOException {
for ( File file : Objects . requireNonNull ( brokerStorageDirectory . listFiles ( ) ) ) {
Utils . delete ( file ) ;