Browse Source

KAFKA-3522: Generalize Segments (#6170)

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/6205/head
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
ef62dd3ef2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 229
      streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
  2. 23
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
  3. 68
      streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
  4. 45
      streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
  5. 63
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
  6. 49
      streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
  7. 13
      streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
  8. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
  9. 219
      streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
  10. 40
      streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
  11. 59
      streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
  12. 36
      streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java
  13. 44
      streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
  14. 18
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
  15. 2
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
  16. 8
      streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java

229
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java

@ -0,0 +1,229 @@ @@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SimpleTimeZone;
import java.util.TreeMap;
abstract class AbstractSegments<S extends Segment> implements Segments<S> {
private static final Logger log = LoggerFactory.getLogger(AbstractSegments.class);
final TreeMap<Long, S> segments = new TreeMap<>();
final String name;
private final long retentionPeriod;
private final long segmentInterval;
private final SimpleDateFormat formatter;
AbstractSegments(final String name, final long retentionPeriod, final long segmentInterval) {
this.name = name;
this.segmentInterval = segmentInterval;
this.retentionPeriod = retentionPeriod;
// Create a date formatter. Formatted timestamps are used as segment name suffixes
this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}
@Override
public long segmentId(final long timestamp) {
return timestamp / segmentInterval;
}
@Override
public String segmentName(final long segmentId) {
// (1) previous format used - as a separator so if this changes in the future
// then we should use something different.
// (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
// so if this changes in the future then we should use something different.
return name + "." + segmentId * segmentInterval;
}
@Override
public S getSegmentForTimestamp(final long timestamp) {
return segments.get(segmentId(timestamp));
}
@Override
public S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) {
final long minLiveTimestamp = context.streamTime() - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);
final S toReturn;
if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
toReturn = getOrCreateSegment(segmentId, context);
} else {
toReturn = null;
}
cleanupEarlierThan(minLiveSegment);
return toReturn;
}
@Override
public void openExisting(final InternalProcessorContext context) {
try {
final File dir = new File(context.stateDir(), name);
if (dir.exists()) {
final String[] list = dir.list();
if (list != null) {
final long[] segmentIds = new long[list.length];
for (int i = 0; i < list.length; i++) {
segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
}
// open segments in the id order
Arrays.sort(segmentIds);
for (final long segmentId : segmentIds) {
if (segmentId >= 0) {
getOrCreateSegment(segmentId, context);
}
}
}
} else {
if (!dir.mkdir()) {
throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));
}
}
} catch (final Exception ex) {
// ignore
}
final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
cleanupEarlierThan(minLiveSegment);
}
@Override
public List<S> segments(final long timeFrom, final long timeTo) {
final List<S> result = new ArrayList<>();
final NavigableMap<Long, S> segmentsInRange = segments.subMap(
segmentId(timeFrom), true,
segmentId(timeTo), true
);
for (final S segment : segmentsInRange.values()) {
if (segment.isOpen()) {
result.add(segment);
}
}
return result;
}
@Override
public List<S> allSegments() {
final List<S> result = new ArrayList<>();
for (final S segment : segments.values()) {
if (segment.isOpen()) {
result.add(segment);
}
}
return result;
}
@Override
public void flush() {
for (final S segment : segments.values()) {
segment.flush();
}
}
@Override
public void close() {
for (final S segment : segments.values()) {
segment.close();
}
segments.clear();
}
private void cleanupEarlierThan(final long minLiveSegment) {
final Iterator<Map.Entry<Long, S>> toRemove =
segments.headMap(minLiveSegment, false).entrySet().iterator();
while (toRemove.hasNext()) {
final Map.Entry<Long, S> next = toRemove.next();
toRemove.remove();
final S segment = next.getValue();
segment.close();
try {
segment.destroy();
} catch (final IOException e) {
log.error("Error destroying {}", segment, e);
}
}
}
private long segmentIdFromSegmentName(final String segmentName,
final File parent) {
final int segmentSeparatorIndex = name.length();
final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex);
final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1);
final long segmentId;
// old style segment name with date
if (segmentSeparator == '-') {
try {
segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval;
} catch (final ParseException e) {
log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
return -1L;
}
renameSegmentFile(parent, segmentName, segmentId);
} else {
// for both new formats (with : or .) parse segment ID identically
try {
segmentId = Long.parseLong(segmentIdString) / segmentInterval;
} catch (final NumberFormatException e) {
throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
}
// intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with .
if (segmentSeparator == ':') {
renameSegmentFile(parent, segmentName, segmentId);
}
}
return segmentId;
}
private void renameSegmentFile(final File parent,
final String segmentName,
final long segmentId) {
final File newName = new File(parent, segmentName(segmentId));
final File oldName = new File(parent, segmentName);
if (!oldName.renameTo(newName)) {
throw new ProcessorStateException("Unable to rename old style segment from: "
+ oldName
+ " to new name: "
+ newName);
}
}
}

