Browse Source

KAFKA-5979; Use single AtomicCounter to generate internal names

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3979 from mjsax/kafka-5979-kip-120-regression
pull/3979/merge
Matthias J. Sax 7 years ago committed by Damian Guy
parent
commit
e846daa89b
  1. 7
      streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
  3. 12
      streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java

7
streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java

@ -37,7 +37,6 @@ import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; @@ -37,7 +37,6 @@ import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
/**
@ -52,8 +51,6 @@ import java.util.regex.Pattern; @@ -52,8 +51,6 @@ import java.util.regex.Pattern;
@Deprecated
public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
private final AtomicInteger index = new AtomicInteger(0);
private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset resetPolicy) {
@ -1249,7 +1246,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB @@ -1249,7 +1246,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a new unique name
*/
public String newName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
return internalStreamsBuilder.newName(prefix);
}
/**
@ -1261,7 +1258,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB @@ -1261,7 +1258,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a new unique name
*/
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
return internalStreamsBuilder.newStoreName(prefix);
}
}

2
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

@ -159,7 +159,7 @@ public class InternalStreamsBuilder { @@ -159,7 +159,7 @@ public class InternalStreamsBuilder {
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
}
String newName(final String prefix) {
public String newName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}

12
streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java

@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream; @@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopologyBuilder;
@ -29,25 +29,25 @@ import org.apache.kafka.streams.processor.internals.SourceNode; @@ -29,25 +29,25 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.MockValueJoiner;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("deprecation")
public class KStreamBuilderTest {
@ -303,7 +303,7 @@ public class KStreamBuilderTest { @@ -303,7 +303,7 @@ public class KStreamBuilderTest {
final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000000-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
}
@Test

Loading…
Cancel
Save