Browse Source

KAFKA-9137: Fix incorrect FetchSessionCache eviction logic (#7640)

Fix a bug where the lastUsedMs value in the FetchSessionCache was not getting correctly updated, resulting in spurious evictions.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
pull/7654/head
Lucas Bradstreet 5 years ago committed by Colin Patrick McCabe
parent
commit
a4cbdc6a7b
  1. 8
      core/src/main/scala/kafka/server/FetchSession.scala
  2. 194
      core/src/test/scala/unit/kafka/server/FetchSessionTest.scala

8
core/src/main/scala/kafka/server/FetchSession.scala

@ -265,7 +265,7 @@ case class FetchSession(val id: Int, @@ -265,7 +265,7 @@ case class FetchSession(val id: Int,
", privileged=" + privileged +
", partitionMap.size=" + partitionMap.size +
", creationMs=" + creationMs +
", creationMs=" + lastUsedMs +
", lastUsedMs=" + lastUsedMs +
", epoch=" + epoch + ")"
}
}
@ -781,11 +781,7 @@ class FetchManager(private val time: Time, @@ -781,11 +781,7 @@ class FetchManager(private val time: Time,
cache.remove(session)
new SessionlessFetchContext(fetchData)
} else {
if (session.size != session.cachedSize) {
// If the number of partitions in the session changed, update the session's
// position in the cache.
cache.touch(session, session.lastUsedMs)
}
cache.touch(session, time.milliseconds())
session.epoch = JFetchMetadata.nextEpoch(session.epoch)
debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +

194
core/src/test/scala/unit/kafka/server/FetchSessionTest.scala

@ -289,6 +289,200 @@ class FetchSessionTest { @@ -289,6 +289,200 @@ class FetchSessionTest {
assertTrue(resp2.sessionId > 0)
}
@Test
def testFetchSessionExpiration(): Unit = {
val time = new MockTime()
// set maximum entries to 2 to allow for eviction later
val cache = new FetchSessionCache(2, 1000)
val fetchManager = new FetchManager(time, cache)
// Create a new fetch session, session 1
val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session1context1 = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session1context1.getClass)
val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session1resp = session1context1.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session1resp.error())
assertTrue(session1resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session1resp.responseData().size())
// check session entered into case
assertTrue(cache.get(session1resp.sessionId()).isDefined)
time.sleep(500)
// Create a second new fetch session
val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session2context.getClass)
val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session2resp = session2context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session2resp.error())
assertTrue(session2resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session2resp.responseData().size())
// both newly created entries are present in cache
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session2resp.sessionId()).isDefined)
time.sleep(500)
// Create an incremental fetch request for session 1
val context1v2 = fetchManager.newContext(
new JFetchMetadata(session1resp.sessionId(), 1),
new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
new util.ArrayList[TopicPartition], false)
assertEquals(classOf[IncrementalFetchContext], context1v2.getClass)
// total sleep time will now be large enough that fetch session 1 will be evicted if not correctly touched
time.sleep(501)
// create one final session to test that the least recently used entry is evicted
// the second session should be evicted because the first session was incrementally fetched
// more recently than the second session was created
val session3req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session3req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session3req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
val session3context = fetchManager.newContext(JFetchMetadata.INITIAL, session3req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session3context.getClass)
val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData3.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData3.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session3resp = session3context.updateAndGenerateResponseData(respData3)
assertEquals(Errors.NONE, session3resp.error())
assertTrue(session3resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session3resp.responseData().size())
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertFalse("session 2 should have been evicted by latest session, as session 1 was used more recently",
cache.get(session2resp.sessionId()).isDefined)
assertTrue(cache.get(session3resp.sessionId()).isDefined)
}
@Test
def testPrivilegedSessionHandling(): Unit = {
val time = new MockTime()
// set maximum entries to 2 to allow for eviction later
val cache = new FetchSessionCache(2, 1000)
val fetchManager = new FetchManager(time, cache)
// Create a new fetch session, session 1
val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session1context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, true)
assertEquals(classOf[FullFetchContext], session1context.getClass)
val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session1resp = session1context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session1resp.error())
assertTrue(session1resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session1resp.responseData().size())
assertEquals(1, cache.size)
// move time forward to age session 1 a little compared to session 2
time.sleep(500)
// Create a second new fetch session, unprivileged
val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session2context.getClass)
val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session2resp = session2context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session2resp.error())
assertTrue(session2resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session2resp.responseData().size())
// both newly created entries are present in cache
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session2resp.sessionId()).isDefined)
assertEquals(2, cache.size)
time.sleep(500)
// create a session to test session1 privileges mean that session 1 is retained and session 2 is evicted
val session3req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session3req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session3req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
val session3context = fetchManager.newContext(JFetchMetadata.INITIAL, session3req, EMPTY_PART_LIST, true)
assertEquals(classOf[FullFetchContext], session3context.getClass)
val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData3.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData3.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session3resp = session3context.updateAndGenerateResponseData(respData3)
assertEquals(Errors.NONE, session3resp.error())
assertTrue(session3resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session3resp.responseData().size())
assertTrue(cache.get(session1resp.sessionId()).isDefined)
// even though session 2 is more recent than session 1, and has not reached expiry time, it is less
// privileged than session 2, and thus session 3 should be entered and session 2 evicted.
assertFalse("session 2 should have been evicted by session 3",
cache.get(session2resp.sessionId()).isDefined)
assertTrue(cache.get(session3resp.sessionId()).isDefined)
assertEquals(2, cache.size)
time.sleep(501)
// create a final session to test whether session1 can be evicted due to age even though it is privileged
val session4req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session4req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session4req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
val session4context = fetchManager.newContext(JFetchMetadata.INITIAL, session4req, EMPTY_PART_LIST, true)
assertEquals(classOf[FullFetchContext], session4context.getClass)
val respData4 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData4.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData4.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session4resp = session3context.updateAndGenerateResponseData(respData4)
assertEquals(Errors.NONE, session4resp.error())
assertTrue(session4resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session4resp.responseData().size())
assertFalse("session 1 should have been evicted by session 4 even though it is privileged as it has hit eviction time",
cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session3resp.sessionId()).isDefined)
assertTrue(cache.get(session4resp.sessionId()).isDefined)
assertEquals(2, cache.size)
}
@Test
def testZeroSizeFetchSession(): Unit = {
val time = new MockTime()

Loading…
Cancel
Save