The client caches metadata fetched from Metadata requests. Previously, each metadata response overwrote all of the metadata from the previous one, so we could rely on the expectation that the broker only returned the leaderId for a partition if it had connection information available. This behavior changed with KIP-320 since having the leader epoch allows the client to filter out partition metadata which is known to be stale. However, because of this, we can no longer rely on the request-level guarantee of leader availability. There is no mechanism similar to the leader epoch to track the staleness of broker metadata, so we still overwrite all of the broker metadata from each response, which means that the partition metadata can get out of sync with the broker metadata in the client's cache. Hence it is no longer safe to validate inside the `Cluster` constructor that each leader has an associated `Node`
Fixing this issue was unfortunately not straightforward because the cache was built to maintain references to broker metadata through the `Node` object at the partition level. In order to keep the state consistent, each `Node` reference would need to be updated based on the new broker metadata. Instead of doing that, this patch changes the cache so that it is structured more closely with the Metadata response schema. Broker node information is maintained at the top level in a single collection and cached partition metadata only references the id of the broker. To accommodate this, we have removed `PartitionInfoAndEpoch` and we have altered `MetadataResponse.PartitionMetadata` to eliminate its `Node` references.
Note that one of the side benefits of the refactor here is that we virtually eliminate one of the hotspots in Metadata request handling in `MetadataCache.getEndpoints` (which was renamed to `maybeFilterAliveReplicas`). The only reason this was expensive was because we had to build a new collection for the `Node` representations of each of the replica lists. This information was doomed to just get discarded on serialization, so the whole effort was wasteful. Now, we work with the lower level id lists and no copy of the replicas is needed (at least for all versions other than 0).
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
A few references to KIP-559 in the schema definitions needed to be fixed.
Reviewers: Brajesh Kumar <bristy@users.noreply.github.com>, Ron Dagostino <rdagostino@confluent.io>, Jason Gustafson <jason@confluent.io>
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:
Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
guozhangwang#2
guozhangwang#5
Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
guozhangwang#3
guozhangwang#4
Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
guozhangwang#6
guozhangwang#7
Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
guozhangwang#8
guozhangwang#9
Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
Fixes NPE in brokers when processing record errors in produce response for older versions.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Also add support for flexible versions to both protocol types.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin Patrick McCabe <cmccabe@apache.org>
Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: Jason Gustafson <jason@confluent.io>
This PR implements the KIP-559: https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies
- it adds the Protocol Type and the Protocol Name fields in JoinGroup and SyncGroup API;
- it validates that the fields are provided by the client when the new version of the API is used and ensure that they are consistent. it errors out otherwise;
- it validates that the fields are consistent in the client and errors out otherwise;
- it adds many tests related to the API changes but also extends the testing coverage of the requests/responses themselves.
- it standardises the naming in the coordinator. now, `ProtocolType` and `ProtocolName` are used across the board in the coordinator instead of having a mix of protocol type, protocol name, subprotocol, protocol, etc.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR implements `default.api.timeout.ms` as documented by KIP-533. This is a rebased version of #6913 with some additional test cases and small cleanups.
Reviewers: David Arthur <mumrah@gmail.com>
Co-authored-by: huxi <huxi_2b@hotmail.com>
Attempt to load multiple IBM classes but fallback on loading the Sun class if the IBM one is not found.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Also: Improve error message, Add test, Minor code quality fixes
Verified that the test fails if the broker default for max message bytes is lower or higher than the currently set value.
Reviewers: Andrew Choi <andchoi@linkedin.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Previously the idempotent producer and transactional producer use separate logic when initializing the producerId. This patch consolidates the two paths. We also do some cleanup in `TransactionManagerTest` to eliminate brittle expectations on `Sender`.
Reviewers: Bob Barrett <bob.barrett@confluent.io>, Viktor Somogyi <viktorsomogyi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This patch adds a new API to the producer to implement transactional offset commit fencing through the group coordinator as proposed in KIP-447. This PR mainly changes on the Producer end for compatible paths to old `sendOffsetsToTxn(offsets, groupId)` vs new `sendOffsetsToTxn(offsets, groupMetadata)`.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
The previous version of MockConsumer does not allow the clients to test consecutive calls to poll while consuming only from a partial set of partitions due to the fact that it clears all the records after each call. This change makes MockConsumer clearing the records only for the partitions that are not paused (whose records are actually returned by the poll). The remaining paused partitions will retain the records.
Unit test added accordingly.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The producer's BufferPool may block allocations if its memory limit has hit capacity. If the producer is closed, it's possible for the allocation waiters to wait for max.block.ms if progress cannot be made, even when force-closed (immediate), which can cause indefinite blocking if max.block.ms is particularly high.
This patch fixes the problem by adding a `close()` method to `BufferPool`, which wakes up any waiters that have pending allocations and throws an exception.
Reviewers: Jason Gustafson <jason@confluent.io>
When the scheduled refreshTopicPartitions runs, check existing topics in
both source and target clusters in order to compute topic partitions to
be created on target.
If a temporary failure to create the target topic is encountered (e.g.
insufficient number of brokers), on the next refresh the target topic
creation will be re-attempted.
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@uk.ibm.com>
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Removes references to the old scala Acl classes from kafka.security.auth (Acl, Operation, ResourceType, Resource etc.) and replaces these with the Java API. Only the old SimpleAclAuthorizer, AuthorizerWrapper used to wrap legacy authorizer instances and tests using SimpleAclAuthorizer continue to use the old API. Deprecates the old scala API.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
To be able to correctly fence zombie producer txn commit, we propose to add (member.id, group.instance.id, generation) into the transaction commit protocol to raise the same level of correctness guarantee as consumer commit.
Major changes involve:
1. Upgrade transaction commit protocol with (member.id, group.instance.id, generation). The client will fail if the broker is not supporting the new protocol.
2. Refactor group coordinator logic to handle new txn commit errors such as FENCED_INSTANCE_ID, UNKNOWN_MEMBER_ID and ILLEGAL_GENERATION. We loose the check on transaction commit when the member.id is set to empty. This is because the member.id check is an add-on safety for producer commit, and we also need to consider backward compatibility for old producer clients without member.id information. And if producer equips with group.instance.id, then it must provide a valid member.id (not empty definitely), the same as a consumer commit.
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Let consumer back-off and retry offset fetch when the specific offset topic has pending commits.
The major change lies in the broker side offset fetch logic, where a request configured with flag WaitTransaction to true will be required to back-off when some pending transactional commit is ongoing. This prevents any ongoing transaction being modified by third party, thus guaranteeing the correctness with input partition writer shuffling.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Loop call Consumer.endOffsets Throw TimeoutException: Failed to get offsets by times in 30000ms after a leader change.
Reviewers: Steven Lu, Guozhang Wang <wangguoz@gmail.com>
Prior to this commit, broker's Selector used to read all requests available on the socket when the socket is ready for read. These are queued up as staged receives. This can result in excessive memory usage when socket read buffer size is large. We mute the channel and stop reading any more data until all the staged requests are processed. This behaviour is slightly inconsistent since for the initial read we drain the socket buffer, allowing it to get filled up again, but if data arrives slighly after the initial read, then we dont read from the socket buffer until pending requests are processed.
To avoid holding onto requests for longer than required, this commit removes staged receives and reads one request at a time even if more data is available in the socket buffer. This is especially useful for produce requests which may be large and may take long to process. Additional data read from the socket for SSL is limited to the pre-allocated intermediate SSL buffers.
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Not wait until updateAssignmentMetadataIfNeeded returns true, but only call it once with 0 timeout. Also do not return empty if in rebalance.
Trim the pre-fetched records after long polling since assignment may have been changed.
Also need to update SubscriptionState to retain the state in assignFromSubscribed if it already exists (similar to assignFromUser), so that we do not need the transition of INITIALIZING to FETCHING.
Unit test: this actually took me the most time :)
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Sophie Blee-Goldman <sophie@confluent.io>, Jason Gustafson <jason@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, dengziming <dengziming1993@gmail.com>
Existing producer state expiration uses timestamps from data records only and not from transaction markers. This can cause premature producer expiration when the coordinator times out a transaction because we drop the state from existing batches. This in turn can allow the coordinator epoch to revert to a previous value, which can lead to validation failures during log recovery. This patch fixes the problem by also leveraging the timestamp from transaction markers.
We also change the validation logic so that coordinator epoch is verified only for new marker appends. When replicating from the leader and when recovering the log, we only log a warning if we notice that the coordinator epoch has gone backwards. This allows recovery from previous occurrences of this bug.
Finally, this patch fixes one minor issue when loading producer state from the snapshot file. When the only record for a given producer is a control record, the "last offset" field will be set to -1 in the snapshot. We should check for this case when loading to be sure we recover the state consistently.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
It is possible for the user to call `commitTransaction` before a pending `AddPartitionsToTxn` call has returned. If the `AddPartitionsToTxn` call returns an abortable error, we need to cancel any pending batches in the accumulator and we need to fail the pending commit. The problem in this case is that `Sender.runOnce`, after failing the batches, enters `NetworkClient.poll` before it has a chance to cancel the commit. Since there are no pending requests at this time, this will block unnecessarily and prevent completion of the doomed commit. This patch fixes the problem by returning from `runOnce` if any batches have been aborted, which allows the commit to also fail without the delay.
Note that this was the cause of the delay executing `AuthorizerIntegrationTest.testTransactionalProducerTopicAuthorizationExceptionInCommit` compared to other tests in this class. After fixing the bug, the delay is gone and the test time is similar to other test cases in the class.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Also document the parameters of `ReplicaAssignment` in `ControllerContext`.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
This change updates the CreatePartitions request and response api objects
to use the generated protocol classes.
Reviewers: Jason Gustafson <jason@confluent.io>
The close method calls `Thread.join` to wait for AdminClient thread to
die, but if the close is called in the api completion callback, `join`
waits forever, as the thread calling `join` is same as the thread it
wants to wait to die.
This change checks for this condition and skips the join. The thread
will then return to main loop, where it will check for this condition
and exit.
Reviewers: Jason Gustafson <jason@confluent.io>
The log context is useful when debugging applications which have multiple clients. This patch propagates the context to the channel builders and the SASL authenticator.
Reviewers: Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
* Adjust build and documentation.
* Use lambda syntax for SAM types in `core`, `streams-scala` and
`connect-runtime` modules.
* Remove `runnable` and `newThread` from `CoreUtils` as lambda
syntax for SAM types make them unnecessary.
* Remove stale comment in `FunctionsCompatConversions`,
`KGroupedStream`, `KGroupedTable' and `KStream` about Scala 2.11,
the conversions are needed for Scala 2.12 too.
* Deprecate `org.apache.kafka.streams.scala.kstream.Suppressed`
and use `org.apache.kafka.streams.kstream.Suppressed` instead.
* Use `Admin.create` instead of `AdminClient.create`. Static methods
in Java interfaces can be invoked since Scala 2.12. I noticed that
MirrorMaker 2 uses `AdminClient.create`, but I did not change them
as Connectors have restrictions on newer client APIs.
* Improve efficiency in a few `Gauge` implementations by avoiding
unnecessary intermediate collections.
* Remove pointless `Option.apply` in `ZookeeperClient`
`SessionState` metric.
* Fix unused import/variable and other compiler warnings.
* Reduce visibility of some vals/defs.
Reviewers: Manikumar Reddy <manikumar@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Gwen Shapira <gwen@confluent.io>
When the consumer's fetch request is throttled by the KIP-219 mechanism,
it receives an empty fetch response. The consumer should not log this
as an error.
Reviewers: Jason Gustafson <jason@confluent.io>
Allow ConfigCommand to handle more operations without using direct ZooKeeper access, as described by KIP-543. Also allow specifying entity type and name via a single flag-- again, as the KIP describes.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Adds support for TLSv1.3 in SslTransportLayer. Note that TLSv1.3 is only enabled from Java 11 onwards, so we test the code only when running with Java11 and above.
Tests run on this PR:
- SslTransportLayerTest: This covers testing of our SslTransportLayer and all tests are run with TLSv1.3 when running with Java 11. These tests are also run with TLSv1.2 for all Java versions.
- SslFactoryTest: Also run with TLSv1.3 on Java 11 onwards in addition to TLSv1.2 for all Java versions.
- SslEndToEndAuthorizationTest - Run only with TLSv1.3 on Java 11 onwards and only with TLSv1.2 on earlier Java versions. We have other versions of this test which use SSL that continue to be with TLSv1.2 on Java 11 to avoid reducing test coverage for TLSv1.2
Additional testing for done for TLSv1.3:
- Most tests that use SSL use TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS which is set to TLSv1.2. I have run all clients and core tests with DEFAULT_TLS_PROTOCOL_FOR_TESTS=TLSv1.3 with Java 11.
- Ran a few system tests locally with TKSv1.3
Reviewers: Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy <manikumar.reddy@gmail.com>