Browse Source

KAFKA-6535: Set default retention ms for Streams repartition topics to Long.MAX_VALUE (#4730)

Implements KIP-284

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/4947/head
khairy 7 years ago committed by Matthias J. Sax
parent
commit
6655a4d75f
  1. 6
      docs/streams/upgrade-guide.html
  2. 3
      docs/upgrade.html
  3. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
  4. 4
      streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
  5. 3
      streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
  6. 8
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
  7. 3
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

6
docs/streams/upgrade-guide.html

@ -100,6 +100,7 @@
</ul> </ul>
<!-- TODO: verify release verion and update `id` and `href` attributes (also at other places that link to this headline) --> <!-- TODO: verify release verion and update `id` and `href` attributes (also at other places that link to this headline) -->
<h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API changes in 1.2.0</a></h3> <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API changes in 1.2.0</a></h3>
<p> <p>
We have removed the <code>skippedDueToDeserializationError-rate</code> and <code>skippedDueToDeserializationError-total</code> metrics. We have removed the <code>skippedDueToDeserializationError-rate</code> and <code>skippedDueToDeserializationError-total</code> metrics.
@ -156,7 +157,10 @@
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record. The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
Forwarding based on child index is not supported in the new API any longer. Forwarding based on child index is not supported in the new API any longer.
</p> </p>
<p>
<a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now.
</p>
<p> <p>
Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when
interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection types, a way interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection types, a way

3
docs/upgrade.html

@ -75,6 +75,7 @@
updated to aggregate across different versions. updated to aggregate across different versions.
</li> </li>
<li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li> <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
<li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
</ul> </ul>
<h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol Versions</a></h5> <h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol Versions</a></h5>
@ -87,7 +88,6 @@
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams API changes in 1.2.0</a> for more details. </li> <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams API changes in 1.2.0</a> for more details. </li>
</ul> </ul>
<h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4> <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4>
<p>Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, <p>Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_110_notable">notable changes in 1.1.0</a> before upgrading. you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_110_notable">notable changes in 1.1.0</a> before upgrading.
@ -132,6 +132,7 @@
Hot-swaping the jar-file only might not work.</li> Hot-swaping the jar-file only might not work.</li>
</ol> </ol>
<!-- TODO add if 1.1.1 gets release <!-- TODO add if 1.1.1 gets release
<h5><a id="upgrade_111_notable" href="#upgrade_111_notable">Notable changes in 1.1.1</a></h5> <h5><a id="upgrade_111_notable" href="#upgrade_111_notable">Notable changes in 1.1.1</a></h5>
<ul> <ul>

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java

@ -36,6 +36,7 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800"); // 50 MB tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800"); // 50 MB
tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"); // 50 MB
tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000"); // 10 min tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000"); // 10 min
tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Long.MAX_VALUE)); // Infinity
REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides); REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
} }

4
streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java

@ -164,7 +164,7 @@ public class InternalTopicIntegrationTest {
final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition"); final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
assertEquals(4, repartitionProps.size()); assertEquals(5, repartitionProps.size());
} }
@Test @Test
@ -213,6 +213,6 @@ public class InternalTopicIntegrationTest {
final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition"); final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
assertEquals(4, repartitionProps.size()); assertEquals(5, repartitionProps.size());
} }
} }

3
streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java

@ -588,7 +588,8 @@ public class TopologyBuilderTest {
final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
assertEquals(4, properties.size()); assertEquals(5, properties.size());
assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("appId-foo", topicConfig.name()); assertEquals("appId-foo", topicConfig.name());
} }

8
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java

@ -57,4 +57,12 @@ public class InternalTopicConfigTest {
assertEquals("1000", properties.get("retention.ms")); assertEquals("1000", properties.get("retention.ms"));
assertEquals("10000", properties.get("retention.bytes")); assertEquals("10000", properties.get("retention.bytes"));
} }
@Test
public void shouldUseSuppliedConfigsForRepartitionConfig() {
final Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "1000");
final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs);
assertEquals("1000", topicConfig.getProperties(Collections.<String, String>emptyMap(), 0).get(TopicConfig.RETENTION_MS_CONFIG));
}
} }

3
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java

@ -554,7 +554,8 @@ public class InternalTopologyBuilderTest {
final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000); final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
assertEquals(4, properties.size()); assertEquals(5, properties.size());
assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-foo", topicConfig.name()); assertEquals("appId-foo", topicConfig.name());
assertTrue(topicConfig instanceof RepartitionTopicConfig); assertTrue(topicConfig instanceof RepartitionTopicConfig);

Loading…
Cancel
Save