23
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

@ -30,8 +30,6 @@ import org.apache.kafka.streams.state.StateSerdes; @@ -30,8 +30,6 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.List;
class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> {
private final WindowStore<Bytes, byte[]> underlying;
@ -84,18 +82,15 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @@ -84,18 +82,15 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
name = context.taskId() + "-" + underlying.name();
cache = this.context.getCache();
cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> entries) {
for (final ThreadCache.DirtyEntry entry : entries) {
final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, serdes.keyDeserializer(), serdes.topic());
final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
underlying.put(key, entry.newValue(), timestamp);
}
cache.addDirtyEntryFlushListener(name, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
final byte[] binaryWindowKey = cacheFunction.key(entry.key()).get();
final long timestamp = WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, serdes.keyDeserializer(), serdes.topic());
final Bytes key = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
maybeForward(entry, key, windowedKey, (InternalProcessorContext) context);
underlying.put(key, entry.newValue(), timestamp);
}
});
}

68
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java

@ -0,0 +1,68 @@ @@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.io.IOException;
import java.util.Objects;
class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment>, Segment {
public final long id;
KeyValueSegment(final String segmentName, final String windowName, final long id) {
super(segmentName, windowName);
this.id = id;
}
@Override
public void destroy() throws IOException {
Utils.delete(dbDir);
}
@Override
public int compareTo(final KeyValueSegment segment) {
return Long.compare(id, segment.id);
}
@Override
public void openDB(final ProcessorContext context) {
super.openDB(context);
// skip the registering step
internalProcessorContext = context;
}
@Override
public String toString() {
return "KeyValueSegment(id=" + id + ", name=" + name() + ")";
}
@Override
public boolean equals(final Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final KeyValueSegment segment = (KeyValueSegment) obj;
return id == segment.id;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

45
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
/**
* Manages the {@link KeyValueSegment}s that are used by the {@link RocksDBSegmentedBytesStore}
*/
class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
KeyValueSegments(final String name, final long retentionPeriod, final long segmentInterval) {
super(name, retentionPeriod, segmentInterval);
}
@Override
public KeyValueSegment getOrCreateSegment(final long segmentId, final InternalProcessorContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
final KeyValueSegment newSegment = new KeyValueSegment(segmentName(segmentId), name, segmentId);
if (segments.put(segmentId, newSegment) != null) {
throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access.");
}
newSegment.openDB(context);
return newSegment;
}
}
}

63
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java

