* KAFKA-14365: Extract common logic from Fetcher
Extract logic from Fetcher into AbstractFetcher.
Also introduce FetchConfig as a more concise way to delineate state from
incoming configuration.
Formalized the defaults in CommonClientConfigs and ConsumerConfig to be
accessible elsewhere.
* Removed overridden methods in favor of synchronizing where needed
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.
Reviewers: David Jacot <djacot@confluent.io>
This commit refactors AbstractStickyAssignor without changing any logic to make it easier to add rack-awareness. The class currently consists of a lot of collections that are passed around various methods, with some methods updating some collections. Addition of rack-awareness makes this class with very large methods even more complex and harder to read. The new code moves the two assignment methods into their own classes so that the state can be maintained as instance fields rather than local variables.
Reviewers: David Jacot <djacot@confluent.io>
The Fetcher class is used internally by the KafkaConsumer to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored Fetcher.
This task includes refactoring Fetcher by extracting out the inner classes into top-level (though still in internal) so that those classes can be referenced by forthcoming refactored fetch logic.
Reviewers: Philip Nee <philipnee@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Minor changes to `Sender` and `NetworkClient` so that we can log timeouts during `ProduceRequest` with a more precise error message, denoting a timeout vs. "generic" network error.
Reviewers: Philip Nee <pnee@confluent.io>, Guozhang Wang <guozhang@apache.org>, David Jacot <djacot@confluent.io>
A binary value (array of bytes) can be a BinaryNode or a TextNode. When it is a BinaryNode, the method binaryValue() always returns non-null. When it is a TextNode, the method binaryValue() will return non-null if the value is a base64 string. For all other JSON nodes binaryValue() returns null.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
The goal of this PR is to add more tests to the PrototypeAsyncConsumer to test
* Successful startup and shutdown.
* Commit.
I also added integration tests:
* Test commitAsync()
* Test commitSync()
Note that I still need to implement committed() to test if commitSync() has been successfully committed.
Additional things:
Change KafkaConsumer<K, V> to Consumer<K, V> to use different implementations
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Guozhang Wang <wangguoz@gmail.com>
Part 1 of KIP-890
I've updated the API spec and related classes.
Clients should only be able to send up to version 3 requests and that is enforced by using a client builder.
Requests > 4 only require cluster permissions as they are initiated from other brokers. API version 4 is marked as unstable for now.
I've added tests for the batched requests and for the verifyOnly mode.
Also -- minor change to the KafkaApis method to properly match the request name.
Reviewers: Jason Gustafson <jason@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, David Jacot <djacot@confluent.io>
To avoid mistakes during dynamic broker config updates that could potentially affect clients, we restrict changes that can be performed dynamically without broker restart. For broker keystore updates, we require the DN to be the same for the old and new certificates since this could potentially contain host names used for host name verification by clients. DNs are compared using standard Java implementation of X500Principal.equals() which compares canonical names. If tags of fields change from one with a printable string representation and one without or vice-versa, canonical name check fails even if the actual name is the same since canonical representation converts to hex for some tags only. We can relax the verification to allow dynamic updates in this case by enabling dynamic update if either the canonical name or the RFC2253 string representation of the DN matches.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kalpesh Patel <kpatel@confluent.io>
This commit adds support to store the SCRAM credentials in a cluster with KRaft quorum servers and
no ZK cluster backing the metadata. This includes creating ScramControlManager in the controller,
and adding support for SCRAM to MetadataImage and MetadataDelta.
Change UserScramCredentialRecord to contain only a single tuple (name, mechanism, salt, pw, iter)
rather than a mapping between name and a list. This will avoid creating an excessively large record
if a single user has many entries. Because record ID 11 (UserScramCredentialRecord) has not been
used before, this is a compatible change. SCRAM will be supported in 3.5-IV0 and later.
This commit does not include KIP-900 SCRAM bootstrapping support, or updating the credential cache
on the controller (as opposed to broker). We will implement these in follow-on commits.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
throwing an exception directly form the foreground thread's callers when the abnormal exit of the heartbeat thread
Reviewers: Luke Chen <showuon@gmail.com>, Philip Nee <philipnee@gmail.com>
Best-effort rack alignment for range assignor when both consumer racks and partition racks are available with the protocol changes introduced in KIP-881. Rack-aware assignment is enabled by configuring client.rack for consumers. Balanced assignment per topic is prioritized over rack-alignment. For topics with equal partitions and the same set of subscribers, co-partitioning is prioritized over rack-alignment.
Reviewers: David Jacot <djacot@confluent.io>
In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This pull request introduces a CommitRequestManager to efficiently manage commit requests from clients and the autocommit state. The manager utilizes a "staged" commit queue to store commit requests made by clients. A background thread regularly polls the CommitRequestManager, which then checks the queue for any outstanding commit requests. When permitted, the CommitRequestManager generates a PollResult which contains a list of UnsentRequests that are subsequently processed by the NetworkClientDelegate.
In addition, a RequestManagerRegistry has been implemented to hold all request managers, including the new CommitRequestManager and the CoordinatorRequestManager. The registry is regularly polled by a background thread in each event loop, ensuring that all request managers are kept up to date and able to handle incoming requests
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
While refactoring the OffsetFetch handling in KafkaApis, we introduced a NullPointerException (NPE). The NPE arises when the FetchOffset API is called with a client using a version older than version 8 and using null for the topics to signal that all topic-partition offsets must be returned. This means that this bug mainly impacts admin tools. The consumer does not use null.
This NPE is here: 24a86423e9 (diff-0f2f19fd03e2fc5aa9618c607b432ea72e5aaa53866f07444269f38cb537f3feR237).
We missed this during the refactor because we had no tests in place to test this mode.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
After reading data of type BYTES, COMPACT_BYTES, NULLABLE_BYTES or COMPACT_NULLABLE_BYTES returned ByteBuffer might have a capacity that is larger than its limit, thus these data types may access data that lies beyond its size by increasing limit of the returned ByteBuffer. I guess this is not very critical but I think it would be good to restrict increasing limit of the returned ByteBuffer by making its capacity strictly equal to its limit. I think someone might unintentionally mishandle these data types and accidentally mess up data in the ByteBuffer from which they were read.
Reviewers: Luke Chen <showuon@gmail.com>
The `Fetcher` class is used internally by the `KafkaConsumer` to fetch records from the brokers. There is [ongoing work to create a new consumer implementation with a significantly refactored threading model](https://issues.apache.org/jira/browse/KAFKA-14246). The threading refactor work requires a similarly refactored `Fetcher`.
This task covers the work to extract from `Fetcher` the APIs that are related to metadata operations into two new classes: `OffsetFetcher` and `TopicMetadataFetcher`. This will allow the refactoring of `Fetcher` and `MetadataFetcher` for the new consumer.
Reviewers: Philip Nee <pnee@confluent.io>, Guozhang Wang <guozhang@apache.org>, Jason Gustafson <jason@confluent.io>
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
Extend the implementation of ProcessTerminatingFaultHandler to support calling either Exit.halt or Exit.exit. Change the fault handler used by the Controller thread and the KRaft thread to use a halting fault handler.
Those threads cannot call Exit.exit because Runtime.exit joins on the default shutdown hook thread. The shutdown hook thread joins on the controller and kraft thread terminating. This causes a deadlock.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
Warnings about unused configs are most often spurious. This patch changes the current warning to an info message.
Reviewers: Chris Egerton <chrise@aiven.io>, Jason Gustafson <jason@confluent.io>
We currently cache login managers in static maps for both static JAAS config using system property and for JAAS config specified using Kafka config sasl.jaas.config. In addition to the JAAS config, the login manager callback handler is included in the key, but all other configs are ignored. This implementation is based on the assumption clients that require different logins (e.g. username/password) use different JAAS configs, because login properties are included in the JAAS config rather than as separate top-level configs. The OIDC support added in KIP-768 only allows configuration of token endpoint URL as a top-level config. This results in two clients in a JVM configured with different token endpoint URLs to incorrectly share a login.
This PR includes all SASL configs prefixed with sasl. to be included in the key so that logins are only shared if all the sasl configs are identical.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kirk True <kirk@mustardgrain.com>
## Problem
When consumer is closed, fetch sessions associated with the consumer should notify the server about it's intention to close using a Fetch call with epoch = -1 (identified by `FINAL_EPOCH` in `FetchMetadata.java`). However, we are not sending this final fetch request in the current flow which leads to unnecessary fetch sessions on the server which are closed only after timeout.
## Changes
1. Change `close()` in `Fetcher` to add a logic to send the final Fetch request notifying close to the server.
2. Change `close()` in `Consumer` to respect the timeout duration passed to it. Prior to this change, the timeout parameter was being ignored.
3. Change tests to close with `Duration.zero` to reduce the execution time of the tests. Otherwise the tests will wait for default timeout to exit (close() in the tests is expected to be unsuccessful because there is no server to send the request to).
4. Distinguish between the case of "close existing session and create new session" and "close existing session" by renaming the `nextCloseExisting` function to `nextCloseExistingAttemptNew`.
## Testing
Added unit test which validates that the correct close request is sent to the server.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Kirk True <kirk@mustardgrain.com>, Philip Nee <philipnee@gmail.com>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
This patch does a few things:
1) It introduces a new flag to the request spec: `latestVersionUnstable`. It signifies that the last version of the API is considered unstable (or still in development). As such, the last API version is not exposed by the server unless specified otherwise with the new internal `unstable.api.versions.enable`. This allows us to commit new APIs which are still in development.
3) It adds the ConsumerGroupHeartbeat API, part of KIP-848, and marks it as unreleased for now.
4) It adds the new error codes required by the new ConsumerGroupHeartbeat API.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
When running junit tests, it is not good to block forever on CompletableFuture objects. When there
are bugs, this can lead to junit tests hanging forever. Jenkins does not deal with this well -- it
often brings down the whole multi-hour test run. Therefore, when running integration tests in
JUnit, set some reasonable time limits on broker and controller startup time.
Reviewers: Jason Gustafson <jason@confluent.io>
Removes logging of the HTTP response directly in all known cases to prevent potentially logging access tokens.
Reviewers: Sushant Mahajan <sushant.mahajan88@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
This patch contains a few cleanups and fixes in the new refactored consumer logic:
- Use `CompletableFuture` instead of `RequestFuture` in `NetworkClientDelegate`. This is a much more extensible API and it avoids tying the new implementation to `ConsumerNetworkClient`.
- Fix call to `isReady` in `NetworkClientDelegate`. We need the call to `ready` to initiate the connection.
- Ensure backoff is enforced even after successful `FindCoordinator` requests. This avoids a tight loop while metadata is converging after a coordinator change.
- `RequestState` was incorrectly using the reconnect backoff as the retry backoff. In fact, we don't currently have a retry backoff max implemented in the consumer, so the use of `ExponentialBackoff` is unnecessary, but I've left it since we may add this with https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients.
- Minor cleanups in test cases to avoid unused classes/fields.
Reviewers: Philip Nee <pnee@confluent.i>, Guozhang Wang <guozhang@apache.org>
The current documentation indicates two positions are tracked, but these positions were removed a few years ago. Now we use a single position to track the last consumed record. Updated the documentation to reflect to the current state.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This test is supposed to be a sanity check that rebalancing with a large number of partitions/consumers won't start to take obscenely long or approach the max.poll.interval.ms -- bumping up the timeout by another 30s still feels very reasonable considering the test is for 1 million partitions
Reviewers: Matthias J. Sax <mjsax@apache.org>