This PR supersedes PR #4654 as it was growing too large. All comments in that PR should be addressed here.
I will attempt to break the PRs for the topology optimization effort into 3 PRs total and will follow this general plan:
1. This PR only adds the graph nodes and graph. The graph nodes will hold the information used to make calls to the InternalTopologyBuilder when using the DSL. Graph nodes are stored in the StreamsTopologyGraph until the final topology needs building then the graph is traversed and optimizations are made at that point. There are no tests in this PR relying on the follow-up PR to use all current streams tests, which should suffice.
2. PR 2 will intercept all DSL calls and build the graph. The InternalStreamsBuilder uses the graph to provide the required info to the InternalTopologyBuilder and build a topology. The condition of satisfaction for this PR is that all current unit, integration and system tests pass using the graph.
3. PR 3 adds some optimizations mainly automatically repartitioning for operations that may modify a key and have child operations that would normally create a separate repartition topic, saving possible unnecessary repartition topics. For example the following topology:
```
KStream<String, String> mappedStreamOther = inputStream.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
@Override
public KeyValue<? extends String, ? extends String> apply(String key, String value) {
return KeyValue.pair(key.substring(0, 3), value);
}
});
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out");
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(10000)).count().toStream().to("count-two-out");
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out");
```
would create 3 repartion topics, but after applying an optimization strategy, only one is created.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This pull request is for JIRA 6657, for KIP-276.
Added unit tests for new getGlobalConsumerConfigs API and make sure existing restore consumer tests are passing.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This PR does the following:
* Remove the StreamsRepeatingIntegerKeyProducerService and the associated Java class
* Add a parameter to VerifiableProducer.java to enable sending keys when specified
* Update the corresponding Python file verifiable_producer.py to support the new parameter.
Reviewers: Matthias J Sax <matthias@confluentio>, Guozhang Wang <wangguoz@gmail.com>
Removed the following: "zookeeper.connect", "key.serde", "value.serde", "timestamp.extractor"
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>
* Fixes a bug in which all NamedCache instances in a process shared
one parent metric.
* Also fixes a bug which incorrectly computed the per-cache metric tag
(which was undetected due to the former bug).
* Drop the StreamsMetricsConventions#xLevelSensorName convention
in favor of StreamsMetricsImpl#xLevelSensor to allow StreamsMetricsImpl
to track thread- and cache-level metrics, so that they may be cleanly declared
from anywhere but still unloaded at the appropriate time. This was necessary
right now so that the NamedCache could register a thread-level parent sensor
to be unloaded when the thread, not the cache, is closed.
* The above changes made it mostly unnecessary for the StreamsMetricsImpl to
expose a reference to the underlying Metrics registry, so I did a little extra work
to remove that reference, including removing inconsistently-used and unnecessary
calls to Metrics#close() in the tests.
The existing tests should be sufficient to verify this change.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This PR implements a Scala wrapper library for Kafka Streams. The library is implemented as a project under streams, namely `:streams:streams-scala`. The PR contains the following:
* the library implementation of the wrapper abstractions
* the test suite
* the changes in `build.gradle` to build the library jar
The library has been tested running the tests as follows:
```
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdes streams:streams-scala:test
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro streams:streams-scala:test
$ ./gradlew -Dtest.single=WordCountTest streams:streams-scala:test
```
Author: Debasish Ghosh <ghosh.debasish@gmail.com>
Author: Sean Glover <seglo@randonom.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>, John Roesler <john@confluent.io>, Damian Guy <damian@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4756 from debasishg/scala-streams
* unify skipped records metering
* log warnings when things get skipped
* tighten up metrics usage a bit
### Testing strategy:
Unit testing of the metrics and the logs should be sufficient.
Author: John Roesler <john@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4812 from vvcephei/kip-274-streams-skip-metrics
There are a couple minor additions in this PR:
1. Add a new test for window store, to range query upon receiving each record.
2. In the non-windowed state store case, add a get call before the put call.
3. Enable caching by default to be consistent with other Join / Aggregate cases, where caching is enabled by default.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
In the AbstractResetIntegrationTest we can have a transient error when setting the time for the test where the new time is less than the original time, for those cases we should catch the exception and re-try setting the time once versus letting the test fail.
For testing, ran the entire streams test suite.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
If users don't create all topics before starting a streams application, they could get unexpected results. For example, sharing a state store between sub-topologies where one input topic is not created ahead time results in log message that that "Partition X is not assigned to any tasks" does not give any clues as to how this could have occurred.
Also, this PR changes the log level from INFO to WARN when metadata does not have partitions for a given topic.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Some anonymous classes of AbstractProcessor didn't initialize their superclass. This will not set up ProcessorContext context at AbstractProcessor.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
- adds Streams upgrade tests for 1.1 release
- introduces metadata version 3
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
General cleanup of Streams code, mostly resolving compiler warnings and re-formatting.
The regular testing suite should be sufficient.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
guozhangwang
While TopologyTestDriver works well with stores created from KTable it does not with stores from GlobalKTable.
Moreover, for my testing purposes but I think it can be useful to others, I need to get access to the MockProducer inside TopologyTestDriver.
I have added 4 new tests to TopologyTestDriverTest, two for stores from KTable and two for stores from GlobalKTable.
While I was changing the TopologyTestDriver I've also make it implement java.io.Closeable.
Author: Valentino Proietti <valentino.proietti@kydea.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4823 from Vale68/KAFKA-6742
minor renaming
SimpleBenchmark:
1.a Do not rely on manual num.records / bytes collection on atomic integers.
1.b Rely on config files for num.threads, bootstrap.servers, etc.
1.c Add parameters for key skewness and value size.
1.d Refactor the tests for loading phase, adding tumbling-windowed count.
1.e For consumer / consumeproduce, collect metrics on consumer instead.
1.f Force stop the test after 3 minutes, this is based on empirical numbers of 10M records.
Other tests: use config for kafka bootstrap servers.
streams_simple_benchmark.py: only use scale 1 for system test, remove yahoo from benchmark tests.
Note that the JMX based metrics is more accurate than the manually collected metrics.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
As Frederic reported on mailing list under the subject "kafka-streams Invalid transition attempted from state READY to state ABORTING_TRANSACTION", producer#abortTransaction should only be called when transactionInFlight is true.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
After the punctuate() call, we would like to double check on the scheduled flag since the call itself may cancel it.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
KafkaStreams.waitOnState() should check the state to be the given one instead of the hard-coded `NOT_RUNNING`.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
In the stream thread capture of the TaskMigration exception, print the task full information in WARN. In other places only log as INFO, plus additional context information.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This pull request targets https://issues.apache.org/jira/browse/KAFKA-6386
The minor fix to deprecate usage of `StreamsConfig` in favor of `java.util.Properties`.
I created separate public constructors using `Properties` in order to replace the old ones,
and prioritize new functions in the `KafkaStreams.java` file.
Since this is my first time doing open source contribution, I'm very happy to get
any comment or pointer to be more professional and get better next time, thank you Guozhang guozhangwang and Liquan Ishiihara!
testing strategy: existing unit test should be suffice to cover this change.
Author: cs427fa16staff <bchen11@outlook.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
Closes#4354 from abbccdda/starter
github comments
1. Use JmxMixin for SimpleBenchmark (will remove the self reporting in #4744), only when loading phase is false (i.e. we are in fact starting the streams app).
2. Reported the full jmx reported metrics in log files, and in the returned data only return the max values: this is because we want to skip the warming up and cooling down periods that will have lower rate numbers, while max represents the actual rate at full speed.
3. Incorporates two other improves to JMXTool: #1241 and #2950
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Rohan Desai <desai.p.rohan@gmail.com>
This is a general change and is re-requisite to allow streams benchmark test with different streams tests. For the streams benchmark itself I will have a separate PR for switching configs. Details:
1. Create a "streams.properties" file under PERSISTENT_ROOT before all the streams test. For now it will only contain a single config of state.dir pointing to PERSISTENT_ROOT.
2. For all the system test related code, replace the main function parameter of state.dir with propsFilename, then inside the function load the props from the file and apply overrides if necessary.
3. Minor fixes.
Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Remove unnecessary null check in StringDeserializer, MockProducerInterceptor and KStreamImpl.
Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>
As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bill@confluent.io>
Now that we have augmented WindowSerde with non-arg parameters, extract it out as part of the public APIs so that users who want to I/O windowed streams can use it. This is originally introduced by @vitaly-pushkar
This PR grows out to be a much larger one, as I found a few tech debts and bugs while working on it. Here is a summary of the PR:
Public API changes (I will propose a KIP after a first round of reviews):
Add TimeWindowedSerializer, TimeWindowedDeserializer, SessionWindowedSerializer, SessionWindowedDeserializer into o.a.k.streams.kstream. The serializers would implemented an internal WindowedSerializer interface for the serializeBaseKey function used in 3) below.
Add WindowedSerdes into o.a.k.streams.kstream. The reason to now add them into o.a.k.clients's Serdes is that it then needs dependency of streams.
Add "default.windowed.key.serde.inner" and "default.windowed.value.serde.inner" into StreamsConfig, used when "default.key.serde" is specified to use time or session windowed serde. Note this requires the serde class, not the type class.
Consolidated serde format from multiple classes, including SessionKeySerde.java for session, and WindowStoreUtils for time window, into SessionKeySchema and WindowKeySchema.
Bug fix: WindowedStreamPartitioner needs to consider both time window and session window serdes.
Removed RocksDBWindowBytesStore etc optimization since after KIP-182 all the serde know happens on metered store, hence this optimization is not worth.
Bug fix: for time window, the serdes used for store and the serdes used for piping (source and sink node) are different: the former needs to append sequence number but not for the later.
Other minor cleanups: remove unnecessary throws, etc.
Authors: Guozhang Wang <wangguoz@gmail.com>, Vitaly Pushkar <vitaly.pushkar@gmail.com>
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bill@confluent.io>, Xi Hu
Author: John Roeler <john@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
If the result of a fetch from a Window Store results in a null byte array we should return null rather than passing it to the serde to deserialize.
Reviewers: Guozhang Wang <wangguoz@gmail.com>