@ -41,15 +41,15 @@ import java.util.Set; @@ -41,15 +41,15 @@ import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
private final String name;
private final Segments segments;
private final KeyValueSegments segments;
private final String metricScope;
private final KeySchema keySchema;
private InternalProcessorContext context;
private volatile boolean open;
private Set<Segment> bulkLoadSegments;
private Set<KeyValueSegment> bulkLoadSegments;
private Sensor expiredRecordSensor;
RocksDBSegmentedBytesStore(final String name,
@ -60,55 +60,54 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -60,55 +60,54 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
this.name = name;
this.metricScope = metricScope;
this.keySchema = keySchema;
this.segments = new Segments(name, retention, segmentInterval);
this.segments = new KeyValueSegments(name, retention, segmentInterval);
}
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
final List<KeyValueSegment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
return new SegmentIterator(searchSpace.iterator(),
keySchema.hasNextCondition(key, key, from, to),
binaryFrom, binaryTo);
return new SegmentIterator<>(searchSpace.iterator(),
keySchema.hasNextCondition(key, key, from, to),
binaryFrom, binaryTo);
}
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) {
final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
final List<KeyValueSegment> searchSpace = keySchema.segmentsToSearch(segments, from, to);
final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
final Bytes binaryTo = keySchema.upperRange(keyTo, to);
return new SegmentIterator(searchSpace.iterator(),
keySchema.hasNextCondition(keyFrom, keyTo, from, to),
binaryFrom, binaryTo);
return new SegmentIterator<>(searchSpace.iterator(),
keySchema.hasNextCondition(keyFrom, keyTo, from, to),
binaryFrom, binaryTo);
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
final List<KeyValueSegment> searchSpace = segments.allSegments();
final List<Segment> searchSpace = segments.allSegments();
return new SegmentIterator(searchSpace.iterator(),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
null, null);
return new SegmentIterator<>(searchSpace.iterator(),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
null, null);
}
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom, final long timeTo) {
final List<Segment> searchSpace = segments.segments(timeFrom, timeTo);
final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, timeTo);
return new SegmentIterator(searchSpace.iterator(),
keySchema.hasNextCondition(null, null, timeFrom, timeTo),
null, null);
return new SegmentIterator<>(searchSpace.iterator(),
keySchema.hasNextCondition(null, null, timeFrom, timeTo),
null, null);
}
@Override
public void remove(final Bytes key) {
final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
final KeyValueSegment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
if (segment == null) {
return;
}
@ -119,7 +118,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -119,7 +118,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
public void put(final Bytes key, final byte[] value) {
final long timestamp = keySchema.segmentTimestamp(key);
final long segmentId = segments.segmentId(timestamp);
final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
if (segment == null) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
@ -130,7 +129,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -130,7 +129,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
@Override
public byte[] get(final Bytes key) {
final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
final KeyValueSegment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
if (segment == null) {
return null;
}
@ -195,16 +194,16 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -195,16 +194,16 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
}
// Visible for testing
List<Segment> getSegments() {
List<KeyValueSegment> getSegments() {
return segments.allSegments();
}
// Visible for testing
void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
try {
final Map<Segment, WriteBatch> writeBatchMap = getWriteBatches(records);
for (final Map.Entry<Segment, WriteBatch> entry : writeBatchMap.entrySet()) {
final Segment segment = entry.getKey();
final Map<KeyValueSegment, WriteBatch> writeBatchMap = getWriteBatches(records);
for (final Map.Entry<KeyValueSegment, WriteBatch> entry : writeBatchMap.entrySet()) {
final KeyValueSegment segment = entry.getKey();
final WriteBatch batch = entry.getValue();
segment.write(batch);
}
@ -214,11 +213,11 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -214,11 +213,11 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
}
// Visible for testing
Map<Segment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> records) {
final Map<Segment, WriteBatch> writeBatchMap = new HashMap<>();
Map<KeyValueSegment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> records) {
final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
for (final KeyValue<byte[], byte[]> record : records) {
final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key)));
final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
if (segment != null) {
// This handles the case that state store is moved to a new client and does not
// have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
@ -247,7 +246,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { @@ -247,7 +246,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
}
private void toggleForBulkLoading(final boolean prepareForBulkload) {
for (final Segment segment: segments.allSegments()) {
for (final KeyValueSegment segment: segments.allSegments()) {
segment.toggleDbForBulkLoading(prepareForBulkload);
}
}

49
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java

@ -16,52 +16,17 @@ @@ -16,52 +16,17 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.io.IOException;
import java.util.Objects;
class Segment extends RocksDBStore implements Comparable<Segment> {
public final long id;
public interface Segment extends StateStore {
Segment(final String segmentName, final String windowName, final long id) {
super(segmentName, windowName);
this.id = id;
}
void destroy() throws IOException;
void destroy() throws IOException {
Utils.delete(dbDir);
}
KeyValueIterator<Bytes, byte[]> all();
@Override
public int compareTo(final Segment segment) {
return Long.compare(id, segment.id);
}
@Override
public void openDB(final ProcessorContext context) {
super.openDB(context);
// skip the registering step
internalProcessorContext = context;
}
@Override
public String toString() {
return "Segment(id=" + id + ", name=" + name() + ")";
}
@Override
public boolean equals(final Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final Segment segment = (Segment) obj;
return id == segment.id;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to);
}

13
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java

@ -20,25 +20,24 @@ import org.apache.kafka.common.utils.Bytes; @@ -20,25 +20,24 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Iterate over multiple Segments
* Iterate over multiple KeyValueSegments
*/
class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, byte[]> {
private final Bytes from;
private final Bytes to;
protected final Iterator<Segment> segments;
protected final Iterator<S> segments;
protected final HasNextCondition hasNextCondition;
protected KeyValueStore<Bytes, byte[]> currentSegment;
protected KeyValueIterator<Bytes, byte[]> currentIterator;
private S currentSegment;
KeyValueIterator<Bytes, byte[]> currentIterator;
SegmentIterator(final Iterator<Segment> segments,
SegmentIterator(final Iterator<S> segments,
final HasNextCondition hasNextCondition,
final Bytes from,
final Bytes to) {

4
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java

@ -154,7 +154,7 @@ public interface SegmentedBytesStore extends StateStore { @@ -154,7 +154,7 @@ public interface SegmentedBytesStore extends StateStore {
/**
* Create an implementation of {@link HasNextCondition} that knows when
* to stop iterating over the Segments. Used during {@link SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations
* to stop iterating over the KeyValueSegments. Used during {@link SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations
* @param binaryKeyFrom the first key in the range
* @param binaryKeyTo the last key in the range
* @param from starting time range
@ -171,6 +171,6 @@ public interface SegmentedBytesStore extends StateStore { @@ -171,6 +171,6 @@ public interface SegmentedBytesStore extends StateStore {
* @param to
* @return List of segments to search
*/
List<Segment> segmentsToSearch(Segments segments, long from, long to);
<S extends Segment> List<S> segmentsToSearch(Segments<S> segments, long from, long to);
}
}

219
streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java

@ -16,224 +16,29 @@ @@ -16,224 +16,29 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SimpleTimeZone;
import java.util.TreeMap;
/**
* Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore}
*/
class Segments {
private static final Logger log = LoggerFactory.getLogger(Segments.class);
private final TreeMap<Long, Segment> segments = new TreeMap<>();
private final String name;
private final long retentionPeriod;
private final long segmentInterval;
private final SimpleDateFormat formatter;
Segments(final String name, final long retentionPeriod, final long segmentInterval) {
this.name = name;
this.segmentInterval = segmentInterval;
this.retentionPeriod = retentionPeriod;
// Create a date formatter. Formatted timestamps are used as segment name suffixes
this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}
long segmentId(final long timestamp) {
return timestamp / segmentInterval;
}
String segmentName(final long segmentId) {
// (1) previous format used - as a separator so if this changes in the future
// then we should use something different.
// (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
// so if this changes in the future then we should use something different.
return name + "." + segmentId * segmentInterval;
}
Segment getSegmentForTimestamp(final long timestamp) {
return segments.get(segmentId(timestamp));
}
Segment getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) {
final long minLiveTimestamp = context.streamTime() - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);
final Segment toReturn;
if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
toReturn = getOrCreateSegment(segmentId, context);
} else {
toReturn = null;
}
cleanupEarlierThan(minLiveSegment);
return toReturn;
}
private Segment getOrCreateSegment(final long segmentId, final InternalProcessorContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
final Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
final Segment shouldBeNull = segments.put(segmentId, newSegment);
if (shouldBeNull != null) {
throw new IllegalStateException("Segment already exists. Possible concurrent access.");
}
newSegment.openDB(context);
return newSegment;
}
}
void openExisting(final InternalProcessorContext context) {
try {
final File dir = new File(context.stateDir(), name);
if (dir.exists()) {
final String[] list = dir.list();
if (list != null) {
final long[] segmentIds = new long[list.length];
for (int i = 0; i < list.length; i++) {
segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
}
// open segments in the id order
Arrays.sort(segmentIds);
for (final long segmentId : segmentIds) {
if (segmentId >= 0) {
getOrCreateSegment(segmentId, context);
}
}
}
} else {
if (!dir.mkdir()) {
throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));
}
}
} catch (final Exception ex) {
// ignore
}
final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
cleanupEarlierThan(minLiveSegment);
}
List<Segment> segments(final long timeFrom, final long timeTo) {
final List<Segment> result = new ArrayList<>();
final NavigableMap<Long, Segment> segmentsInRange = segments.subMap(
segmentId(timeFrom), true,
segmentId(timeTo), true
);
for (final Segment segment : segmentsInRange.values()) {
if (segment.isOpen()) {
result.add(segment);
}
}
return result;
}
List<Segment> allSegments() {
final List<Segment> result = new ArrayList<>();
for (final Segment segment : segments.values()) {
if (segment.isOpen()) {
result.add(segment);
}
}
return result;
}
void flush() {
for (final Segment segment : segments.values()) {
segment.flush();
}
}
interface Segments<S extends Segment> {
public void close() {
for (final Segment segment : segments.values()) {
segment.close();
}
segments.clear();
}
long segmentId(final long timestamp);
private void cleanupEarlierThan(final long minLiveSegment) {
final Iterator<Map.Entry<Long, Segment>> toRemove =
segments.headMap(minLiveSegment, false).entrySet().iterator();
String segmentName(final long segmentId);
while (toRemove.hasNext()) {
final Map.Entry<Long, Segment> next = toRemove.next();
toRemove.remove();
final Segment segment = next.getValue();
segment.close();
try {
segment.destroy();
} catch (final IOException e) {
log.error("Error destroying {}", segment, e);
}
}
}
S getSegmentForTimestamp(final long timestamp);
private long segmentIdFromSegmentName(final String segmentName,
final File parent) {
final int segmentSeparatorIndex = name.length();
final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex);
final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1);
final long segmentId;
S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context);
// old style segment name with date
if (segmentSeparator == '-') {
try {
segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval;
} catch (final ParseException e) {
log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
return -1L;
}
renameSegmentFile(parent, segmentName, segmentId);
} else {
// for both new formats (with : or .) parse segment ID identically
try {
segmentId = Long.parseLong(segmentIdString) / segmentInterval;
} catch (final NumberFormatException e) {
throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
}
S getOrCreateSegment(final long segmentId, final InternalProcessorContext context);
// intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with .
if (segmentSeparator == ':') {
renameSegmentFile(parent, segmentName, segmentId);
}
}
void openExisting(final InternalProcessorContext context);
return segmentId;
List<S> segments(final long timeFrom, final long timeTo);
}
List<S> allSegments();
private void renameSegmentFile(final File parent,
final String segmentName,
final long segmentId) {
final File newName = new File(parent, segmentName(segmentId));
final File oldName = new File(parent, segmentName);
if (!oldName.renameTo(newName)) {
throw new ProcessorStateException("Unable to rename old style segment from: "
+ oldName
+ " to new name: "
+ newName);
}
}
void flush();
}
void close();
}

