@ -54,7 +55,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@@ -54,7 +55,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String,
@ -88,25 +92,30 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -88,25 +92,30 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/* the maximum number of entries this index can hold */
defmaxEntries=mmap.limit/8
/**
*Thelastoffsetwrittentotheindex
*/
privatedefreadLastOffset():Long={
valoffset=
size.getmatch{
case0=>0
cases=>relativeOffset(this.mmap,s-1)
}
baseOffset+offset
inLock(lock){
valoffset=
size.getmatch{
case0=>0
cases=>relativeOffset(this.mmap,s-1)
}
baseOffset+offset
}
}
/**
@ -116,12 +125,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -116,12 +125,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@ -167,17 +178,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -167,17 +178,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
*Getthenthoffsetmappingfromtheindex
*/
defentry(n:Int):OffsetPosition={
if(n>=entries)
thrownewIllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n,entries))
require(!isFull,"Attempt to append to a full index (size = "+size+").")
if(size.get==0||offset>lastOffset){
debug("Adding index entry %d => %d to %s.".format(offset,position,file.getName))
@ -186,8 +199,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -186,8 +199,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
this.size.incrementAndGet()
this.lastOffset=offset
require(entries*8==mmap.position,entries+" entries but file position in index is "+mmap.position+".")
}
else{
}else{
thrownewInvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset,entries,lastOffset,file.getName))
}
@ -209,7 +221,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -209,7 +221,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@ -233,9 +245,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -233,9 +245,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
*Truncatesindextoaknownnumberofentries.
*/
privatedeftruncateToEntries(entries:Int){
this.size.set(entries)
mmap.position(this.size.get*8)
this.lastOffset=readLastOffset
inLock(lock){
this.size.set(entries)
mmap.position(this.size.get*8)
this.lastOffset=readLastOffset
}
}
/**
@ -243,7 +257,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -243,7 +257,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
*thefile.
*/
deftrimToValidSize(){
thissynchronized{
inLock(lock){
resize(entries*8)
}
}
@ -255,14 +269,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -255,14 +269,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@ -270,11 +288,23 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -270,11 +288,23 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
caset:Throwable=>warn("Error when freeing index buffer",t)
}
}
/**
*Flushthedataintheindextodisk
*/
defflush(){
thissynchronized{
inLock(lock){
mmap.force()
}
}
@ -300,4 +330,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
@@ -300,4 +330,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =