Browse Source

KAFKA-8897: Warn about no guaranteed backwards compatibility in RocksDBConfigSetter (#7483)

Reviewer: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/7511/head
Bruno Cadonna 5 years ago committed by Matthias J. Sax
parent
commit
3e24495c69
  1. 9
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 15
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
  3. 30
      streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
  4. 35
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java

9
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -60,6 +60,7 @@ import org.apache.kafka.streams.state.QueryableStoreType; @@ -60,6 +60,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
@ -774,6 +775,7 @@ public class KafkaStreams implements AutoCloseable { @@ -774,6 +775,7 @@ public class KafkaStreams implements AutoCloseable {
return thread;
});
maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
}
@ -789,6 +791,13 @@ public class KafkaStreams implements AutoCloseable { @@ -789,6 +791,13 @@ public class KafkaStreams implements AutoCloseable {
return null;
}
private static void maybeWarnAboutCodeInRocksDBConfigSetter(final Logger log,
final StreamsConfig config) {
if (config.getClass(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG) != null) {
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.logWarning(log);
}
}
private static HostInfo parseHostInfo(final String endPoint) {
if (endPoint == null || endPoint.trim().isEmpty()) {
return StreamsMetadataState.UNKNOWN_HOST;

15
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java

@ -43,11 +43,11 @@ import org.rocksdb.SstFileManager; @@ -43,11 +43,11 @@ import org.rocksdb.SstFileManager;
import org.rocksdb.Statistics;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import org.rocksdb.WriteBufferManager;
import org.slf4j.LoggerFactory;
/**
* The generic {@link Options} class allows users to set all configs on one object if only default column family
@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
*
* This class do the translation between generic {@link Options} into {@link DBOptions} and {@link ColumnFamilyOptions}.
*/
class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options {
public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options {
private final DBOptions dbOptions;
private final ColumnFamilyOptions columnFamilyOptions;
@ -1341,15 +1341,24 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options @@ -1341,15 +1341,24 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
@Override
public Options setCompactionOptionsFIFO(final CompactionOptionsFIFO compactionOptionsFIFO) {
logWarning(LOG);
columnFamilyOptions.setCompactionOptionsFIFO(compactionOptionsFIFO);
return this;
}
@Override
public CompactionOptionsFIFO compactionOptionsFIFO() {
logWarning(LOG);
return columnFamilyOptions.compactionOptionsFIFO();
}
public static void logWarning(final org.slf4j.Logger log) {
log.warn("RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`).");
}
@Override
public Options setForceConsistencyChecks(final boolean forceConsistencyChecks) {
columnFamilyOptions.setForceConsistencyChecks(forceConsistencyChecks);

30
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

@ -39,7 +39,9 @@ import org.apache.kafka.streams.processor.internals.StateDirectory; @@ -39,7 +39,9 @@ import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
@ -79,6 +81,8 @@ import static org.easymock.EasyMock.anyInt; @@ -79,6 +81,8 @@ import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -91,6 +95,7 @@ public class KafkaStreamsTest { @@ -91,6 +95,7 @@ public class KafkaStreamsTest {
private static final int NUM_THREADS = 2;
private final static String APPLICATION_ID = "appId";
private final static String CLIENT_ID = "test-client";
@Rule
public TestName testName = new TestName();
@ -143,7 +148,7 @@ public class KafkaStreamsTest { @@ -143,7 +148,7 @@ public class KafkaStreamsTest {
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.CLIENT_ID_CONFIG, "clientId");
props.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018");
props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@ -745,6 +750,29 @@ public class KafkaStreamsTest { @@ -745,6 +750,29 @@ public class KafkaStreamsTest {
PowerMock.verify(Executors.class, rocksDBMetricsRecordingTriggerThread);
}
public static class TestRocksDbConfigSetter implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName,
final org.rocksdb.Options options,
final Map<String, Object> configs) {
}
}
@Test
public void shouldWarnAboutRocksDBConfigSetterIsNotGuaranteedToBeBackwardsCompatible() {
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
LogCaptureAppender.unregister(appender);
assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
}
@Test
public void shouldCleanupOldStateDirs() throws Exception {
PowerMock.mockStatic(Executors.class);

35
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Test;
@ -26,6 +27,7 @@ import org.rocksdb.AbstractCompactionFilterFactory; @@ -26,6 +27,7 @@ import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionOptionsFIFO;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle;
import org.rocksdb.ComparatorOptions;
@ -56,6 +58,7 @@ import java.util.List; @@ -56,6 +58,7 @@ import java.util.List;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.matchesPattern;
@ -109,6 +112,38 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { @@ -109,6 +112,38 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
}
}
@Test
public void shouldWarnThanMethodCompactionOptionsFIFOSetTtlWillBeRemoved() {
final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions
= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, new ColumnFamilyOptions());
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
optionsFacadeDbOptions.setCompactionOptionsFIFO(new CompactionOptionsFIFO());
LogCaptureAppender.unregister(appender);
assertThat(appender.getMessages(), hasItem(""
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
}
@Test
public void shouldWarnThanMethodCompactionOptionsFIFOTtlWillBeRemoved() {
final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions
= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, new ColumnFamilyOptions());
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
optionsFacadeDbOptions.compactionOptionsFIFO();
LogCaptureAppender.unregister(appender);
assertThat(appender.getMessages(), hasItem(""
+ "RocksDB's version will be bumped to version 6+ via KAFKA-8897 in a future release. "
+ "If you use `org.rocksdb.CompactionOptionsFIFO#setTtl(long)` or `#ttl()` you will need to rewrite "
+ "your code after KAFKA-8897 is resolved and set TTL via `org.rocksdb.Options` "
+ "(or `org.rocksdb.ColumnFamilyOptions`)."));
}
private void verifyDBOptionsMethodCall(final Method method) throws Exception {
final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions
= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, new ColumnFamilyOptions());

Loading…
Cancel
Save