We would like to also export the producer metrics from StreamThread just like consumer metrics, so that we could gain more visibility of stream application. The approach is to pass in the threadProducer into the StreamThread so that we could export its metrics in dynamic.
Note that this is a pure internal change that doesn't require a KIP, and in the future we also want to export admin client metrics. A followup KIP for admin client will be created once this is merged.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch adds a few metrics that are useful for monitoring controller health. See KIP-237 for more detail.
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#4392 from lindong28/KAFKA-3473
1. Remove TopologyBuilder, TopologyBuilderException, KStreamBuilder,
2. Completed the leftover work of https://issues.apache.org/jira/browse/KAFKA-5660, when we remove TopologyBuilderException.
3. Added MockStoreBuilder to replace MockStateStoreSupplier, remove all XXStoreSupplier except StateStoreSupplier as it is still referenced in the logical streams graph.
4. Minor: rename KStreamsFineGrainedAutoResetIntegrationTest.java to FineGrainedAutoResetIntegrationTest.java.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Removed usage of deprecated AdminClient from StreamsResetter
No additional tests are required.
Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Also include a few clean-ups:
* Method/variable/parameter renames to make them consistent with
the class name
* Return `ApiVersion` from `minSupportedFor`
* Use `values` to remove some code duplication
* Reduce duplication in `ApiVersion` by introducing the `shortVersion`
method and building the versions map programatically
* Avoid unnecessary `regex` in `ApiVersion.apply`
* Added scaladoc to a few methods
Some of these were originally discussed in:
https://github.com/apache/kafka/pull/4583#pullrequestreview-98089400
Added a test for `ApiVersion.shortVersion`. Relying on existing tests
for the rest since there is no change in behaviour.
Reviewers: Jason Gustafson <jason@confluent.io>
Also removed the InternalValueTransformerWithKey / Supplier which is used to mock away the deprecated punctuate function.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over
In summary:
- Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
- Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch)
- If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with.
Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
1. Remove the deprecated StateStoreSuppliers, and the corresponding Stores.create() functions and factories: only the base StateStoreSupplier and MockStoreSupplier were still preserved as they are needed by the deprecated TopologyBuilder and KStreamBuilder. Will remove them in a follow-up PR.
2. Add TopologyWrapper.java as the original InternalTopologyBuilderAccessor was removed, but I realized it is still needed as of now.
3. Minor: removed StateStoreTestUtils.java and inline its logic in its callers since now with StoreBuilder it is just a one-liner.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is continuation of #4978.
From Guozhang:
I think to fix this issue, in init we could consider switching the steps of 1 and 2:
initInternal(context);
underlying.init(context, root);
since
volatile boolean open = false;
it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
When AdminClient gets a NOT_CONTROLLER error, it should refresh its metadata and retry the request, rather than making the end-user deal with NotControllerException.
Move AdminClient's metadata management outside of NetworkClient and into AdminMetadataManager. This will make it easier to do more sophisticated metadata management in the future, such as implementing a NodeProvider which fetches the leaders for topics.
Rather than manipulating newCalls directly, the AdminClient service thread now drains it directly into pendingCalls. This minimizes the amount of locking we have to do, since pendingCalls is only accessed from the service thread.
Grow buffers in log cleaner to hold one message set after sanity check even if message set is bigger than max.message.bytes.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
This patch removed a completedFetch from the completedFetches queue upon a failed parse if it contains no records. The following scenario explains why this is needed for an instance of this case – i.e. in TopicAuthorizationException.
0. Let's assume a scenario, in which the consumer is attempting to read from a topic without the necessary read permission.
1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the Fetcher#parseCompletedFetch(CompletedFetch) throws a TopicAuthorizationException (as expected).
2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up without having a chance to poll completedFetches. So, the same completedFetch remains at the completedFetches queue.
3. Upon following calls to Fetcher#fetchedRecords(), peeking the completedFetches will always return the same completedFetch independent of any updates to the ACL that the topic is trying to read from.
4. Hence, despite the creation of an ACL with correct permissions, once the consumer sees the TopicAuthorizationException, it will be unable to recover without a bounce.
Author: Adem Efe Gencer <agencer@linkedin.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4974 from efeg/fix/parseCompletedFetchRemainsInQueue
Switch from sum operations to subtraction to avoid type casting in checks and type overflow during `FlieLogInputStream` work, especially in cases where property `log.segment.bytes` was set close to the `Integer.MAX_VALUE` and used as a `position` inside `nextBatch()` function.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Fixes a deadlock between the controller's beforeInitializingSession callback which holds the zookeeper client initialization lock while awaiting completion of an asynchronous event which itself depends on the same lock.
Also catch and log callback exceptions to ensure the ZooKeeper reconnection takes place.
Finally, configure KafkaScheduler in ZooKeeperClient to have at least 1 thread.
Added tests that fail or hang without the changes in this PR.
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Serdes are confusing in the Scala wrapper:
* We have wrappers around Serializer, Deserializer and Serde which are not very useful.
* We have Serdes in 2 places org.apache.kafka.common.serialization.Serde and in DefaultSerdes, instead we should be having only one place where to find all the Serdes.
I wanted to do this PR before the release as this is a breaking change.
This shouldn't add more so the current tests should be enough.
Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
I'm breaking KAFKA-6813 into a couple of "smaller" PRs and this is the first one. It focused on:
Remove deprecated APIs in KStream, KTable, KGroupedStream, KGroupedTable, SessionWindowedKStream, TimeWindowedKStream.
Also found a couple of overlooked bugs while working on them:
2.a) In KTable.filter / mapValues without the additional parameter indicating the materialized stores, originally we will not materialize the store. After KIP-182 we mistakenly diverge the semantics: for KTable.mapValues it is still the case, for KTable.filter we will always materialize.
2.b) In XXStream/Table.reduce/count, we used to try to reuse the serdes since their types are pre-known (for reduce it is the same types for both key / value, for count it is the same types for key, and Long for value). This was somehow lost in the past refactoring.
2.c) We are enforcing to cast a Serde<V> to Serde<VR> for XXStream / Table.aggregate, for which the returned value type is NOT known, such the enforced casting should not be applied and we should require users to provide us the value serde if they believe the default ones are not applicable.
2.d) Whenever we are creating a new MaterializedInternal we are effectively incrementing the suffix index for the store / processor-node names. However in some places this MaterializedInternal is only used for validation, so the resulted processor-node / store suffix is not monotonic.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Updated RocksDBSegmentedBytesStoreTest class to include time window serdes.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
Reviewer: Matthias J. Sax <matthias@confluent.io>, Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
Updated the upgrade doc as well since we do not have an overloaded function without the deprecated parameter before. Also renamed the 1.2 release version to 2.0.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Several build and documentation updates were required after the merge of KAFKA-6670: Implement a Scala wrapper library for Kafka Streams.
Encode Scala major version into streams-scala artifacts.
To differentiate versions of the kafka-streams-scala artifact across Scala major versions it's required to encode the version into the artifact name before its published to a maven repository. This is accomplished by following a similar release process as kafka core, which encodes the Scala major version and then runs the build for each major version of Scala supported. This is considered standard practice when releasing Scala libraries, but is not handled for us automatically with the basic Scala for Gradle support.
After this change you can generate and install the kafka-streams-scala artifact into the local maven repository:
$ ./gradlew -PscalaVersion=2.11 install
$ ./gradlew -PscalaVersion=2.12 install
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
Remove the deprecated KafkaStreams#toString function. Also override toString() for internal classes for debugging purposes.
Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Moved the shutdown of GlobalStreamThread to after all StreamThread instances have stopped.
There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, the GlobalStreamThread can shutdown first.
If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed "
Tested by running all current streams tests.
Reviewers: Ted Yu <yuzhihong@gmail.com>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Wakeup consumers during shutdown to break them out of any internally blocking calls.
Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully.
The existing tests should be sufficient to verify no regressions.
Author: John Roesler <john@confluent.io>
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4930 from vvcephei/streams-client-wakeup-on-shutdown
minor javadoc updates
While working on this, I also refactored the MockProcessor out of the MockProcessorSupplier to cleanup the unit test paths.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR supersedes PR #4654 as it was growing too large. All comments in that PR should be addressed here.
I will attempt to break the PRs for the topology optimization effort into 3 PRs total and will follow this general plan:
1. This PR only adds the graph nodes and graph. The graph nodes will hold the information used to make calls to the InternalTopologyBuilder when using the DSL. Graph nodes are stored in the StreamsTopologyGraph until the final topology needs building then the graph is traversed and optimizations are made at that point. There are no tests in this PR relying on the follow-up PR to use all current streams tests, which should suffice.
2. PR 2 will intercept all DSL calls and build the graph. The InternalStreamsBuilder uses the graph to provide the required info to the InternalTopologyBuilder and build a topology. The condition of satisfaction for this PR is that all current unit, integration and system tests pass using the graph.
3. PR 3 adds some optimizations mainly automatically repartitioning for operations that may modify a key and have child operations that would normally create a separate repartition topic, saving possible unnecessary repartition topics. For example the following topology:
```
KStream<String, String> mappedStreamOther = inputStream.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
@Override
public KeyValue<? extends String, ? extends String> apply(String key, String value) {
return KeyValue.pair(key.substring(0, 3), value);
}
});
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out");
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(10000)).count().toStream().to("count-two-out");
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out");
```
would create 3 repartion topics, but after applying an optimization strategy, only one is created.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The leader must explicitly check if requested leader epoch is undefined, and return undefined offset so that the follower can fall back to truncating to high watermark. Otherwise, if the leader also is not tracking leader epochs, it may return its LEO, which will the follower to truncate to the incorrect offset.
Log cleaner grows buffers when result.messagesRead is zero. This contains the number of filtered messages read from source which can be zero when transactions are used because batches may be discarded. Log cleaner incorrectly assumes that messages were not read because the buffer was too small and attempts to double the buffer size unnecessarily, failing with an exception if the buffer is already max.message.bytes. Additional check for discarded batches has been added to avoid growing buffers when batches are discarded.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's behavior to retry after this error. While this is a rare case since the user would not commit offsets for topics unless they had been able to fetch from them, but this doesn't really handle the situation where the broker hasn't received any metadata updates.
Reviewers: Jason Gustafson <jason@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>