Previously, checkpointed offsets for a log were only updated if the log was chosen for cleaning once the cleaning job completes. This caused issues in cases where logs with invalid checkpointed offsets would repeatedly emit warnings if the log with an invalid cleaning checkpoint wasn't chosen for cleaning.
Proposed fix is to update the checkpointed offset for logs with invalid checkpoints regardless of whether it gets chosen for cleaning.
Reviewers: Anna Povzner <anna@confluent.io>, Jun Rao <junrao@gmail.com>
This change is the client-side part of KIP-360. It identifies cases where it is safe to abort a transaction, bump the producer epoch, and allow the application to continue without closing the producer. In these cases, when KafkaProducer.abortTransaction() is called, the producer sends an InitProducerId following the transaction abort, which causes the producer epoch to be bumped. The application can then start a new transaction and continue processing.
For recoverable errors in the idempotent producer, the epoch is bumped locally. In-flight requests for partitions with an error are rewritten to reflect the new epoch, and in-flights of all other partitions are allowed to complete using the old epoch.
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
Test cases in `ConsumerPerformanceTest` were failing and causing a system exit rather than throwing the expected exception following #8023. We didn't catch this because the tests were marked as skipped and not failed.
Reviewers: Guozhang Wang <guozhang@confluent.io>
This change updates ConsoleProducer, ConsumerPerformance, VerifiableProducer, and VerifiableConsumer classes to add and prefer the --bootstrap-server flag for defining the connection point of the Kafka cluster. This change is part of KIP-499: https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch adds logic to the test case to ensure that consumer groups are in a valid state prior to attempting offset reset.
Reviewers: Jason Gustafson <jason@confluent.io>
This change mainly have 2 components:
1. extend the existing transactions_test.py to also try out new sendTxnOffsets(groupMetadata) API to make sure we are not introducing any regression or compatibility issue
a. We shrink the time window to 10 seconds for the txn timeout scheduler on broker so that we could trigger expiration earlier than later
2. create a completely new system test class called group_mode_transactions_test which is more complicated than the existing system test, as we are taking rebalance into consideration and using multiple partitions instead of one. For further breakdown:
a. The message count was done on partition level, instead of global as we need to visualize
the per partition order throughout the test. For this sake, we extend ConsoleConsumer to print out the data partition as well to help message copier interpret the per partition data.
b. The progress count includes the time for completing the pending txn offset expiration
c. More visibility and feature improvements on TransactionMessageCopier to better work under either standalone or group mode.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This PR speeds up the deletion process by doing the following:
- Batch whenever possible to minimize the number of requests sent out to other brokers;
- Refactor `onPartitionDeletion` to remove the usage of `allLiveReplicas`.
Reviewers: Jason Gustafson <jason@confluent.io>
The recent increase in the flakiness of one of the offset reset tests (KAFKA-9538) traces back to https://github.com/apache/kafka/pull/7941. After investigation, we found that following this patch, the consumer was sending an additional metadata request prior to performing the group assignment. This slight timing difference was enough to trigger the test failures. The problem turned out to be due to a bug in `SubscriptionState.groupSubscribe`, which no longer counted the local subscription when determining if there were new topics to fetch metadata for. Hence the extra metadata update. This patch restores the old logic.
Without the fix, we saw 30-50% test failures locally. With it, I could no longer reproduce the failure. However, #6561 is probably still needed to improve the resilience of this test.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
The client caches metadata fetched from Metadata requests. Previously, each metadata response overwrote all of the metadata from the previous one, so we could rely on the expectation that the broker only returned the leaderId for a partition if it had connection information available. This behavior changed with KIP-320 since having the leader epoch allows the client to filter out partition metadata which is known to be stale. However, because of this, we can no longer rely on the request-level guarantee of leader availability. There is no mechanism similar to the leader epoch to track the staleness of broker metadata, so we still overwrite all of the broker metadata from each response, which means that the partition metadata can get out of sync with the broker metadata in the client's cache. Hence it is no longer safe to validate inside the `Cluster` constructor that each leader has an associated `Node`
Fixing this issue was unfortunately not straightforward because the cache was built to maintain references to broker metadata through the `Node` object at the partition level. In order to keep the state consistent, each `Node` reference would need to be updated based on the new broker metadata. Instead of doing that, this patch changes the cache so that it is structured more closely with the Metadata response schema. Broker node information is maintained at the top level in a single collection and cached partition metadata only references the id of the broker. To accommodate this, we have removed `PartitionInfoAndEpoch` and we have altered `MetadataResponse.PartitionMetadata` to eliminate its `Node` references.
Note that one of the side benefits of the refactor here is that we virtually eliminate one of the hotspots in Metadata request handling in `MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`). The only reason this was expensive was because we had to build a new collection for the `Node` representations of each of the replica lists. This information was doomed to just get discarded on serialization, so the whole effort was wasteful. Now, we work with the lower level id lists and no copy of the replicas is needed (at least for all versions other than 0).
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
When a follower's fetch offset is behind the leader's log start offset, the
follower will do a full log truncation. When it does so, it must update both
its log start offset and high watermark. The previous code did the former,
but not the latter. Failure to update the high watermark in this case can lead
to out of range errors if the follower becomes leader before getting the latest
high watermark from the previous leader. The out of range errors occur when
we attempt to resolve the log position of the high watermark in DelayedFetch
in order to determine if a fetch is satisfied.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Also add support for flexible versions to both protocol types.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin Patrick McCabe <cmccabe@apache.org>
Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: Jason Gustafson <jason@confluent.io>
Refactors CheckpointFile such that buffers can be read in lieu of files. This
is a relatively simple refactoring as we already create a buffered reader over
the checkpoint file.
#6742, which improves the performance of the checkpointing code, requires
a similar refactoring of code, although it does go further than this.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
This PR implements the KIP-559: https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies
- it adds the Protocol Type and the Protocol Name fields in JoinGroup and SyncGroup API;
- it validates that the fields are provided by the client when the new version of the API is used and ensure that they are consistent. it errors out otherwise;
- it validates that the fields are consistent in the client and errors out otherwise;
- it adds many tests related to the API changes but also extends the testing coverage of the requests/responses themselves.
- it standardises the naming in the coordinator. now, `ProtocolType` and `ProtocolName` are used across the board in the coordinator instead of having a mix of protocol type, protocol name, subprotocol, protocol, etc.
Reviewers: Jason Gustafson <jason@confluent.io>
As the feature freeze approaches, we should support `2.5` as the inter.broker.protocol.version value. There are no new APIs so far, so `2.5` is effectively equivalent to `2.4`.
This PR implements `default.api.timeout.ms` as documented by KIP-533. This is a rebased version of #6913 with some additional test cases and small cleanups.
Reviewers: David Arthur <mumrah@gmail.com>
Co-authored-by: huxi <huxi_2b@hotmail.com>
Also: Improve error message, Add test, Minor code quality fixes
Verified that the test fails if the broker default for max message bytes is lower or higher than the currently set value.
Reviewers: Andrew Choi <andchoi@linkedin.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Currently, when a dynamic change is made to the broker-level default log configuration, existing log configs will be recreated with an empty overridden configs. In such case, when updating dynamic broker configs a second round, the topic-level configs are lost. This can cause unexpected data loss, for example, if the cleanup policy changes from "compact" to "delete."
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
This patch adds a new API to the producer to implement transactional offset commit fencing through the group coordinator as proposed in KIP-447. This PR mainly changes on the Producer end for compatible paths to old `sendOffsetsToTxn(offsets, groupId)` vs new `sendOffsetsToTxn(offsets, groupMetadata)`.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Since the leader epoch was not maintained in the fetch session cache, no validation would be done except for the initial (full) fetch request. This patch adds the leader epoch to the session cache and addresses the testing gaps.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin Patrick McCabe <cmccabe@apache.org>
The `KafkaController::replicasAreValid` method currently returns a
boolean indicating if replicas are valid or not. But the failure
condition loses any context on why replicas are not valid. This change
updates the metod to return the error conition if validation fails. This
allows caller to report the error to the client.
The change also renames the `replicasAreValid` method to
`validateReplicas` to reflect updated semantics.
Reviewers: Sean Li <seanli-rallyhealth@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Fixed an intermittent failure in the test that was caused by a mocked instance not handling expandIsr. Also updated the test to use the offset from the batch to trigger expandIsr more often. With this change, the test may try to acquire write lock while appends are blocked on read lock, so separated out the test using write lock to make it safer.
Reviewers: Jason Gustafson <jason@confluent.io>
Removes references to the old scala Acl classes from kafka.security.auth (Acl, Operation, ResourceType, Resource etc.) and replaces these with the Java API. Only the old SimpleAclAuthorizer, AuthorizerWrapper used to wrap legacy authorizer instances and tests using SimpleAclAuthorizer continue to use the old API. Deprecates the old scala API.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
During a reassignment, it can happen that the current leader of a partition is demoted and removed from the replica set at the same time. In this case, we rely on the StopReplica request in order to stop replica fetchers and to clear the group coordinator cache. This patch adds similar logic to ensure that the transaction coordinator state cache also gets cleared.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
To be able to correctly fence zombie producer txn commit, we propose to add (member.id, group.instance.id, generation) into the transaction commit protocol to raise the same level of correctness guarantee as consumer commit.
Major changes involve:
1. Upgrade transaction commit protocol with (member.id, group.instance.id, generation). The client will fail if the broker is not supporting the new protocol.
2. Refactor group coordinator logic to handle new txn commit errors such as FENCED_INSTANCE_ID, UNKNOWN_MEMBER_ID and ILLEGAL_GENERATION. We loose the check on transaction commit when the member.id is set to empty. This is because the member.id check is an add-on safety for producer commit, and we also need to consider backward compatibility for old producer clients without member.id information. And if producer equips with group.instance.id, then it must provide a valid member.id (not empty definitely), the same as a consumer commit.
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This fix makes the LogCleaner tolerant of gaps in the offset sequence. Previously, this could lead to endless loops of cleaning which required manual intervention.
Reviewers: Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
Check for ISR updates using ISR read lock and acquire ISR write lock only if ISR needs to be updated. This avoids lock contention between request handler threads processing log appends on the leader holding the ISR read lock and request handler threads processing replica fetch requests that check/update ISR.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Let consumer back-off and retry offset fetch when the specific offset topic has pending commits.
The major change lies in the broker side offset fetch logic, where a request configured with flag WaitTransaction to true will be required to back-off when some pending transactional commit is ongoing. This prevents any ongoing transaction being modified by third party, thus guaranteeing the correctness with input partition writer shuffling.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Prior to this commit, broker's Selector used to read all requests available on the socket when the socket is ready for read. These are queued up as staged receives. This can result in excessive memory usage when socket read buffer size is large. We mute the channel and stop reading any more data until all the staged requests are processed. This behaviour is slightly inconsistent since for the initial read we drain the socket buffer, allowing it to get filled up again, but if data arrives slighly after the initial read, then we dont read from the socket buffer until pending requests are processed.
To avoid holding onto requests for longer than required, this commit removes staged receives and reads one request at a time even if more data is available in the socket buffer. This is especially useful for produce requests which may be large and may take long to process. Additional data read from the socket for SSL is limited to the pre-allocated intermediate SSL buffers.
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
When `kafka-topics.sh` is used with the --zookeeper option, it prints a confirmation message. This patch adds the same message when --bootstrap-server is used.
Reviewers: Jason Gustafson <jason@confluent.io>
Changed `CreatePartitionMetadata` to not include partition assignment
information since this is not needed to generate a
`CreatePartitionResponse` message.
Reviewers: Vikas Singh <vikas@confluent.io>, Jason Gustafson <jason@confluent.io>
The method updatePartitionReplicaAssignment allows the user to directly
modify the replicas assigned to a partition without also modifying the
addingReplicas and removingReplicas fields. This is not a safe operation
for all inputs. Clients should instead use updatePartitionFullReplicaAssignment.
Reviewers: Vikas Singh <vikas@confluent.io>, Jason Gustafson <jason@confluent.io>
The test blocks requests threads while sending the ACL update requests and occasionally hits request timeout. Updated the test to tolerate timeouts and retry the request for that case. Added an additional check to verify that the requests threads are unblocked when the semaphore is released, ensuring that the timeout is not due to blocked threads.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Not wait until updateAssignmentMetadataIfNeeded returns true, but only call it once with 0 timeout. Also do not return empty if in rebalance.
Trim the pre-fetched records after long polling since assignment may have been changed.
Also need to update SubscriptionState to retain the state in assignFromSubscribed if it already exists (similar to assignFromUser), so that we do not need the transition of INITIALIZING to FETCHING.
Unit test: this actually took me the most time :)
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Sophie Blee-Goldman <sophie@confluent.io>, Jason Gustafson <jason@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, dengziming <dengziming1993@gmail.com>
Existing producer state expiration uses timestamps from data records only and not from transaction markers. This can cause premature producer expiration when the coordinator times out a transaction because we drop the state from existing batches. This in turn can allow the coordinator epoch to revert to a previous value, which can lead to validation failures during log recovery. This patch fixes the problem by also leveraging the timestamp from transaction markers.
We also change the validation logic so that coordinator epoch is verified only for new marker appends. When replicating from the leader and when recovering the log, we only log a warning if we notice that the coordinator epoch has gone backwards. This allows recovery from previous occurrences of this bug.
Finally, this patch fixes one minor issue when loading producer state from the snapshot file. When the only record for a given producer is a control record, the "last offset" field will be set to -1 in the snapshot. We should check for this case when loading to be sure we recover the state consistently.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>