diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index aa482eca7b6..8a3cd96fb2f 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -265,7 +265,7 @@ case class FetchSession(val id: Int, ", privileged=" + privileged + ", partitionMap.size=" + partitionMap.size + ", creationMs=" + creationMs + - ", creationMs=" + lastUsedMs + + ", lastUsedMs=" + lastUsedMs + ", epoch=" + epoch + ")" } } @@ -781,11 +781,7 @@ class FetchManager(private val time: Time, cache.remove(session) new SessionlessFetchContext(fetchData) } else { - if (session.size != session.cachedSize) { - // If the number of partitions in the session changed, update the session's - // position in the cache. - cache.touch(session, session.lastUsedMs) - } + cache.touch(session, time.milliseconds()) session.epoch = JFetchMetadata.nextEpoch(session.epoch) debug(s"Created a new incremental FetchContext for session id ${session.id}, " + s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " + diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index b66324e169b..ae852fb4b69 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -289,6 +289,200 @@ class FetchSessionTest { assertTrue(resp2.sessionId > 0) } + @Test + def testFetchSessionExpiration(): Unit = { + val time = new MockTime() + // set maximum entries to 2 to allow for eviction later + val cache = new FetchSessionCache(2, 1000) + val fetchManager = new FetchManager(time, cache) + + // Create a new fetch session, session 1 + val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100, + Optional.empty())) + val session1context1 = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false) + assertEquals(classOf[FullFetchContext], session1context1.getClass) + val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session1resp = session1context1.updateAndGenerateResponseData(respData1) + assertEquals(Errors.NONE, session1resp.error()) + assertTrue(session1resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session1resp.responseData().size()) + + // check session entered into case + assertTrue(cache.get(session1resp.sessionId()).isDefined) + time.sleep(500) + + // Create a second new fetch session + val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100, + Optional.empty())) + val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false) + assertEquals(classOf[FullFetchContext], session2context.getClass) + val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session2resp = session2context.updateAndGenerateResponseData(respData1) + assertEquals(Errors.NONE, session2resp.error()) + assertTrue(session2resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session2resp.responseData().size()) + + // both newly created entries are present in cache + assertTrue(cache.get(session1resp.sessionId()).isDefined) + assertTrue(cache.get(session2resp.sessionId()).isDefined) + time.sleep(500) + + // Create an incremental fetch request for session 1 + val context1v2 = fetchManager.newContext( + new JFetchMetadata(session1resp.sessionId(), 1), + new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData], + new util.ArrayList[TopicPartition], false) + assertEquals(classOf[IncrementalFetchContext], context1v2.getClass) + + // total sleep time will now be large enough that fetch session 1 will be evicted if not correctly touched + time.sleep(501) + + // create one final session to test that the least recently used entry is evicted + // the second session should be evicted because the first session was incrementally fetched + // more recently than the second session was created + val session3req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session3req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session3req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + val session3context = fetchManager.newContext(JFetchMetadata.INITIAL, session3req, EMPTY_PART_LIST, false) + assertEquals(classOf[FullFetchContext], session3context.getClass) + val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + respData3.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + respData3.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session3resp = session3context.updateAndGenerateResponseData(respData3) + assertEquals(Errors.NONE, session3resp.error()) + assertTrue(session3resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session3resp.responseData().size()) + + assertTrue(cache.get(session1resp.sessionId()).isDefined) + assertFalse("session 2 should have been evicted by latest session, as session 1 was used more recently", + cache.get(session2resp.sessionId()).isDefined) + assertTrue(cache.get(session3resp.sessionId()).isDefined) + } + + @Test + def testPrivilegedSessionHandling(): Unit = { + val time = new MockTime() + // set maximum entries to 2 to allow for eviction later + val cache = new FetchSessionCache(2, 1000) + val fetchManager = new FetchManager(time, cache) + + // Create a new fetch session, session 1 + val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100, + Optional.empty())) + val session1context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, true) + assertEquals(classOf[FullFetchContext], session1context.getClass) + val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session1resp = session1context.updateAndGenerateResponseData(respData1) + assertEquals(Errors.NONE, session1resp.error()) + assertTrue(session1resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session1resp.responseData().size()) + assertEquals(1, cache.size) + + // move time forward to age session 1 a little compared to session 2 + time.sleep(500) + + // Create a second new fetch session, unprivileged + val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100, + Optional.empty())) + val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false) + assertEquals(classOf[FullFetchContext], session2context.getClass) + val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session2resp = session2context.updateAndGenerateResponseData(respData1) + assertEquals(Errors.NONE, session2resp.error()) + assertTrue(session2resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session2resp.responseData().size()) + + // both newly created entries are present in cache + assertTrue(cache.get(session1resp.sessionId()).isDefined) + assertTrue(cache.get(session2resp.sessionId()).isDefined) + assertEquals(2, cache.size) + time.sleep(500) + + // create a session to test session1 privileges mean that session 1 is retained and session 2 is evicted + val session3req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session3req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session3req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + val session3context = fetchManager.newContext(JFetchMetadata.INITIAL, session3req, EMPTY_PART_LIST, true) + assertEquals(classOf[FullFetchContext], session3context.getClass) + val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + respData3.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + respData3.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session3resp = session3context.updateAndGenerateResponseData(respData3) + assertEquals(Errors.NONE, session3resp.error()) + assertTrue(session3resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session3resp.responseData().size()) + + assertTrue(cache.get(session1resp.sessionId()).isDefined) + // even though session 2 is more recent than session 1, and has not reached expiry time, it is less + // privileged than session 2, and thus session 3 should be entered and session 2 evicted. + assertFalse("session 2 should have been evicted by session 3", + cache.get(session2resp.sessionId()).isDefined) + assertTrue(cache.get(session3resp.sessionId()).isDefined) + assertEquals(2, cache.size) + + time.sleep(501) + + // create a final session to test whether session1 can be evicted due to age even though it is privileged + val session4req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] + session4req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + session4req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100, + Optional.empty())) + val session4context = fetchManager.newContext(JFetchMetadata.INITIAL, session4req, EMPTY_PART_LIST, true) + assertEquals(classOf[FullFetchContext], session4context.getClass) + val respData4 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]] + respData4.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData( + Errors.NONE, 100, 100, 100, null, null)) + respData4.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData( + Errors.NONE, 10, 10, 10, null, null)) + val session4resp = session3context.updateAndGenerateResponseData(respData4) + assertEquals(Errors.NONE, session4resp.error()) + assertTrue(session4resp.sessionId() != INVALID_SESSION_ID) + assertEquals(2, session4resp.responseData().size()) + + assertFalse("session 1 should have been evicted by session 4 even though it is privileged as it has hit eviction time", + cache.get(session1resp.sessionId()).isDefined) + assertTrue(cache.get(session3resp.sessionId()).isDefined) + assertTrue(cache.get(session4resp.sessionId()).isDefined) + assertEquals(2, cache.size) + } + @Test def testZeroSizeFetchSession(): Unit = { val time = new MockTime()