From ef62dd3ef2f88d990bff21e3ff825c992e65fa10 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 25 Jan 2019 23:56:39 -0800 Subject: [PATCH] KAFKA-3522: Generalize Segments (#6170) Reviewers: Bill Bejeck , John Roesler , Guozhang Wang --- .../state/internals/AbstractSegments.java | 229 ++++++++++++++++++ .../state/internals/CachingWindowStore.java | 23 +- .../state/internals/KeyValueSegment.java | 68 ++++++ .../state/internals/KeyValueSegments.java | 45 ++++ .../internals/RocksDBSegmentedBytesStore.java | 63 +++-- .../streams/state/internals/Segment.java | 49 +--- .../state/internals/SegmentIterator.java | 13 +- .../state/internals/SegmentedBytesStore.java | 4 +- .../streams/state/internals/Segments.java | 219 +---------------- .../state/internals/SessionKeySchema.java | 40 ++- .../state/internals/WindowKeySchema.java | 59 ++--- ....java => KeyValueSegmentIteratorTest.java} | 36 +-- ...ntsTest.java => KeyValueSegmentsTest.java} | 44 ++-- .../RocksDBSegmentedBytesStoreTest.java | 18 +- .../internals/RocksDBWindowStoreTest.java | 2 +- .../state/internals/WindowKeySchemaTest.java | 8 +- 16 files changed, 511 insertions(+), 409 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java rename streams/src/test/java/org/apache/kafka/streams/state/internals/{SegmentIteratorTest.java => KeyValueSegmentIteratorTest.java} (86%) rename streams/src/test/java/org/apache/kafka/streams/state/internals/{SegmentsTest.java => KeyValueSegmentsTest.java} (86%) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java new file mode 100644 index 00000000000..4d60b08a5fd --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.SimpleTimeZone; +import java.util.TreeMap; + +abstract class AbstractSegments implements Segments { + private static final Logger log = LoggerFactory.getLogger(AbstractSegments.class); + + final TreeMap segments = new TreeMap<>(); + final String name; + private final long retentionPeriod; + private final long segmentInterval; + private final SimpleDateFormat formatter; + + AbstractSegments(final String name, final long retentionPeriod, final long segmentInterval) { + this.name = name; + this.segmentInterval = segmentInterval; + this.retentionPeriod = retentionPeriod; + // Create a date formatter. Formatted timestamps are used as segment name suffixes + this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); + this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC")); + } + + @Override + public long segmentId(final long timestamp) { + return timestamp / segmentInterval; + } + + @Override + public String segmentName(final long segmentId) { + // (1) previous format used - as a separator so if this changes in the future + // then we should use something different. + // (2) previous format used : as a separator (which did break KafkaStreams on Windows OS) + // so if this changes in the future then we should use something different. + return name + "." + segmentId * segmentInterval; + } + + @Override + public S getSegmentForTimestamp(final long timestamp) { + return segments.get(segmentId(timestamp)); + } + + @Override + public S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) { + final long minLiveTimestamp = context.streamTime() - retentionPeriod; + final long minLiveSegment = segmentId(minLiveTimestamp); + + final S toReturn; + if (segmentId >= minLiveSegment) { + // The segment is live. get it, ensure it's open, and return it. + toReturn = getOrCreateSegment(segmentId, context); + } else { + toReturn = null; + } + + cleanupEarlierThan(minLiveSegment); + return toReturn; + } + + @Override + public void openExisting(final InternalProcessorContext context) { + try { + final File dir = new File(context.stateDir(), name); + if (dir.exists()) { + final String[] list = dir.list(); + if (list != null) { + final long[] segmentIds = new long[list.length]; + for (int i = 0; i < list.length; i++) { + segmentIds[i] = segmentIdFromSegmentName(list[i], dir); + } + + // open segments in the id order + Arrays.sort(segmentIds); + for (final long segmentId : segmentIds) { + if (segmentId >= 0) { + getOrCreateSegment(segmentId, context); + } + } + } + } else { + if (!dir.mkdir()) { + throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name)); + } + } + } catch (final Exception ex) { + // ignore + } + + final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod); + cleanupEarlierThan(minLiveSegment); + } + + @Override + public List segments(final long timeFrom, final long timeTo) { + final List result = new ArrayList<>(); + final NavigableMap segmentsInRange = segments.subMap( + segmentId(timeFrom), true, + segmentId(timeTo), true + ); + for (final S segment : segmentsInRange.values()) { + if (segment.isOpen()) { + result.add(segment); + } + } + return result; + } + + @Override + public List allSegments() { + final List result = new ArrayList<>(); + for (final S segment : segments.values()) { + if (segment.isOpen()) { + result.add(segment); + } + } + return result; + } + + @Override + public void flush() { + for (final S segment : segments.values()) { + segment.flush(); + } + } + + @Override + public void close() { + for (final S segment : segments.values()) { + segment.close(); + } + segments.clear(); + } + + private void cleanupEarlierThan(final long minLiveSegment) { + final Iterator> toRemove = + segments.headMap(minLiveSegment, false).entrySet().iterator(); + + while (toRemove.hasNext()) { + final Map.Entry next = toRemove.next(); + toRemove.remove(); + final S segment = next.getValue(); + segment.close(); + try { + segment.destroy(); + } catch (final IOException e) { + log.error("Error destroying {}", segment, e); + } + } + } + + private long segmentIdFromSegmentName(final String segmentName, + final File parent) { + final int segmentSeparatorIndex = name.length(); + final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex); + final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1); + final long segmentId; + + // old style segment name with date + if (segmentSeparator == '-') { + try { + segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval; + } catch (final ParseException e) { + log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName); + return -1L; + } + renameSegmentFile(parent, segmentName, segmentId); + } else { + // for both new formats (with : or .) parse segment ID identically + try { + segmentId = Long.parseLong(segmentIdString) / segmentInterval; + } catch (final NumberFormatException e) { + throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName); + } + + // intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with . + if (segmentSeparator == ':') { + renameSegmentFile(parent, segmentName, segmentId); + } + } + + return segmentId; + + } + + private void renameSegmentFile(final File parent, + final String segmentName, + final long segmentId) { + final File newName = new File(parent, segmentName(segmentId)); + final File oldName = new File(parent, segmentName); + if (!oldName.renameTo(newName)) { + throw new ProcessorStateException("Unable to rename old style segment from: " + + oldName + + " to new name: " + + newName); + } + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index afe9b343230..6112544fbd8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -30,8 +30,6 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.List; - class CachingWindowStore extends WrappedStateStore.AbstractStateStore implements WindowStore, CachedStateStore, V> { private final WindowStore underlying; @@ -84,18 +82,15 @@ class CachingWindowStore extends WrappedStateStore.AbstractStateStore impl name = context.taskId() + "-" + underlying.name(); cache = this.context.getCache(); - cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() { - @Override - public void apply(final List entries) { - for (final ThreadCache.DirtyEntry entry : entries) { - final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get(); - final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey); - - final Windowed windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, serdes.keyDeserializer(), serdes.topic()); - final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey)); - maybeForward(entry, key, windowedKey, (InternalProcessorContext) context); - underlying.put(key, entry.newValue(), timestamp); - } + cache.addDirtyEntryFlushListener(name, entries -> { + for (final ThreadCache.DirtyEntry entry : entries) { + final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get(); + final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey); + + final Windowed windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, serdes.keyDeserializer(), serdes.topic()); + final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey)); + maybeForward(entry, key, windowedKey, (InternalProcessorContext) context); + underlying.put(key, entry.newValue(), timestamp); } }); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java new file mode 100644 index 00000000000..697b67af434 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.processor.ProcessorContext; + +import java.io.IOException; +import java.util.Objects; + +class KeyValueSegment extends RocksDBStore implements Comparable, Segment { + public final long id; + + KeyValueSegment(final String segmentName, final String windowName, final long id) { + super(segmentName, windowName); + this.id = id; + } + + @Override + public void destroy() throws IOException { + Utils.delete(dbDir); + } + + @Override + public int compareTo(final KeyValueSegment segment) { + return Long.compare(id, segment.id); + } + + @Override + public void openDB(final ProcessorContext context) { + super.openDB(context); + // skip the registering step + internalProcessorContext = context; + } + + @Override + public String toString() { + return "KeyValueSegment(id=" + id + ", name=" + name() + ")"; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final KeyValueSegment segment = (KeyValueSegment) obj; + return id == segment.id; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java new file mode 100644 index 00000000000..0664551bb8d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; + +/** + * Manages the {@link KeyValueSegment}s that are used by the {@link RocksDBSegmentedBytesStore} + */ +class KeyValueSegments extends AbstractSegments { + + KeyValueSegments(final String name, final long retentionPeriod, final long segmentInterval) { + super(name, retentionPeriod, segmentInterval); + } + + @Override + public KeyValueSegment getOrCreateSegment(final long segmentId, final InternalProcessorContext context) { + if (segments.containsKey(segmentId)) { + return segments.get(segmentId); + } else { + final KeyValueSegment newSegment = new KeyValueSegment(segmentName(segmentId), name, segmentId); + + if (segments.put(segmentId, newSegment) != null) { + throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access."); + } + + newSegment.openDB(context); + return newSegment; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 17079b964b5..79e6b95f361 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -41,15 +41,15 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; -class RocksDBSegmentedBytesStore implements SegmentedBytesStore { +public class RocksDBSegmentedBytesStore implements SegmentedBytesStore { private static final Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class); private final String name; - private final Segments segments; + private final KeyValueSegments segments; private final String metricScope; private final KeySchema keySchema; private InternalProcessorContext context; private volatile boolean open; - private Set bulkLoadSegments; + private Set bulkLoadSegments; private Sensor expiredRecordSensor; RocksDBSegmentedBytesStore(final String name, @@ -60,55 +60,54 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { this.name = name; this.metricScope = metricScope; this.keySchema = keySchema; - this.segments = new Segments(name, retention, segmentInterval); + this.segments = new KeyValueSegments(name, retention, segmentInterval); } @Override public KeyValueIterator fetch(final Bytes key, final long from, final long to) { - final List searchSpace = keySchema.segmentsToSearch(segments, from, to); + final List searchSpace = keySchema.segmentsToSearch(segments, from, to); final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from); final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to); - return new SegmentIterator(searchSpace.iterator(), - keySchema.hasNextCondition(key, key, from, to), - binaryFrom, binaryTo); + return new SegmentIterator<>(searchSpace.iterator(), + keySchema.hasNextCondition(key, key, from, to), + binaryFrom, binaryTo); } @Override public KeyValueIterator fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { - final List searchSpace = keySchema.segmentsToSearch(segments, from, to); + final List searchSpace = keySchema.segmentsToSearch(segments, from, to); final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); final Bytes binaryTo = keySchema.upperRange(keyTo, to); - return new SegmentIterator(searchSpace.iterator(), - keySchema.hasNextCondition(keyFrom, keyTo, from, to), - binaryFrom, binaryTo); + return new SegmentIterator<>(searchSpace.iterator(), + keySchema.hasNextCondition(keyFrom, keyTo, from, to), + binaryFrom, binaryTo); } @Override public KeyValueIterator all() { + final List searchSpace = segments.allSegments(); - final List searchSpace = segments.allSegments(); - - return new SegmentIterator(searchSpace.iterator(), - keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE), - null, null); + return new SegmentIterator<>(searchSpace.iterator(), + keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE), + null, null); } @Override public KeyValueIterator fetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo); + final List searchSpace = segments.segments(timeFrom, timeTo); - return new SegmentIterator(searchSpace.iterator(), - keySchema.hasNextCondition(null, null, timeFrom, timeTo), - null, null); + return new SegmentIterator<>(searchSpace.iterator(), + keySchema.hasNextCondition(null, null, timeFrom, timeTo), + null, null); } @Override public void remove(final Bytes key) { - final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); + final KeyValueSegment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); if (segment == null) { return; } @@ -119,7 +118,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { public void put(final Bytes key, final byte[] value) { final long timestamp = keySchema.segmentTimestamp(key); final long segmentId = segments.segmentId(timestamp); - final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context); if (segment == null) { expiredRecordSensor.record(); LOG.debug("Skipping record for expired segment."); @@ -130,7 +129,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @Override public byte[] get(final Bytes key) { - final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); + final KeyValueSegment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); if (segment == null) { return null; } @@ -195,16 +194,16 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { } // Visible for testing - List getSegments() { + List getSegments() { return segments.allSegments(); } // Visible for testing void restoreAllInternal(final Collection> records) { try { - final Map writeBatchMap = getWriteBatches(records); - for (final Map.Entry entry : writeBatchMap.entrySet()) { - final Segment segment = entry.getKey(); + final Map writeBatchMap = getWriteBatches(records); + for (final Map.Entry entry : writeBatchMap.entrySet()) { + final KeyValueSegment segment = entry.getKey(); final WriteBatch batch = entry.getValue(); segment.write(batch); } @@ -214,11 +213,11 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { } // Visible for testing - Map getWriteBatches(final Collection> records) { - final Map writeBatchMap = new HashMap<>(); + Map getWriteBatches(final Collection> records) { + final Map writeBatchMap = new HashMap<>(); for (final KeyValue record : records) { final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key))); - final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context); if (segment != null) { // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading @@ -247,7 +246,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { } private void toggleForBulkLoading(final boolean prepareForBulkload) { - for (final Segment segment: segments.allSegments()) { + for (final KeyValueSegment segment: segments.allSegments()) { segment.toggleDbForBulkLoading(prepareForBulkload); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java index f95e3955c10..8687ffc1a45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java @@ -16,52 +16,17 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; import java.io.IOException; -import java.util.Objects; -class Segment extends RocksDBStore implements Comparable { - public final long id; +public interface Segment extends StateStore { - Segment(final String segmentName, final String windowName, final long id) { - super(segmentName, windowName); - this.id = id; - } + void destroy() throws IOException; - void destroy() throws IOException { - Utils.delete(dbDir); - } + KeyValueIterator all(); - @Override - public int compareTo(final Segment segment) { - return Long.compare(id, segment.id); - } - - @Override - public void openDB(final ProcessorContext context) { - super.openDB(context); - // skip the registering step - internalProcessorContext = context; - } - - @Override - public String toString() { - return "Segment(id=" + id + ", name=" + name() + ")"; - } - - @Override - public boolean equals(final Object obj) { - if (obj == null || getClass() != obj.getClass()) { - return false; - } - final Segment segment = (Segment) obj; - return id == segment.id; - } - - @Override - public int hashCode() { - return Objects.hash(id); - } + KeyValueIterator range(final Bytes from, final Bytes to); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 5b0781211e9..0d90bd8caae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -20,25 +20,24 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; import java.util.Iterator; import java.util.NoSuchElementException; /** - * Iterate over multiple Segments + * Iterate over multiple KeyValueSegments */ -class SegmentIterator implements KeyValueIterator { +class SegmentIterator implements KeyValueIterator { private final Bytes from; private final Bytes to; - protected final Iterator segments; + protected final Iterator segments; protected final HasNextCondition hasNextCondition; - protected KeyValueStore currentSegment; - protected KeyValueIterator currentIterator; + private S currentSegment; + KeyValueIterator currentIterator; - SegmentIterator(final Iterator segments, + SegmentIterator(final Iterator segments, final HasNextCondition hasNextCondition, final Bytes from, final Bytes to) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index ce528ed2f12..068dc5f0dc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -154,7 +154,7 @@ public interface SegmentedBytesStore extends StateStore { /** * Create an implementation of {@link HasNextCondition} that knows when - * to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations + * to stop iterating over the KeyValueSegments. Used during {@link SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations * @param binaryKeyFrom the first key in the range * @param binaryKeyTo the last key in the range * @param from starting time range @@ -171,6 +171,6 @@ public interface SegmentedBytesStore extends StateStore { * @param to * @return List of segments to search */ - List segmentsToSearch(Segments segments, long from, long to); + List segmentsToSearch(Segments segments, long from, long to); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index d4aedce3531..71d0c4616c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -16,224 +16,29 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.SimpleTimeZone; -import java.util.TreeMap; -/** - * Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore} - */ -class Segments { - private static final Logger log = LoggerFactory.getLogger(Segments.class); - - private final TreeMap segments = new TreeMap<>(); - private final String name; - private final long retentionPeriod; - private final long segmentInterval; - private final SimpleDateFormat formatter; - - Segments(final String name, final long retentionPeriod, final long segmentInterval) { - this.name = name; - this.segmentInterval = segmentInterval; - this.retentionPeriod = retentionPeriod; - // Create a date formatter. Formatted timestamps are used as segment name suffixes - this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); - this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC")); - } - - long segmentId(final long timestamp) { - return timestamp / segmentInterval; - } - - String segmentName(final long segmentId) { - // (1) previous format used - as a separator so if this changes in the future - // then we should use something different. - // (2) previous format used : as a separator (which did break KafkaStreams on Windows OS) - // so if this changes in the future then we should use something different. - return name + "." + segmentId * segmentInterval; - } - - Segment getSegmentForTimestamp(final long timestamp) { - return segments.get(segmentId(timestamp)); - } - - Segment getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) { - final long minLiveTimestamp = context.streamTime() - retentionPeriod; - final long minLiveSegment = segmentId(minLiveTimestamp); - - final Segment toReturn; - if (segmentId >= minLiveSegment) { - // The segment is live. get it, ensure it's open, and return it. - toReturn = getOrCreateSegment(segmentId, context); - } else { - toReturn = null; - } - - cleanupEarlierThan(minLiveSegment); - return toReturn; - } - - private Segment getOrCreateSegment(final long segmentId, final InternalProcessorContext context) { - if (segments.containsKey(segmentId)) { - return segments.get(segmentId); - } else { - final Segment newSegment = new Segment(segmentName(segmentId), name, segmentId); - final Segment shouldBeNull = segments.put(segmentId, newSegment); - - if (shouldBeNull != null) { - throw new IllegalStateException("Segment already exists. Possible concurrent access."); - } - - newSegment.openDB(context); - return newSegment; - } - } - - void openExisting(final InternalProcessorContext context) { - try { - final File dir = new File(context.stateDir(), name); - if (dir.exists()) { - final String[] list = dir.list(); - if (list != null) { - final long[] segmentIds = new long[list.length]; - for (int i = 0; i < list.length; i++) { - segmentIds[i] = segmentIdFromSegmentName(list[i], dir); - } - - // open segments in the id order - Arrays.sort(segmentIds); - for (final long segmentId : segmentIds) { - if (segmentId >= 0) { - getOrCreateSegment(segmentId, context); - } - } - } - } else { - if (!dir.mkdir()) { - throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name)); - } - } - } catch (final Exception ex) { - // ignore - } - - final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod); - cleanupEarlierThan(minLiveSegment); - } - - List segments(final long timeFrom, final long timeTo) { - final List result = new ArrayList<>(); - final NavigableMap segmentsInRange = segments.subMap( - segmentId(timeFrom), true, - segmentId(timeTo), true - ); - for (final Segment segment : segmentsInRange.values()) { - if (segment.isOpen()) { - result.add(segment); - } - } - return result; - } - - List allSegments() { - final List result = new ArrayList<>(); - for (final Segment segment : segments.values()) { - if (segment.isOpen()) { - result.add(segment); - } - } - return result; - } - - void flush() { - for (final Segment segment : segments.values()) { - segment.flush(); - } - } +interface Segments { - public void close() { - for (final Segment segment : segments.values()) { - segment.close(); - } - segments.clear(); - } + long segmentId(final long timestamp); - private void cleanupEarlierThan(final long minLiveSegment) { - final Iterator> toRemove = - segments.headMap(minLiveSegment, false).entrySet().iterator(); + String segmentName(final long segmentId); - while (toRemove.hasNext()) { - final Map.Entry next = toRemove.next(); - toRemove.remove(); - final Segment segment = next.getValue(); - segment.close(); - try { - segment.destroy(); - } catch (final IOException e) { - log.error("Error destroying {}", segment, e); - } - } - } + S getSegmentForTimestamp(final long timestamp); - private long segmentIdFromSegmentName(final String segmentName, - final File parent) { - final int segmentSeparatorIndex = name.length(); - final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex); - final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1); - final long segmentId; + S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context); - // old style segment name with date - if (segmentSeparator == '-') { - try { - segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval; - } catch (final ParseException e) { - log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName); - return -1L; - } - renameSegmentFile(parent, segmentName, segmentId); - } else { - // for both new formats (with : or .) parse segment ID identically - try { - segmentId = Long.parseLong(segmentIdString) / segmentInterval; - } catch (final NumberFormatException e) { - throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName); - } + S getOrCreateSegment(final long segmentId, final InternalProcessorContext context); - // intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with . - if (segmentSeparator == ':') { - renameSegmentFile(parent, segmentName, segmentId); - } - } + void openExisting(final InternalProcessorContext context); - return segmentId; + List segments(final long timeFrom, final long timeTo); - } + List allSegments(); - private void renameSegmentFile(final File parent, - final String segmentName, - final long segmentId) { - final File newName = new File(parent, segmentName(segmentId)); - final File oldName = new File(parent, segmentName); - if (!oldName.renameTo(newName)) { - throw new ProcessorStateException("Unable to rename old style segment from: " - + oldName - + " to new name: " - + newName); - } - } + void flush(); -} + void close(); +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 8ba78cc57d8..6e998601266 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.apache.kafka.streams.state.KeyValueIterator; import java.nio.ByteBuffer; import java.util.List; @@ -69,29 +68,26 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { - return new HasNextCondition() { - @Override - public boolean hasNext(final KeyValueIterator iterator) { - while (iterator.hasNext()) { - final Bytes bytes = iterator.peekNextKey(); - final Windowed windowedKey = SessionKeySchema.from(bytes); - if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0) - && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0) - && windowedKey.window().end() >= from - && windowedKey.window().start() <= to) { - return true; - } - iterator.next(); + return iterator -> { + while (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + final Windowed windowedKey = SessionKeySchema.from(bytes); + if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0) + && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0) + && windowedKey.window().end() >= from + && windowedKey.window().start() <= to) { + return true; } - return false; + iterator.next(); } + return false; }; } @Override - public List segmentsToSearch(final Segments segments, - final long from, - final long to) { + public List segmentsToSearch(final Segments segments, + final long from, + final long to) { return segments.segments(from, Long.MAX_VALUE); } @@ -101,21 +97,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { return deserializer.deserialize(topic, extractKeyBytes(binaryKey)); } - public static byte[] extractKeyBytes(final byte[] binaryKey) { + static byte[] extractKeyBytes(final byte[] binaryKey) { final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE]; System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); return bytes; } - public static long extractEndTimestamp(final byte[] binaryKey) { + static long extractEndTimestamp(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); } - public static long extractStartTimestamp(final byte[] binaryKey) { + static long extractStartTimestamp(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE); } - public static Window extractWindow(final byte[] binaryKey) { + static Window extractWindow(final byte[] binaryKey) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index 0b3ba9e7d1d..f960b01a96f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import java.nio.ByteBuffer; @@ -66,29 +65,31 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { } @Override - public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { - return new HasNextCondition() { - @Override - public boolean hasNext(final KeyValueIterator iterator) { - while (iterator.hasNext()) { - final Bytes bytes = iterator.peekNextKey(); - final Bytes keyBytes = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get())); - final long time = WindowKeySchema.extractStoreTimestamp(bytes.get()); - if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0) - && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0) - && time >= from - && time <= to) { - return true; - } - iterator.next(); + public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, + final Bytes binaryKeyTo, + final long from, + final long to) { + return iterator -> { + while (iterator.hasNext()) { + final Bytes bytes = iterator.peekNextKey(); + final Bytes keyBytes = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get())); + final long time = WindowKeySchema.extractStoreTimestamp(bytes.get()); + if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0) + && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0) + && time >= from + && time <= to) { + return true; } - return false; + iterator.next(); } + return false; }; } @Override - public List segmentsToSearch(final Segments segments, final long from, final long to) { + public List segmentsToSearch(final Segments segments, + final long from, + final long to) { return segments.segments(from, to); } @@ -96,8 +97,8 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { * Safely construct a time window of the given size, * taking care of bounding endMs to Long.MAX_VALUE if necessary */ - public static TimeWindow timeWindowForSize(final long startMs, - final long windowSize) { + static TimeWindow timeWindowForSize(final long startMs, + final long windowSize) { final long endMs = startMs + windowSize; return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs); } @@ -175,24 +176,24 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { return Bytes.wrap(buf.array()); } - public static byte[] extractStoreKeyBytes(final byte[] binaryKey) { + static byte[] extractStoreKeyBytes(final byte[] binaryKey) { final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); return bytes; } - public static K extractStoreKey(final byte[] binaryKey, - final StateSerdes serdes) { + static K extractStoreKey(final byte[] binaryKey, + final StateSerdes serdes) { final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE]; System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); return serdes.keyFrom(bytes); } - public static long extractStoreTimestamp(final byte[] binaryKey) { + static long extractStoreTimestamp(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE); } - public static int extractStoreSequence(final byte[] binaryKey) { + static int extractStoreSequence(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE); } @@ -205,15 +206,15 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { return new Windowed<>(key, window); } - public static Windowed fromStoreBytesKey(final byte[] binaryKey, - final long windowSize) { + static Windowed fromStoreBytesKey(final byte[] binaryKey, + final long windowSize) { final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey)); final Window window = extractStoreWindow(binaryKey, windowSize); return new Windowed<>(key, window); } - public static Window extractStoreWindow(final byte[] binaryKey, - final long windowSize) { + static Window extractStoreWindow(final byte[] binaryKey, + final long windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE); return timeWindowForSize(start, windowSize); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java similarity index 86% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java index db5f0832729..68bd815f2cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java @@ -38,25 +38,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class SegmentIteratorTest { +public class KeyValueSegmentIteratorTest { - private final Segment segmentOne = new Segment("one", "one", 0); - private final Segment segmentTwo = new Segment("two", "window", 1); + private final KeyValueSegment segmentOne = new KeyValueSegment("one", "one", 0); + private final KeyValueSegment segmentTwo = new KeyValueSegment("two", "window", 1); private final HasNextCondition hasNextCondition = Iterator::hasNext; - private SegmentIterator iterator = null; + private SegmentIterator iterator = null; @Before public void before() { final InternalMockProcessorContext context = new InternalMockProcessorContext( - TestUtils.tempDirectory(), - Serdes.String(), - Serdes.String(), - new NoOpRecordCollector(), - new ThreadCache( - new LogContext("testCache "), - 0, - new MockStreamsMetrics(new Metrics()))); + TestUtils.tempDirectory(), + Serdes.String(), + Serdes.String(), + new NoOpRecordCollector(), + new ThreadCache( + new LogContext("testCache "), + 0, + new MockStreamsMetrics(new Metrics()))); segmentOne.openDB(context); segmentTwo.openDB(context); segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes()); @@ -77,7 +77,7 @@ public class SegmentIteratorTest { @Test public void shouldIterateOverAllSegments() { - iterator = new SegmentIterator( + iterator = new SegmentIterator<>( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, Bytes.wrap("a".getBytes()), @@ -104,7 +104,7 @@ public class SegmentIteratorTest { @Test public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() { - iterator = new SegmentIterator( + iterator = new SegmentIterator<>( Collections.singletonList(segmentOne).iterator(), hasNextCondition, Bytes.wrap("a".getBytes()), @@ -117,7 +117,7 @@ public class SegmentIteratorTest { @Test public void shouldOnlyIterateOverSegmentsInRange() { - iterator = new SegmentIterator( + iterator = new SegmentIterator<>( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, Bytes.wrap("a".getBytes()), @@ -136,7 +136,7 @@ public class SegmentIteratorTest { @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() { - iterator = new SegmentIterator( + iterator = new SegmentIterator<>( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, Bytes.wrap("f".getBytes()), @@ -147,7 +147,7 @@ public class SegmentIteratorTest { @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementOnNextIfNoNext() { - iterator = new SegmentIterator( + iterator = new SegmentIterator<>( Arrays.asList(segmentOne, segmentTwo).iterator(), hasNextCondition, Bytes.wrap("f".getBytes()), @@ -159,4 +159,4 @@ public class SegmentIteratorTest { private KeyValue toStringKeyValue(final KeyValue binaryKv) { return KeyValue.pair(new String(binaryKv.key.get()), new String(binaryKv.value)); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java similarity index 86% rename from streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java rename to streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java index efed24f49e9..7c851343fba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java @@ -41,13 +41,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class SegmentsTest { +public class KeyValueSegmentsTest { private static final int NUM_SEGMENTS = 5; private static final long SEGMENT_INTERVAL = 100L; private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL; private InternalMockProcessorContext context; - private Segments segments; + private KeyValueSegments segments; private File stateDirectory; private final String storeName = "test"; @@ -61,7 +61,7 @@ public class SegmentsTest { new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) ); - segments = new Segments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL); + segments = new KeyValueSegments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL); } @After @@ -79,7 +79,7 @@ public class SegmentsTest { @Test public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() { - final Segments segments = new Segments("test", 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL); + final KeyValueSegments segments = new KeyValueSegments("test", 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL); assertEquals(0, segments.segmentId(0)); assertEquals(0, segments.segmentId(SEGMENT_INTERVAL)); assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL)); @@ -94,9 +94,9 @@ public class SegmentsTest { @Test public void shouldCreateSegments() { - final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); - final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); - final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context); + final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context); + final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context); + final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context); assertTrue(new File(context.stateDir(), "test/test.0").isDirectory()); assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory()); assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory()); @@ -114,10 +114,10 @@ public class SegmentsTest { @Test public void shouldCleanupSegmentsThatHaveExpired() { - final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); - final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); + final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context); + final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context); context.setStreamTime(SEGMENT_INTERVAL * 7); - final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context); + final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, context); assertFalse(segment1.isOpen()); assertFalse(segment2.isOpen()); assertTrue(segment3.isOpen()); @@ -128,22 +128,22 @@ public class SegmentsTest { @Test public void shouldGetSegmentForTimestamp() { - final Segment segment = segments.getOrCreateSegmentIfLive(0, context); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context); segments.getOrCreateSegmentIfLive(1, context); assertEquals(segment, segments.getSegmentForTimestamp(0L)); } @Test public void shouldGetCorrectSegmentString() { - final Segment segment = segments.getOrCreateSegmentIfLive(0, context); - assertEquals("Segment(id=0, name=test.0)", segment.toString()); + final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context); + assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString()); } @Test public void shouldCloseAllOpenSegments() { - final Segment first = segments.getOrCreateSegmentIfLive(0, context); - final Segment second = segments.getOrCreateSegmentIfLive(1, context); - final Segment third = segments.getOrCreateSegmentIfLive(2, context); + final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, context); + final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, context); + final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, context); segments.close(); assertFalse(first.isOpen()); @@ -153,7 +153,7 @@ public class SegmentsTest { @Test public void shouldOpenExistingSegments() { - segments = new Segments("test", 4, 1); + segments = new KeyValueSegments("test", 4, 1); segments.getOrCreateSegmentIfLive(0, context); segments.getOrCreateSegmentIfLive(1, context); segments.getOrCreateSegmentIfLive(2, context); @@ -162,7 +162,7 @@ public class SegmentsTest { // close existing. segments.close(); - segments = new Segments("test", 4, 1); + segments = new KeyValueSegments("test", 4, 1); segments.openExisting(context); assertTrue(segments.getSegmentForTimestamp(0).isOpen()); @@ -185,7 +185,7 @@ public class SegmentsTest { segments.getOrCreateSegmentIfLive(3, context); segments.getOrCreateSegmentIfLive(4, context); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -200,7 +200,7 @@ public class SegmentsTest { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(3); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -252,7 +252,7 @@ public class SegmentsTest { public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception { final long segmentInterval = 60_000L; // the old segment file's naming system maxes out at 1 minute granularity. - segments = new Segments(storeName, NUM_SEGMENTS * segmentInterval, segmentInterval); + segments = new KeyValueSegments(storeName, NUM_SEGMENTS * segmentInterval, segmentInterval); final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName; final File storeDirectory = new File(storeDirectoryPath); @@ -306,7 +306,7 @@ public class SegmentsTest { } private void verifyCorrectSegments(final long first, final int numSegments) { - final List result = this.segments.segments(0, Long.MAX_VALUE); + final List result = this.segments.segments(0, Long.MAX_VALUE); assertEquals(numSegments, result.size()); for (int i = 0; i < numSegments; i++) { assertEquals(i + first, result.get(i).id); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 0b9d66df67b..8097d74670d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -178,7 +178,7 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldRollSegments() { // just to validate directories - final Segments segments = new Segments(storeName, retention, segmentInterval); + final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval); final String key = "a"; bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50)); @@ -206,7 +206,7 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldGetAllSegments() { // just to validate directories - final Segments segments = new Segments(storeName, retention, segmentInterval); + final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval); final String key = "a"; bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); @@ -235,7 +235,7 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldFetchAllSegments() { // just to validate directories - final Segments segments = new Segments(storeName, retention, segmentInterval); + final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval); final String key = "a"; bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); @@ -263,7 +263,7 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldLoadSegmentsWithOldStyleDateFormattedName() { - final Segments segments = new Segments(storeName, retention, segmentInterval); + final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval); final String key = "a"; bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); @@ -304,7 +304,7 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldLoadSegmentsWithOldStyleColonFormattedName() { - final Segments segments = new Segments(storeName, retention, segmentInterval); + final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval); final String key = "a"; bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); @@ -355,7 +355,7 @@ public class RocksDBSegmentedBytesStoreTest { final Collection> records = new ArrayList<>(); records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[0])).get(), serializeValue(50L))); records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(), serializeValue(100L))); - final Map writeBatchMap = bytesStore.getWriteBatches(records); + final Map writeBatchMap = bytesStore.getWriteBatches(records); assertEquals(2, writeBatchMap.size()); for (final WriteBatch batch : writeBatchMap.values()) { assertEquals(1, batch.count()); @@ -376,7 +376,7 @@ public class RocksDBSegmentedBytesStoreTest { assertEquals(2, bytesStore.getSegments().size()); // Bulk loading is enabled during recovery. - for (final Segment segment : bytesStore.getSegments()) { + for (final KeyValueSegment segment : bytesStore.getSegments()) { Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); } @@ -400,12 +400,12 @@ public class RocksDBSegmentedBytesStoreTest { restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L); - for (final Segment segment : bytesStore.getSegments()) { + for (final KeyValueSegment segment : bytesStore.getSegments()) { Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); } restoreListener.onRestoreEnd(null, bytesStore.name(), 0L); - for (final Segment segment : bytesStore.getSegments()) { + for (final KeyValueSegment segment : bytesStore.getSegments()) { Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index c155a83420b..2666f5fd741 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -74,7 +74,7 @@ public class RocksDBWindowStoreTest { private final long segmentInterval = 60_000L; private final long retentionPeriod = segmentInterval * (numSegments - 1); private final String windowName = "window"; - private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval); + private final KeyValueSegments segments = new KeyValueSegments(windowName, retentionPeriod, segmentInterval); private final StateSerdes serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String()); private final List> changeLog = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java index 81120809a0d..8824a9487d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java @@ -30,16 +30,16 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.KeyValueIteratorStub; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - public class WindowKeySchemaTest { final private String key = "key";