diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java index bfe6a73f75e..90a748f5796 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.HashMap; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.internals.Topic; import java.util.Map; @@ -32,6 +34,11 @@ public abstract class InternalTopicConfig { private Optional numberOfPartitions = Optional.empty(); + static final Map INTERNAL_TOPIC_DEFAULT_OVERRIDES = new HashMap<>(); + static { + INTERNAL_TOPIC_DEFAULT_OVERRIDES.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); + } + InternalTopicConfig(final String name, final Map topicConfigs) { Objects.requireNonNull(name, "name can't be null"); Topic.validate(name); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java index 7161a3fc880..78dcf64ccdf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java @@ -31,7 +31,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig { private static final Map REPARTITION_TOPIC_DEFAULT_OVERRIDES; static { - final Map tempTopicDefaultOverrides = new HashMap<>(); + final Map tempTopicDefaultOverrides = new HashMap<>(INTERNAL_TOPIC_DEFAULT_OVERRIDES); tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(-1)); // Infinity @@ -49,6 +49,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig { * @param additionalRetentionMs - added to retention to allow for clock drift etc * @return Properties to be used when creating the topic */ + @Override public Map getProperties(final Map defaultProperties, final long additionalRetentionMs) { // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(REPARTITION_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java index acca8377843..fa455a92a16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java @@ -30,7 +30,7 @@ import java.util.Objects; public class UnwindowedChangelogTopicConfig extends InternalTopicConfig { private static final Map UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES; static { - final Map tempTopicDefaultOverrides = new HashMap<>(); + final Map tempTopicDefaultOverrides = new HashMap<>(INTERNAL_TOPIC_DEFAULT_OVERRIDES); tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides); } @@ -46,6 +46,7 @@ public class UnwindowedChangelogTopicConfig extends InternalTopicConfig { * @param additionalRetentionMs - added to retention to allow for clock drift etc * @return Properties to be used when creating the topic */ + @Override public Map getProperties(final Map defaultProperties, final long additionalRetentionMs) { // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(UNWINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java index 55d548c9795..3f65272dc70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WindowedChangelogTopicConfig.java @@ -30,7 +30,7 @@ import java.util.Objects; public class WindowedChangelogTopicConfig extends InternalTopicConfig { private static final Map WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES; static { - final Map tempTopicDefaultOverrides = new HashMap<>(); + final Map tempTopicDefaultOverrides = new HashMap<>(INTERNAL_TOPIC_DEFAULT_OVERRIDES); tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE); WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides); } @@ -48,6 +48,7 @@ public class WindowedChangelogTopicConfig extends InternalTopicConfig { * @param additionalRetentionMs - added to retention to allow for clock drift etc * @return Properties to be used when creating the topic */ + @Override public Map getProperties(final Map defaultProperties, final long additionalRetentionMs) { // internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides final Map topicConfig = new HashMap<>(WINDOWED_STORE_CHANGELOG_TOPIC_DEFAULT_OVERRIDES); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index cdb369c8bfe..2eafabdc1a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -170,7 +170,7 @@ public class InternalTopicIntegrationTest { final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition"); assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); - assertEquals(3, repartitionProps.size()); + assertEquals(4, repartitionProps.size()); } @Test @@ -215,6 +215,6 @@ public class InternalTopicIntegrationTest { final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition"); assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); - assertEquals(3, repartitionProps.size()); + assertEquals(4, repartitionProps.size()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java index ea7a926c1f1..9af422c816d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java @@ -30,39 +30,81 @@ public class InternalTopicConfigTest { @Test(expected = NullPointerException.class) public void shouldThrowIfNameIsNull() { - new RepartitionTopicConfig(null, Collections.emptyMap()); + new RepartitionTopicConfig(null, Collections.emptyMap()); } @Test(expected = InvalidTopicException.class) public void shouldThrowIfNameIsInvalid() { - new RepartitionTopicConfig("foo bar baz", Collections.emptyMap()); + new RepartitionTopicConfig("foo bar baz", Collections.emptyMap()); + } + + @Test + public void shouldSetCreateTimeByDefaultForWindowedChangelog() { + final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap()); + + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); + } + + @Test + public void shouldSetCreateTimeByDefaultForUnwindowedChangelog() { + final UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("name", Collections.emptyMap()); + + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); + } + + @Test + public void shouldSetCreateTimeByDefaultForRepartitionTopic() { + final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", Collections.emptyMap()); + + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + assertEquals("CreateTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @Test public void shouldAugmentRetentionMsWithWindowedChangelog() { - final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap()); + final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", Collections.emptyMap()); topicConfig.setRetentionMs(10); - assertEquals("30", topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG)); + assertEquals("30", topicConfig.getProperties(Collections.emptyMap(), 20).get(TopicConfig.RETENTION_MS_CONFIG)); + } + + @Test + public void shouldUseSuppliedConfigsForWindowedChangelogConfig() { + final Map configs = new HashMap<>(); + configs.put("message.timestamp.type", "LogAppendTime"); + + final WindowedChangelogTopicConfig topicConfig = new WindowedChangelogTopicConfig("name", configs); + + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @Test - public void shouldUseSuppliedConfigs() { + public void shouldUseSuppliedConfigsForUnwindowedChangelogConfig() { final Map configs = new HashMap<>(); configs.put("retention.ms", "1000"); configs.put("retention.bytes", "10000"); + configs.put("message.timestamp.type", "LogAppendTime"); final UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("name", configs); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); - assertEquals("1000", properties.get("retention.ms")); - assertEquals("10000", properties.get("retention.bytes")); + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); + assertEquals("10000", properties.get(TopicConfig.RETENTION_BYTES_CONFIG)); + assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } @Test public void shouldUseSuppliedConfigsForRepartitionConfig() { final Map configs = new HashMap<>(); configs.put("retention.ms", "1000"); + configs.put("message.timestamp.type", "LogAppendTime"); + final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs); - assertEquals("1000", topicConfig.getProperties(Collections.emptyMap(), 0).get(TopicConfig.RETENTION_MS_CONFIG)); + + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 0); + assertEquals("1000", properties.get(TopicConfig.RETENTION_MS_CONFIG)); + assertEquals("LogAppendTime", properties.get(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG)); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 185cd908a70..766b7dc64cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -620,14 +620,14 @@ public class InternalTopologyBuilderTest { final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); final Map properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); - assertEquals(2, properties1.size()); + assertEquals(3, properties1.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("appId-store1-changelog", topicConfig1.name()); assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig); final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog"); final Map properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000); - assertEquals(2, properties2.size()); + assertEquals(3, properties2.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("appId-store2-changelog", topicConfig2.name()); @@ -644,7 +644,7 @@ public class InternalTopologyBuilderTest { final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); - assertEquals(1, properties.size()); + assertEquals(2, properties.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("appId-store-changelog", topicConfig.name()); assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig); @@ -658,7 +658,7 @@ public class InternalTopologyBuilderTest { final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); - assertEquals(3, properties.size()); + assertEquals(4, properties.size()); assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("appId-foo", topicConfig.name());