Since admin client allows use to use flexible offset-spec, we can always set to use read-uncommitted regardless of the EOS config.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
## Background
When a partition subscription is initialized it has a `null` position and is in the INITIALIZING state. Depending on the consumer, it will then transition to one of the other states. Typically a consumer will either reset the offset to earliest/latest, or it will provide an offset (with or without offset metadata). For the reset case, we still have no position to act on so fetches should not occur.
Recently we made changes for KAFKA-9724 (#8376) to prevent clients from entering the AWAIT_VALIDATION state when targeting older brokers. New logic to bypass offset validation as part of this change exposed this new issue.
## Bug and Fix
In the partition subscriptions, the AWAIT_RESET state was incorrectly reporting that it had a position. In some cases a position might actually exist (e.g., if we were resetting offsets during a fetch after a truncation), but in the initialization case no position had been set. We saw this issue in system tests where there is a race between the offset reset completing and the first fetch request being issued.
Since AWAIT_RESET#hasPosition was incorrectly returning `true`, the new logic to bypass offset validation was transitioning the subscription to FETCHING (even though no position existed).
The fix was simply to have AWAIT_RESET#hasPosition to return `false` which should have been the case from the start. Additionally, this fix includes some guards against NPE when reading the position from the subscription.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31
Also remove the workaround used on previous versions from Connect's SSLUtils.
(Reverts KAFKA-9771 - commit ee832d7d)
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>
Reduce test data set from 1000 records to 500.
Some recent test failures indicate that the Jenkins runners aren't
able to process all 1000 records in two minutes.
Also add sanity check that all the test data are readable from the
input topic.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>
* Remove problematic Percentiles measurements until the implementation is fixed
* Fix leaking e2e metrics when task is closed
* Fix leaking metrics when tasks are recycled
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>
* KAFKA-10150: always transition to SUSPENDED during suspend, no matter the current state only call prepareCommit before closing if task.commitNeeded is true
* Don't commit any consumed offsets during handleAssignment -- revoked active tasks (and any others that need committing) will be committed during handleRevocation so we only need to worry about cleaning them up in handleAssignment
* KAFKA-10152: when recycling a task we should always commit consumed offsets (if any), but don't need to write the checkpoint (since changelog offsets are preserved across task transitions)
* Make sure we close all tasks during shutdown, even if an exception is thrown during commit
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The tests failed at assertThat(listener.startOffset, is(equalTo(0L)));. It looks like that it did a restore before the assert. But we should expect the restore sometimes happen to resume the failed tasks by itself. It should not cause the test failure under this situation.
On the other hands, the original tests added the assertThat(listener.startOffset, is(equalTo(0L))); is because in the end of the test, we'll also test the startOffset value. But in the newer version of the test, we don't really care about the startOffset or totalNumRestored value. All we want to test in this test is:
Assert that the current value in store reflects all messages being processed
So, removing the assert can avoid flaky test failure, and also be able to test what the test case want to test.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
* Add documentation for using transformation predicates.
* Add `PredicateDoc` for generating predicate config docs, following the style of `TransformationDoc`.
* Fix the header depth mismatch.
* Avoid generating HTML ids based purely on the config name since there
are very likely to conflict (e.g. #name). Instead allow passing a function
which can be used to generate an id from a config key.
The docs have been generated and tested locally.
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
The warning appears because the skipped offsets are not removed from the checkpoint. However, there is nothing to warn about, because the offset found there corresponding state stores and they were skipped.
A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Commit 9a4f00f78b changed the constructor of DescribeConfigsResponse.
The build failed with the following error:
```
/home/chia7712/kafka/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java:582: error: incompatible types: int cannot be converted to Struct
return new DescribeConfigsResponse(1000, configs);
^
Note: Some messages have been simplified; recompile with -Xdiags:verbose to get full output
1 error
```
Reviewers: Boyang Chan <boyang@confluent.io>, Tom Bentley <tbentley@redhat.com>, Konstantine Karantasis <konstantine@confluent.io>
We need to make sure that corrupted standby tasks are actually cleaned up upon a TaskCorruptedException. However due to the commit prior to invoking handleCorruption, it's possible to throw a TaskMigratedException before actually cleaning up any of the corrupted tasks.
This is fine for active tasks since handleLostAll will finish up the job, but it does nothing with standby tasks. We should make sure that standby tasks are handled before attempting to commit (which we can do, since we don't need to commit anything for the corrupted standbys)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR fixes the cause of failing tests mentioned in the jira:
kafka.network.DynamicConnectionQuotaTest
kafka.api.CustomQuotaCallbackTest
kafka.server.DynamicBrokerReconfigurationTest
Issue:
The call to ChangeNotificationProcessorThread.queue.take() could throw an InterruptedException. While the queue is empty and the thread is blocking on taking an item from the queue, a concurrent call to FinalizedFeatureChangeListener.close() could interrupt the thread and cause an InterruptedException to be raised from queue.take(). In such a case, it is safe to ignore the exception since the thread is being shutdown.
Definitely ignoring the InterruptedException for the above reason was the intent of the code that used the ignoring clause for the same. But it seems unfortunately the ignoring clause does not ignore InterruptedException, so that doesn't work for us. To confirm this theory, I found the following code in scala.util.control.Exception.scala: https://github.com/scala/scala/blob/v2.12.0/src/library/scala/util/control/Exception.scala#L167-L176.
Fix:
The fix in this PR is to just not use the ignoring clause. We rely on existing mechanism in ShutdownableThread that ignores the exception during shutdown.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chan <boyang@confluent.io>, Anna Povzner <anna@confluent.io>, Jun Rao <junrao@gmail.com>
Bug Details:
Mistakenly setting the value serde to the key serde for an internal wrapped serde in the FKJ workflow.
Testing:
Modified the existing test to reproduce the issue, then verified that the test passes.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@apache.org>
In this PR, I have implemented various classes and integration for the read path of the feature versioning system (KIP-584). The ultimate plan is that the cluster-wide finalized features information is going to be stored in ZK under the node /feature. The read path implemented in this PR is centered around reading this finalized features information from ZK, and, processing it inside the Broker.
Here is a summary of what's in this PR (a lot of it is new classes):
A facility is provided in the broker to declare its supported features, and advertise its supported features via its own BrokerIdZNode under a features key.
A facility is provided in the broker to listen to and propagate cluster-wide finalized feature changes from ZK.
When new finalized features are read from ZK, feature incompatibilities are detected by comparing against the broker's own supported features.
ApiVersionsResponse is now served containing supported and finalized feature information (using the newly added tagged fields).
Reviewers: Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
Split out the optimized source changelogs and fetch the committed offsets rather than the end offset for task lag computation
Reviewers: John Roesler <vvcephei@apache.org>
Add an integration test for the task assignor.
* ensure we see proper scale-out behavior with warmups
* ensure in-memory stores are properly recycled and not restored through the scale-out process
Fix two bugs revealed by the test:
Bug 1: we can't remove active tasks in the cooperative algorithm, because this causes their state to get discarded (definitely for in-memory stores, and maybe for persistent ones, depending on the state cleaner). Instead, we convert them to standbys so they can keep warm.
Bug 2: tasks with only in-memory stores weren't reporting their offset positions
Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
These changes allow herders to continue to function even when a connector they are running hangs in its start, stop, initialize, validate, and/or config methods.
The main idea is to make these connector interactions asynchronous and accept a callback that can be invoked upon the completion (successful or otherwise) of these interactions. The distributed herder handles any follow-up logic by adding a new herder request to its queue in that callback, which helps preserve some synchronization and ordering guarantees provided by the current tick model.
If any connector refuses to shut down within a graceful timeout period, the framework will abandon it and potentially start a new connector in its place (in cases such as connector restart or reconfiguration).
Existing unit tests for the distributed herder and worker have been modified to reflect these changes, and a new integration test named `BlockingConnectorTest` has been added to ensure that they work in practice.
Reviewers: Greg Harris <gregh@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
Make sure that the Errant Record Reporter recently added in KIP-610 adheres to the `errors.tolerance` policy.
Author: Aakash Shah <ashah@confluent.io>
Reviewers: Arjun Satish <arjunconfluent.io>, Randall Hauch <rhauch@gmail.com>
This change adds a check to the KafkaConfigBackingStore, KafkaOffsetBackingStore, and KafkaStatusBackingStore to use the admin client to verify that the internal topics are compacted and do not use the `delete` cleanup policy.
Connect already will create the internal topics with `cleanup.policy=compact` if the topics do not yet exist when the Connect workers are started; the new topics are created always as compacted, overwriting any user-specified `cleanup.policy`. However, if the topics already exist the worker did not previously verify the internal topics were compacted, such as when a user manually creates the internal topics before starting Connect or manually changes the topic settings after the fact.
The current change helps guard against users running Connect with topics that have delete cleanup policy enabled, which will remove all connector configurations, source offsets, and connector & task statuses that are older than the retention time. This means that, for example, the configuration for a long-running connector could be deleted by the broker, and this will cause restart issues upon a subsequent rebalance or restarting of Connect worker(s).
Connect behavior requires that its internal topics are compacted and not deleted after some retention time. Therefore, this additional check is simply enforcing the existing expectations, and therefore does not need a KIP.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>, Chris Egerton <chrise@confluent.io>
With Incremental Cooperative Rebalancing, if a worker returns after it's been out of the group for sometime (essentially as a zombie worker) and hasn't voluntarily revoked its own connectors and tasks in the meantime, there's the possibility that these assignments have been distributed to other workers and redundant connectors and tasks might be running now in the Connect cluster.
This PR complements previous fixes such as KAFKA-9184, KAFKA-9849 and KAFKA-9851 providing a last line of defense against zombie tasks: if at any rebalance round the leader worker detects that there are duplicate assignments in the group, it revokes them completely and resolves duplication with a correct assignment in the rebalancing round that will follow task revocation.
Author: Wang <ywang50@ebay.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
This fix excludes `ConnectorClientConfigRequest` and its inner class from class loading isolation in a similar way that KAFKA-8415 excluded `ConnectorClientConfigOverridePolicy`.
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
The latest commit #8254 on this test deleted all topics after each test, but the topic was actually shared among tests before. And after that we are relying on the less-reliable auto-topic generation to get the topic which makes the test flaky.
I'm now using different topics for different tests, also setting the app.id for tests differently.
Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Ensure all channels get closed in `Selector.close`, even if some of them raise errors.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Uses a similar (but slightly different) algorithm as in KAFKA-9987 to produce a maximally sticky -- and perfectly balanced -- assignment of tasks to threads within a single client. This is important for in-memory stores which get wiped out when transferred between threads.
Reviewers: John Roesler <vvcephei@apache.org>
For better visibility on the group rebalance, we are trying to print out the evicted members inside the group coordinator during rebalance complete.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Struct value validation in Kafka Connect can be optimized
to avoid creating an Iterator when the expectedClasses list is of
size 1. This is a meaningful enhancement for high throughput
connectors.
Reviewers: Konstantine Karantasis <konstantine@confluent.io>
When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the end of the config topic, it needs to voluntarily revoke its locally running tasks on time, before these tasks get assigned to another worker, creating a situation where redundant tasks are running in the Connect cluster.
Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for the eager rebalancing protocol and has a long default value (which coincidentally is equal to the default value of the rebalance delay of the incremental cooperative protocol), the worker should quickly attempt to re-read the config topic and backoff for a fraction of the rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before it revokes its running assignment and for a cumulative delay less than the configured `scheduled.rebalance.max.delay.ms`.
Unit tests are added to cover the backoff logic with incremental cooperative rebalancing.
Reviewers: Randall Hauch <rhauch@gmail.com>
There is some confusion over the compression rate metrics, as the meaning of the value isn't clearly stated in the metric description. In this case, it was assumed that a higher compression rate value meant better compression. This PR clarifies the meaning of the value, to prevent misunderstandings.
Reviewers: Jason Gustafson <jason@confluent.io>
In the first version of the incremental cooperative protocol, in the presence of a failed sync request by the leader, the assignor was designed to treat the unapplied assignments as lost and trigger a rebalance delay.
This commit applies optimizations in these cases to avoid the unnecessary activation of the rebalancing delay. First, if the worker that loses the sync group request or response is the leader, then it detects this failure by checking the what is the expected generation when it performs task assignments. If it's not the expected one, it resets its view of the previous assignment because it wasn't successfully applied and it doesn't represent a correct state. Furthermore, if the worker that has missed the assignment sync is an ordinary worker, then the leader is able to detect that there are lost assignments and instead of triggering a rebalance delay among the same members of the group, it treats the lost tasks as new tasks and reassigns them immediately. If the lost assignment included revocations that were not applied, the leader reapplies these revocations again.
Existing unit tests and integration tests are adapted to test the proposed optimizations.
Reviewers: Randall Hauch <rhauch@gmail.com>
The current "about" string incorrectly describes the session epoch as the partition epoch. Rename to `SessionEpoch` to make usage clearer. Also rename `MaxWait` to `MaxWaitMs` to make the time unit clear and `FetchableTopic` to `FetchTopic` for consistency with `FetchPartition`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Some `LogCleaner` metrics have an unsafe call to `max` over the collection of cleaner threads, which could be empty after shutdown. This leads to an `UnsupportedOperationException`. This patch fixes the problem by changing the computation to use `foldLeft`.
Reviewers: Jason Gustafson <jason@confluent.io>