40
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java

@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes; @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.nio.ByteBuffer;
import java.util.List;
@ -69,29 +68,26 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @@ -69,29 +68,26 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Windowed<Bytes> windowedKey = SessionKeySchema.from(bytes);
if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
&& (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
&& windowedKey.window().end() >= from
&& windowedKey.window().start() <= to) {
return true;
}
iterator.next();
return iterator -> {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Windowed<Bytes> windowedKey = SessionKeySchema.from(bytes);
if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0)
&& (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0)
&& windowedKey.window().end() >= from
&& windowedKey.window().start() <= to) {
return true;
}
return false;
iterator.next();
}
return false;
};
}
@Override
public List<Segment> segmentsToSearch(final Segments segments,
final long from,
final long to) {
public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments,
final long from,
final long to) {
return segments.segments(from, Long.MAX_VALUE);
}
@ -101,21 +97,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema { @@ -101,21 +97,21 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
}
public static byte[] extractKeyBytes(final byte[] binaryKey) {
static byte[] extractKeyBytes(final byte[] binaryKey) {
final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
return bytes;
}
public static long extractEndTimestamp(final byte[] binaryKey) {
static long extractEndTimestamp(final byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
}
public static long extractStartTimestamp(final byte[] binaryKey) {
static long extractStartTimestamp(final byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
}
public static Window extractWindow(final byte[] binaryKey) {
static Window extractWindow(final byte[] binaryKey) {
final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);

59
streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java

@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes; @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import java.nio.ByteBuffer;
@ -66,29 +65,31 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { @@ -66,29 +65,31 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
}
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
return new HasNextCondition() {
@Override
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get()));
final long time = WindowKeySchema.extractStoreTimestamp(bytes.get());
if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
&& (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
&& time >= from
&& time <= to) {
return true;
}
iterator.next();
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
final Bytes binaryKeyTo,
final long from,
final long to) {
return iterator -> {
while (iterator.hasNext()) {
final Bytes bytes = iterator.peekNextKey();
final Bytes keyBytes = Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get()));
final long time = WindowKeySchema.extractStoreTimestamp(bytes.get());
if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
&& (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
&& time >= from
&& time <= to) {
return true;
}
return false;
iterator.next();
}
return false;
};
}
@Override
public List<Segment> segmentsToSearch(final Segments segments, final long from, final long to) {
public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments,
final long from,
final long to) {
return segments.segments(from, to);
}
@ -96,8 +97,8 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { @@ -96,8 +97,8 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
* Safely construct a time window of the given size,
* taking care of bounding endMs to Long.MAX_VALUE if necessary
*/
public static TimeWindow timeWindowForSize(final long startMs,
final long windowSize) {
static TimeWindow timeWindowForSize(final long startMs,
final long windowSize) {
final long endMs = startMs + windowSize;
return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
}
@ -175,24 +176,24 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { @@ -175,24 +176,24 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
return Bytes.wrap(buf.array());
}
public static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
return bytes;
}
public static <K> K extractStoreKey(final byte[] binaryKey,
final StateSerdes<K, ?> serdes) {
static <K> K extractStoreKey(final byte[] binaryKey,
final StateSerdes<K, ?> serdes) {
final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
return serdes.keyFrom(bytes);
}
public static long extractStoreTimestamp(final byte[] binaryKey) {
static long extractStoreTimestamp(final byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
}
public static int extractStoreSequence(final byte[] binaryKey) {
static int extractStoreSequence(final byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
}
@ -205,15 +206,15 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { @@ -205,15 +206,15 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
return new Windowed<>(key, window);
}
public static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
final long windowSize) {
static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
final long windowSize) {
final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey));
final Window window = extractStoreWindow(binaryKey, windowSize);
return new Windowed<>(key, window);
}
public static Window extractStoreWindow(final byte[] binaryKey,
final long windowSize) {
static Window extractStoreWindow(final byte[] binaryKey,
final long windowSize) {
final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
return timeWindowForSize(start, windowSize);

36
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java → streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java

@ -38,25 +38,25 @@ import static org.junit.Assert.assertEquals; @@ -38,25 +38,25 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SegmentIteratorTest {
public class KeyValueSegmentIteratorTest {
private final Segment segmentOne = new Segment("one", "one", 0);
private final Segment segmentTwo = new Segment("two", "window", 1);
private final KeyValueSegment segmentOne = new KeyValueSegment("one", "one", 0);
private final KeyValueSegment segmentTwo = new KeyValueSegment("two", "window", 1);
private final HasNextCondition hasNextCondition = Iterator::hasNext;
private SegmentIterator iterator = null;
private SegmentIterator<KeyValueSegment> iterator = null;
@Before
public void before() {
final InternalMockProcessorContext context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
new NoOpRecordCollector(),
new ThreadCache(
new LogContext("testCache "),
0,
new MockStreamsMetrics(new Metrics())));
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
new NoOpRecordCollector(),
new ThreadCache(
new LogContext("testCache "),
0,
new MockStreamsMetrics(new Metrics())));
segmentOne.openDB(context);
segmentTwo.openDB(context);
segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
@ -77,7 +77,7 @@ public class SegmentIteratorTest { @@ -77,7 +77,7 @@ public class SegmentIteratorTest {
@Test
public void shouldIterateOverAllSegments() {
iterator = new SegmentIterator(
iterator = new SegmentIterator<>(
Arrays.asList(segmentOne, segmentTwo).iterator(),
hasNextCondition,
Bytes.wrap("a".getBytes()),
@ -104,7 +104,7 @@ public class SegmentIteratorTest { @@ -104,7 +104,7 @@ public class SegmentIteratorTest {
@Test
public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
iterator = new SegmentIterator(
iterator = new SegmentIterator<>(
Collections.singletonList(segmentOne).iterator(),
hasNextCondition,
Bytes.wrap("a".getBytes()),
@ -117,7 +117,7 @@ public class SegmentIteratorTest { @@ -117,7 +117,7 @@ public class SegmentIteratorTest {
@Test
public void shouldOnlyIterateOverSegmentsInRange() {
iterator = new SegmentIterator(
iterator = new SegmentIterator<>(
Arrays.asList(segmentOne, segmentTwo).iterator(),
hasNextCondition,
Bytes.wrap("a".getBytes()),
@ -136,7 +136,7 @@ public class SegmentIteratorTest { @@ -136,7 +136,7 @@ public class SegmentIteratorTest {
@Test(expected = NoSuchElementException.class)
public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() {
iterator = new SegmentIterator(
iterator = new SegmentIterator<>(
Arrays.asList(segmentOne, segmentTwo).iterator(),
hasNextCondition,
Bytes.wrap("f".getBytes()),
@ -147,7 +147,7 @@ public class SegmentIteratorTest { @@ -147,7 +147,7 @@ public class SegmentIteratorTest {
@Test(expected = NoSuchElementException.class)
public void shouldThrowNoSuchElementOnNextIfNoNext() {
iterator = new SegmentIterator(
iterator = new SegmentIterator<>(
Arrays.asList(segmentOne, segmentTwo).iterator(),
hasNextCondition,
Bytes.wrap("f".getBytes()),
@ -159,4 +159,4 @@ public class SegmentIteratorTest { @@ -159,4 +159,4 @@ public class SegmentIteratorTest {
private KeyValue<String, String> toStringKeyValue(final KeyValue<Bytes, byte[]> binaryKv) {
return KeyValue.pair(new String(binaryKv.key.get()), new String(binaryKv.value));
}
}
}

44
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java → streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java

@ -41,13 +41,13 @@ import static org.junit.Assert.assertFalse; @@ -41,13 +41,13 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class SegmentsTest {
public class KeyValueSegmentsTest {
private static final int NUM_SEGMENTS = 5;
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private InternalMockProcessorContext context;
private Segments segments;
private KeyValueSegments segments;
private File stateDirectory;
private final String storeName = "test";
@ -61,7 +61,7 @@ public class SegmentsTest { @@ -61,7 +61,7 @@ public class SegmentsTest {
new NoOpRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
);
segments = new Segments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL);
segments = new KeyValueSegments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL);
}
@After
@ -79,7 +79,7 @@ public class SegmentsTest { @@ -79,7 +79,7 @@ public class SegmentsTest {
@Test
public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
final Segments segments = new Segments("test", 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
final KeyValueSegments segments = new KeyValueSegments("test", 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
assertEquals(0, segments.segmentId(0));
assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
@ -94,9 +94,9 @@ public class SegmentsTest { @@ -94,9 +94,9 @@ public class SegmentsTest {
@Test
public void shouldCreateSegments() {
final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context);
final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context);
final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context);
final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context);
assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory());
assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory());
@ -114,10 +114,10 @@ public class SegmentsTest { @@ -114,10 +114,10 @@ public class SegmentsTest {
@Test
public void shouldCleanupSegmentsThatHaveExpired() {
final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context);
final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context);
context.setStreamTime(SEGMENT_INTERVAL * 7);
final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context);
final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, context);
assertFalse(segment1.isOpen());
assertFalse(segment2.isOpen());
assertTrue(segment3.isOpen());
@ -128,22 +128,22 @@ public class SegmentsTest { @@ -128,22 +128,22 @@ public class SegmentsTest {
@Test
public void shouldGetSegmentForTimestamp() {
final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context);
segments.getOrCreateSegmentIfLive(1, context);
assertEquals(segment, segments.getSegmentForTimestamp(0L));
}
@Test
public void shouldGetCorrectSegmentString() {
final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
assertEquals("Segment(id=0, name=test.0)", segment.toString());
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context);
assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
}
@Test
public void shouldCloseAllOpenSegments() {
final Segment first = segments.getOrCreateSegmentIfLive(0, context);
final Segment second = segments.getOrCreateSegmentIfLive(1, context);
final Segment third = segments.getOrCreateSegmentIfLive(2, context);
final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, context);
final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, context);
final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, context);
segments.close();
assertFalse(first.isOpen());
@ -153,7 +153,7 @@ public class SegmentsTest { @@ -153,7 +153,7 @@ public class SegmentsTest {
@Test
public void shouldOpenExistingSegments() {
segments = new Segments("test", 4, 1);
segments = new KeyValueSegments("test", 4, 1);
segments.getOrCreateSegmentIfLive(0, context);
segments.getOrCreateSegmentIfLive(1, context);
segments.getOrCreateSegmentIfLive(2, context);
@ -162,7 +162,7 @@ public class SegmentsTest { @@ -162,7 +162,7 @@ public class SegmentsTest {
// close existing.
segments.close();
segments = new Segments("test", 4, 1);
segments = new KeyValueSegments("test", 4, 1);
segments.openExisting(context);
assertTrue(segments.getSegmentForTimestamp(0).isOpen());
@ -185,7 +185,7 @@ public class SegmentsTest { @@ -185,7 +185,7 @@ public class SegmentsTest {
segments.getOrCreateSegmentIfLive(3, context);
segments.getOrCreateSegmentIfLive(4, context);
final List<Segment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
final List<KeyValueSegment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
assertEquals(3, segments.size());
assertEquals(0, segments.get(0).id);
assertEquals(1, segments.get(1).id);
@ -200,7 +200,7 @@ public class SegmentsTest { @@ -200,7 +200,7 @@ public class SegmentsTest {
updateStreamTimeAndCreateSegment(1);
updateStreamTimeAndCreateSegment(3);
final List<Segment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
final List<KeyValueSegment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
assertEquals(3, segments.size());
assertEquals(0, segments.get(0).id);
assertEquals(1, segments.get(1).id);
@ -252,7 +252,7 @@ public class SegmentsTest { @@ -252,7 +252,7 @@ public class SegmentsTest {
public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception {
final long segmentInterval = 60_000L; // the old segment file's naming system maxes out at 1 minute granularity.
segments = new Segments(storeName, NUM_SEGMENTS * segmentInterval, segmentInterval);
segments = new KeyValueSegments(storeName, NUM_SEGMENTS * segmentInterval, segmentInterval);
final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
@ -306,7 +306,7 @@ public class SegmentsTest { @@ -306,7 +306,7 @@ public class SegmentsTest {
}
private void verifyCorrectSegments(final long first, final int numSegments) {
final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
final List<KeyValueSegment> result = this.segments.segments(0, Long.MAX_VALUE);
assertEquals(numSegments, result.size());
for (int i = 0; i < numSegments; i++) {
assertEquals(i + first, result.get(i).id);

18
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java

@ -178,7 +178,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -178,7 +178,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldRollSegments() {
// just to validate directories
final Segments segments = new Segments(storeName, retention, segmentInterval);
final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
@ -206,7 +206,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -206,7 +206,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldGetAllSegments() {
// just to validate directories
final Segments segments = new Segments(storeName, retention, segmentInterval);
final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@ -235,7 +235,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -235,7 +235,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldFetchAllSegments() {
// just to validate directories
final Segments segments = new Segments(storeName, retention, segmentInterval);
final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@ -263,7 +263,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -263,7 +263,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldLoadSegmentsWithOldStyleDateFormattedName() {
final Segments segments = new Segments(storeName, retention, segmentInterval);
final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@ -304,7 +304,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -304,7 +304,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldLoadSegmentsWithOldStyleColonFormattedName() {
final Segments segments = new Segments(storeName, retention, segmentInterval);
final KeyValueSegments segments = new KeyValueSegments(storeName, retention, segmentInterval);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@ -355,7 +355,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -355,7 +355,7 @@ public class RocksDBSegmentedBytesStoreTest {
final Collection<KeyValue<byte[], byte[]>> records = new ArrayList<>();
records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[0])).get(), serializeValue(50L)));
records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(), serializeValue(100L)));
final Map<Segment, WriteBatch> writeBatchMap = bytesStore.getWriteBatches(records);
final Map<KeyValueSegment, WriteBatch> writeBatchMap = bytesStore.getWriteBatches(records);
assertEquals(2, writeBatchMap.size());
for (final WriteBatch batch : writeBatchMap.values()) {
assertEquals(1, batch.count());
@ -376,7 +376,7 @@ public class RocksDBSegmentedBytesStoreTest { @@ -376,7 +376,7 @@ public class RocksDBSegmentedBytesStoreTest {
assertEquals(2, bytesStore.getSegments().size());
// Bulk loading is enabled during recovery.
for (final Segment segment : bytesStore.getSegments()) {
for (final KeyValueSegment segment : bytesStore.getSegments()) {
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
}
@ -400,12 +400,12 @@ public class RocksDBSegmentedBytesStoreTest { @@ -400,12 +400,12 @@ public class RocksDBSegmentedBytesStoreTest {
restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
for (final Segment segment : bytesStore.getSegments()) {
for (final KeyValueSegment segment : bytesStore.getSegments()) {
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
}
restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
for (final Segment segment : bytesStore.getSegments()) {
for (final KeyValueSegment segment : bytesStore.getSegments()) {
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4));
}
}

2
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java

@ -74,7 +74,7 @@ public class RocksDBWindowStoreTest { @@ -74,7 +74,7 @@ public class RocksDBWindowStoreTest {
private final long segmentInterval = 60_000L;
private final long retentionPeriod = segmentInterval * (numSegments - 1);
private final String windowName = "window";
private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
private final KeyValueSegments segments = new KeyValueSegments(windowName, retentionPeriod, segmentInterval);
private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();

8
streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java

@ -30,16 +30,16 @@ import org.apache.kafka.streams.state.StateSerdes; @@ -30,16 +30,16 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class WindowKeySchemaTest {
final private String key = "key";

Loading…
Cancel
Save