Browse Source

MINOR: Code cleanup (#4229)

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian@confluent.io>, Paolo Patierno <ppatierno@live.com>, Ismael Juma <ismael@juma.me.uk>
pull/4565/head
Kamal C 7 years ago committed by Guozhang Wang
parent
commit
83b058a0d1
  1. 10
      clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
  2. 9
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
  3. 26
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

10
clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java

@ -79,8 +79,8 @@ public final class Sensor { @@ -79,8 +79,8 @@ public final class Sensor {
public static RecordingLevel forId(int id) {
if (id < MIN_RECORDING_LEVEL_KEY || id > MAX_RECORDING_LEVEL_KEY)
throw new IllegalArgumentException(String.format("Unexpected RecordLevel id `%s`, it should be between `%s` " +
"and `%s` (inclusive)", id, MIN_RECORDING_LEVEL_KEY, MAX_RECORDING_LEVEL_KEY));
throw new IllegalArgumentException(String.format("Unexpected RecordLevel id `%d`, it should be between `%d` " +
"and `%d` (inclusive)", id, MIN_RECORDING_LEVEL_KEY, MAX_RECORDING_LEVEL_KEY));
return ID_TO_TYPE[id];
}
@ -90,11 +90,7 @@ public final class Sensor { @@ -90,11 +90,7 @@ public final class Sensor {
}
public boolean shouldRecord(final int configId) {
if (configId == DEBUG.id) {
return true;
} else {
return configId == this.id;
}
return configId == DEBUG.id || configId == this.id;
}
}

9
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.Topology;
@ -42,10 +43,18 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> { @@ -42,10 +43,18 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
return keySerde;
}
public Deserializer<K> keyDeserializer() {
return keySerde == null ? null : keySerde.deserializer();
}
public Serde<V> valueSerde() {
return valueSerde;
}
public Deserializer<V> valueDeserializer() {
return valueSerde == null ? null : valueSerde.deserializer();
}
public TimestampExtractor timestampExtractor() {
return timestampExtractor;
}

26
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
@ -49,8 +48,8 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -49,8 +48,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
consumed.timestampExtractor(),
consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topics.toArray(new String[topics.size()]));
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
@ -62,8 +61,8 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -62,8 +61,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
consumed.timestampExtractor(),
consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topicPattern);
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
@ -124,8 +123,8 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -124,8 +123,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
source,
consumed.timestampExtractor(),
consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topic);
internalTopologyBuilder.addProcessor(name, processorSupplier, source);
@ -147,14 +146,11 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -147,14 +146,11 @@ public class InternalStreamsBuilder implements InternalNameProvider {
final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
internalTopologyBuilder.addGlobalStore(storeBuilder,
sourceName,
consumed.timestampExtractor(),
keyDeserializer,
valueDeserializer,
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topic,
processorName,
tableSource);
@ -183,13 +179,11 @@ public class InternalStreamsBuilder implements InternalNameProvider { @@ -183,13 +179,11 @@ public class InternalStreamsBuilder implements InternalNameProvider {
final ProcessorSupplier stateUpdateSupplier) {
// explicitly disable logging for global stores
storeBuilder.withLoggingDisabled();
final Deserializer keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
final Deserializer valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
internalTopologyBuilder.addGlobalStore(storeBuilder,
sourceName,
consumed.timestampExtractor(),
keyDeserializer,
valueDeserializer,
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topic,
processorName,
stateUpdateSupplier);

Loading…
Cancel
Save