|
|
|
@ -16,15 +16,6 @@
@@ -16,15 +16,6 @@
|
|
|
|
|
*/ |
|
|
|
|
package org.apache.kafka.streams.state.internals; |
|
|
|
|
|
|
|
|
|
import java.util.Iterator; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Map.Entry; |
|
|
|
|
import java.util.NoSuchElementException; |
|
|
|
|
import java.util.Objects; |
|
|
|
|
import java.util.Set; |
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
import java.util.concurrent.ConcurrentNavigableMap; |
|
|
|
|
import java.util.concurrent.ConcurrentSkipListMap; |
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord; |
|
|
|
|
import org.apache.kafka.common.metrics.Sensor; |
|
|
|
|
import org.apache.kafka.common.utils.Bytes; |
|
|
|
@ -41,6 +32,16 @@ import org.apache.kafka.streams.state.SessionStore;
@@ -41,6 +32,16 @@ import org.apache.kafka.streams.state.SessionStore;
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import java.util.Iterator; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Map.Entry; |
|
|
|
|
import java.util.NoSuchElementException; |
|
|
|
|
import java.util.Objects; |
|
|
|
|
import java.util.Set; |
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
import java.util.concurrent.ConcurrentNavigableMap; |
|
|
|
|
import java.util.concurrent.ConcurrentSkipListMap; |
|
|
|
|
|
|
|
|
|
public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { |
|
|
|
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class); |
|
|
|
@ -155,7 +156,6 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@@ -155,7 +156,6 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Deprecated |
|
|
|
|
@Override |
|
|
|
|
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, |
|
|
|
|
final long earliestSessionEndTime, |
|
|
|
@ -170,7 +170,6 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@@ -170,7 +170,6 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|
|
|
|
endTimeMap.tailMap(earliestSessionEndTime, true).entrySet().iterator()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Deprecated |
|
|
|
|
@Override |
|
|
|
|
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, |
|
|
|
|
final Bytes keyTo, |
|
|
|
@ -259,7 +258,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@@ -259,7 +258,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|
|
|
|
final Bytes keyTo, |
|
|
|
|
final long latestSessionStartTime, |
|
|
|
|
final Iterator<Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator) { |
|
|
|
|
final InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, it -> openIterators.remove(it)); |
|
|
|
|
final InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, openIterators::remove); |
|
|
|
|
openIterators.add(iterator); |
|
|
|
|
return iterator; |
|
|
|
|
} |
|
|
|
|