diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 8236c1ad010..b672f3dd2fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -84,7 +84,7 @@ public class Stores { */ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) { Objects.requireNonNull(name, "name cannot be null"); - return new RocksDbKeyValueBytesStoreSupplier(name); + return new RocksDbKeyValueBytesStoreSupplier(name, true); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java new file mode 100644 index 00000000000..c07e43bcc67 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -0,0 +1,1362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.rocksdb.AbstractComparator; +import org.rocksdb.AbstractSlice; +import org.rocksdb.AccessHint; +import org.rocksdb.BuiltinComparator; +import org.rocksdb.Cache; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionOptionsFIFO; +import org.rocksdb.CompactionOptionsUniversal; +import org.rocksdb.CompactionPriority; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionOptions; +import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; +import org.rocksdb.DbPath; +import org.rocksdb.Env; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.Logger; +import org.rocksdb.MemTableConfig; +import org.rocksdb.MergeOperator; +import org.rocksdb.Options; +import org.rocksdb.RateLimiter; +import org.rocksdb.SstFileManager; +import org.rocksdb.Statistics; +import org.rocksdb.TableFormatConfig; +import org.rocksdb.WALRecoveryMode; + +import java.util.Collection; +import java.util.List; + +/** + * The generic {@link Options} class allows users to set all configs on one object if only default column family + * is used. Because we use multiple column families, we need to use {@link DBOptions} and {@link ColumnFamilyOptions} + * that cover a part of all options each. + * + * This class do the translation between generic {@link Options} into {@link DBOptions} and {@link ColumnFamilyOptions}. + */ +class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options { + private final DBOptions dbOptions; + private final ColumnFamilyOptions columnFamilyOptions; + + RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(final DBOptions dbOptions, + final ColumnFamilyOptions columnFamilyOptions) { + this.dbOptions = dbOptions; + this.columnFamilyOptions = columnFamilyOptions; + } + + @Override + public Options setIncreaseParallelism(final int totalThreads) { + dbOptions.setIncreaseParallelism(totalThreads); + return this; + } + + @Override + public Options setCreateIfMissing(final boolean flag) { + dbOptions.setCreateIfMissing(flag); + return this; + } + + @Override + public Options setCreateMissingColumnFamilies(final boolean flag) { + dbOptions.setCreateMissingColumnFamilies(flag); + return this; + } + + @Override + public Options setEnv(final Env env) { + dbOptions.setEnv(env); + return this; + } + + @Override + public Env getEnv() { + return dbOptions.getEnv(); + } + + @Override + public Options prepareForBulkLoad() { + /* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ + * + * Q: What's the fastest way to load data into RocksDB? + * + * A: A fast way to direct insert data to the DB: + * + * 1. using single writer thread and insert in sorted order + * 2. batch hundreds of keys into one write batch + * 3. use vector memtable + * 4. make sure options.max_background_flushes is at least 4 + * 5. before inserting the data, + * disable automatic compaction, + * set options.level0_file_num_compaction_trigger, + * options.level0_slowdown_writes_trigger + * and options.level0_stop_writes_trigger to very large. + * After inserting all the data, issue a manual compaction. + * + * 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option + */ + // (1) not in our control + // (2) is done via bulk-loading API + // (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc + //columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig()); + // (4-5) below: + dbOptions.setMaxBackgroundFlushes(4); + columnFamilyOptions.setDisableAutoCompactions(true); + columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30); + columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30); + columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30); + return this; + } + + @Override + public boolean createIfMissing() { + return dbOptions.createIfMissing(); + } + + @Override + public boolean createMissingColumnFamilies() { + return dbOptions.createMissingColumnFamilies(); + } + + @Override + public Options optimizeForSmallDb() { + dbOptions.optimizeForSmallDb(); + columnFamilyOptions.optimizeForSmallDb(); + return this; + } + + @Override + public Options optimizeForPointLookup(final long blockCacheSizeMb) { + columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb); + return this; + } + + @Override + public Options optimizeLevelStyleCompaction() { + columnFamilyOptions.optimizeLevelStyleCompaction(); + return this; + } + + @Override + public Options optimizeLevelStyleCompaction(final long memtableMemoryBudget) { + columnFamilyOptions.optimizeLevelStyleCompaction(memtableMemoryBudget); + return this; + } + + @Override + public Options optimizeUniversalStyleCompaction() { + columnFamilyOptions.optimizeUniversalStyleCompaction(); + return this; + } + + @Override + public Options optimizeUniversalStyleCompaction(final long memtableMemoryBudget) { + columnFamilyOptions.optimizeUniversalStyleCompaction(memtableMemoryBudget); + return this; + } + + @Override + public Options setComparator(final BuiltinComparator builtinComparator) { + columnFamilyOptions.setComparator(builtinComparator); + return this; + } + + @Override + public Options setComparator(final AbstractComparator> comparator) { + columnFamilyOptions.setComparator(comparator); + return this; + } + + @Override + public Options setMergeOperatorName(final String name) { + columnFamilyOptions.setMergeOperatorName(name); + return this; + } + + @Override + public Options setMergeOperator(final MergeOperator mergeOperator) { + columnFamilyOptions.setMergeOperator(mergeOperator); + return this; + } + + @Override + public Options setWriteBufferSize(final long writeBufferSize) { + columnFamilyOptions.setWriteBufferSize(writeBufferSize); + return this; + } + + @Override + public long writeBufferSize() { + return columnFamilyOptions.writeBufferSize(); + } + + @Override + public Options setMaxWriteBufferNumber(final int maxWriteBufferNumber) { + columnFamilyOptions.setMaxWriteBufferNumber(maxWriteBufferNumber); + return this; + } + + @Override + public int maxWriteBufferNumber() { + return columnFamilyOptions.maxWriteBufferNumber(); + } + + @Override + public boolean errorIfExists() { + return dbOptions.errorIfExists(); + } + + @Override + public Options setErrorIfExists(final boolean errorIfExists) { + dbOptions.setErrorIfExists(errorIfExists); + return this; + } + + @Override + public boolean paranoidChecks() { + final boolean columnFamilyParanoidFileChecks = columnFamilyOptions.paranoidFileChecks(); + final boolean dbOptionsParanoidChecks = dbOptions.paranoidChecks(); + + if (columnFamilyParanoidFileChecks != dbOptionsParanoidChecks) { + throw new IllegalStateException("Config for paranoid checks for RockDB and ColumnFamilies should be the same."); + } + + return dbOptionsParanoidChecks; + } + + @Override + public Options setParanoidChecks(final boolean paranoidChecks) { + columnFamilyOptions.paranoidFileChecks(); + dbOptions.setParanoidChecks(paranoidChecks); + return this; + } + + @Override + public int maxOpenFiles() { + return dbOptions.maxOpenFiles(); + } + + @Override + public Options setMaxFileOpeningThreads(final int maxFileOpeningThreads) { + dbOptions.setMaxFileOpeningThreads(maxFileOpeningThreads); + return this; + } + + @Override + public int maxFileOpeningThreads() { + return dbOptions.maxFileOpeningThreads(); + } + + @Override + public Options setMaxTotalWalSize(final long maxTotalWalSize) { + dbOptions.setMaxTotalWalSize(maxTotalWalSize); + return this; + } + + @Override + public long maxTotalWalSize() { + return dbOptions.maxTotalWalSize(); + } + + @Override + public Options setMaxOpenFiles(final int maxOpenFiles) { + dbOptions.setMaxOpenFiles(maxOpenFiles); + return this; + } + + @Override + public boolean useFsync() { + return dbOptions.useFsync(); + } + + @Override + public Options setUseFsync(final boolean useFsync) { + dbOptions.setUseFsync(useFsync); + return this; + } + + @Override + public Options setDbPaths(final Collection dbPaths) { + dbOptions.setDbPaths(dbPaths); + return this; + } + + @Override + public List dbPaths() { + return dbOptions.dbPaths(); + } + + @Override + public String dbLogDir() { + return dbOptions.dbLogDir(); + } + + @Override + public Options setDbLogDir(final String dbLogDir) { + dbOptions.setDbLogDir(dbLogDir); + return this; + } + + @Override + public String walDir() { + return dbOptions.walDir(); + } + + @Override + public Options setWalDir(final String walDir) { + dbOptions.setWalDir(walDir); + return this; + } + + @Override + public long deleteObsoleteFilesPeriodMicros() { + return dbOptions.deleteObsoleteFilesPeriodMicros(); + } + + @Override + public Options setDeleteObsoleteFilesPeriodMicros(final long micros) { + dbOptions.setDeleteObsoleteFilesPeriodMicros(micros); + return this; + } + + @Override + public int maxBackgroundCompactions() { + return dbOptions.maxBackgroundCompactions(); + } + + @Override + public Options setStatistics(final Statistics statistics) { + dbOptions.setStatistics(statistics); + return this; + } + + @Override + public Statistics statistics() { + return dbOptions.statistics(); + } + + @Override + public void setBaseBackgroundCompactions(final int baseBackgroundCompactions) { + dbOptions.setBaseBackgroundCompactions(baseBackgroundCompactions); + } + + @Override + public int baseBackgroundCompactions() { + return dbOptions.baseBackgroundCompactions(); + } + + @Override + public Options setMaxBackgroundCompactions(final int maxBackgroundCompactions) { + dbOptions.setMaxBackgroundCompactions(maxBackgroundCompactions); + return this; + } + + @Override + public void setMaxSubcompactions(final int maxSubcompactions) { + dbOptions.setMaxSubcompactions(maxSubcompactions); + } + + @Override + public int maxSubcompactions() { + return dbOptions.maxSubcompactions(); + } + + @Override + public int maxBackgroundFlushes() { + return dbOptions.maxBackgroundFlushes(); + } + + @Override + public Options setMaxBackgroundFlushes(final int maxBackgroundFlushes) { + dbOptions.setMaxBackgroundFlushes(maxBackgroundFlushes); + return this; + } + + @Override + public int maxBackgroundJobs() { + return dbOptions.maxBackgroundJobs(); + } + + @Override + public Options setMaxBackgroundJobs(final int maxBackgroundJobs) { + dbOptions.setMaxBackgroundJobs(maxBackgroundJobs); + return this; + } + + @Override + public long maxLogFileSize() { + return dbOptions.maxLogFileSize(); + } + + @Override + public Options setMaxLogFileSize(final long maxLogFileSize) { + dbOptions.setMaxLogFileSize(maxLogFileSize); + return this; + } + + @Override + public long logFileTimeToRoll() { + return dbOptions.logFileTimeToRoll(); + } + + @Override + public Options setLogFileTimeToRoll(final long logFileTimeToRoll) { + dbOptions.setLogFileTimeToRoll(logFileTimeToRoll); + return this; + } + + @Override + public long keepLogFileNum() { + return dbOptions.keepLogFileNum(); + } + + @Override + public Options setKeepLogFileNum(final long keepLogFileNum) { + dbOptions.setKeepLogFileNum(keepLogFileNum); + return this; + } + + @Override + public Options setRecycleLogFileNum(final long recycleLogFileNum) { + dbOptions.setRecycleLogFileNum(recycleLogFileNum); + return this; + } + + @Override + public long recycleLogFileNum() { + return dbOptions.recycleLogFileNum(); + } + + @Override + public long maxManifestFileSize() { + return dbOptions.maxManifestFileSize(); + } + + @Override + public Options setMaxManifestFileSize(final long maxManifestFileSize) { + dbOptions.setMaxManifestFileSize(maxManifestFileSize); + return this; + } + + @Override + public Options setMaxTableFilesSizeFIFO(final long maxTableFilesSize) { + columnFamilyOptions.setMaxTableFilesSizeFIFO(maxTableFilesSize); + return this; + } + + @Override + public long maxTableFilesSizeFIFO() { + return columnFamilyOptions.maxTableFilesSizeFIFO(); + } + + @Override + public int tableCacheNumshardbits() { + return dbOptions.tableCacheNumshardbits(); + } + + @Override + public Options setTableCacheNumshardbits(final int tableCacheNumshardbits) { + dbOptions.setTableCacheNumshardbits(tableCacheNumshardbits); + return this; + } + + @Override + public long walTtlSeconds() { + return dbOptions.walTtlSeconds(); + } + + @Override + public Options setWalTtlSeconds(final long walTtlSeconds) { + dbOptions.setWalTtlSeconds(walTtlSeconds); + return this; + } + + @Override + public long walSizeLimitMB() { + return dbOptions.walSizeLimitMB(); + } + + @Override + public Options setWalSizeLimitMB(final long sizeLimitMB) { + dbOptions.setWalSizeLimitMB(sizeLimitMB); + return this; + } + + @Override + public long manifestPreallocationSize() { + return dbOptions.manifestPreallocationSize(); + } + + @Override + public Options setManifestPreallocationSize(final long size) { + dbOptions.setManifestPreallocationSize(size); + return this; + } + + @Override + public Options setUseDirectReads(final boolean useDirectReads) { + dbOptions.setUseDirectReads(useDirectReads); + return this; + } + + @Override + public boolean useDirectReads() { + return dbOptions.useDirectReads(); + } + + @Override + public Options setUseDirectIoForFlushAndCompaction(final boolean useDirectIoForFlushAndCompaction) { + dbOptions.setUseDirectIoForFlushAndCompaction(useDirectIoForFlushAndCompaction); + return this; + } + + @Override + public boolean useDirectIoForFlushAndCompaction() { + return dbOptions.useDirectIoForFlushAndCompaction(); + } + + @Override + public Options setAllowFAllocate(final boolean allowFAllocate) { + dbOptions.setAllowFAllocate(allowFAllocate); + return this; + } + + @Override + public boolean allowFAllocate() { + return dbOptions.allowFAllocate(); + } + + @Override + public boolean allowMmapReads() { + return dbOptions.allowMmapReads(); + } + + @Override + public Options setAllowMmapReads(final boolean allowMmapReads) { + dbOptions.setAllowMmapReads(allowMmapReads); + return this; + } + + @Override + public boolean allowMmapWrites() { + return dbOptions.allowMmapWrites(); + } + + @Override + public Options setAllowMmapWrites(final boolean allowMmapWrites) { + dbOptions.setAllowMmapWrites(allowMmapWrites); + return this; + } + + @Override + public boolean isFdCloseOnExec() { + return dbOptions.isFdCloseOnExec(); + } + + @Override + public Options setIsFdCloseOnExec(final boolean isFdCloseOnExec) { + dbOptions.setIsFdCloseOnExec(isFdCloseOnExec); + return this; + } + + @Override + public int statsDumpPeriodSec() { + return dbOptions.statsDumpPeriodSec(); + } + + @Override + public Options setStatsDumpPeriodSec(final int statsDumpPeriodSec) { + dbOptions.setStatsDumpPeriodSec(statsDumpPeriodSec); + return this; + } + + @Override + public boolean adviseRandomOnOpen() { + return dbOptions.adviseRandomOnOpen(); + } + + @Override + public Options setAdviseRandomOnOpen(final boolean adviseRandomOnOpen) { + dbOptions.setAdviseRandomOnOpen(adviseRandomOnOpen); + return this; + } + + @Override + public Options setDbWriteBufferSize(final long dbWriteBufferSize) { + dbOptions.setDbWriteBufferSize(dbWriteBufferSize); + return this; + } + + @Override + public long dbWriteBufferSize() { + return dbOptions.dbWriteBufferSize(); + } + + @Override + public Options setAccessHintOnCompactionStart(final AccessHint accessHint) { + dbOptions.setAccessHintOnCompactionStart(accessHint); + return this; + } + + @Override + public AccessHint accessHintOnCompactionStart() { + return dbOptions.accessHintOnCompactionStart(); + } + + @Override + public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) { + dbOptions.setNewTableReaderForCompactionInputs(newTableReaderForCompactionInputs); + return this; + } + + @Override + public boolean newTableReaderForCompactionInputs() { + return dbOptions.newTableReaderForCompactionInputs(); + } + + @Override + public Options setCompactionReadaheadSize(final long compactionReadaheadSize) { + dbOptions.setCompactionReadaheadSize(compactionReadaheadSize); + return this; + } + + @Override + public long compactionReadaheadSize() { + return dbOptions.compactionReadaheadSize(); + } + + @Override + public Options setRandomAccessMaxBufferSize(final long randomAccessMaxBufferSize) { + dbOptions.setRandomAccessMaxBufferSize(randomAccessMaxBufferSize); + return this; + } + + @Override + public long randomAccessMaxBufferSize() { + return dbOptions.randomAccessMaxBufferSize(); + } + + @Override + public Options setWritableFileMaxBufferSize(final long writableFileMaxBufferSize) { + dbOptions.setWritableFileMaxBufferSize(writableFileMaxBufferSize); + return this; + } + + @Override + public long writableFileMaxBufferSize() { + return dbOptions.writableFileMaxBufferSize(); + } + + @Override + public boolean useAdaptiveMutex() { + return dbOptions.useAdaptiveMutex(); + } + + @Override + public Options setUseAdaptiveMutex(final boolean useAdaptiveMutex) { + dbOptions.setUseAdaptiveMutex(useAdaptiveMutex); + return this; + } + + @Override + public long bytesPerSync() { + return dbOptions.bytesPerSync(); + } + + @Override + public Options setBytesPerSync(final long bytesPerSync) { + dbOptions.setBytesPerSync(bytesPerSync); + return this; + } + + @Override + public Options setWalBytesPerSync(final long walBytesPerSync) { + dbOptions.setWalBytesPerSync(walBytesPerSync); + return this; + } + + @Override + public long walBytesPerSync() { + return dbOptions.walBytesPerSync(); + } + + @Override + public Options setEnableThreadTracking(final boolean enableThreadTracking) { + dbOptions.setEnableThreadTracking(enableThreadTracking); + return this; + } + + @Override + public boolean enableThreadTracking() { + return dbOptions.enableThreadTracking(); + } + + @Override + public Options setDelayedWriteRate(final long delayedWriteRate) { + dbOptions.setDelayedWriteRate(delayedWriteRate); + return this; + } + + @Override + public long delayedWriteRate() { + return dbOptions.delayedWriteRate(); + } + + @Override + public Options setAllowConcurrentMemtableWrite(final boolean allowConcurrentMemtableWrite) { + dbOptions.setAllowConcurrentMemtableWrite(allowConcurrentMemtableWrite); + return this; + } + + @Override + public boolean allowConcurrentMemtableWrite() { + return dbOptions.allowConcurrentMemtableWrite(); + } + + @Override + public Options setEnableWriteThreadAdaptiveYield(final boolean enableWriteThreadAdaptiveYield) { + dbOptions.setEnableWriteThreadAdaptiveYield(enableWriteThreadAdaptiveYield); + return this; + } + + @Override + public boolean enableWriteThreadAdaptiveYield() { + return dbOptions.enableWriteThreadAdaptiveYield(); + } + + @Override + public Options setWriteThreadMaxYieldUsec(final long writeThreadMaxYieldUsec) { + dbOptions.setWriteThreadMaxYieldUsec(writeThreadMaxYieldUsec); + return this; + } + + @Override + public long writeThreadMaxYieldUsec() { + return dbOptions.writeThreadMaxYieldUsec(); + } + + @Override + public Options setWriteThreadSlowYieldUsec(final long writeThreadSlowYieldUsec) { + dbOptions.setWriteThreadSlowYieldUsec(writeThreadSlowYieldUsec); + return this; + } + + @Override + public long writeThreadSlowYieldUsec() { + return dbOptions.writeThreadSlowYieldUsec(); + } + + @Override + public Options setSkipStatsUpdateOnDbOpen(final boolean skipStatsUpdateOnDbOpen) { + dbOptions.setSkipStatsUpdateOnDbOpen(skipStatsUpdateOnDbOpen); + return this; + } + + @Override + public boolean skipStatsUpdateOnDbOpen() { + return dbOptions.skipStatsUpdateOnDbOpen(); + } + + @Override + public Options setWalRecoveryMode(final WALRecoveryMode walRecoveryMode) { + dbOptions.setWalRecoveryMode(walRecoveryMode); + return this; + } + + @Override + public WALRecoveryMode walRecoveryMode() { + return dbOptions.walRecoveryMode(); + } + + @Override + public Options setAllow2pc(final boolean allow2pc) { + dbOptions.setAllow2pc(allow2pc); + return this; + } + + @Override + public boolean allow2pc() { + return dbOptions.allow2pc(); + } + + @Override + public Options setRowCache(final Cache rowCache) { + dbOptions.setRowCache(rowCache); + return this; + } + + @Override + public Cache rowCache() { + return dbOptions.rowCache(); + } + + @Override + public Options setFailIfOptionsFileError(final boolean failIfOptionsFileError) { + dbOptions.setFailIfOptionsFileError(failIfOptionsFileError); + return this; + } + + @Override + public boolean failIfOptionsFileError() { + return dbOptions.failIfOptionsFileError(); + } + + @Override + public Options setDumpMallocStats(final boolean dumpMallocStats) { + dbOptions.setDumpMallocStats(dumpMallocStats); + return this; + } + + @Override + public boolean dumpMallocStats() { + return dbOptions.dumpMallocStats(); + } + + @Override + public Options setAvoidFlushDuringRecovery(final boolean avoidFlushDuringRecovery) { + dbOptions.setAvoidFlushDuringRecovery(avoidFlushDuringRecovery); + return this; + } + + @Override + public boolean avoidFlushDuringRecovery() { + return dbOptions.avoidFlushDuringRecovery(); + } + + @Override + public Options setAvoidFlushDuringShutdown(final boolean avoidFlushDuringShutdown) { + dbOptions.setAvoidFlushDuringShutdown(avoidFlushDuringShutdown); + return this; + } + + @Override + public boolean avoidFlushDuringShutdown() { + return dbOptions.avoidFlushDuringShutdown(); + } + + @Override + public MemTableConfig memTableConfig() { + return columnFamilyOptions.memTableConfig(); + } + + @Override + public Options setMemTableConfig(final MemTableConfig config) { + columnFamilyOptions.setMemTableConfig(config); + return this; + } + + @Override + public Options setRateLimiter(final RateLimiter rateLimiter) { + dbOptions.setRateLimiter(rateLimiter); + return this; + } + + @Override + public Options setSstFileManager(final SstFileManager sstFileManager) { + dbOptions.setSstFileManager(sstFileManager); + return this; + } + + @Override + public Options setLogger(final Logger logger) { + dbOptions.setLogger(logger); + return this; + } + + @Override + public Options setInfoLogLevel(final InfoLogLevel infoLogLevel) { + dbOptions.setInfoLogLevel(infoLogLevel); + return this; + } + + @Override + public InfoLogLevel infoLogLevel() { + return dbOptions.infoLogLevel(); + } + + @Override + public String memTableFactoryName() { + return columnFamilyOptions.memTableFactoryName(); + } + + @Override + public TableFormatConfig tableFormatConfig() { + return columnFamilyOptions.tableFormatConfig(); + } + + @Override + public Options setTableFormatConfig(final TableFormatConfig config) { + columnFamilyOptions.setTableFormatConfig(config); + return this; + } + + @Override + public String tableFactoryName() { + return columnFamilyOptions.tableFactoryName(); + } + + @Override + public Options useFixedLengthPrefixExtractor(final int n) { + columnFamilyOptions.useFixedLengthPrefixExtractor(n); + return this; + } + + @Override + public Options useCappedPrefixExtractor(final int n) { + columnFamilyOptions.useCappedPrefixExtractor(n); + return this; + } + + @Override + public CompressionType compressionType() { + return columnFamilyOptions.compressionType(); + } + + @Override + public Options setCompressionPerLevel(final List compressionLevels) { + columnFamilyOptions.setCompressionPerLevel(compressionLevels); + return this; + } + + @Override + public List compressionPerLevel() { + return columnFamilyOptions.compressionPerLevel(); + } + + @Override + public Options setCompressionType(final CompressionType compressionType) { + columnFamilyOptions.setCompressionType(compressionType); + return this; + } + + + @Override + public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) { + columnFamilyOptions.setBottommostCompressionType(bottommostCompressionType); + return this; + } + + @Override + public CompressionType bottommostCompressionType() { + return columnFamilyOptions.bottommostCompressionType(); + } + + @Override + public Options setCompressionOptions(final CompressionOptions compressionOptions) { + columnFamilyOptions.setCompressionOptions(compressionOptions); + return this; + } + + @Override + public CompressionOptions compressionOptions() { + return columnFamilyOptions.compressionOptions(); + } + + @Override + public CompactionStyle compactionStyle() { + return columnFamilyOptions.compactionStyle(); + } + + @Override + public Options setCompactionStyle(final CompactionStyle compactionStyle) { + columnFamilyOptions.setCompactionStyle(compactionStyle); + return this; + } + + @Override + public int numLevels() { + return columnFamilyOptions.numLevels(); + } + + @Override + public Options setNumLevels(final int numLevels) { + columnFamilyOptions.setNumLevels(numLevels); + return this; + } + + @Override + public int levelZeroFileNumCompactionTrigger() { + return columnFamilyOptions.levelZeroFileNumCompactionTrigger(); + } + + @Override + public Options setLevelZeroFileNumCompactionTrigger(final int numFiles) { + columnFamilyOptions.setLevelZeroFileNumCompactionTrigger(numFiles); + return this; + } + + @Override + public int levelZeroSlowdownWritesTrigger() { + return columnFamilyOptions.levelZeroSlowdownWritesTrigger(); + } + + @Override + public Options setLevelZeroSlowdownWritesTrigger(final int numFiles) { + columnFamilyOptions.setLevelZeroSlowdownWritesTrigger(numFiles); + return this; + } + + @Override + public int levelZeroStopWritesTrigger() { + return columnFamilyOptions.levelZeroStopWritesTrigger(); + } + + @Override + public Options setLevelZeroStopWritesTrigger(final int numFiles) { + columnFamilyOptions.setLevelZeroStopWritesTrigger(numFiles); + return this; + } + + @Override + public long targetFileSizeBase() { + return columnFamilyOptions.targetFileSizeBase(); + } + + @Override + public Options setTargetFileSizeBase(final long targetFileSizeBase) { + columnFamilyOptions.setTargetFileSizeBase(targetFileSizeBase); + return this; + } + + @Override + public int targetFileSizeMultiplier() { + return columnFamilyOptions.targetFileSizeMultiplier(); + } + + @Override + public Options setTargetFileSizeMultiplier(final int multiplier) { + columnFamilyOptions.setTargetFileSizeMultiplier(multiplier); + return this; + } + + @Override + public Options setMaxBytesForLevelBase(final long maxBytesForLevelBase) { + columnFamilyOptions.setMaxBytesForLevelBase(maxBytesForLevelBase); + return this; + } + + @Override + public long maxBytesForLevelBase() { + return columnFamilyOptions.maxBytesForLevelBase(); + } + + @Override + public Options setLevelCompactionDynamicLevelBytes(final boolean enableLevelCompactionDynamicLevelBytes) { + columnFamilyOptions.setLevelCompactionDynamicLevelBytes(enableLevelCompactionDynamicLevelBytes); + return this; + } + + @Override + public boolean levelCompactionDynamicLevelBytes() { + return columnFamilyOptions.levelCompactionDynamicLevelBytes(); + } + + @Override + public double maxBytesForLevelMultiplier() { + return columnFamilyOptions.maxBytesForLevelMultiplier(); + } + + @Override + public Options setMaxBytesForLevelMultiplier(final double multiplier) { + columnFamilyOptions.setMaxBytesForLevelMultiplier(multiplier); + return this; + } + + @Override + public long maxCompactionBytes() { + return columnFamilyOptions.maxCompactionBytes(); + } + + @Override + public Options setMaxCompactionBytes(final long maxCompactionBytes) { + columnFamilyOptions.setMaxCompactionBytes(maxCompactionBytes); + return this; + } + + @Override + public long arenaBlockSize() { + return columnFamilyOptions.arenaBlockSize(); + } + + @Override + public Options setArenaBlockSize(final long arenaBlockSize) { + columnFamilyOptions.setArenaBlockSize(arenaBlockSize); + return this; + } + + @Override + public boolean disableAutoCompactions() { + return columnFamilyOptions.disableAutoCompactions(); + } + + @Override + public Options setDisableAutoCompactions(final boolean disableAutoCompactions) { + columnFamilyOptions.setDisableAutoCompactions(disableAutoCompactions); + return this; + } + + @Override + public long maxSequentialSkipInIterations() { + return columnFamilyOptions.maxSequentialSkipInIterations(); + } + + @Override + public Options setMaxSequentialSkipInIterations(final long maxSequentialSkipInIterations) { + columnFamilyOptions.setMaxSequentialSkipInIterations(maxSequentialSkipInIterations); + return this; + } + + @Override + public boolean inplaceUpdateSupport() { + return columnFamilyOptions.inplaceUpdateSupport(); + } + + @Override + public Options setInplaceUpdateSupport(final boolean inplaceUpdateSupport) { + columnFamilyOptions.setInplaceUpdateSupport(inplaceUpdateSupport); + return this; + } + + @Override + public long inplaceUpdateNumLocks() { + return columnFamilyOptions.inplaceUpdateNumLocks(); + } + + @Override + public Options setInplaceUpdateNumLocks(final long inplaceUpdateNumLocks) { + columnFamilyOptions.setInplaceUpdateNumLocks(inplaceUpdateNumLocks); + return this; + } + + @Override + public double memtablePrefixBloomSizeRatio() { + return columnFamilyOptions.memtablePrefixBloomSizeRatio(); + } + + @Override + public Options setMemtablePrefixBloomSizeRatio(final double memtablePrefixBloomSizeRatio) { + columnFamilyOptions.setMemtablePrefixBloomSizeRatio(memtablePrefixBloomSizeRatio); + return this; + } + + @Override + public int bloomLocality() { + return columnFamilyOptions.bloomLocality(); + } + + @Override + public Options setBloomLocality(final int bloomLocality) { + columnFamilyOptions.setBloomLocality(bloomLocality); + return this; + } + + @Override + public long maxSuccessiveMerges() { + return columnFamilyOptions.maxSuccessiveMerges(); + } + + @Override + public Options setMaxSuccessiveMerges(final long maxSuccessiveMerges) { + columnFamilyOptions.setMaxSuccessiveMerges(maxSuccessiveMerges); + return this; + } + + @Override + public int minWriteBufferNumberToMerge() { + return columnFamilyOptions.minWriteBufferNumberToMerge(); + } + + @Override + public Options setMinWriteBufferNumberToMerge(final int minWriteBufferNumberToMerge) { + columnFamilyOptions.setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge); + return this; + } + + @Override + public Options setOptimizeFiltersForHits(final boolean optimizeFiltersForHits) { + columnFamilyOptions.setOptimizeFiltersForHits(optimizeFiltersForHits); + return this; + } + + @Override + public boolean optimizeFiltersForHits() { + return columnFamilyOptions.optimizeFiltersForHits(); + } + + @Override + public Options setMemtableHugePageSize(final long memtableHugePageSize) { + columnFamilyOptions.setMemtableHugePageSize(memtableHugePageSize); + return this; + } + + @Override + public long memtableHugePageSize() { + return columnFamilyOptions.memtableHugePageSize(); + } + + @Override + public Options setSoftPendingCompactionBytesLimit(final long softPendingCompactionBytesLimit) { + columnFamilyOptions.setSoftPendingCompactionBytesLimit(softPendingCompactionBytesLimit); + return this; + } + + @Override + public long softPendingCompactionBytesLimit() { + return columnFamilyOptions.softPendingCompactionBytesLimit(); + } + + @Override + public Options setHardPendingCompactionBytesLimit(final long hardPendingCompactionBytesLimit) { + columnFamilyOptions.setHardPendingCompactionBytesLimit(hardPendingCompactionBytesLimit); + return this; + } + + @Override + public long hardPendingCompactionBytesLimit() { + return columnFamilyOptions.hardPendingCompactionBytesLimit(); + } + + @Override + public Options setLevel0FileNumCompactionTrigger(final int level0FileNumCompactionTrigger) { + columnFamilyOptions.setLevel0FileNumCompactionTrigger(level0FileNumCompactionTrigger); + return this; + } + + @Override + public int level0FileNumCompactionTrigger() { + return columnFamilyOptions.level0FileNumCompactionTrigger(); + } + + @Override + public Options setLevel0SlowdownWritesTrigger(final int level0SlowdownWritesTrigger) { + columnFamilyOptions.setLevel0SlowdownWritesTrigger(level0SlowdownWritesTrigger); + return this; + } + + @Override + public int level0SlowdownWritesTrigger() { + return columnFamilyOptions.level0SlowdownWritesTrigger(); + } + + @Override + public Options setLevel0StopWritesTrigger(final int level0StopWritesTrigger) { + columnFamilyOptions.setLevel0StopWritesTrigger(level0StopWritesTrigger); + return this; + } + + @Override + public int level0StopWritesTrigger() { + return columnFamilyOptions.level0StopWritesTrigger(); + } + + @Override + public Options setMaxBytesForLevelMultiplierAdditional(final int[] maxBytesForLevelMultiplierAdditional) { + columnFamilyOptions.setMaxBytesForLevelMultiplierAdditional(maxBytesForLevelMultiplierAdditional); + return this; + } + + @Override + public int[] maxBytesForLevelMultiplierAdditional() { + return columnFamilyOptions.maxBytesForLevelMultiplierAdditional(); + } + + @Override + public Options setParanoidFileChecks(final boolean paranoidFileChecks) { + columnFamilyOptions.setParanoidFileChecks(paranoidFileChecks); + return this; + } + + @Override + public boolean paranoidFileChecks() { + return columnFamilyOptions.paranoidFileChecks(); + } + + @Override + public Options setMaxWriteBufferNumberToMaintain(final int maxWriteBufferNumberToMaintain) { + columnFamilyOptions.setMaxWriteBufferNumberToMaintain(maxWriteBufferNumberToMaintain); + return this; + } + + @Override + public int maxWriteBufferNumberToMaintain() { + return columnFamilyOptions.maxWriteBufferNumberToMaintain(); + } + + @Override + public Options setCompactionPriority(final CompactionPriority compactionPriority) { + columnFamilyOptions.setCompactionPriority(compactionPriority); + return this; + } + + @Override + public CompactionPriority compactionPriority() { + return columnFamilyOptions.compactionPriority(); + } + + @Override + public Options setReportBgIoStats(final boolean reportBgIoStats) { + columnFamilyOptions.setReportBgIoStats(reportBgIoStats); + return this; + } + + @Override + public boolean reportBgIoStats() { + return columnFamilyOptions.reportBgIoStats(); + } + + @Override + public Options setCompactionOptionsUniversal(final CompactionOptionsUniversal compactionOptionsUniversal) { + columnFamilyOptions.setCompactionOptionsUniversal(compactionOptionsUniversal); + return this; + } + + @Override + public CompactionOptionsUniversal compactionOptionsUniversal() { + return columnFamilyOptions.compactionOptionsUniversal(); + } + + @Override + public Options setCompactionOptionsFIFO(final CompactionOptionsFIFO compactionOptionsFIFO) { + columnFamilyOptions.setCompactionOptionsFIFO(compactionOptionsFIFO); + return this; + } + + @Override + public CompactionOptionsFIFO compactionOptionsFIFO() { + return columnFamilyOptions.compactionOptionsFIFO(); + } + + @Override + public Options setForceConsistencyChecks(final boolean forceConsistencyChecks) { + columnFamilyOptions.setForceConsistencyChecks(forceConsistencyChecks); + return this; + } + + @Override + public boolean forceConsistencyChecks() { + return columnFamilyOptions.forceConsistencyChecks(); + } + + @Override + public void close() { + columnFamilyOptions.close(); + dbOptions.close(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java new file mode 100644 index 00000000000..b1cf24dcdd1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksIterator; + +import java.util.Comparator; +import java.util.Set; + +class RocksDBRangeIterator extends RocksDbIterator { + // RocksDB's JNI interface does not expose getters/setters that allow the + // comparator to be pluggable, and the default is lexicographic, so it's + // safe to just force lexicographic comparator here for now. + private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; + private final byte[] rawToKey; + + RocksDBRangeIterator(final String storeName, + final RocksIterator iter, + final Set> openIterators, + final Bytes from, + final Bytes to) { + super(storeName, iter, openIterators); + iter.seek(from.get()); + rawToKey = to.get(); + if (rawToKey == null) { + throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); + } + } + + @Override + public KeyValue makeNext() { + final KeyValue next = super.makeNext(); + + if (next == null) { + return allDone(); + } else { + if (comparator.compare(next.key.get(), rawToKey) <= 0) { + return next; + } else { + return allDone(); + } + } + } +} + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 232d8f23840..0528fcaf856 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -33,8 +32,12 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; import org.rocksdb.FlushOptions; import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; @@ -49,26 +52,20 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; /** * A persistent key-value store based on RocksDB. - * - * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior. - * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, - * i.e. use {@code RocksDBStore} rather than {@code RocksDBStore}. */ public class RocksDBStore implements KeyValueStore { - private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class); private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst"); @@ -81,17 +78,18 @@ public class RocksDBStore implements KeyValueStore { private static final int MAX_WRITE_BUFFERS = 3; private static final String DB_FILE_DIR = "rocksdb"; - private final String name; + final String name; private final String parentDir; - private final Set openIterators = Collections.synchronizedSet(new HashSet<>()); + final Set> openIterators = Collections.synchronizedSet(new HashSet<>()); File dbDir; - private RocksDB db; + RocksDB db; + RocksDBAccessor dbAccessor; // the following option objects will be created in the constructor and closed in the close() method - private Options options; - private WriteOptions wOptions; - private FlushOptions fOptions; + private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions; + WriteOptions wOptions; + FlushOptions fOptions; private volatile boolean prepareForBulkload = false; ProcessorContext internalProcessorContext; @@ -112,25 +110,27 @@ public class RocksDBStore implements KeyValueStore { @SuppressWarnings("unchecked") - public void openDB(final ProcessorContext context) { + void openDB(final ProcessorContext context) { // initialize the default rocksdb options - options = new Options(); + final DBOptions dbOptions = new DBOptions(); + final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); + userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions); final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); tableConfig.setBlockSize(BLOCK_SIZE); tableConfig.setFilter(new BloomFilter()); - options.optimizeFiltersForHits(); - options.setTableFormatConfig(tableConfig); - options.setWriteBufferSize(WRITE_BUFFER_SIZE); - options.setCompressionType(COMPRESSION_TYPE); - options.setCompactionStyle(COMPACTION_STYLE); - options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); - options.setCreateIfMissing(true); - options.setErrorIfExists(false); - options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); + userSpecifiedOptions.optimizeFiltersForHits(); + userSpecifiedOptions.setTableFormatConfig(tableConfig); + userSpecifiedOptions.setWriteBufferSize(WRITE_BUFFER_SIZE); + userSpecifiedOptions.setCompressionType(COMPRESSION_TYPE); + userSpecifiedOptions.setCompactionStyle(COMPACTION_STYLE); + userSpecifiedOptions.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); + userSpecifiedOptions.setCreateIfMissing(true); + userSpecifiedOptions.setErrorIfExists(false); + userSpecifiedOptions.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); // this is the recommended way to increase parallelism in RocksDb // note that the current implementation of setIncreaseParallelism affects the number // of compaction threads but not flush threads (the latter remains one). Also @@ -138,7 +138,7 @@ public class RocksDBStore implements KeyValueStore { // https://github.com/facebook/rocksdb/blob/62ad0a9b19f0be4cefa70b6b32876e764b7f3c11/util/options.cc#L580 // subtracts one from the value passed to determine the number of compaction threads // (this could be a bug in the RocksDB code and their devs have been contacted). - options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); + userSpecifiedOptions.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); wOptions = new WriteOptions(); wOptions.setDisableWAL(true); @@ -148,34 +148,44 @@ public class RocksDBStore implements KeyValueStore { final Map configs = context.appConfigs(); final Class configSetterClass = - (Class) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); + (Class) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); if (configSetterClass != null) { final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass); - configSetter.setConfig(name, options, configs); + configSetter.setConfig(name, userSpecifiedOptions, configs); } if (prepareForBulkload) { - options.prepareForBulkLoad(); + userSpecifiedOptions.prepareForBulkLoad(); } dbDir = new File(new File(context.stateDir(), parentDir), name); try { - try { - Files.createDirectories(dbDir.getParentFile().toPath()); - Files.createDirectories(dbDir.getAbsoluteFile().toPath()); - db = RocksDB.open(options, dbDir.getAbsolutePath()); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); - } - } catch (final IOException e) { - throw new ProcessorStateException(e); + Files.createDirectories(dbDir.getParentFile().toPath()); + Files.createDirectories(dbDir.getAbsoluteFile().toPath()); + } catch (final IOException fatal) { + throw new ProcessorStateException(fatal); } + openRocksDB(dbOptions, columnFamilyOptions); open = true; } + void openRocksDB(final DBOptions dbOptions, + final ColumnFamilyOptions columnFamilyOptions) { + final List columnFamilyDescriptors + = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)); + final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + + try { + db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); + } + } + public void init(final ProcessorContext context, final StateStore root) { // open the DB dir @@ -208,54 +218,19 @@ public class RocksDBStore implements KeyValueStore { return open; } - @Override - public synchronized byte[] get(final Bytes key) { - validateStoreOpen(); - return getInternal(key.get()); - } - private void validateStoreOpen() { if (!open) { throw new InvalidStateStoreException("Store " + name + " is currently closed"); } } - private byte[] getInternal(final byte[] rawKey) { - try { - return db.get(rawKey); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while getting value for key %s from store " + name, e); - } - } - - void toggleDbForBulkLoading(final boolean prepareForBulkload) { - if (prepareForBulkload) { - // if the store is not empty, we need to compact to get around the num.levels check - // for bulk loading - final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches()); - - if (sstFileNames != null && sstFileNames.length > 0) { - try { - db.compactRange(true, 1, 0); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); - } - } - } - - close(); - this.prepareForBulkload = prepareForBulkload; - openDB(internalProcessorContext); - } - @SuppressWarnings("unchecked") @Override public synchronized void put(final Bytes key, final byte[] value) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); - putInternal(key.get(), value); + dbAccessor.put(key.get(), value); } @Override @@ -269,68 +244,39 @@ public class RocksDBStore implements KeyValueStore { return originalValue; } - private void restoreAllInternal(final Collection> records) { + @Override + public void putAll(final List> entries) { try (final WriteBatch batch = new WriteBatch()) { - for (final KeyValue record : records) { - if (record.value == null) { - batch.delete(record.key); - } else { - batch.put(record.key, record.value); - } - } + dbAccessor.prepareBatch(entries, batch); write(batch); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + name, e); - } - } - - private void putInternal(final byte[] rawKey, - final byte[] rawValue) { - if (rawValue == null) { - try { - db.delete(wOptions, rawKey); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while removing key %s from store " + name, e); - } - } else { - try { - db.put(wOptions, rawKey, rawValue); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while putting key %s value %s into store " + name, e); - } + throw new ProcessorStateException("Error while batch writing to store " + name, e); } } - void write(final WriteBatch batch) throws RocksDBException { - db.write(wOptions, batch); - } - @Override - public void putAll(final List> entries) { - try (final WriteBatch batch = new WriteBatch()) { - for (final KeyValue entry : entries) { - Objects.requireNonNull(entry.key, "key cannot be null"); - if (entry.value == null) { - batch.delete(entry.key.get()); - } else { - batch.put(entry.key.get(), entry.value); - } - } - write(batch); + public synchronized byte[] get(final Bytes key) { + validateStoreOpen(); + try { + return dbAccessor.get(key.get()); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while batch writing to store " + name, e); + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while getting value for key from store " + name, e); } - } @Override public synchronized byte[] delete(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); - final byte[] value = get(key); + final byte[] oldValue; + try { + oldValue = dbAccessor.getOnly(key.get()); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while getting value for key from store " + name, e); + } put(key, null); - return value; + return oldValue; } @Override @@ -340,8 +286,7 @@ public class RocksDBStore implements KeyValueStore { Objects.requireNonNull(to, "to cannot be null"); validateStoreOpen(); - // query rocksdb - final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), from, to); + final KeyValueIterator rocksDBRangeIterator = dbAccessor.range(from, to); openIterators.add(rocksDBRangeIterator); return rocksDBRangeIterator; @@ -350,10 +295,7 @@ public class RocksDBStore implements KeyValueStore { @Override public synchronized KeyValueIterator all() { validateStoreOpen(); - // query rocksdb - final RocksIterator innerIter = db.newIterator(); - innerIter.seekToFirst(); - final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter); + final KeyValueIterator rocksDbIterator = dbAccessor.all(); openIterators.add(rocksDbIterator); return rocksDbIterator; } @@ -372,16 +314,16 @@ public class RocksDBStore implements KeyValueStore { @Override public long approximateNumEntries() { validateStoreOpen(); - final long value; + final long numEntries; try { - value = db.getLongProperty("rocksdb.estimate-num-keys"); + numEntries = dbAccessor.approximateNumEntries(); } catch (final RocksDBException e) { throw new ProcessorStateException("Error fetching property from store " + name, e); } - if (isOverflowing(value)) { + if (isOverflowing(numEntries)) { return Long.MAX_VALUE; } - return value; + return numEntries; } private boolean isOverflowing(final long value) { @@ -395,20 +337,32 @@ public class RocksDBStore implements KeyValueStore { if (db == null) { return; } - // flush RocksDB - flushInternal(); - } - /** - * @throws ProcessorStateException if flushing failed because of any internal store exceptions - */ - private void flushInternal() { try { - db.flush(fOptions); + dbAccessor.flush(); } catch (final RocksDBException e) { throw new ProcessorStateException("Error while executing flush from store " + name, e); } } + void toggleDbForBulkLoading(final boolean prepareForBulkload) { + if (prepareForBulkload) { + // if the store is not empty, we need to compact to get around the num.levels check for bulk loading + final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches()); + + if (sstFileNames != null && sstFileNames.length > 0) { + dbAccessor.toggleDbForBulkLoading(); + } + } + + close(); + this.prepareForBulkload = prepareForBulkload; + openDB(internalProcessorContext); + } + + void write(final WriteBatch batch) throws RocksDBException { + db.write(wOptions, batch); + } + @Override public synchronized void close() { if (!open) { @@ -417,129 +371,170 @@ public class RocksDBStore implements KeyValueStore { open = false; closeOpenIterators(); - options.close(); + dbAccessor.close(); + userSpecifiedOptions.close(); wOptions.close(); fOptions.close(); db.close(); - options = null; + userSpecifiedOptions = null; wOptions = null; fOptions = null; db = null; } private void closeOpenIterators() { - final HashSet iterators; + final HashSet> iterators; synchronized (openIterators) { iterators = new HashSet<>(openIterators); } if (iterators.size() != 0) { log.warn("Closing {} open iterators for store {}", iterators.size(), name); - for (final KeyValueIterator iterator : iterators) { + for (final KeyValueIterator iterator : iterators) { iterator.close(); } } } - private class RocksDbIterator - extends AbstractIterator> - implements KeyValueIterator { - private final String storeName; - private final RocksIterator iter; - private volatile boolean open = true; + interface RocksDBAccessor { + + void put(final byte[] key, + final byte[] value); + + void prepareBatch(final List> entries, + final WriteBatch batch) throws RocksDBException; + + byte[] get(final byte[] key) throws RocksDBException; + + /** + * In contrast to get(), we don't migrate the key to new CF. + *

+ * Use for get() within delete() -- no need to migrate, as it's deleted anyway + */ + byte[] getOnly(final byte[] key) throws RocksDBException; + + KeyValueIterator range(final Bytes from, + final Bytes to); + + KeyValueIterator all(); + + long approximateNumEntries() throws RocksDBException; + + void flush() throws RocksDBException; + + void prepareBatchForRestore(final Collection> records, + final WriteBatch batch) throws RocksDBException; + + void close(); + + void toggleDbForBulkLoading(); + } - private KeyValue next; + class SingleColumnFamilyAccessor implements RocksDBAccessor { + private final ColumnFamilyHandle columnFamily; - RocksDbIterator(final String storeName, - final RocksIterator iter) { - this.iter = iter; - this.storeName = storeName; + SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) { + this.columnFamily = columnFamily; } @Override - public synchronized boolean hasNext() { - if (!open) { - throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName)); + public void put(final byte[] key, + final byte[] value) { + if (value == null) { + try { + db.delete(columnFamily, wOptions, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + } else { + try { + db.put(columnFamily, wOptions, key, value); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while putting key/value into store " + name, e); + } } - return super.hasNext(); } @Override - public synchronized KeyValue next() { - return super.next(); + public void prepareBatch(final List> entries, + final WriteBatch batch) throws RocksDBException { + for (final KeyValue entry : entries) { + Objects.requireNonNull(entry.key, "key cannot be null"); + if (entry.value == null) { + batch.delete(columnFamily, entry.key.get()); + } else { + batch.put(columnFamily, entry.key.get(), entry.value); + } + } } @Override - public KeyValue makeNext() { - if (!iter.isValid()) { - return allDone(); - } else { - next = getKeyValue(); - iter.next(); - return next; - } + public byte[] get(final byte[] key) throws RocksDBException { + return db.get(columnFamily, key); } - private KeyValue getKeyValue() { - return new KeyValue<>(new Bytes(iter.key()), iter.value()); + @Override + public byte[] getOnly(final byte[] key) throws RocksDBException { + return db.get(columnFamily, key); } @Override - public void remove() { - throw new UnsupportedOperationException("RocksDB iterator does not support remove()"); + public KeyValueIterator range(final Bytes from, + final Bytes to) { + return new RocksDBRangeIterator( + name, + db.newIterator(columnFamily), + openIterators, + from, + to); } @Override - public synchronized void close() { - openIterators.remove(this); - iter.close(); - open = false; + public KeyValueIterator all() { + final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily); + innerIterWithTimestamp.seekToFirst(); + return new RocksDbIterator(name, innerIterWithTimestamp, openIterators); } @Override - public Bytes peekNextKey() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return next.key; + public long approximateNumEntries() throws RocksDBException { + return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys"); } - } - private class RocksDBRangeIterator extends RocksDbIterator { - // RocksDB's JNI interface does not expose getters/setters that allow the - // comparator to be pluggable, and the default is lexicographic, so it's - // safe to just force lexicographic comparator here for now. - private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; - private final byte[] rawToKey; - - RocksDBRangeIterator(final String storeName, - final RocksIterator iter, - final Bytes from, - final Bytes to) { - super(storeName, iter); - iter.seek(from.get()); - rawToKey = to.get(); - if (rawToKey == null) { - throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); - } + @Override + public void flush() throws RocksDBException { + db.flush(fOptions, columnFamily); } @Override - public KeyValue makeNext() { - final KeyValue next = super.makeNext(); - - if (next == null) { - return allDone(); - } else { - if (comparator.compare(next.key.get(), rawToKey) <= 0) { - return next; + public void prepareBatchForRestore(final Collection> records, + final WriteBatch batch) throws RocksDBException { + for (final KeyValue record : records) { + if (record.value == null) { + batch.delete(columnFamily, record.key); } else { - return allDone(); + batch.put(columnFamily, record.key, record.value); } } } + + @Override + public void close() { + columnFamily.close(); + } + + @Override + public void toggleDbForBulkLoading() { + try { + db.compactRange(columnFamily, true, 1, 0); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); + } + } } // not private for testing @@ -553,7 +548,12 @@ public class RocksDBStore implements KeyValueStore { @Override public void restoreAll(final Collection> records) { - rocksDBStore.restoreAllInternal(records); + try (final WriteBatch batch = new WriteBatch()) { + rocksDBStore.dbAccessor.prepareBatchForRestore(records, batch); + rocksDBStore.write(batch); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + rocksDBStore.name, e); + } } @Override @@ -574,6 +574,6 @@ public class RocksDBStore implements KeyValueStore { // for testing public Options getOptions() { - return options; + return userSpecifiedOptions; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java new file mode 100644 index 00000000000..6d477bdfd52 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.state.internals.StoreProxyUtils.getValueWithUnknownTimestamp; + +/** + * A persistent key-(value-timestamp) store based on RocksDB. + */ +public class RocksDBTimestampedStore extends RocksDBStore { + private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class); + + RocksDBTimestampedStore(final String name) { + super(name); + } + + @Override + void openRocksDB(final DBOptions dbOptions, + final ColumnFamilyOptions columnFamilyOptions) { + final List columnFamilyDescriptors = asList( + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), + new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); + final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + + try { + db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies); + + final ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0); + + final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); + noTimestampsIter.seekToFirst(); + if (noTimestampsIter.isValid()) { + log.info("Opening store {} in upgrade mode", name); + dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, columnFamilies.get(1)); + } else { + log.info("Opening store {} in regular mode", name); + dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(1)); + } + noTimestampsIter.close(); + } catch (final RocksDBException e) { + if ("Column family not found: : keyValueWithTimestamp".equals(e.getMessage())) { + try { + db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors.subList(0, 1), columnFamilies); + columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1))); + } catch (final RocksDBException fatal) { + throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal); + } + log.info("Opening store {} in upgrade mode", name); + dbAccessor = new DualColumnFamilyAccessor(columnFamilies.get(0), columnFamilies.get(1)); + } else { + throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); + } + } + } + + + + private class DualColumnFamilyAccessor implements RocksDBAccessor { + private final ColumnFamilyHandle oldColumnFamily; + private final ColumnFamilyHandle newColumnFamily; + + private DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily, + final ColumnFamilyHandle newColumnFamily) { + this.oldColumnFamily = oldColumnFamily; + this.newColumnFamily = newColumnFamily; + } + + @Override + public void put(final byte[] key, + final byte[] valueWithTimestamp) { + if (valueWithTimestamp == null) { + try { + db.delete(oldColumnFamily, wOptions, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + try { + db.delete(newColumnFamily, wOptions, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + } else { + try { + db.delete(oldColumnFamily, wOptions, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + try { + db.put(newColumnFamily, wOptions, key, valueWithTimestamp); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while putting key/value into store " + name, e); + } + } + } + + @Override + public void prepareBatch(final List> entries, + final WriteBatch batch) throws RocksDBException { + for (final KeyValue entry : entries) { + Objects.requireNonNull(entry.key, "key cannot be null"); + if (entry.value == null) { + batch.delete(oldColumnFamily, entry.key.get()); + batch.delete(newColumnFamily, entry.key.get()); + } else { + batch.delete(oldColumnFamily, entry.key.get()); + batch.put(newColumnFamily, entry.key.get(), entry.value); + } + } + } + + @Override + public byte[] get(final byte[] key) throws RocksDBException { + final byte[] valueWithTimestamp = db.get(newColumnFamily, key); + if (valueWithTimestamp != null) { + return valueWithTimestamp; + } + + final byte[] plainValue = db.get(oldColumnFamily, key); + if (plainValue != null) { + final byte[] valueWithUnknownTimestamp = getValueWithUnknownTimestamp(plainValue); + // this does only work, because the changelog topic contains correct data already + // for other format changes, we cannot take this short cut and can only migrate data + // from old to new store on put() + put(key, valueWithUnknownTimestamp); + return valueWithUnknownTimestamp; + } + + return null; + } + + @Override + public byte[] getOnly(final byte[] key) throws RocksDBException { + final byte[] valueWithTimestamp = db.get(newColumnFamily, key); + if (valueWithTimestamp != null) { + return valueWithTimestamp; + } + + final byte[] plainValue = db.get(oldColumnFamily, key); + if (plainValue != null) { + return getValueWithUnknownTimestamp(plainValue); + } + + return null; + } + + @Override + public KeyValueIterator range(final Bytes from, + final Bytes to) { + return new RocksDBDualCFRangeIterator( + name, + db.newIterator(newColumnFamily), + db.newIterator(oldColumnFamily), + from, + to); + } + + @Override + public KeyValueIterator all() { + final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily); + innerIterWithTimestamp.seekToFirst(); + final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily); + innerIterNoTimestamp.seekToFirst(); + return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp); + } + + @Override + public long approximateNumEntries() throws RocksDBException { + return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys") + + db.getLongProperty(newColumnFamily, "rocksdb.estimate-num-keys"); + } + + @Override + public void flush() throws RocksDBException { + db.flush(fOptions, oldColumnFamily); + db.flush(fOptions, newColumnFamily); + } + + @Override + public void prepareBatchForRestore(final Collection> records, + final WriteBatch batch) throws RocksDBException { + for (final KeyValue record : records) { + if (record.value == null) { + batch.delete(oldColumnFamily, record.key); + batch.delete(newColumnFamily, record.key); + } else { + batch.delete(oldColumnFamily, record.key); + batch.put(newColumnFamily, record.key, record.value); + } + } + } + + @Override + public void close() { + oldColumnFamily.close(); + newColumnFamily.close(); + } + + @Override + public void toggleDbForBulkLoading() { + try { + db.compactRange(oldColumnFamily, true, 1, 0); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); + } + try { + db.compactRange(newColumnFamily, true, 1, 0); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); + } + } + } + + private class RocksDBDualCFIterator extends AbstractIterator> + implements KeyValueIterator { + + // RocksDB's JNI interface does not expose getters/setters that allow the + // comparator to be pluggable, and the default is lexicographic, so it's + // safe to just force lexicographic comparator here for now. + private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; + + private final String storeName; + private final RocksIterator iterWithTimestamp; + private final RocksIterator iterNoTimestamp; + + private volatile boolean open = true; + + private byte[] nextWithTimestamp; + private byte[] nextNoTimestamp; + private KeyValue next; + + RocksDBDualCFIterator(final String storeName, + final RocksIterator iterWithTimestamp, + final RocksIterator iterNoTimestamp) { + this.iterWithTimestamp = iterWithTimestamp; + this.iterNoTimestamp = iterNoTimestamp; + this.storeName = storeName; + } + + @Override + public synchronized boolean hasNext() { + if (!open) { + throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName)); + } + return super.hasNext(); + } + + @Override + public synchronized KeyValue next() { + return super.next(); + } + + @Override + public KeyValue makeNext() { + if (nextNoTimestamp == null && iterNoTimestamp.isValid()) { + nextNoTimestamp = iterNoTimestamp.key(); + } + + if (nextWithTimestamp == null && iterWithTimestamp.isValid()) { + nextWithTimestamp = iterWithTimestamp.key(); + } + + if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) { + if (nextWithTimestamp == null && !iterWithTimestamp.isValid()) { + return allDone(); + } else { + next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); + nextWithTimestamp = null; + iterWithTimestamp.next(); + } + } else { + if (nextWithTimestamp == null) { + next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value())); + nextNoTimestamp = null; + iterNoTimestamp.next(); + } else { + if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) { + next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value())); + nextNoTimestamp = null; + iterNoTimestamp.next(); + } else { + next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); + nextWithTimestamp = null; + iterWithTimestamp.next(); + } + } + } + + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("RocksDB iterator does not support remove()"); + } + + @Override + public synchronized void close() { + openIterators.remove(this); + iterNoTimestamp.close(); + iterWithTimestamp.close(); + open = false; + } + + @Override + public Bytes peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next.key; + } + } + + private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator { + // RocksDB's JNI interface does not expose getters/setters that allow the + // comparator to be pluggable, and the default is lexicographic, so it's + // safe to just force lexicographic comparator here for now. + private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; + private final byte[] upperBoundKey; + + RocksDBDualCFRangeIterator(final String storeName, + final RocksIterator iterWithTimestamp, + final RocksIterator iterNoTimestamp, + final Bytes from, + final Bytes to) { + super(storeName, iterWithTimestamp, iterNoTimestamp); + iterWithTimestamp.seek(from.get()); + iterNoTimestamp.seek(from.get()); + upperBoundKey = to.get(); + if (upperBoundKey == null) { + throw new NullPointerException("RocksDBDualCFRangeIterator: upperBoundKey is null for key " + to); + } + } + + @Override + public KeyValue makeNext() { + final KeyValue next = super.makeNext(); + + if (next == null) { + return allDone(); + } else { + if (comparator.compare(next.key.get(), upperBoundKey) <= 0) { + return next; + } else { + return allDone(); + } + } + } + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java new file mode 100644 index 00000000000..b26e7af251c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.AbstractIterator; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksIterator; + +import java.util.NoSuchElementException; +import java.util.Set; + +class RocksDbIterator extends AbstractIterator> implements KeyValueIterator { + + private final String storeName; + private final RocksIterator iter; + private final Set> openIterators; + + private volatile boolean open = true; + + private KeyValue next; + + RocksDbIterator(final String storeName, + final RocksIterator iter, + final Set> openIterators) { + this.storeName = storeName; + this.iter = iter; + this.openIterators = openIterators; + } + + @Override + public synchronized boolean hasNext() { + if (!open) { + throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName)); + } + return super.hasNext(); + } + + @Override + public KeyValue makeNext() { + if (!iter.isValid()) { + return allDone(); + } else { + next = getKeyValue(); + iter.next(); + return next; + } + } + + private KeyValue getKeyValue() { + return new KeyValue<>(new Bytes(iter.key()), iter.value()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("RocksDB iterator does not support remove()"); + } + + @Override + public synchronized void close() { + openIterators.remove(this); + iter.close(); + open = false; + } + + @Override + public Bytes peekNextKey() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next.key; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java index b0ad6197230..1836c27ec5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java @@ -23,9 +23,12 @@ import org.apache.kafka.streams.state.KeyValueStore; public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier { private final String name; + private final boolean returnTimestampedStore; - public RocksDbKeyValueBytesStoreSupplier(final String name) { + public RocksDbKeyValueBytesStoreSupplier(final String name, + final boolean returnTimestampedStore) { this.name = name; + this.returnTimestampedStore = returnTimestampedStore; } @Override @@ -35,7 +38,7 @@ public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp @Override public KeyValueStore get() { - return new RocksDBStore(name); + return returnTimestampedStore ? new RocksDBTimestampedStore(name) : new RocksDBStore(name); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java new file mode 100644 index 00000000000..e78b3826949 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; + +import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP; + +class StoreProxyUtils { + + static byte[] getValueWithUnknownTimestamp(final byte[] rawValue) { + if (rawValue == null) { + return null; + } + return ByteBuffer + .allocate(8 + rawValue.length) + .putLong(NO_TIMESTAMP) + .put(rawValue) + .array(); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java new file mode 100644 index 00000000000..897f94f5e4e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.rocksdb.AccessHint; +import org.rocksdb.BuiltinComparator; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionPriority; +import org.rocksdb.CompactionStyle; +import org.rocksdb.ComparatorOptions; +import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; +import org.rocksdb.Env; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.LRUCache; +import org.rocksdb.Logger; +import org.rocksdb.Options; +import org.rocksdb.PlainTableConfig; +import org.rocksdb.RateLimiter; +import org.rocksdb.RocksDB; +import org.rocksdb.SstFileManager; +import org.rocksdb.StringAppendOperator; +import org.rocksdb.VectorMemTableConfig; +import org.rocksdb.WALRecoveryMode; +import org.rocksdb.util.BytewiseComparator; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.matchesPattern; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * The purpose of this test is, to catch interface changes if we upgrade {@link RocksDB}. + * Using reflections, we make sure the {@link RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter} maps all + * methods from {@link DBOptions} and {@link ColumnFamilyOptions} to/from {@link Options} correctly. + */ +@RunWith(EasyMockRunner.class) +public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { + private final List ignoreMethods = new LinkedList() { + { + add("isOwningHandle"); + add("dispose"); + add("wait"); + add("equals"); + add("getClass"); + add("hashCode"); + add("notify"); + add("notifyAll"); + add("toString"); + } + }; + + @Mock + private DBOptions dbOptions; + @Mock + private ColumnFamilyOptions columnFamilyOptions; + + @Test + public void shouldOverwriteAllOptionsMethods() throws Exception { + for (final Method method : Options.class.getMethods()) { + if (!ignoreMethods.contains(method.getName())) { + RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class + .getDeclaredMethod(method.getName(), method.getParameterTypes()); + } + } + } + + @Test + public void shouldForwardAllDbOptionsCalls() throws Exception { + for (final Method method : Options.class.getMethods()) { + if (!ignoreMethods.contains(method.getName())) { + try { + DBOptions.class.getMethod(method.getName(), method.getParameterTypes()); + verifyDBOptionsMethodCall(method); + } catch (final NoSuchMethodException expectedAndSwallow) { } + } + } + } + + private void verifyDBOptionsMethodCall(final Method method) throws Exception { + final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions + = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, new ColumnFamilyOptions()); + + final Object[] parameters = getDBOptionsParameters(method.getParameterTypes()); + + try { + reset(dbOptions); + replay(dbOptions); + method.invoke(optionsFacadeDbOptions, parameters); + verify(); + fail("Should have called DBOptions." + method.getName() + "()"); + } catch (final InvocationTargetException undeclaredMockMethodCall) { + assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class)); + assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(), + matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):")); + } + } + + private Object[] getDBOptionsParameters(final Class[] parameterTypes) throws Exception { + final Object[] parameters = new Object[parameterTypes.length]; + + for (int i = 0; i < parameterTypes.length; ++i) { + switch (parameterTypes[i].getName()) { + case "boolean": + parameters[i] = true; + break; + case "int": + parameters[i] = 0; + break; + case "long": + parameters[i] = 0L; + break; + case "java.util.Collection": + parameters[i] = new ArrayList<>(); + break; + case "org.rocksdb.AccessHint": + parameters[i] = AccessHint.NONE; + break; + case "org.rocksdb.Cache": + parameters[i] = new LRUCache(1L); + break; + case "org.rocksdb.Env": + parameters[i] = Env.getDefault(); + break; + case "org.rocksdb.InfoLogLevel": + parameters[i] = InfoLogLevel.FATAL_LEVEL; + break; + case "org.rocksdb.Logger": + parameters[i] = new Logger(new Options()) { + @Override + protected void log(final InfoLogLevel infoLogLevel, final String logMsg) {} + }; + break; + case "org.rocksdb.RateLimiter": + parameters[i] = new RateLimiter(1L); + break; + case "org.rocksdb.SstFileManager": + parameters[i] = new SstFileManager(Env.getDefault()); + break; + case "org.rocksdb.WALRecoveryMode": + parameters[i] = WALRecoveryMode.AbsoluteConsistency; + break; + default: + parameters[i] = parameterTypes[i].newInstance(); + } + } + + return parameters; + } + + @Test + public void shouldForwardAllColumnFamilyCalls() throws Exception { + for (final Method method : Options.class.getMethods()) { + if (!ignoreMethods.contains(method.getName())) { + try { + ColumnFamilyOptions.class.getMethod(method.getName(), method.getParameterTypes()); + verifyColumnFamilyOptionsMethodCall(method); + } catch (final NoSuchMethodException expectedAndSwallow) { } + } + } + } + + private void verifyColumnFamilyOptionsMethodCall(final Method method) throws Exception { + final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeColumnFamilyOptions + = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), columnFamilyOptions); + + final Object[] parameters = getColumnFamilyOptionsParameters(method.getParameterTypes()); + + try { + reset(columnFamilyOptions); + replay(columnFamilyOptions); + method.invoke(optionsFacadeColumnFamilyOptions, parameters); + verify(); + fail("Should have called ColumnFamilyOptions." + method.getName() + "()"); + } catch (final InvocationTargetException undeclaredMockMethodCall) { + assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class)); + assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(), + matchesPattern("Unexpected method call ColumnFamilyOptions\\." + method.getName() + "(.*)")); + } + } + + private Object[] getColumnFamilyOptionsParameters(final Class[] parameterTypes) throws Exception { + final Object[] parameters = new Object[parameterTypes.length]; + + for (int i = 0; i < parameterTypes.length; ++i) { + switch (parameterTypes[i].getName()) { + case "boolean": + parameters[i] = true; + break; + case "double": + parameters[i] = 0.0d; + break; + case "int": + parameters[i] = 0; + break; + case "long": + parameters[i] = 0L; + break; + case "[I": + parameters[i] = new int[0]; + break; + case "java.util.List": + parameters[i] = new ArrayList<>(); + break; + case "org.rocksdb.AbstractComparator": + parameters[i] = new BytewiseComparator(new ComparatorOptions()); + break; + case "org.rocksdb.BuiltinComparator": + parameters[i] = BuiltinComparator.BYTEWISE_COMPARATOR; + break; + case "org.rocksdb.CompactionPriority": + parameters[i] = CompactionPriority.ByCompensatedSize; + break; + case "org.rocksdb.CompactionStyle": + parameters[i] = CompactionStyle.UNIVERSAL; + break; + case "org.rocksdb.CompressionType": + parameters[i] = CompressionType.NO_COMPRESSION; + break; + case "org.rocksdb.MemTableConfig": + parameters[i] = new VectorMemTableConfig(); + break; + case "org.rocksdb.MergeOperator": + parameters[i] = new StringAppendOperator(); + break; + case "org.rocksdb.TableFormatConfig": + parameters[i] = new PlainTableConfig(); + break; + default: + parameters[i] = parameterTypes[i].newInstance(); + } + } + + return parameters; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index f13ac0a2aff..802df8d6c00 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -280,7 +280,7 @@ public class RocksDBSessionStoreTest { return results; } - static List valuesToList(final KeyValueIterator, V> iterator) { + private static List valuesToList(final KeyValueIterator, V> iterator) { final List results = new ArrayList<>(); while (iterator.hasNext()) { results.add(iterator.next().value); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index b98f72f58e8..4785673a150 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -58,18 +58,21 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class RocksDBStoreTest { + private static boolean enableBloomFilters = false; + final static String DB_NAME = "db-name"; + + private File dir; private final Serializer stringSerializer = new StringSerializer(); private final Deserializer stringDeserializer = new StringDeserializer(); - private RocksDBStore rocksDBStore; - private InternalMockProcessorContext context; - private File dir; - private static boolean enableBloomFilters = false; + + InternalMockProcessorContext context; + RocksDBStore rocksDBStore; @Before public void setUp() { final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); - rocksDBStore = new RocksDBStore("test"); + rocksDBStore = getRocksDBStore(); dir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext(dir, Serdes.String(), @@ -77,6 +80,10 @@ public class RocksDBStoreTest { new StreamsConfig(props)); } + RocksDBStore getRocksDBStore() { + return new RocksDBStore(DB_NAME); + } + @After public void tearDown() { rocksDBStore.close(); @@ -98,7 +105,8 @@ public class RocksDBStoreTest { assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10)); assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(20)); - assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36)); } + assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36)); + } @Test public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() { @@ -117,7 +125,7 @@ public class RocksDBStoreTest { final byte[] restoredValue = "restoredValue".getBytes(UTF_8); restoreBytes.add(KeyValue.pair(restoredKey, restoredValue)); - context.restore("test", restoreBytes); + context.restore(DB_NAME, restoreBytes); assertThat( stringDeserializer.deserialize( @@ -168,20 +176,20 @@ public class RocksDBStoreTest { rocksDBStore.flush(); assertEquals( + "a", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), - "a"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1"))))); assertEquals( + "b", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), - "b"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2"))))); assertEquals( + "c", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), - "c"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); } @Test @@ -222,20 +230,20 @@ public class RocksDBStoreTest { context.restore(rocksDBStore.name(), entries); assertEquals( + "a", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), - "a"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1"))))); assertEquals( + "b", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), - "b"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2"))))); assertEquals( + "c", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), - "c"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); } @Test @@ -249,7 +257,7 @@ public class RocksDBStoreTest { rocksDBStore.putIfAbsent(keyBytes, valueBytesUpdate); final String retrievedValue = stringDeserializer.deserialize(null, rocksDBStore.get(keyBytes)); - assertEquals(retrievedValue, "A"); + assertEquals("A", retrievedValue); } @Test @@ -294,20 +302,20 @@ public class RocksDBStoreTest { assertThat(keys, equalTo(Utils.mkSet("1", "2", "3"))); assertEquals( + "restored", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), - "restored"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1"))))); assertEquals( + "b", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), - "b"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2"))))); assertEquals( + "c", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), - "c"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); } @Test @@ -319,20 +327,20 @@ public class RocksDBStoreTest { context.restore(rocksDBStore.name(), entries); assertEquals( + "a", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))), - "a"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1"))))); assertEquals( + "b", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))), - "b"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2"))))); assertEquals( + "c", stringDeserializer.deserialize( null, - rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))), - "c"); + rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); entries.clear(); @@ -352,8 +360,6 @@ public class RocksDBStoreTest { assertThat(keys, equalTo(Utils.mkSet("2", "3"))); } - - @Test public void shouldThrowNullPointerExceptionOnNullPut() { rocksDBStore.init(context, rocksDBStore); @@ -424,13 +430,14 @@ public class RocksDBStoreTest { final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class); - rocksDBStore = new RocksDBStore("test"); + rocksDBStore = getRocksDBStore(); dir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext(dir, Serdes.String(), Serdes.String(), new StreamsConfig(props)); + enableBloomFilters = false; rocksDBStore.init(context, rocksDBStore); final List expectedValues = new ArrayList<>(); @@ -451,11 +458,11 @@ public class RocksDBStoreTest { assertFalse(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet); rocksDBStore.close(); - enableBloomFilters = true; expectedIndex = 0; // reopen with Bloom Filters enabled // should open fine without errors + enableBloomFilters = true; rocksDBStore.init(context, rocksDBStore); for (final KeyValue keyValue : keyValues) { @@ -479,7 +486,7 @@ public class RocksDBStoreTest { public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter { - static boolean bloomFiltersSet = false; + static boolean bloomFiltersSet; @Override public void setConfig(final String storeName, final Options options, final Map configs) { @@ -493,10 +500,10 @@ public class RocksDBStoreTest { bloomFiltersSet = true; } else { options.setOptimizeFiltersForHits(false); + bloomFiltersSet = false; } options.setTableFormatConfig(tableConfig); - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java new file mode 100644 index 00000000000..3347d02795f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.hamcrest.core.IsNull; +import org.junit.Test; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { + + RocksDBStore getRocksDBStore() { + return new RocksDBTimestampedStore(DB_NAME); + } + + @Test + public void shouldMigrateDataFromDefaultToTimestampColumnFamily() throws Exception { + prepareOldStore(); + + LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class); + + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + rocksDBStore.init(context, rocksDBStore); + assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode")); + LogCaptureAppender.unregister(appender); + + // approx: 7 entries on old CF, 0 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(7L)); + + // get() + + // should be no-op on both CF + assertThat(rocksDBStore.get(new Bytes("unknown".getBytes())), new IsNull<>()); + // approx: 7 entries on old CF, 0 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(7L)); + + // should migrate key1 from old to new CF + // must return timestamp plus value, ie, it's not 1 byte but 9 bytes + assertThat(rocksDBStore.get(new Bytes("key1".getBytes())).length, is(8 + 1)); + // one delete on old CF, one put on new CF + // approx: 6 entries on old CF, 1 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(7L)); + + // put() + + // should migrate key2 from old to new CF with new value + rocksDBStore.put(new Bytes("key2".getBytes()), "timestamp+22".getBytes()); + // one delete on old CF, one put on new CF + // approx: 5 entries on old CF, 2 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(7L)); + + // should delete key3 from old and new CF + rocksDBStore.put(new Bytes("key3".getBytes()), null); + // count is off by one, due to two delete operations (even if one does not delete anything) + // approx: 4 entries on old CF, 1 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(5L)); + + // should add new key8 to new CF + rocksDBStore.put(new Bytes("key8".getBytes()), "timestamp+88888888".getBytes()); + // one delete on old CF, one put on new CF + // approx: 3 entries on old CF, 2 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(5L)); + + // putIfAbsent() + + // should migrate key4 from old to new CF with old value + assertThat(rocksDBStore.putIfAbsent(new Bytes("key4".getBytes()), "timestamp+4444".getBytes()).length, is(8 + 4)); + // one delete on old CF, one put on new CF + // approx: 2 entries on old CF, 3 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(5L)); + + // should add new key11 to new CF + assertThat(rocksDBStore.putIfAbsent(new Bytes("key11".getBytes()), "timestamp+11111111111".getBytes()), new IsNull<>()); + // one delete on old CF, one put on new CF + // approx: 1 entries on old CF, 4 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(5L)); + + // should not delete key5 but migrate to new CF + assertThat(rocksDBStore.putIfAbsent(new Bytes("key5".getBytes()), null).length, is(8 + 5)); + // one delete on old CF, one put on new CF + // approx: 0 entries on old CF, 5 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(5L)); + + // should be no-op on both CF + assertThat(rocksDBStore.putIfAbsent(new Bytes("key12".getBytes()), null), new IsNull<>()); + // two delete operation, however, only one is counted because old CF count was zero before already + // approx: 0 entries on old CF, 4 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(4L)); + + // delete() + + // should delete key6 from old and new CF + assertThat(rocksDBStore.delete(new Bytes("key6".getBytes())).length, is(8 + 6)); + // two delete operation, however, only one is counted because old CF count was zero before already + // approx: 0 entries on old CF, 3 in new CF + assertThat(rocksDBStore.approximateNumEntries(), is(3L)); + + + iteratorsShouldNotMigrateData(); + assertThat(rocksDBStore.approximateNumEntries(), is(3L)); + + rocksDBStore.close(); + + verifyOldAndNewColumnFamily(); + } + + private void iteratorsShouldNotMigrateData() { + // iterating should not migrate any data, but return all key over both CF (plus surrogate timestamps for old CF) + final KeyValueIterator itAll = rocksDBStore.all(); + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key1".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 1 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key11".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key2".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key4".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 4444 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key5".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 55555 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key7".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 7777777 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '7', '7', '7', '7', '7', '7', '7'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key8".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value); + } + assertFalse(itAll.hasNext()); + itAll.close(); + + final KeyValueIterator it = + rocksDBStore.range(new Bytes("key2".getBytes()), new Bytes("key5".getBytes())); + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key2".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); + } + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key4".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 4444 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); + } + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key5".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 55555 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + } + assertFalse(it.hasNext()); + it.close(); + } + + private void verifyOldAndNewColumnFamily() throws Exception { + final DBOptions dbOptions = new DBOptions(); + final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); + + final List columnFamilyDescriptors = asList( + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), + new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions)); + final List columnFamilies = new ArrayList<>(columnFamilyDescriptors.size()); + + RocksDB db = RocksDB.open( + dbOptions, + new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), + columnFamilyDescriptors, + columnFamilies); + + ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0); + final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1); + + assertThat(db.get(noTimestampColumnFamily, "unknown".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key1".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key2".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key3".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key4".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key7".getBytes()).length, is(7)); + assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), new IsNull<>()); + assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), new IsNull<>()); + + assertThat(db.get(withTimestampColumnFamily, "unknown".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key1".getBytes()).length, is(8 + 1)); + assertThat(db.get(withTimestampColumnFamily, "key2".getBytes()).length, is(12)); + assertThat(db.get(withTimestampColumnFamily, "key3".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key4".getBytes()).length, is(8 + 4)); + assertThat(db.get(withTimestampColumnFamily, "key5".getBytes()).length, is(8 + 5)); + assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), new IsNull<>()); + assertThat(db.get(withTimestampColumnFamily, "key8".getBytes()).length, is(18)); + assertThat(db.get(withTimestampColumnFamily, "key11".getBytes()).length, is(21)); + assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), new IsNull<>()); + + db.close(); + + // check that still in upgrade mode + LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + rocksDBStore.init(context, rocksDBStore); + assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode")); + LogCaptureAppender.unregister(appender); + rocksDBStore.close(); + + // clear old CF + columnFamilies.clear(); + db = RocksDB.open( + dbOptions, + new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(), + columnFamilyDescriptors, + columnFamilies); + + noTimestampColumnFamily = columnFamilies.get(0); + db.delete(noTimestampColumnFamily, "key7".getBytes()); + db.close(); + + // check that still in regular mode + appender = LogCaptureAppender.createAndRegister(); + rocksDBStore.init(context, rocksDBStore); + assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode")); + LogCaptureAppender.unregister(appender); + } + + private void prepareOldStore() { + final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME); + keyValueStore.init(context, keyValueStore); + + keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes()); + keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes()); + keyValueStore.put(new Bytes("key3".getBytes()), "333".getBytes()); + keyValueStore.put(new Bytes("key4".getBytes()), "4444".getBytes()); + keyValueStore.put(new Bytes("key5".getBytes()), "55555".getBytes()); + keyValueStore.put(new Bytes("key6".getBytes()), "666666".getBytes()); + keyValueStore.put(new Bytes("key7".getBytes()), "7777777".getBytes()); + + keyValueStore.close(); + } + +}