Moved the shutdown of GlobalStreamThread to after all StreamThread instances have stopped.
There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, the GlobalStreamThread can shutdown first.
If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed "
Tested by running all current streams tests.
Reviewers: Ted Yu <yuzhihong@gmail.com>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Wakeup consumers during shutdown to break them out of any internally blocking calls.
Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully.
The existing tests should be sufficient to verify no regressions.
Author: John Roesler <john@confluent.io>
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4930 from vvcephei/streams-client-wakeup-on-shutdown
minor javadoc updates
While working on this, I also refactored the MockProcessor out of the MockProcessorSupplier to cleanup the unit test paths.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR supersedes PR #4654 as it was growing too large. All comments in that PR should be addressed here.
I will attempt to break the PRs for the topology optimization effort into 3 PRs total and will follow this general plan:
1. This PR only adds the graph nodes and graph. The graph nodes will hold the information used to make calls to the InternalTopologyBuilder when using the DSL. Graph nodes are stored in the StreamsTopologyGraph until the final topology needs building then the graph is traversed and optimizations are made at that point. There are no tests in this PR relying on the follow-up PR to use all current streams tests, which should suffice.
2. PR 2 will intercept all DSL calls and build the graph. The InternalStreamsBuilder uses the graph to provide the required info to the InternalTopologyBuilder and build a topology. The condition of satisfaction for this PR is that all current unit, integration and system tests pass using the graph.
3. PR 3 adds some optimizations mainly automatically repartitioning for operations that may modify a key and have child operations that would normally create a separate repartition topic, saving possible unnecessary repartition topics. For example the following topology:
```
KStream<String, String> mappedStreamOther = inputStream.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
@Override
public KeyValue<? extends String, ? extends String> apply(String key, String value) {
return KeyValue.pair(key.substring(0, 3), value);
}
});
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out");
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(10000)).count().toStream().to("count-two-out");
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out");
```
would create 3 repartion topics, but after applying an optimization strategy, only one is created.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The leader must explicitly check if requested leader epoch is undefined, and return undefined offset so that the follower can fall back to truncating to high watermark. Otherwise, if the leader also is not tracking leader epochs, it may return its LEO, which will the follower to truncate to the incorrect offset.
Log cleaner grows buffers when result.messagesRead is zero. This contains the number of filtered messages read from source which can be zero when transactions are used because batches may be discarded. Log cleaner incorrectly assumes that messages were not read because the buffer was too small and attempts to double the buffer size unnecessarily, failing with an exception if the buffer is already max.message.bytes. Additional check for discarded batches has been added to avoid growing buffers when batches are discarded.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's behavior to retry after this error. While this is a rare case since the user would not commit offsets for topics unless they had been able to fetch from them, but this doesn't really handle the situation where the broker hasn't received any metadata updates.
Reviewers: Jason Gustafson <jason@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This pull request is for JIRA 6657, for KIP-276.
Added unit tests for new getGlobalConsumerConfigs API and make sure existing restore consumer tests are passing.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Enable dynamic update of default unclean leader election config of brokers. A new controller event has been added to process unclean leader election when the config is enabled dynamically.
Reviewers: Dong Lin <lindong28@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Prevent exception thrown by metric reporters to impact request processing and other reporters.
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This PR does the following:
* Remove the StreamsRepeatingIntegerKeyProducerService and the associated Java class
* Add a parameter to VerifiableProducer.java to enable sending keys when specified
* Update the corresponding Python file verifiable_producer.py to support the new parameter.
Reviewers: Matthias J Sax <matthias@confluentio>, Guozhang Wang <wangguoz@gmail.com>
If not pinned, the following error will happen:
Traceback (most recent call last):
File "/usr/bin/pip", line 9, in <module>
from pip import main
ImportError: cannot import name main
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Removed the following: "zookeeper.connect", "key.serde", "value.serde", "timestamp.extractor"
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>
Debian installer packages are no longer available for Java 7.
Also upgrade AMI to latest ubuntu/trusty 14 amd64 as the older
one is no longer available.
Note that this only changes the JDK used to build and run
the system tests. We still have Jenkins jobs that compile
and run the JUnit tests with Java 7 so that we don't use
features that are only available in newer Java versions.
This PR fixes some regressions introduced into streams system tests and sets the upgrade tests to ignore until PR #4636 is merged as it has the fixes for the upgrade tests.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
* Fixes a bug in which all NamedCache instances in a process shared
one parent metric.
* Also fixes a bug which incorrectly computed the per-cache metric tag
(which was undetected due to the former bug).
* Drop the StreamsMetricsConventions#xLevelSensorName convention
in favor of StreamsMetricsImpl#xLevelSensor to allow StreamsMetricsImpl
to track thread- and cache-level metrics, so that they may be cleanly declared
from anywhere but still unloaded at the appropriate time. This was necessary
right now so that the NamedCache could register a thread-level parent sensor
to be unloaded when the thread, not the cache, is closed.
* The above changes made it mostly unnecessary for the StreamsMetricsImpl to
expose a reference to the underlying Metrics registry, so I did a little extra work
to remove that reference, including removing inconsistently-used and unnecessary
calls to Metrics#close() in the tests.
The existing tests should be sufficient to verify this change.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The current Iterator-based ListConsumerGroups API is synchronous. The API should be asynchronous to fit in with the other AdminClient APIs. Also fix some error handling corner cases.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Added unit tests for ReplicaAlterLogDirsThread. Mostly focused on unit tests for truncating logic.
Fixed ReplicaAlterLogDirsThread.buildLeaderEpochRequest() to use future replica's latest epoch (not the latest epoch of replica it is fetching from). This follows the logic that offset for leader epoch request should be based on leader epoch of the follower (in this case it's the future local replica).
Also fixed PartitionFetchState constructor that takes offset and delay. The code ignored the delay parameter and used 0 for the delay. This constructor is used only by another constructor which passes delay = 0, which luckily works.
Author: Anna Povzner <anna@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#4918 from apovzner/kafka-6795
Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Currently if the client sends a produce request or a fetch request to a broker which isn't a replica,
we return UNKNOWN_TOPIC_OR_PARTITION. This is a bit surprising to see when the topic actually
exists. It would be better to return NOT_LEADER to avoid confusion. Clients typically handle both errors by refreshing metadata and retrying, so changing this should not cause any change in behavior on the client. This case can be hit following a partition reassignment after the leader is moved and the local replica is deleted.
To validate the current behavior and the fix, I've added integration tests for the fetch and produce APIs.
This PR implements a Scala wrapper library for Kafka Streams. The library is implemented as a project under streams, namely `:streams:streams-scala`. The PR contains the following:
* the library implementation of the wrapper abstractions
* the test suite
* the changes in `build.gradle` to build the library jar
The library has been tested running the tests as follows:
```
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdes streams:streams-scala:test
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro streams:streams-scala:test
$ ./gradlew -Dtest.single=WordCountTest streams:streams-scala:test
```
Author: Debasish Ghosh <ghosh.debasish@gmail.com>
Author: Sean Glover <seglo@randonom.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>, John Roesler <john@confluent.io>, Damian Guy <damian@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4756 from debasishg/scala-streams
* unify skipped records metering
* log warnings when things get skipped
* tighten up metrics usage a bit
### Testing strategy:
Unit testing of the metrics and the logs should be sufficient.
Author: John Roesler <john@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4812 from vvcephei/kip-274-streams-skip-metrics
There are a couple minor additions in this PR:
1. Add a new test for window store, to range query upon receiving each record.
2. In the non-windowed state store case, add a get call before the put call.
3. Enable caching by default to be consistent with other Join / Aggregate cases, where caching is enabled by default.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>