Browse Source

KAFKA-6614: configure internal topics with message.timestamp.type=CreateTime by default (#7889)

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/7907/head
A. Sophie Blee-Goldman 5 years ago committed by Matthias J. Sax
parent
commit
1513c817d4
  1. 7
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
  2. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
  3. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java
  4. 3
      streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java
  5. 4
      streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
  6. 60
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
  7. 8
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

7
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.HashMap;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
import java.util.Map;
@ -32,6 +34,11 @@ public abstract class InternalTopicConfig { @@ -32,6 +34,11 @@ public abstract class InternalTopicConfig {
private Optional<Integer> numberOfPartitions = Optional.empty();
static final Map<String, String> INTERNAL_TOPIC_DEFAULT_OVERRIDES = new HashMap<>();
static {
INTERNAL_TOPIC_DEFAULT_OVERRIDES.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
}
InternalTopicConfig(final String name, final Map<String, String> topicConfigs) {
Objects.requireNonNull(name, "name can't be null");
Topic.validate(name);

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java

@ -31,7 +31,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig { @@ -31,7 +31,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
private static final Map<String, String> REPARTITION_TOPIC_DEFAULT_OVERRIDES;
static {
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>(INTERNAL_TOPIC_DEFAULT_OVERRIDES);
tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB
tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)); // Infinity
@ -49,6 +49,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig { @@ -49,6 +49,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
* @param additionalRetentionMs - added to retention to allow for clock drift etc
* @return Properties to be used when creating the topic
*/
@Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(REPARTITION_TOPIC_DEFAULT_OVERRIDES);

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java

@ -30,7 +30,7 @@ import java.util.Objects; @@ -30,7 +30,7 @@ import java.util.Objects;
public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
private static final Map<String, String> UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES;
static {
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>(INTERNAL_TOPIC_DEFAULT_OVERRIDES);
tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
}
@ -46,6 +46,7 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig { @@ -46,6 +46,7 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig {
* @param additionalRetentionMs - added to retention to allow for clock drift etc
* @return Properties to be used when creating the topic
*/
@Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);

3
streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java

@ -30,7 +30,7 @@ import java.util.Objects; @@ -30,7 +30,7 @@ import java.util.Objects;
public class WindowedChangelogTopicConfig extends InternalTopicConfig {
private static final Map<String, String> WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES;
static {
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
final Map<String, String> tempTopicDefaultOverrides = new HashMap<>(INTERNAL_TOPIC_DEFAULT_OVERRIDES);
tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE);
WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
}
@ -48,6 +48,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig { @@ -48,6 +48,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig {
* @param additionalRetentionMs - added to retention to allow for clock drift etc
* @return Properties to be used when creating the topic
*/
@Override
public Map<String, String> getProperties(final Map<String, String> defaultProperties, final long additionalRetentionMs) {
// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>(WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES);

4
streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java

@ -170,7 +170,7 @@ public class InternalTopicIntegrationTest { @@ -170,7 +170,7 @@ public class InternalTopicIntegrationTest {
final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
assertEquals(3, repartitionProps.size());
assertEquals(4, repartitionProps.size());
}
@Test
@ -215,6 +215,6 @@ public class InternalTopicIntegrationTest { @@ -215,6 +215,6 @@ public class InternalTopicIntegrationTest {
final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
assertEquals(3, repartitionProps.size());
assertEquals(4, repartitionProps.size());
}
}

60
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java

@ -30,39 +30,81 @@ public class InternalTopicConfigTest { @@ -30,39 +30,81 @@ public class InternalTopicConfigTest {
@Test(expected = NullPointerException.class)
public void shouldThrowIfNameIsNull() {
new RepartitionTopicConfig(null, Collections.<String, String>emptyMap());
new RepartitionTopicConfig(null, Collections.emptyMap());
}
@Test(expected = InvalidTopicException.class)
public void shouldThrowIfNameIsInvalid() {
new RepartitionTopicConfig("foo bar baz", Collections.<String, String>emptyMap());
new RepartitionTopicConfig("foo bar baz", Collections.emptyMap());
}
@Test
public void shouldSetCreateTimeByDefaultForWindowedChangelog() {
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap());
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
}
@Test
public void shouldSetCreateTimeByDefaultForUnwindowedChangelog() {
final UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("name", Collections.emptyMap());
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
}
@Test
public void shouldSetCreateTimeByDefaultForRepartitionTopic() {
final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", Collections.emptyMap());
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0);
assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
}
@Test
public void shouldAugmentRetentionMsWithWindowedChangelog() {
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.<String, String>emptyMap());
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap());
topicConfig.setRetentionMs(10);
assertEquals("30", topicConfig.getProperties(Collections.<String, String>emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("30", topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG));
}
@Test
public void shouldUseSuppliedConfigsForWindowedChangelogConfig() {
final Map<String, String> configs = new HashMap<>();
configs.put("message.timestamp.type", "LogAppendTime");
final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", configs);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0);
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
}
@Test
public void shouldUseSuppliedConfigs() {
public void shouldUseSuppliedConfigsForUnwindowedChangelogConfig() {
final Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "1000");
configs.put("retention.bytes", "10000");
configs.put("message.timestamp.type", "LogAppendTime");
final UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("name", configs);
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 0);
assertEquals("1000", properties.get("retention.ms"));
assertEquals("10000", properties.get("retention.bytes"));
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0);
assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("10000", properties.get(TopicConfig.RETENTION_BYTES_CONFIG));
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
}
@Test
public void shouldUseSuppliedConfigsForRepartitionConfig() {
final Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "1000");
configs.put("message.timestamp.type", "LogAppendTime");
final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs);
assertEquals("1000", topicConfig.getProperties(Collections.<String, String>emptyMap(), 0).get(TopicConfig.RETENTION_MS_CONFIG));
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 0);
assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG));
}
}

8
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

@ -620,14 +620,14 @@ public class InternalTopologyBuilderTest { @@ -620,14 +620,14 @@ public class InternalTopologyBuilderTest {
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties1.size());
assertEquals(3, properties1.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("appId-store1-changelog", topicConfig1.name());
assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig);
final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
final Map<String, String> properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties2.size());
assertEquals(3, properties2.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("appId-store2-changelog", topicConfig2.name());
@ -644,7 +644,7 @@ public class InternalTopologyBuilderTest { @@ -644,7 +644,7 @@ public class InternalTopologyBuilderTest {
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(1, properties.size());
assertEquals(2, properties.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-store-changelog", topicConfig.name());
assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
@ -658,7 +658,7 @@ public class InternalTopologyBuilderTest { @@ -658,7 +658,7 @@ public class InternalTopologyBuilderTest {
final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(3, properties.size());
assertEquals(4, properties.size());
assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-foo", topicConfig.name());

Loading…
Cancel
Save