@ -479,7 +479,7 @@ public class RemoteIndexCache implements Closeable {
@@ -479,7 +479,7 @@ public class RemoteIndexCache implements Closeable {
// underlying files of the index) isn't performed while a read is in-progress for the entry. This is required in
// addition to using the thread safe cache because, while the thread safety of the cache ensures that we can read
// entries concurrently, it does not ensure that we won't mutate underlying files belonging to an entry.
private final ReentrantReadWriteLock l ock = new ReentrantReadWriteLock ( ) ;
private final ReentrantReadWriteLock entryL ock = new ReentrantReadWriteLock ( ) ;
private boolean cleanStarted = false ;
@ -524,41 +524,41 @@ public class RemoteIndexCache implements Closeable {
@@ -524,41 +524,41 @@ public class RemoteIndexCache implements Closeable {
}
private long estimatedEntrySize ( ) {
l ock. readLock ( ) . lock ( ) ;
entryL ock. readLock ( ) . lock ( ) ;
try {
return offsetIndex . sizeInBytes ( ) + timeIndex . sizeInBytes ( ) + Files . size ( txnIndex . file ( ) . toPath ( ) ) ;
} catch ( IOException e ) {
log . warn ( "Error occurred when estimating remote index cache entry bytes size, just set 0 firstly." , e ) ;
return 0L ;
} finally {
l ock. readLock ( ) . unlock ( ) ;
entryL ock. readLock ( ) . unlock ( ) ;
}
}
public OffsetPosition lookupOffset ( long targetOffset ) {
l ock. readLock ( ) . lock ( ) ;
entryL ock. readLock ( ) . lock ( ) ;
try {
if ( markedForCleanup ) throw new IllegalStateException ( "This entry is marked for cleanup" ) ;
else return offsetIndex . lookup ( targetOffset ) ;
} finally {
l ock. readLock ( ) . unlock ( ) ;
entryL ock. readLock ( ) . unlock ( ) ;
}
}
public OffsetPosition lookupTimestamp ( long timestamp , long startingOffset ) throws IOException {
l ock. readLock ( ) . lock ( ) ;
entryL ock. readLock ( ) . lock ( ) ;
try {
if ( markedForCleanup ) throw new IllegalStateException ( "This entry is marked for cleanup" ) ;
TimestampOffset timestampOffset = timeIndex . lookup ( timestamp ) ;
return offsetIndex . lookup ( Math . max ( startingOffset , timestampOffset . offset ) ) ;
} finally {
l ock. readLock ( ) . unlock ( ) ;
entryL ock. readLock ( ) . unlock ( ) ;
}
}
public void markForCleanup ( ) throws IOException {
l ock. writeLock ( ) . lock ( ) ;
entryL ock. writeLock ( ) . lock ( ) ;
try {
if ( ! markedForCleanup ) {
markedForCleanup = true ;
@ -567,12 +567,12 @@ public class RemoteIndexCache implements Closeable {
@@ -567,12 +567,12 @@ public class RemoteIndexCache implements Closeable {
txnIndex . renameTo ( new File ( Utils . replaceSuffix ( txnIndex . file ( ) . getPath ( ) , "" , LogFileUtils . DELETED_FILE_SUFFIX ) ) ) ;
}
} finally {
l ock. writeLock ( ) . unlock ( ) ;
entryL ock. writeLock ( ) . unlock ( ) ;
}
}
public void cleanup ( ) throws IOException {
l ock. writeLock ( ) . lock ( ) ;
entryL ock. writeLock ( ) . lock ( ) ;
try {
markForCleanup ( ) ;
// no-op if clean is done already
@ -593,19 +593,19 @@ public class RemoteIndexCache implements Closeable {
@@ -593,19 +593,19 @@ public class RemoteIndexCache implements Closeable {
tryAll ( actions ) ;
}
} finally {
l ock. writeLock ( ) . unlock ( ) ;
entryL ock. writeLock ( ) . unlock ( ) ;
}
}
@Override
public void close ( ) {
l ock. writeLock ( ) . lock ( ) ;
entryL ock. writeLock ( ) . lock ( ) ;
try {
Utils . closeQuietly ( offsetIndex , "OffsetIndex" ) ;
Utils . closeQuietly ( timeIndex , "TimeIndex" ) ;
Utils . closeQuietly ( txnIndex , "TransactionIndex" ) ;
} finally {
l ock. writeLock ( ) . unlock ( ) ;
entryL ock. writeLock ( ) . unlock ( ) ;
}
}