diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java index aa29c6805b3..1343487c9e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java @@ -22,13 +22,12 @@ import org.apache.kafka.common.serialization.Serde; * The {@code Joined} class represents optional params that can be passed to * {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations. */ -public class Joined { - - private final Serde keySerde; - private final Serde valueSerde; - private final Serde otherValueSerde; - private final String name; +public class Joined implements NamedOperation> { + protected final Serde keySerde; + protected final Serde valueSerde; + protected final Serde otherValueSerde; + protected final String name; private Joined(final Serde keySerde, final Serde valueSerde, @@ -40,6 +39,10 @@ public class Joined { this.name = name; } + protected Joined(final Joined joined) { + this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name); + } + /** * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances. * {@code null} values are accepted and will be replaced by the default serdes as defined in config. @@ -135,11 +138,30 @@ public class Joined { * @param value type * @param other value type * @return new {@code Joined} instance configured with the name + * + * @deprecated use {@link #as(String)} instead */ + @Deprecated public static Joined named(final String name) { return new Joined<>(null, null, null, name); } + /** + * Create an instance of {@code Joined} with base name for all components of the join, this may + * include any repartition topics created to complete the join. + * + * @param name the name used as the base for naming components of the join including any + * repartition topics + * @param key type + * @param value type + * @param other value type + * @return new {@code Joined} instance configured with the name + * + */ + public static Joined as(final String name) { + return new Joined<>(null, null, null, name); + } + /** * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default @@ -182,6 +204,7 @@ public class Joined { * repartition topics * @return new {@code Joined} instance configured with the {@code name} */ + @Override public Joined withName(final String name) { return new Joined<>(keySerde, valueSerde, otherValueSerde, name); } @@ -198,7 +221,12 @@ public class Joined { return otherValueSerde; } + /** + * @deprecated this method will be removed in a in a future release + */ + @Deprecated public String name() { return name; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java new file mode 100644 index 00000000000..9a2c40b168b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +/** + * Default interface which can be used to personalized the named of operations, internal topics or store. + */ +interface NamedOperation> { + + /** + * Sets the name to be used for an operation. + * + * @param name the name to use. + * @return an instance of {@link NamedOperation} + */ + T withName(final String name); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 68541012ece..b5d79371747 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; -public interface Suppressed { +public interface Suppressed extends NamedOperation> { /** * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly @@ -163,5 +163,6 @@ public interface Suppressed { * @param name The name to be used for the suppression node and changelog topic * @return The same configuration with the addition of the given {@code name}. */ + @Override Suppressed withName(final String name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java new file mode 100644 index 00000000000..99f7a0fd740 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.Joined; + +public class JoinedInternal extends Joined { + + JoinedInternal(final Joined joined) { + super(joined); + } + + public Serde keySerde() { + return keySerde; + } + + public Serde valueSerde() { + return valueSerde; + } + + public Serde otherValueSerde() { + return otherValueSerde; + } + + @Override // TODO remove annotation when super.name() is removed + @SuppressWarnings("deprecation") // this method should not be removed if super.name() is removed + public String name() { + return name; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 856536cc788..41260c5a016 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -567,13 +567,15 @@ public class KStreamImpl extends AbstractStream implements KStream joinThis = this; KStreamImpl joinOther = (KStreamImpl) other; + final JoinedInternal joinedInternal = new JoinedInternal<>(joined); + final String name = joinedInternal.name(); if (joinThis.repartitionRequired) { - final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name; + final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name; joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde()); } if (joinOther.repartitionRequired) { - final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name; + final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name; joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde()); } @@ -679,9 +681,12 @@ public class KStreamImpl extends AbstractStream implements KStream joinedInternal = new JoinedInternal<>(joined); + final String name = joinedInternal.name(); if (repartitionRequired) { final KStreamImpl thisStreamRepartitioned = repartitionForJoin( - joined.name() != null ? joined.name() : name, + name != null ? name : this.name, joined.keySerde(), joined.valueSerde() ); @@ -703,9 +708,11 @@ public class KStreamImpl extends AbstractStream implements KStream joinedInternal = new JoinedInternal<>(joined); + final String internalName = joinedInternal.name(); if (repartitionRequired) { final KStreamImpl thisStreamRepartitioned = repartitionForJoin( - joined.name() != null ? joined.name() : name, + internalName != null ? internalName : name, joined.keySerde(), joined.valueSerde() );