diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index e653802c571..ec9d55f89ac 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -44,9 +44,67 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon // Length of the index file @volatile private var _length: Long = _ - protected def entrySize: Int + /* + Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This + avoids blocked disk I/O in most cases. + + To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page + cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync + followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very + well with Kafka's index access pattern. + + However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary + page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not + cached in the page cache). + + For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search + algorithm will read index entries in page #0, 6, 9, 11, and 12. + page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 | + steps: |1| | | | | |3| | |4| |5 |2/6| + In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the + index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append) + operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages + are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be + in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12, + and 13: + page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 | + steps: |1| | | | | | |3| | | 4|5 | 6|2/7| + Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than + the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7 + and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the + at-least-once produce latency to jump to about 1 second from a few ms. + + Here, we use a more cache-friendly lookup algorithm: + if (target > indexEntry[end - N]) // if the target is in the last N entries of the index + binarySearch(end - N, end) + else + binarySearch(begin, end - N) + + If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync + lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this + relatively small section, the pages containing this section are more likely to be in the page cache. + + We set N (_warmEntries) to 8192, because + 1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section + lookup. So that, the entire warm section is really "warm". + When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N), + and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we + touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS, + SPARC, Power, ARM etc.). + 2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka + settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages. + + We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm" + section pages are really warm (touched in every lookup) on a typical 4KB-page host. + + In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can + 1) support larger warm section + 2) make sure the warm section of low QPS topic-partitions are really warm. + */ + protected def _warmEntries: Int = 8192 / entrySize + protected val lock = new ReentrantLock @volatile @@ -311,26 +369,35 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon if(_entries == 0) return (-1, -1) + def binarySearch(begin: Int, end: Int) : (Int, Int) = { + // binary search for the entry + var lo = begin + var hi = end + while(lo < hi) { + val mid = ceil(hi/2.0 + lo/2.0).toInt + val found = parseEntry(idx, mid) + val compareResult = compareIndexEntry(found, target, searchEntity) + if(compareResult > 0) + hi = mid - 1 + else if(compareResult < 0) + lo = mid + else + return (mid, mid) + } + (lo, if (lo == _entries - 1) -1 else lo + 1) + } + + val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries) + // check if the target offset is in the warm section of the index + if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { + return binarySearch(firstHotEntry, _entries - 1) + } + // check if the target offset is smaller than the least offset if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) - // binary search for the entry - var lo = 0 - var hi = _entries - 1 - while(lo < hi) { - val mid = ceil(hi/2.0 + lo/2.0).toInt - val found = parseEntry(idx, mid) - val compareResult = compareIndexEntry(found, target, searchEntity) - if(compareResult > 0) - hi = mid - 1 - else if(compareResult < 0) - lo = mid - else - return (mid, mid) - } - - (lo, if (lo == _entries - 1) -1 else lo + 1) + return binarySearch(0, firstHotEntry) } private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {