Browse Source

MINOR: cleanup some warning in Kafka Streams examples (#14547)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/13084/merge
Matthias J. Sax 1 year ago committed by GitHub
parent
commit
cd1b7639cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
  2. 4
      streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
  3. 8
      streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
  4. 20
      streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
  5. 4
      streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
  6. 4
      streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
  7. 7
      streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
  8. 9
      streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java

13
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java

@ -47,32 +47,31 @@ import java.util.concurrent.CountDownLatch;
* using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
* in Kafka Streams. * in Kafka Streams.
* *
* In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" * <p>In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
* with a user profile table that reads from a topic named "streams-userprofile-input", where the data format * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
* is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
* *
* Before running this example you must create the input topics and the output topic (e.g. via * <p>Before running this example you must create the input topics and the output topic (e.g. via
* bin/kafka-topics --create ...), and write some data to the input topics (e.g. via * bin/kafka-topics --create ...), and write some data to the input topics (e.g. via
* bin/kafka-console-producer). Otherwise you won't see any data arriving in the output topic. * bin/kafka-console-producer). Otherwise you won't see any data arriving in the output topic.
* *
* The inputs for this example are: * <p>The inputs for this example are:
* - Topic: streams-pageview-input * - Topic: streams-pageview-input
* Key Format: (String) USER_ID * Key Format: (String) USER_ID
* Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": (String PAGE_ID), "timestamp": (long ms TIMESTAMP)} * Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": (String PAGE_ID), "timestamp": (long ms TIMESTAMP)}
* * <p>
* - Topic: streams-userprofile-input * - Topic: streams-userprofile-input
* Key Format: (String) USER_ID * Key Format: (String) USER_ID
* Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": (long ms TIMESTAMP)} * Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": (long ms TIMESTAMP)}
* *
* To observe the results, read the output topic (e.g., via bin/kafka-console-consumer) * <p>To observe the results, read the output topic (e.g., via bin/kafka-console-consumer)
* - Topic: streams-pageviewstats-typed-output * - Topic: streams-pageviewstats-typed-output
* Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms WINDOW_TIMESTAMP), "region": (String REGION)} * Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms WINDOW_TIMESTAMP), "region": (String REGION)}
* Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": (String REGION)} * Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": (String REGION)}
* *
* Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the * <p>Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the
* generic {@link JSONSerde}. If you instead specify a specific serde per class, you won't need the extra "_t" field. * generic {@link JSONSerde}. If you instead specify a specific serde per class, you won't need the extra "_t" field.
*/ */
@SuppressWarnings({"WeakerAccess", "unused"})
public class PageViewTypedDemo { public class PageViewTypedDemo {
/** /**

4
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java

@ -45,11 +45,11 @@ import java.util.Properties;
* using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
* in Kafka Streams. * in Kafka Streams.
* *
* In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" * <p>In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
* with a user profile table that reads from a topic named "streams-userprofile-input", where the data format * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
* is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
* *
* Before running this example you must create the input topics and the output topic (e.g. via * <p>Before running this example you must create the input topics and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topics (e.g. via * bin/kafka-topics.sh --create ...), and write some data to the input topics (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/ */

8
streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java

@ -29,10 +29,10 @@ import java.util.concurrent.CountDownLatch;
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
* write data to a sink (output) topic. * write data to a sink (output) topic.
* *
* In this example, we implement a simple "pipe" program that reads from a source topic "streams-plaintext-input" * <p>In this example, we implement a simple "pipe" program that reads from a source topic "streams-plaintext-input"
* and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
* *
* Before running this example you must create the input topic and the output topic (e.g. via * <p>Before running this example you must create the input topic and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via * bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/ */
@ -42,8 +42,8 @@ public class PipeDemo {
final Properties props = new Properties(); final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

20
streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java

@ -37,26 +37,26 @@ import java.util.concurrent.CountDownLatch;
* which ingests temperature value to compute the maximum value in the latest TEMPERATURE_WINDOW_SIZE seconds (which * which ingests temperature value to compute the maximum value in the latest TEMPERATURE_WINDOW_SIZE seconds (which
* is 5 seconds) and send a new message if it exceeds the TEMPERATURE_THRESHOLD (which is 20) * is 5 seconds) and send a new message if it exceeds the TEMPERATURE_THRESHOLD (which is 20)
* *
* In this example, the input stream reads from a topic named "iot-temperature", where the values of messages * <p>In this example, the input stream reads from a topic named "iot-temperature", where the values of messages
* represent temperature values; using a TEMPERATURE_WINDOW_SIZE seconds "tumbling" window, the maximum value is processed and * represent temperature values; using a TEMPERATURE_WINDOW_SIZE seconds "tumbling" window, the maximum value is processed and
* sent to a topic named "iot-temperature-max" if it exceeds the TEMPERATURE_THRESHOLD. * sent to a topic named "iot-temperature-max" if it exceeds the TEMPERATURE_THRESHOLD.
* *
* Before running this example you must create the input topic for temperature values in the following way : * <p>Before running this example you must create the input topic for temperature values in the following way :
* *
* bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature * <p>bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature
* *
* and at same time create the output topic for filtered values : * <p>and at same time create the output topic for filtered values :
* *
* bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature-max * <p>bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature-max
* *
* After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic : * <p>After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic :
* *
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iot-temperature-max --from-beginning * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iot-temperature-max --from-beginning
* *
* On the other side, a console producer can be used for sending temperature values (which needs to be integers) * <p>On the other side, a console producer can be used for sending temperature values (which needs to be integers)
* to "iot-temperature" by typing them on the console : * to "iot-temperature" by typing them on the console :
* *
* bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic iot-temperature * <p>bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic iot-temperature
* > 10 * > 10
* > 15 * > 15
* > 22 * > 22
@ -73,8 +73,8 @@ public class TemperatureDemo {
final Properties props = new Properties(); final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);

4
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

@ -62,8 +62,8 @@ public final class WordCountDemo {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool: // Note: To re-run the demo, you need to use the offset reset tool:

4
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

@ -109,8 +109,8 @@ public final class WordCountProcessorDemo {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

7
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java

@ -100,9 +100,6 @@ public final class WordCountTransformerDemo {
} }
} }
} }
@Override
public void close() {}
}; };
} }
@ -128,8 +125,8 @@ public final class WordCountTransformerDemo {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer"); props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

9
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java

@ -54,8 +54,8 @@ public class DeveloperGuideTesting {
private TestOutputTopic<String, Long> outputTopic; private TestOutputTopic<String, Long> outputTopic;
private KeyValueStore<String, Long> store; private KeyValueStore<String, Long> store;
private Serde<String> stringSerde = new Serdes.StringSerde(); private final Serde<String> stringSerde = new Serdes.StringSerde();
private Serde<Long> longSerde = new Serdes.LongSerde(); private final Serde<Long> longSerde = new Serdes.LongSerde();
@BeforeEach @BeforeEach
public void setup() { public void setup() {
@ -72,8 +72,8 @@ public class DeveloperGuideTesting {
// setup test driver // setup test driver
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
testDriver = new TopologyTestDriver(topology, props); testDriver = new TopologyTestDriver(topology, props);
// setup test topics // setup test topics
@ -155,7 +155,6 @@ public class DeveloperGuideTesting {
ProcessorContext<String, Long> context; ProcessorContext<String, Long> context;
private KeyValueStore<String, Long> store; private KeyValueStore<String, Long> store;
@SuppressWarnings("unchecked")
@Override @Override
public void init(final ProcessorContext<String, Long> context) { public void init(final ProcessorContext<String, Long> context) {
this.context = context; this.context = context;

Loading…
Cancel
Save