This patch adds the schemas of the new ConsumerGroupDescribe API (KIP-848) and adds an handler to the KafkaApis.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.
Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
Implementation of KIP-580 to add exponential back-off to situations in which retry.backoff.ms
is used to delay backoff attempts. This KIP adds exponential backoff behavior with a maximum
controlled by a new config retry.backoff.max.ms, together with a +/- 20% of jitter to spread the
retry attempts of the client fleet.
Reviewers: Mayank Shekhar Narula <mayanks.narula@gmail.com>, Milind Luthra <i.milind.luthra@gmail.com>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
Implementation of the OffsetRequestsManager, responsible for building requests and processing responses for requests related to partition offsets.
In this PR, the manager includes support for ListOffset requests, generated when the user makes any of the following consumer API calls:
beginningOffsets
endOffsets
offsetsForTimes
All previous consumer API calls interact with the OffsetsRequestsManager by generating a ListOffsetsApplicationEvent.
Includes tests to cover the new functionality and to ensure no API level changes are introduced.
This covers KAFKA-14965 and KAFKA-15081.
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
Summary
Implemented wakeup() mechanism using a WakeupTrigger class to store the pending wakeup item, and when wakeup() is invoked, it checks whether there's an active task or a wakeup task.
If there's an active task: the task will be completed exceptionally and the atomic reference will be freed up.
If there an wakedup task, which means wakeup() was invoked before a blocking call was issued. Therefore, the current task will be completed exceptionally immediately.
This PR also addressed minor issues such as:
Throwing WakeupException at the right place: As wakeups are thrown by completing an active future exceptionally. The WakeupException is wrapped inside of the ExecutionException.
mockConstruction is a thread-lock mock; therefore, we need to free up the reference before completing the test. Otherwise, other tests will continue using the thread-lock mock.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
As described in https://issues.apache.org/jira/browse/KAFKA-15353
When the AlterPartitionRequest version is < 3 and its builder.build is called multiple times, both newIsrWithEpochs and newIsr will all be empty. This can happen if the sender retires on errors.
Reviewers: Luke Chen <showuon@gmail.com>
This implementation introduces two new configurations `log.message.timestamp.before.max.ms` and `log.message.timestamp.after.max.ms` and deprecates `log.message.timestamp.difference.max.ms`.
The default value for all these three configs is maintained to be Long.MAX_VALUE for backward compatibility but with the newly added configurations we can have a finer control when validating message timestamps that are in the past and the future compared to the broker's timestamp.
To maintain backward compatibility if the default value of `log.message.timestamp.before.max.ms` is not changed, we are assuming users are still using the deprecated config `log.message.timestamp.difference.max.ms` and validation is done using its value. This ensures that existing customers who have customized the value of `log.message.timestamp.difference.max.ms` will continue to see no change in behavior.
Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
A race condition between async flush and segment rename (for deletion purpose) might cause the entire log directory to be marked offline when we delete a topic. This PR fixes the bug by ignoring NoSuchFileException when we flush a directory.
Reviewers: Divij Vaidya <diviv@amazon.com>
"The test RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay relies on the actual system clock, which makes it frequently fail. The fix adds a second constructor that allows for passing a ScheduledExecutorService to manually execute the scheduled tasks before refreshing. The fixed task is much more robust and stable.
Co-authored-by: Fei Xie <feixie@MacBook-Pro.attlocal.net>
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>
This PR main refactoring relates to :
1. serializers/deserializers used in clients - unified in a Deserializers class
2. logic for configuring ClusterResourceListeners moved to ClientUtils
3. misc refactoring of the new async consumer in preparation for upcoming Request Managers
Reviewers: Jun Rao <junrao@gmail.com>
Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>
In a non-empty log the KRaft leader only notifies the listener of leadership when it has read to the leader's epoch start offset. This guarantees that the leader epoch has been committed and that the listener has read all committed offsets/records.
Unfortunately, the KRaft leader doesn't do this when the log is empty. When the log is empty the listener is notified immediately when it has become leader. This makes the API inconsistent and harder to program against.
This change fixes that by having the KRaft leader wait for the listener's nextOffset to be greater than the leader's epochStartOffset before calling handleLeaderChange.
The RecordsBatchReader implementation is also changed to include control records. This makes it possible for the state machine learn about committed control records. This additional information can be used to compute the committed offset or for counting those bytes when determining when to snapshot the partition.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
As title, we discovered a flaky bug during testing that the commit request manager would seldomly throw a NOT_COORDINATOR exception, which means the request was routed to a non-coordinator node. We discovered that if we don't check the coordinator node in the commitRequestManager, the request manager will pass on an empty node to the NetworkClientDelegate, which implies the request can be sent to any node in the cluster. This behavior is incorrect as the commit requests need to be routed to a coordinator node.
Because the timing coordinator's discovery during integration testing isn't entirely deterministic; therefore, the test became extremely flaky. After fixing this: The coordinator node is mandatory before attempt to enqueue these commit request to the NetworkClient.
Reviewers: Jun Rao <junrao@gmail.com>
Move common code from the client implementations to the ClientUtils
class or (consumer) Utils class, where passible.
There are a number of places in the client code where the same basic
calls are made by more than one client implementation. Minor
refactoring will reduce the amount of boilerplate code necessary for
the client to construct its internal state.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
Discovered the committed() API timeout during the integration test. After investigation, this is because the future was not completed in the ApplicationEventProcessor. Also added toString methods to the event class for debug purposes.
Reviewers: Jun Rao <junrao@gmail.com>
* MINOR: Add test for describe topic with ID
Add a simple test to verify topic description with topic IDs.
Reviewers: Divij Vaidya <diviv@amazon.com>, dengziming <dengziming1993@gmail.com>
in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments because isBalanced have a trivial mistake, and result in rebalance timeout in some situation.
Co-authored-by: lixy <lixy@tuya.com>
Reviewers: Ritika Reddy <rreddy@confluent.io>, Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Guozhang Wang <wangguoz@gmail.com>
A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test.
Also fixed an accidental mistake in the committed API.
Reviewers: Jun Rao <junrao@gmail.com>
This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata.
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Reviewers: Jun Rao <junrao@gmail.com>
This patch does a few things:
1) It introduces version 9 of the OffsetCommit API. This new version has no schema changes but it can return a StaleMemberEpochException if the new consumer group protocol is used. Note the use of `"latestVersionUnstable": true` in the request schema. This means that this new version is not available yet unless activated.
2) It renames the `generationId` field in the request to `GenerationIdOrMemberEpoch`. This is backward compatible change.
3) It introduces the new StaleMemberEpochException error.
4) It does a minor refactoring in OffsetCommitRequest class.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch implements the existing JoinGroup protocol within the new group coordinator.
Some notable differences:
* Methods return a CoordinatorResult to the runtime framework, which includes records to append to the log as well as a future to complete after the append succeeds/fails.
* The coordinator runtime ensures that only a single thread will be processing a group at any given time, therefore there is no more locking on groups.
* Instead of using on purgatories, we rely on the Timer interface to schedule/cancel delayed operations.
Reviewers: David Jacot <djacot@confluent.io>
Introduced extra mapping to track verification state.
When verifying, there is a race condition that the add partitions verification response returns that the partition is in the ongoing transaction, but an abort marker is written before we get to append. Therefore, we track any given transaction we are verifying with an object unique to that transaction.
We check this unique state upon the first append to the log. After that, we can rely on currentTransactionFirstOffset. We remove the verification state on appending to the log with a transactional data record or marker.
We will also clean up lingering verification state entries via the producer state entry expiration mechanism. We do not update the the timestamp on retrying a verification for a transaction, so each entry must be verified before producer.id.expiration.ms.
There were a few other fixes:
- Moved the transaction manager handling for failed batch into the future completed exceptionally block to avoid processing it twice (this caused issues in unit tests)
- handle interrupted exceptions encountered when callback thread encountered them
- change handling to throw error if we try to set verification state and leaderLogIfLocal is None.
Reviewers: David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
Splitting partitions while setting auto.offset.reset to latest may cause message delivery loss, but users might not be aware about that since currently it isn't documented anywhere.
Reviewers: Luke Chen <showuon@gmail.com>
This is a follow up on the initial OffsetFetcher refactoring to extract reusable logic, needed for the new consumer implementation (initial refactoring merged with PR-13815).
Similar to the initial refactoring, this PR brings no changes to the existing logic, just extracting functions or pieces of logic.
There were no individual tests for the extracted functions, so no tests were migrated.
Reviewers: Jun Rao <junrao@gmail.com>
Use AdminApiDriver class to refresh the metadata and retry the request that failed with retriable errors.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Mickael Maison <mmaison@redhat.com>, Dimitar Dimitrov <30328539+dimitarndimitrov@users.noreply.github.com>
It's good for us to add support for Java 20 in preparation for Java 21 - the next LTS.
Given that Scala 2.12 support has been deprecated, a Scala 2.12 variant is not included.
Also remove some branch builds that add load to the CI, but have
low value: JDK 8 & Scala 2.13 (JDK 8 support has been deprecated),
JDK 11 & Scala 2.12 (Scala 2.12 support has been deprecated) and
JDK 17 & Scala 2.12 (Scala 2.12 support has been deprecated).
A newer version of Mockito (4.9.0 -> 4.11.0) is required for Java 20 support, but we
only use it with Scala 2.13+ since it causes compilation errors with Scala 2.12. Similarly,
we upgrade easymock when the Java version is 16 or newer as it's incompatible
with powermock (which doesn't support Java 16 or newer).
Filed KAFKA-15117 for a test that fails with Java 20 (SslTransportLayerTest.testValidEndpointIdentificationCN).
Finally, fixed some lossy conversions that were added after #13582 was submitted.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Poison the transaction manager if we detect an illegal transition in the Sender thread. A ThreadLocal in is stored in TransactionManager so that the Sender can inform TransactionManager which thread it's using.
Reviewers: Daniel Urban <durban@cloudera.com>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)
Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
When Kafka fails to perform an atomic file move the error is getting swallowed. Kafka should log these cases at least at WARN level.
Reviewers: Ron Dagostino <rndgstn@gmail.com>, Kirk True <kirk@kirktrue.pro>
Adding the following metrics as per kip-890:
VerificationTimeMs – number of milliseconds from adding partition info to the manager to the time the response is sent. This will include the round trip to the transaction coordinator if it is called. This will also account for verifications that fail before the coordinator is called.
VerificationFailureRate – rate of verifications that returned in failure either from the AddPartitionsToTxn response or through errors in the manager.
AddPartitionsToTxnVerification metrics – separating the verification request metrics from the typical add partitions ones similar to how fetch replication and fetch consumer metrics are separated.
Reviewers: Divij Vaidya <diviv@amazon.com>
RPCProducerIdManager initiates an async request to the controller to grab a block of producer IDs and then blocks waiting for a response from the controller.
This is done in the request handler threads while holding a global lock. This means that if many producers are requesting producer IDs and the controller is slow to respond, many threads can get stuck waiting for the lock.
This patch aims to:
* resolve the deadlock scenario mentioned above by not waiting for a new block and returning an error immediately
* remove synchronization usages in RpcProducerIdManager.generateProducerId()
* handle errors returned from generateProducerId() so that KafkaApis does not log unexpected errors
* confirm producers backoff before retrying
* introduce backoff if manager fails to process AllocateProducerIdsResponse
Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
An example of the warning:
> warning: [lossy-conversions] implicit cast from long to int in compound assignment is possibly lossy
There should be no change in behavior as part of these changes - runtime logic ensured
we didn't run into issues due to the lossy conversions.
Reviewers: Divij Vaidya <diviv@amazon.com>