Commit 9a4f00f78b changed the constructor of DescribeConfigsResponse.
The build failed with the following error:
```
/home/chia7712/kafka/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java:582: error: incompatible types: int cannot be converted to Struct
return new DescribeConfigsResponse(1000, configs);
^
Note: Some messages have been simplified; recompile with -Xdiags:verbose to get full output
1 error
```
Reviewers: Boyang Chan <boyang@confluent.io>, Tom Bentley <tbentley@redhat.com>, Konstantine Karantasis <konstantine@confluent.io>
We need to make sure that corrupted standby tasks are actually cleaned up upon a TaskCorruptedException. However due to the commit prior to invoking handleCorruption, it's possible to throw a TaskMigratedException before actually cleaning up any of the corrupted tasks.
This is fine for active tasks since handleLostAll will finish up the job, but it does nothing with standby tasks. We should make sure that standby tasks are handled before attempting to commit (which we can do, since we don't need to commit anything for the corrupted standbys)
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This PR fixes the cause of failing tests mentioned in the jira:
kafka.network.DynamicConnectionQuotaTest
kafka.api.CustomQuotaCallbackTest
kafka.server.DynamicBrokerReconfigurationTest
Issue:
The call to ChangeNotificationProcessorThread.queue.take() could throw an InterruptedException. While the queue is empty and the thread is blocking on taking an item from the queue, a concurrent call to FinalizedFeatureChangeListener.close() could interrupt the thread and cause an InterruptedException to be raised from queue.take(). In such a case, it is safe to ignore the exception since the thread is being shutdown.
Definitely ignoring the InterruptedException for the above reason was the intent of the code that used the ignoring clause for the same. But it seems unfortunately the ignoring clause does not ignore InterruptedException, so that doesn't work for us. To confirm this theory, I found the following code in scala.util.control.Exception.scala: https://github.com/scala/scala/blob/v2.12.0/src/library/scala/util/control/Exception.scala#L167-L176.
Fix:
The fix in this PR is to just not use the ignoring clause. We rely on existing mechanism in ShutdownableThread that ignores the exception during shutdown.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chan <boyang@confluent.io>, Anna Povzner <anna@confluent.io>, Jun Rao <junrao@gmail.com>
Bug Details:
Mistakenly setting the value serde to the key serde for an internal wrapped serde in the FKJ workflow.
Testing:
Modified the existing test to reproduce the issue, then verified that the test passes.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@apache.org>
In this PR, I have implemented various classes and integration for the read path of the feature versioning system (KIP-584). The ultimate plan is that the cluster-wide finalized features information is going to be stored in ZK under the node /feature. The read path implemented in this PR is centered around reading this finalized features information from ZK, and, processing it inside the Broker.
Here is a summary of what's in this PR (a lot of it is new classes):
A facility is provided in the broker to declare its supported features, and advertise its supported features via its own BrokerIdZNode under a features key.
A facility is provided in the broker to listen to and propagate cluster-wide finalized feature changes from ZK.
When new finalized features are read from ZK, feature incompatibilities are detected by comparing against the broker's own supported features.
ApiVersionsResponse is now served containing supported and finalized feature information (using the newly added tagged fields).
Reviewers: Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
Split out the optimized source changelogs and fetch the committed offsets rather than the end offset for task lag computation
Reviewers: John Roesler <vvcephei@apache.org>
Add an integration test for the task assignor.
* ensure we see proper scale-out behavior with warmups
* ensure in-memory stores are properly recycled and not restored through the scale-out process
Fix two bugs revealed by the test:
Bug 1: we can't remove active tasks in the cooperative algorithm, because this causes their state to get discarded (definitely for in-memory stores, and maybe for persistent ones, depending on the state cleaner). Instead, we convert them to standbys so they can keep warm.
Bug 2: tasks with only in-memory stores weren't reporting their offset positions
Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
These changes allow herders to continue to function even when a connector they are running hangs in its start, stop, initialize, validate, and/or config methods.
The main idea is to make these connector interactions asynchronous and accept a callback that can be invoked upon the completion (successful or otherwise) of these interactions. The distributed herder handles any follow-up logic by adding a new herder request to its queue in that callback, which helps preserve some synchronization and ordering guarantees provided by the current tick model.
If any connector refuses to shut down within a graceful timeout period, the framework will abandon it and potentially start a new connector in its place (in cases such as connector restart or reconfiguration).
Existing unit tests for the distributed herder and worker have been modified to reflect these changes, and a new integration test named `BlockingConnectorTest` has been added to ensure that they work in practice.
Reviewers: Greg Harris <gregh@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
Make sure that the Errant Record Reporter recently added in KIP-610 adheres to the `errors.tolerance` policy.
Author: Aakash Shah <ashah@confluent.io>
Reviewers: Arjun Satish <arjunconfluent.io>, Randall Hauch <rhauch@gmail.com>
This change adds a check to the KafkaConfigBackingStore, KafkaOffsetBackingStore, and KafkaStatusBackingStore to use the admin client to verify that the internal topics are compacted and do not use the `delete` cleanup policy.
Connect already will create the internal topics with `cleanup.policy=compact` if the topics do not yet exist when the Connect workers are started; the new topics are created always as compacted, overwriting any user-specified `cleanup.policy`. However, if the topics already exist the worker did not previously verify the internal topics were compacted, such as when a user manually creates the internal topics before starting Connect or manually changes the topic settings after the fact.
The current change helps guard against users running Connect with topics that have delete cleanup policy enabled, which will remove all connector configurations, source offsets, and connector & task statuses that are older than the retention time. This means that, for example, the configuration for a long-running connector could be deleted by the broker, and this will cause restart issues upon a subsequent rebalance or restarting of Connect worker(s).
Connect behavior requires that its internal topics are compacted and not deleted after some retention time. Therefore, this additional check is simply enforcing the existing expectations, and therefore does not need a KIP.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>, Chris Egerton <chrise@confluent.io>
With Incremental Cooperative Rebalancing, if a worker returns after it's been out of the group for sometime (essentially as a zombie worker) and hasn't voluntarily revoked its own connectors and tasks in the meantime, there's the possibility that these assignments have been distributed to other workers and redundant connectors and tasks might be running now in the Connect cluster.
This PR complements previous fixes such as KAFKA-9184, KAFKA-9849 and KAFKA-9851 providing a last line of defense against zombie tasks: if at any rebalance round the leader worker detects that there are duplicate assignments in the group, it revokes them completely and resolves duplication with a correct assignment in the rebalancing round that will follow task revocation.
Author: Wang <ywang50@ebay.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
This fix excludes `ConnectorClientConfigRequest` and its inner class from class loading isolation in a similar way that KAFKA-8415 excluded `ConnectorClientConfigOverridePolicy`.
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
The latest commit #8254 on this test deleted all topics after each test, but the topic was actually shared among tests before. And after that we are relying on the less-reliable auto-topic generation to get the topic which makes the test flaky.
I'm now using different topics for different tests, also setting the app.id for tests differently.
Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Ensure all channels get closed in `Selector.close`, even if some of them raise errors.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Uses a similar (but slightly different) algorithm as in KAFKA-9987 to produce a maximally sticky -- and perfectly balanced -- assignment of tasks to threads within a single client. This is important for in-memory stores which get wiped out when transferred between threads.
Reviewers: John Roesler <vvcephei@apache.org>
For better visibility on the group rebalance, we are trying to print out the evicted members inside the group coordinator during rebalance complete.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Struct value validation in Kafka Connect can be optimized
to avoid creating an Iterator when the expectedClasses list is of
size 1. This is a meaningful enhancement for high throughput
connectors.
Reviewers: Konstantine Karantasis <konstantine@confluent.io>
When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the end of the config topic, it needs to voluntarily revoke its locally running tasks on time, before these tasks get assigned to another worker, creating a situation where redundant tasks are running in the Connect cluster.
Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for the eager rebalancing protocol and has a long default value (which coincidentally is equal to the default value of the rebalance delay of the incremental cooperative protocol), the worker should quickly attempt to re-read the config topic and backoff for a fraction of the rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before it revokes its running assignment and for a cumulative delay less than the configured `scheduled.rebalance.max.delay.ms`.
Unit tests are added to cover the backoff logic with incremental cooperative rebalancing.
Reviewers: Randall Hauch <rhauch@gmail.com>
There is some confusion over the compression rate metrics, as the meaning of the value isn't clearly stated in the metric description. In this case, it was assumed that a higher compression rate value meant better compression. This PR clarifies the meaning of the value, to prevent misunderstandings.
Reviewers: Jason Gustafson <jason@confluent.io>
In the first version of the incremental cooperative protocol, in the presence of a failed sync request by the leader, the assignor was designed to treat the unapplied assignments as lost and trigger a rebalance delay.
This commit applies optimizations in these cases to avoid the unnecessary activation of the rebalancing delay. First, if the worker that loses the sync group request or response is the leader, then it detects this failure by checking the what is the expected generation when it performs task assignments. If it's not the expected one, it resets its view of the previous assignment because it wasn't successfully applied and it doesn't represent a correct state. Furthermore, if the worker that has missed the assignment sync is an ordinary worker, then the leader is able to detect that there are lost assignments and instead of triggering a rebalance delay among the same members of the group, it treats the lost tasks as new tasks and reassigns them immediately. If the lost assignment included revocations that were not applied, the leader reapplies these revocations again.
Existing unit tests and integration tests are adapted to test the proposed optimizations.
Reviewers: Randall Hauch <rhauch@gmail.com>
The current "about" string incorrectly describes the session epoch as the partition epoch. Rename to `SessionEpoch` to make usage clearer. Also rename `MaxWait` to `MaxWaitMs` to make the time unit clear and `FetchableTopic` to `FetchTopic` for consistency with `FetchPartition`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Some `LogCleaner` metrics have an unsafe call to `max` over the collection of cleaner threads, which could be empty after shutdown. This leads to an `UnsupportedOperationException`. This patch fixes the problem by changing the computation to use `foldLeft`.
Reviewers: Jason Gustafson <jason@confluent.io>
_unknownTaggedFields contains tagged fields which we don't understand
with the current schema. However, we still want to keep the data around
for various purposes. For example, if we are printing out a JSON form of
the message we received, we want to include a section containing the
tagged fields that couldn't be parsed. To leave these out would give an
incorrect impression of what was sent over the wire. Since the unknown
tagged fields represent real data, they should be included in the fields
checked by equals().
Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>
`SelectorMetrics` has a per-connection metrics, which means the number of `MetricName` objects and the strings associated with it (such as group name and description) grows with the number of connections in the client. This overhead of duplicate string objects is amplified when there are multiple instances of kafka clients within the same JVM. This patch addresses some of the memory overhead by making `metricGrpName` a constant field and introducing a new field `perConnectionMetricGrpName`.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.
In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.
This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.
This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main
A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.
Author: Evelyn Bayes <evelyn@confluent.io>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Until recently revocation of connectors and tasks was the result of a rebalance that contained a new assignment. Therefore the view of the running assignment was kept consistent outside the call to `RebalanceListener#onRevoke`. However, after KAFKA-9184 the need appeared for the worker to revoke tasks voluntarily and proactively without having received a new assignment.
This commit will allow the worker to restart tasks that have been stopped as a result of voluntary revocation after a rebalance reassigns these tasks to the work.
The fix is tested by extending an existing integration test.
Reviewers: Randall Hauch <rhauch@gmail.com>
This PR provides two fixes:
1. Skip offset validation if the current leader epoch cannot be reliably determined.
2. Raise an out of range error if the leader returns an undefined offset in response to the OffsetsForLeaderEpoch request.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Define SSL configs in all worker config classes, not just distributed
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Randall Hauch <rhauch@gmail.com>
Connector projects may have their own mock or testing implementations of the `SinkTaskContext`, and this newly-added method should be a default method to prevent breaking those projects. Changing this to a default method that returns null also makes sense w/r/t the method semantics, since the method is already defined to return null if the reporter has not been configured.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>