Browse Source

KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409)

Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
  - add new public interface NamedOperation
  - deprecate methods Joined.as() and Joined.name()
  - update Suppredded interface to extend NamedOperation

Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/6475/head
Florian Hussonnois 6 years ago committed by Bill Bejeck
parent
commit
fa57eb065d
  1. 40
      streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
  2. 32
      streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java
  3. 3
      streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
  4. 45
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java
  5. 15
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

40
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java

@ -22,13 +22,12 @@ import org.apache.kafka.common.serialization.Serde; @@ -22,13 +22,12 @@ import org.apache.kafka.common.serialization.Serde;
* The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations.
*/
public class Joined<K, V, VO> {
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final Serde<VO> otherValueSerde;
private final String name;
public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
protected final Serde<VO> otherValueSerde;
protected final String name;
private Joined(final Serde<K> keySerde,
final Serde<V> valueSerde,
@ -40,6 +39,10 @@ public class Joined<K, V, VO> { @@ -40,6 +39,10 @@ public class Joined<K, V, VO> {
this.name = name;
}
protected Joined(final Joined<K, V, VO> joined) {
this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name);
}
/**
* Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
* {@code null} values are accepted and will be replaced by the default serdes as defined in config.
@ -135,11 +138,30 @@ public class Joined<K, V, VO> { @@ -135,11 +138,30 @@ public class Joined<K, V, VO> {
* @param <V> value type
* @param <VO> other value type
* @return new {@code Joined} instance configured with the name
*
* @deprecated use {@link #as(String)} instead
*/
@Deprecated
public static <K, V, VO> Joined<K, V, VO> named(final String name) {
return new Joined<>(null, null, null, name);
}
/**
* Create an instance of {@code Joined} with base name for all components of the join, this may
* include any repartition topics created to complete the join.
*
* @param name the name used as the base for naming components of the join including any
* repartition topics
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @return new {@code Joined} instance configured with the name
*
*/
public static <K, V, VO> Joined<K, V, VO> as(final String name) {
return new Joined<>(null, null, null, name);
}
/**
* Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
@ -182,6 +204,7 @@ public class Joined<K, V, VO> { @@ -182,6 +204,7 @@ public class Joined<K, V, VO> {
* repartition topics
* @return new {@code Joined} instance configured with the {@code name}
*/
@Override
public Joined<K, V, VO> withName(final String name) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
}
@ -198,7 +221,12 @@ public class Joined<K, V, VO> { @@ -198,7 +221,12 @@ public class Joined<K, V, VO> {
return otherValueSerde;
}
/**
* @deprecated this method will be removed in a in a future release
*/
@Deprecated
public String name() {
return name;
}
}

32
streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
/**
* Default interface which can be used to personalized the named of operations, internal topics or store.
*/
interface NamedOperation<T extends NamedOperation<T>> {
/**
* Sets the name to be used for an operation.
*
* @param name the name to use.
* @return an instance of {@link NamedOperation}
*/
T withName(final String name);
}

3
streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java

@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import java.time.Duration;
public interface Suppressed<K> {
public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
/**
* Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
@ -163,5 +163,6 @@ public interface Suppressed<K> { @@ -163,5 +163,6 @@ public interface Suppressed<K> {
* @param name The name to be used for the suppression node and changelog topic
* @return The same configuration with the addition of the given {@code name}.
*/
@Override
Suppressed<K> withName(final String name);
}

45
streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Joined;
public class JoinedInternal<K, V, VO> extends Joined<K, V, VO> {
JoinedInternal(final Joined<K, V, VO> joined) {
super(joined);
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
}
public Serde<VO> otherValueSerde() {
return otherValueSerde;
}
@Override // TODO remove annotation when super.name() is removed
@SuppressWarnings("deprecation") // this method should not be removed if super.name() is removed
public String name() {
return name;
}
}

15
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -567,13 +567,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -567,13 +567,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final String name = joinedInternal.name();
if (joinThis.repartitionRequired) {
final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name;
joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
}
if (joinOther.repartitionRequired) {
final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name;
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
}
@ -679,9 +681,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -679,9 +681,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final String name = joinedInternal.name();
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
joined.name() != null ? joined.name() : name,
name != null ? name : this.name,
joined.keySerde(),
joined.valueSerde()
);
@ -703,9 +708,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -703,9 +708,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final String internalName = joinedInternal.name();
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
joined.name() != null ? joined.name() : name,
internalName != null ? internalName : name,
joined.keySerde(),
joined.valueSerde()
);

Loading…
Cancel
Save