This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.
During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".
To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.
This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This PR for KIP-714 - KAFKA-1564 lays out interfaces and classes for capturing client telemetry metrics.
Below image defines interaction of different classes among them interfaces have been included in the PR.
Reviewers: Walker Carlson <wcarlson@apache.org>, Matthias J. Sax <matthias@confluent.io>, Andrew Schofield <andrew_schofield@uk.ibm.com>, Kirk True <ktrue@confluent.io>, Philip Nee <pnee@confluent.io>, Jun Rao <junrao@gmail.com>,
Using `SecureRandom.getInstanceStrong()` results in using `/dev/random` which is known to block in Linux when the OS runs low on entropy. This was noticable when running tests in containerised CI environments.
This commit avoids using a CSPRNG altogether since the tests do not need cryptographically secure random numbers.
Reviewers: Divij Vaidya <diviv@amazon.com>, Igor Soarez <soarez@apple.com>
---------
Co-authored-by: Igor Soarez <soarez@apple.com>
Changes:
1. Introduces FetchRequestManager that implements the RequestManager
API for fetching messages from brokers. Unlike Fetcher, record
decompression and deserialization is performed on the application
thread inside CompletedFetch.
2. Restructured the code so that objects owned by the background thread
are not instantiated until the background thread runs (via Supplier)
to ensure that there are no references available to the
application thread.
3. Ensuring resources are properly using Closeable and using
IdempotentCloser to ensure they're only closed once.
4. Introduces ConsumerTestBuilder to reduce a lot of inconsistency in
the way the objects were built up for tests.
Reviewers: Philip Nee <pnee@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao<junrao@gmail.com>
- Introduce a new internal config flag to enable processing threads
- If enabled, create a scheduling task manager inside the normal task manager (renamings will be added on top of this), and use it from the stream thread
- All operations inside the task manager that change task state, lock the corresponding tasks if processing threads are enabled.
- Adds a new abstract class AbstractPartitionGroup. We can modify the underlying implementation depending on the synchronization requirements. PartitionGroup is the unsynchronized subclass that is going to be used by the original code path. The processing thread code path uses a trivially synchronized SynchronizedPartitionGroup that uses object monitors. Further down the road, there is the opportunity to implement a weakly synchronized alternative. The details are complex, but since the implementation is essentially a queue + some other things, it should be feasible to implement this lock-free.
- Refactorings in StreamThreadTest: Make all tests use the thread member variable and add tearDown in order avoid thread leaks and simplify debugging. Make the test parameterized on two internal flags: state updater enabled and processing threads enabled. Use JUnit's assume to disable all tests that do not apply.
Enable some integration tests with processing threads enabled.
Reviewer: Bruno Cadonna <bruno@confluent.io>
This patch adds support for OffsetFetch version 9 in the admin client. It mainly allows handling two new error codes `STALE_MEMBER_EPOCH` and `UNKNOWN_MEMBER_ID` introduced as part of KIP-848.
Reviewers: David Jacot <djacot@confluent.io>
Currently, we aren't able to access the request completion time if the request is completed exceptionally, which results in many system calls. This is not ideal because these system calls can add up. Instead, time is already retrieved on the top of the background thread event loop, which is then propagated into the NetworkClientDelegate.poll.
In this PR - I store the completion time in the handler, so that it becomes accessible in the callbacks.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The PR includes:
* Added a new class of CleanShutdownFile which helps write and read from a clean shutdown file.
* Updated the BrokerRegistration API.
* Client side handling for the broker epoch.
* Minimum work on the controller side.
Reviewers: Jun Rao <junrao@gmail.com>
This patch adds support for OffsetCommit version 9 in the admin client. It mainly allows handling two new error codes `STALE_MEMBER_EPOCH` and `GROUP_ID_NOT_FOUND ` introduced as part of KIP-848.
Reviewers: David Jacot <djacot@confluent.io>
Part of KIP-714.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Part of KIP-714.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This patch includes:
- target assignment changes : accepting only one at a time according to the updated protocol.
- changes for error handling, leaving responsibility in the heartbeatManager and exposing only the functionality for when the state needs to be updated (on successful HB, on fencing, on fatal failure)
- allow transitions for failures when joining
- tests & minor improvements/fixes addressing initial version review
Reviewers: Kirk True <ktrue@confluent.io>, Philip Nee <pnee@confluent.io>, David Jacot <djacot@confluent.io>
A few notes:
* Delete a few methods from `UnifiedLog` that were simply invoking the related method in `LogFileUtils`
* Fix `CoreUtils.swallow` to use the passed in `logging`
* Fix `LogCleanerParameterizedIntegrationTest` to close `log` before reopening
* Minor tweaks in `LogSegment` for readability
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
The current KafkaConsumer offsetsForTimes fails with IllegalArgumentException if negative target timestamps are provided as arguments. This change includes the same validation and tests for the new consumer implementation (and some improved comments for the updateFetchPositions)
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.
Reviewers: Divij Vaidya <diviv@amazon.com>
AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down.
Reviewers: Luke Chen <showuon@gmail.com>, Kirk True <kirk@kirktrue.pro>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, vamossagar12 <sagarmeansocean@gmail.com>
HeartbeatRequestManager is responsible for handling the ConsumerGroupHeartbeat request and response. The manager has the following responsibilities:
1. Sending the request to the GroupCoordinator when it is possible and necessary
2. Handling the response and update the `MembershipManagerImpl` based on the error/response it receives.
3. Handles request retries and fatal failures
For Successful heartbeat response:
- Updates the MembershipManager
For Failures handling:
- Retriables Errors: backoff and retries
- Fenced: Transition to a fenced state and reset the epoch, and retry in the next poll
- Fatal: Propagate the error to the user and fail the state machine
Reviewers: Kirk True <ktrue@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
When producer-batch is being retried, new-leader is known for the partition Vs the leader used in last attempt, then it is worthwhile to retry immediately to this new leader. A partition-leader is considered to be newer, if the epoch has advanced.
Reviewers: Walker Carlson <wcarlson@apache.org>, Kirk True <kirk@kirktrue.pro>, Andrew Schofield <andrew_schofield@uk.ibm.com
Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that provides more clarity. Since static members also have a magic number (-2), this will motivate future commits to use constants instead of hardcoded values.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch implements DeleteGroups and OffsetDelete API in the new group coordinator.
Reviewers: yangy0000, Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
Previous commits left out TxnOffsetCommits which go through the group coordinator (not directly from the producer).
I've wired up the methods to include the transactional id and state partition to do the verification.
I've also updated UnifiedLog to verify on client and coordinator requests that are transactional.
I've not updated any sequence check logic since the sequence is always 0 on group coordinator initiated writes.
Added returned errors to Response files. Both InvalidPidMapping and InvalidTxnState will be returned and be fatal for the transactional OffsetCommit requests.
Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
As a first step, we plan to release a preview of the new consumer group rebalance protocol without the client side assignor. This patch removes all the related fields from the ConsumerGroupHeartbeat API for now. Removing fields is fine here because this API is not released yet and not exposed by default. We will add them back while bumping the version of the request when we release this part in the future.
Reviewers: Justine Olshan <jolshan@confluent.io>
Currently, when consumer startup, there will be a log message said:
[2023-08-11 15:47:17,713] INFO [Consumer clientId=console-consumer, groupId=console-consumer-28605] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
It confused users and make them think there's something wrong in the consumer application. Since we already log need to re-join with the given member-id logs in the joinGroupResponseHandler and already requestRejoined. So, we can skip this confusion logs.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Paolo Patierno <ppatiern@redhat.com>, vamossagar12 <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch fixes the version of the `AuthorizedOperations` field. The schema is not used yet so the bug had no impact.
Reviewers: Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
Add support for --bootstrap-controller in the following command-line tools:
- kafka-cluster.sh
- kafka-configs.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
To implement this, the following AdminClient APIs now support the new bootstrap.controllers
configuration:
- Admin.alterConfigs
- Admin.describeCluster
- Admin.describeConfigs
- Admin.describeFeatures
- Admin.describeMetadataQuorum
- Admin.incrementalAlterConfigs
- Admin.updateFeatures
Command-line tool changes:
- Add CommandLineUtils.initializeBootstrapProperties to handle parsing --bootstrap-controller
in addition to --bootstrap-server.
- Add --bootstrap-controller to ConfigCommand.scala, ClusterTool.java, FeatureCommand.java, and
MetadataQuorumCommand.java.
KafkaAdminClient changes:
- Add the AdminBootstrapAddresses class to handle extracting bootstrap.servers or
bootstrap.controllers from the config map for KafkaAdminClient.
- In AdminMetadataManager, store the new usingBootstrapControllers boolean. Generalize
authException to encompass the concept of fatal exceptions in general. (For example, the
fatal exception where we talked to the wrong node type.) Treat
MismatchedEndpointTypeException and UnsupportedEndpointTypeException as fatal exceptions.
- Extend NodeProvider to include information about whether bootstrap.controllers is supported.
- Modify the APIs described above to support bootstrap.controllers.
Server-side changes:
- Support DescribeConfigsRequest on kcontrollers.
- Add KRaftMetadataCache to the kcontroller to simplify implemeting describeConfigs (and
probably more APIs in the future). It's mainly a wrapper around MetadataImage, so there is
essentially no extra resource consumption.
- Split RuntimeLoggerManager out of ConfigAdminManager to handle the incrementalAlterConfigs
support for BROKER_LOGGER. This is now supported on kcontrollers as well as brokers.
- Fix bug in AuthHelper.computeDescribeClusterResponse that resulted in us always sending back
BROKER as the endpoint type, even on the kcontroller.
Miscellaneous:
- Fix a few places in exceptions and log messages where we wrote "broker" instead of "node".
For example, an exception in NodeApiVersions.java, and a log message in NetworkClient.java.
- Fix the slf4j log prefix used by KafkaRequestHandler logging so that request handlers on a
controller don't look like they're on a broker.
- Make the FinalizedVersionRange constructor public for the sake of a junit test.
- Add unit and integration tests for the above.
Reviewers: David Arthur <mumrah@gmail.com>, Doguscan Namal <namal.doguscan@gmail.com>
LogCaptureAppender sets the log level in various tests to check if a certain log message is produced. The log level is however never reverted, changing the log level across the board and introducing flakiness due to non-determinism since the log level depends on execution order. Some log messages change the timing inside tests significantly.
Reviewer: Bruno Cadonna <cadonna@apache.org>
We have observed an issue where inter broker SSL listener is not coming up when running with TLSv3/JDK 17 .
SSL debug logs shows that TLSv3 post handshake messages >16K are not getting read and causing SslEngineValidator process to stuck while validating the provided trust/key store.
- Right now, WRAP returns if there is already data in the buffer. But if we need more data to be wrapped for UNWRAP to succeed, we end up looping forever. To fix this, now we always attempt WRAP and only return early on BUFFER_OVERFLOW.
- Update SslEngineValidator to unwrap post-handshake messages from peer when local handshake status is FINISHED.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
We are currently encoding an empty hostNames array to subjectAltName in the keystore. While parsing the certificates in the test this causes the issue - Unparseable SubjectAlternativeName extension due to java.io.IOException: No data available in passed DER encoded value. Up to Java 17, this parsing error was ignored. This PR assigns subjectAltName to null if hostnames are empty.
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
* Update CI to build with Java 21 instead of Java 20
* Disable spotbugs when building with Java 21 as it doesn't support it yet (filed KAFKA-15492 for
addressing this)
* Disable SslTransportLayerTest.testValidEndpointIdentificationCN with Java 21 (same as Java 20)
Reviewers: Divij Vaidya <diviv@amazon.com>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
TopicMetadataRequestManager is responsible for sending topic metadata requests. The manager manages API requests and build the request accordingly. All topic metadata requests are chained, if requesting the same topic, to avoid sending requests repeatedly.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
Support for using committed offsets to update fetch positions.
This PR includes:
* movingrefreshCommittedOffsets function out of the existing ConsumerCoordinator so it can be reused (no code changes)
* using the above refreshCommittedOffsets for updating fetch positions if the consumer has enabled the Kafka-based offsets management by defining a groupId
Reviewers: Jun Rao <junrao@gmail.com>
This continues the work of providing the groundwork for the fetch
refactoring work by introducing some new classes and refactoring the
existing code to use the new classes where applicable.
Changes:
* Minor clean up of the events classes to make data immutable,
private, and implement toString().
* Added IdempotentCloser which prevents a resource from being closed
more than once. It's general enough that it could be used elsewhere
in the project, but it's limited to the consumer internals for now.
* Split core Fetcher code into classes to buffer raw results
(FetchBuffer) and to collect raw results into ConsumerRecords
(FetchCollector). These can be tested and changed in isolation from
the core fetcher logic.
* Added NodeStatusDetector which abstracts methods from
ConsumerNetworkClient so that it and NetworkClientDelegate can be
used in AbstractFetch via the interface instead of using
ConsumerNetworkClient directly.
Reviewers: Jun Rao <junrao@gmail.com>
This contribution extends the TrustManager created by the DefaultSSLEngineFactory class with code that checks for any invalid certificate whether it is just expired but valid otherwise. If this is the case, it extracts the common name and logs it. Apart from that, the trust manager will behave exactly as the default one.
Extensive unit tests are included in this pull request for ensuring that the modified trust manager will behave exactly as the default one, except for logging expired certificates common name. The test code generates several certificate chains with valid, invalid and expired end certificates, root CAs and even intermediate CAs.
This contribution is my original work and I license the work to the project under the project's open source license.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
When using kafka consumer in apache pinot , we did see couple of WARN as we are trying to create kafka consumer class with the same name . We currently have to use a added suffix to create a new mBean as each new kafka consumer in same process creates a mBean . Adding support here to skip creation of mBean if its already existing
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
Implementation of the required functionality for resetting and validating positions in the new async consumer.
This PR includes:
1. New async application events ResetPositionsApplicationEvent and ValidatePositionsApplicationEvent, both handled by the same OfffsetsRequestManager.
2. Integration of the reset/validate functionality in the new async consumer, to update fetch positions using the partitions offsets.
3. Minor refactoring to extract functionality that is reused from both consumer implementations (moving logic without changes from OffsetFetcher into OffsetFetchUtils, and from OffsetsForLeaderEpochClient into OffsetsForLeaderEpochUtils)
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
Initial definition of the core components for maintaining group membership on the client following the new consumer group protocol.
This PR includes:
- Membership management for keeping member state and assignment, based on the heartbeat responses received.
- Assignor selection logic to support server side assignors.
This only includes the basic initial states and transitions, to be extended as we implement the protocol.
This is intended to be used from the heartbeat and assignment requests manager that actually build and process the heartbeat and assignment related requests.
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.
Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.
In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.
The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest
On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.
QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
This restores previous behavior for Admin::listOffsets, which was to fail immediately if topic metadata could not be found, and only retry if metadata for one or more specific partitions could not be found.
There is a subtle difference here: prior to https://github.com/apache/kafka/pull/13432, the operation would be retried if any metadata error was reported for any individual topic partition, even if an error was also reported for the entire topic. With this change, the operation always fails if an error is reported for the entire topic, even if an error is also reported for one or more individual topic partitions.
I am not aware of any cases where brokers might return both topic- and topic partition-level errors for a metadata request, and if there are none, then this change should be safe. However, if there are such cases, we may need to refine this PR to remove the discrepancy in behavior.
Reviewers: Justine Olshan <jolshan@confluent.io>