An API call for consumer groups must send a FindCoordinatorRequest to find the consumer group coordinator, and then send a follow-up request to that node. But the coordinator might move after the FindCoordinatorRequest but before the follow-up request is sent. In that case we currently fail.
This change fixes that by detecting this error and then retrying. This fixes listConsumerGroupOffsets, deleteConsumerGroups, and describeConsumerGroups.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Boyang Chen <bchen11@outlook.com>
When consumer coordinator realize the subscription may have changed, today we check again against the joinedSubscription within handleAssignmentMismatch. This checking however is a bit fishy and over-kill as well. It's better just simplifying it to always request re-join.
The joinedSubscription object itself however still need to be maintained for potential augment to avoid extra re-joining the group.
Since testOutdatedCoordinatorAssignment already cover the normal case we also remove the other invalidAssignment test case.
Reviewers: Jason Gustafson <jason@confluent.io>
As title states. We plan to merge this to both trunk and 2.3 if it could fix the stream system tests globally.
Reference implementation: #6673
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
KIP-345 and KIP-392 introduced a couple breaking changes for old versions of bumped protocols. This patch fixes them.
Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
Return a copy of the ConfigDef in Client Configs. Related to KIP-458.
Author: Magesh Nandakumar <magesh.n.kumar@gmail.com
Reviewer: Randall Hauch <rhauch@gmail.com>
- Make endpoint validation configurable on SslEngineBuilder when creating an engine
- Disable endpoint validation for engines created for inter-broker SSL validation since it is unsafe to use `localhost`
- Use empty hostname in validation engine to ensure tests fail if validation is re-enabled by mistake
- Add tests to verify inter-broker SSL validation
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This changes the field "generationid" to "generationId" to be consistent with other uses.
Reviewers: Shaobo Liu <lambda.tencent@gmail.com>, Jason Gustafson <jason@confluent.io>
For static members join/rejoin, we encode the current timestamp in the new member.id. The format looks like group.instance.id-timestamp.
During consumer/broker interaction logic (Join, Sync, Heartbeat, Commit), we shall check the whether group.instance.id is known on group. If yes, we shall match the member.id stored on static membership map with the request member.id. If mismatching, this indicates a conflict consumer has used same group.instance.id, and it will receive a fatal exception to shut down.
Right now the only missing part is the system test. Will work on it offline while getting the major logic changes reviewed.
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This patch includes API changes for follower fetching per [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) as well as the consumer implementation. After this patch, consumers will continue to fetch only from the leader, since the broker implementation to select an alternate read replica is not included here.
Adds new `client.rack` consumer configuration property is added which allows the consumer to indicate its rack. This is just an arbitrary string to indicate some relative location, it doesn't have to actually represent a physical rack. We are keeping the naming consistent with the broker property (`broker.rack`).
FetchRequest now includes `rack_id` which can optionally be specified by the consumer. FetchResponse includes an optional `preferred_read_replica` field for each partition in the response. OffsetForLeaderEpochRequest also adds new `replica_id` field which is similar to the same field in FetchRequest.
When the consumer sees a `preferred_read_replica` in a fetch response, it will use the Node with that ID for the next fetch.
Reviewers: Jason Gustafson <jason@confluent.io>
Initiate `unreachable server` scenario before starting admin client to avoid timing issues if node is disconnected from the test thread while admin client network thread is processing a metadata request.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Implementation to enable policy for Connector Client config overrides. This is
implemented per the KIP-458.
Reviewers: Randall Hauch <rhauch@gmail.com>
Added the incremental cooperative rebalancing in Connect to avoid global rebalances on all connectors and tasks with each new/changed/removed connector. This new protocol is backward compatible and will work with heterogeneous clusters that exist during a rolling upgrade, but once the clusters consist of new workers only some affected connectors and tasks will be rebalanced: connectors and tasks on existing nodes still in the cluster and not added/changed/removed will continue running while the affected connectors and tasks are rebalanced.
This commit attempted to minimize the changes to the existing V0 protocol logic, though that was not entirely possible.
This commit adds extensive unit and integration tests for both the old V0 protocol and the new v1 protocol. Soak testing has been performed multiple times to verify behavior while connectors and added, changed, and removed and while workers are added and removed from the cluster.
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <me@ewencp.org>, Robert Yokota <rayokota@gmail.com>, David Arthur <mumrah@gmail.com>, Ryanne Dolan <ryannedolan@gmail.com>
This fixes a regression caused by KAFKA-8275. The least loaded node selection
should take into account nodes which are currently being connect to. This
includes both the CONNECTING and CHECKING_API_VERSIONS states since
`canSendRequest` would return false in either case.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
SslFactory: split the part of SslFactory that creates SSLEngine instances into SslEngineBuilder. When (re)configuring, we simply create a new SslEngineBuilder. This allows us to make all the builder fields immutable. It also simplifies the logic for reconfiguring. Because we sometimes need to test old SslEngine instances against new ones, being able to use both the old and the new builder at once is useful.
Create an enum named SslClientAuth which encodes the possible values for ssl.client.auth. This will simplify the handling of this configuration.
SslTransportLayer#maybeProcessHandshakeFailure should treat an SSLHandshakeException with a "Received fatal alert" message as a handshake error (and therefore an authentication error.)
SslFactoryTest: add some line breaks for very long lines.
ConfigCommand#main: when terminating the command due to an uncaught exception, log the exception using debug level in slf4j, in addition to printing it to stderr. This makes it easier to debug failing junit tests, where stderr may not be kept, or may be reordered with respect to other slf4j messages. The use of debug level is consistent with how we handle other types of exceptions in ConfigCommand#main.
StateChangeLogMerger#main: spell out the full name of scala.io.Source rather than abbreviating it as io.Source. This makes it clearer that it is part of the Scala standard library. It also avoids compiler errors when other libraries whose groupId starts with "io" are used in the broker.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Enable reconfiguration of SSL keystores and truststores in client-side channel builders used by brokers for controller, transaction coordinator and replica fetchers. This enables brokers using TLS mutual authentication for inter-broker listener to use short-lived certs that may be updated before expiry without restarting brokers.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Updated AbstractConfig to be able to resolve variables in config values when the configuration includes config provider properties.
Author: Tejal Adsul <tejal@confluent.io>
Reviewers: Rajini Sivaram <rajinisivaram@gmail.com>, Randall Hauch <rhauch@gmail.com>
MINOR: update documentation for the log cleaner max compaction lag feature (KIP-354) implemented in KAFKA-7321
Author: Xiongqi Wu <xiowu@linkedin.com>
Reviewer: Joel Koshy <jjkoshy@gmail.com>
The main problem we are trying to solve here is the batching of StopReplica requests and the lack of test coverage for `ControllerChannelManager`. Addressing the first problem was straightforward, but the second problem required quite a bit of work because of the dependence on `KafkaController` for all of the events. It seemed to make sense to separate the events from the processing of events so that we could remove this dependence and improve testability. With the refactoring, I was able to add test cases covering most of the logic in `ControllerChannelManager` including the generation of requests and the expected response handling logic. Note that I have not actually changed any of the event handling logic in `KafkaController`.
While refactoring this logic, I found that the event queue time metric was not being correctly computed. The problem is that many of the controller events were singleton objects which inherited the `enqueueTimeMs` field from the `ControllerEvent` trait. This would never get updated, so queue time would be skewed.
Reviewers: Jun Rao <junrao@gmail.com>
Currently in maybeFailWithError, we always wrap the lastError as a KafkaException. For ProducerFencedException however, we should just throw the exception itself; however we throw a new instance since the previous book-kept one's call trace is not from this call, and hence could be confusing.
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@conflent.io>
KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)
Records become eligible for compaction after the specified time interval.
Author: Xiongqi Wu <xiowu@linkedin.com>
Reviewer: Joel Koshy <jjkoshy@gmail.com>
This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures.
Reviewers: Yishun Guan <gyishun@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
Because of how conversions between Java collections and Scala collections work, ImplicitLinkedHashMultiSet objects were being treated as unordered in some contexts where they shouldn't be. This broke JOIN_GROUP handling.
This patch renames ImplicitLinkedHashMultiSet to ImplicitLinkedHashMultCollection. The order of Collection objects will be preserved when converting to scala. Adding Set and List "views" to the Collection gives us a more elegant way of accessing that functionality when needed.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
* Fixed bug in Struct.equals where we returned prematurely and added tests
* Update RequestResponseTest to check that `equals` and `hashCode` of
the struct is the same after serialization/deserialization only when possible.
* Use `Objects.equals` and `Long.hashCode` to simplify code
* Removed deprecated usages of `JUnitTestSuite`
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not.
Reviewers: Jason Gustafson <jason@confluent.io>
KAFKA-7903: automatically generate OffsetCommitRequest (#6583) introduced a change that cause consumer breakage when OffsetCommitResponse versions < 3 are parsed, as they do not include a throttle_time_ms field. This PR fixes the parsing by supplying the correct version to the OffsetCommitResponse constructor in AbstractResponse.parseResponse.
I have tested this change against many of the compatibility system tests, and it has fixed all the failures that I have tested thus far.
Author: Lucas Bradstreet <lucas@confluent.io>
Reviewers: Gwen Shapira, Boyang Chen
Closes#6698 from lbradstreet/offset-commit-response-throttle-field
Part of KIP-345 effort. The strategy is to extract user passed in group.instance.id config and pass it in with given thread-id (because consumer is currently per-thread level).
Reviewers: Guozhang Wang <wangguoz@gmail.com>
If a node is currently throttled, we should take it out of the running for `leastLoadedNode`. Additionally, current logic seems to favor connecting to new nodes rather than using existing connections which have one or more in flight requests. The javadoc is slightly vague about whether this is expected, but it seems not.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This patch fixes a bug in the sending of transactional requests. We need to call `KafkaClient.send` with an updated current time. Failing to do so can result in an `IllegalStateExcepton` which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
When shutting down the ReplicaFetcher thread, we may fail to unregister sensors in selector.close(). When that happened, we will fail to start up the ReplicaFetcherThread with the same fetch id again because of the IllegalArgumentException in sensor registration. This issue will cause constant URPs in the cluster because the ReplicaFetchterThread is gone.
This patch addresses this issue by introducing a try-finally block in selector.close() so that we will always unregister the sensors in shutting down ReplicaFetcherThreads.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
`ConfigDef.embeddedValidator` should return an Anonymous Object instead of lambda so that we can have a useful `toString()` for methods such as `toRst`.
Reviewers: Jason Gustafson <jason@confluent.io>
In NetworkClient.leastLoadedNode, we invoke `isReady` to check if an established connection exists for the given node. `isReady` checks whether metadata needs to be updated also which wants to make metadata request first priority. However, if the to-be-sent request is metadata request, then we do not have to check this otherwise the loop in `leastLoadedNode` will do a complete iteration until the final node is selected.
Reviewers: Jason Gustafson <jason@confluent.io>
This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains:
* Add group.instance.id to be unique identifier for consumer instances, provided by end user;
Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability.
* Add client side support for incorporating static membership changes, including new config for group.instance.id, apply stream thread client id by default, and new join group exception handling.
* Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance.
* Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like:
6.1 Dynamic/Static member
6.2 Known/Unknown member id
6.3 Group stable/unstable
6.4 Leader/Follower
The rest of the 345 change will be broken down to 4 separate diffs:
* Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout.
* Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc.
* Admin client changes to add ability to batch remove static members
* Deprecate group.initial.rebalance.delay
Reviewers: Liquan Pei <liquanpei@gmail.com>, Stanislav Kozlovski <familyguyuser192@windowslive.com>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
In the KafkaProducer#close method we have a debug log statement Kafka producer has been closed then a few lines later a KafkaException can occur.
This could be confusing to users, so this PR simply moves the log statement to after the possible exception to avoid confusing information in the logs.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Streams previously flushed stores in the order of their registration, which is arbitrary. Because stores may forward values upon flush (as in cached state stores), we must flush stores in topological order.
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>