Cache-level metrics are refactor according to KIP-444:
tag client-id changed to thread-id
name hitRatio changed to hit-ratio
made backward compatible by using streams config built.in.metrics.version
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
* log lock acquistion failures on the state store
* Document required uniqueness of state.dir path
* Move bunch of log calls around task state changes to DEBUG
* More readable log messages during partition assignment
Reviewers: Matthias J. Sax <mjsax@apache.org>, A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Given that the tests do not create clusters larger than 3, we do not gain much by creating 50 partitions for that topic. Reducing it should slightly increase test startup and shutdown speed.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
The streams config built.in.metrics.version is needed to add metrics in
a backward-compatible way. However, not in every location where metrics are
added a streams config is available to check built.in.metrics.version. Thus,
the config value needs to be exposed through the StreamsMetricsImpl object.
Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
Replace the `<table>` elements by `<ul>` so the full page width can be used for the configuration descriptions instead of only a very narrow column. I moved the other fields (Type, Default Value, etc) below each entry.
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
Key changes include:
1. Moves general offset limit updates down to StandbyTask.
2. Updates offsets for StandbyTask at most once per commit and only when we need and updated offset limit to make progress.
3. Avoids writing an 0 checkpoint when StandbyTask.update is called but we cannot apply any of the records.
4. Avoids going into a restoring state in the case that the last checkpoint is greater or equal to the offset limit (consumer committed offset). This needs special attention please. Code is in
StoreChangelogReader.
5. Does update offset limits initially for StreamTask because it provides a way to prevent playing to many records from the changelog (also the input topic with optimized topology).
NOTE: this PR depends on KAFKA-8816, which is under review separately. Fortunately the changes involved are few. You can focus just on the KAFKA-8755 commit if you prefer.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
The previous approach to testing KAFKA-8412 was to look at the logs and
determine if an error occurred during close. There was no direct way to
detect than an exception occurred because the exception was eaten in
AssignedTasks.close. In the PR for that ticket (#7207) it was
acknowledged that this was a brittle way to test for the exception. We
now see occasional failures because an unrelated ERROR level log entry
is made while closing the task.
This change eliminates the brittle log checking by rethrowing any time
an exception occurs in close, even when a subsequent unclean close
succeeds. This has the potential benefit of uncovering other supressed
exceptions down the road.
I've verified that even with us rethrowing on closeUnclean that all
tests pass.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
* Leader instance uses dictionary encoding on the wire to send topic partitions
* Topic names (most expensive component) are mapped to an integer using the dictionary
* Follower instances receive the dictionary, decode topic names back
* Purely an on-the-wire optimization, no in-memory structures changed
* Test case added for version 5 AssignmentInfo
Reviewers: Guozhang Wang <wangguoz@gmail.com>
In case of version probing we would skip the logic for setting cluster / assigned tasks; since these values are initialized as null they are vulnerable to NPE when code changes.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>
The tag key for store level metrics specified in StreamsMetricsImpl
is unified with the tag keys on thread and task level.
Reviewers: Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Reviewers: cpettitt-confluent <53191309+cpettitt-confluent@users.noreply.github.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Currently on commit streams will attempt to delete offsets from repartition topics. However, if a topology does not have any repartition topics, then the recordsToDelete map will be empty.
This PR adds a check that the recordsToDelete is not empty before executing the AdminClient#deleteRecords() method.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Splits the existing StickyAssignor logic into an AbstractStickyAssignor class, which is extended by the existing (eager) StickyAssignor and by the new CooperativeStickyAssignor which supports incremental cooperative rebalancing.
There is no actual change to the logic -- most methods from StickyAssignor were moved to AbstractStickyAssignor to be shared with CooperativeStickyAssignor, and the abstract MemberData memberData(Subscription) method converts the Subscription to the embedded list of owned partitions for each assignor.
The "generation" logic is left in, however this is always Optional.empty() for the CooperativeStickyAssignor as onPartitionsLost should always be called when a generation is missed.
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
RocksDB metrics are added to the Kafka metrics. For each segmented state store only
one set of metrics is exposed rather than one set of metrics for each segment.
The metrics are not computed yet.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:
1. EOS is enabled
2. The task was in a suspended state
The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.
The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.
Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Move the error code resetting logic from the onPartitionsRevoked callback into the streamthread directly after we've decided to rejoin the group, since onPartitionsRevoked are not guaranteed to be triggered.
Ran system tests on the originally failed StreamsUpgradeTest 10 times and passed.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Jun Rao <junrao@gmail.com>
In a KTable context, we should not pass null into a user-supplied serde.
Testing: I verified that the change to the test results in test failures without the patch.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>,
Reverts the TreeMap -> ConcurrentSkipListMap change that caused a performance regression in 2.3, and fixes the ConcurrentModificationException by copying (just) the key set to iterate over
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, Guozhang Wang <guozhang@confluent.io>
Iterator#remove has a default implementation that throws UnsupportedOperatorException so there's no need to override it with the same thing.
Should be cherry-picked back to whenever we switched to Java 8
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
InMemoryKeyValueStore uses ConcurrentSkipListMap#size which takes linear time as it iterates over the entire map. We should just track size ourselves for approximateNumEntries
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates.
Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix.
Author: Paul Whalen <pgwhalen@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
1. Add onPartitionsLost into the RebalanceListener, which will be triggered when the consumer found that the generation is reset due to fatal errors in response handling.
2. Semantical behavior change: with COOPERATIVE protocol, if the revoked / lost partitions are empty, do not trigger the corresponding callback at all. For added partitions though, even if it is empty we would still trigger the callback as a way to notify the rebalance event; with EAGER protocol, revoked / assigned callbacks are always triggered.
The ordering of the callback would be the following:
a. Callback onPartitionsRevoked / onPartitionsLost triggered.
b. Update the assignment (both revoked and added).
c. Callback onPartitionsAssigned triggered.
In this way we are assured that users can still access the partitions being revoked, whereas they can also access the partitions being added.
3. Semantical behavior change (KAFKA-4600): if the rebalance listener throws an exception, pass it along all the way to the consumer.poll caller, but still completes the rest of the actions. Also, the newly assigned partitions list does not gets affected with exception thrown since it is just for notifying the users.
4. Semantical behavior change: the ConsumerCoordinator would not try to modify assignor's returned assignments, instead it will validate that assignments and set the error code accordingly: if there are overlaps between added / revoked partitions, it is a fatal error and would be communicated to all members to stop; if revoked is not empty, it is an error indicate re-join; otherwise, it is normal.
5. Minor: with the error code removed from the Assignment, ConsumerCoordinator will request re-join if the revoked partitions list is not empty.
6. Updated ConsumerCoordinatorTest accordingly. Also found a minor bug in MetadataUpdate that removed topic would still be retained with null value of num.partitions.
6. Updated a few other flaky tests that are exposed due to this change.
Reviewers: John Roesler <vvcephei@users.noreply.github.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Jason Gustafson <jason@confluent.io>
Renames method names in StreamsMetricsImpl to make them consistent.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Reviews: A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>
* Adds RocksDBMetrics class that provides methods to get sensors from the Kafka metrics registry and to setup the sensors to record RocksDB metrics
* Extends StreamsMetricsImpl with functionality to add the required metrics to the sensors.
Reviewers: Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
Follow up to new PartitionAssignor interface merged in 7108 is merged
Adds a PartitionAssignorAdapter class to maintain backwards compatibility
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This patch is part of KIP-345. We are aiming to support batch leave group request issued from admin client. This diff is the first effort to bump leave group request version.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Main changes of this PR:
* Deprecate old consumer.internal.PartitionAssignor and add public consumer.ConsumerPartitionAssignor with all OOTB assignors migrated to new interface
* Refactor assignor's assignment/subscription related classes for easier to evolve API
* Removed version number from classes as it is only needed for serialization/deserialization
* Other previously-discussed cleanup included in this PR:
* Remove Assignment.error added in pt 1
* Remove ConsumerCoordinator#adjustAssignment added in pt 2
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
* Clean up one redundant and one misplaced metric
* Clarify the relationship among these metrics to avoid future confusion
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>