Reviewers: Guozhang Wang <guozhang@confluent.io>, Ismael Juma <ismael@confluent.io>, Jorge Quilcate Otoya <quilcate.jorge@gmail.com>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
See also KIP-183.
This implements the following algorithm:
AdminClient sends ElectPreferredLeadersRequest.
KafakApis receives ElectPreferredLeadersRequest and delegates to
ReplicaManager.electPreferredLeaders()
ReplicaManager delegates to KafkaController.electPreferredLeaders()
KafkaController adds a PreferredReplicaLeaderElection to the EventManager,
ReplicaManager.electPreferredLeaders()'s callback uses the
delayedElectPreferredReplicasPurgatory to wait for the results of the
election to appear in the metadata cache. If there are no results
because of errors, or because the preferred leaders are already leading
the partitions then a response is returned immediately.
In the EventManager work thread the preferred leader is elected as follows:
The EventManager runs PreferredReplicaLeaderElection.process()
process() calls KafkaController.onPreferredReplicaElectionWithResults()
KafkaController.onPreferredReplicaElectionWithResults()
calls the PartitionStateMachine.handleStateChangesWithResults() to
perform the election (asynchronously the PSM will send LeaderAndIsrRequest
to the new and old leaders and UpdateMetadataRequest to all brokers)
then invokes the callback.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jun Rao <junrao@gmail.com>
This patch fixes a few overflow issues with wrapping sequence numbers in the broker's producer state tracking.
Reviewers: Jason Gustafson <jason@confluent.io>
KIP-291 Implementation : Added code to separate controller connections and requests from the data plane.
Tested with local deployment that the controller request are handled by the control plane and other requests are handled by the data plane.
Also added unit tests in order to test the functionality.
Author: Lucas Wang <luwang@linkedin.com>,
Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
Reviewers: Joel Koshy <jjkoshy@gmail.com>, Jun Rao <junrao@gmail.com>
Using AdminClient#alterConfigs, topic `retention.ms` property can be assigned to a value lesser than -1. This leads to inconsistency while describing the topic configuration. We should not allow values lesser than -1.
Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>,Matthias J. Sax <matthias@confluent.io>
Closes#6082 from kamalcph/KAFKA-7781
* KAFKA-6627: Prevent config default values overriding ones specified through --producer-property on command line.
In Console{Producer,Consumer}, extraProducerProps (options specified in
--producer-property) is applied first, then overriden unconditionally,
even if the value is not specified explicitly (and default value is
used). This patch fixes it so that it doesn't override the existing
value set by --producer-property if it is not explicitly specified.
The contribution is my original work and I license the work to the
project under the project's open source license.
Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>
There is a race condition in ReplicaFetcherThread, where we can update PartitionFetchState with the new leader epoch (same leader) before handling the OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error which causes removing partition from partitionStates, which in turn causes no fetching until the next LeaderAndIsr.
This patch adds logic to ensure that the leader epoch doesn't change while an OffsetsForLeaderEpoch request is in flight (which could happen with back-to-back leader elections). If it has changed, we ignore the response.
Also added toString() implementation to PartitionData, because some log messages did not show useful info which I found while investigating the above system test failure.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This patch changes the behavior of KafkaProducer.waitOnMetadata to wait up to max.block.ms when the partition specified in the produce request is out of the range of partitions present in the metadata. This improves the user experience in the case when partitions are added to a topic and a client attempts to produce to one of the new partitions before the metadata has propagated to the brokers. Tested with unit tests.
Reviewers: Arjun Satish <arjun@confluent.io>, Jason Gustafson <jason@confluent.io>
- This commits sets ACL on /kafka-acl-extended
- Extended ZkAuthorizationTest to check ACL on /kafka-acl-extended
- Using zookeeper-security-migration.sh tool on a Kerberized test cluster, I verified the changes: secured and unsecured Kafka znodes and examined ACL on /kafka-acl-extended with zookeeper client
Author: Attila Sasvari <asasvari@apache.org>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Andras Katona <41361962+akatona84@users.noreply.github.com>
Closes#6072 from asasvari/KAFKA-7752
* Shard purgatory to reduce lock contention
* put constant into Object, use foldLeft instead of for loop
* watchersForKey -> watchersByKey
* Incorporate Jun's comments: use named arguments instead of _, and remove
an unnecessary lock
Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Jun Rao <junrao@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
**User Interface Improvement :** If topic doesn't exist then Kafka describe command should throw topic doesn't exist exception, like alter and delete commands
Author: Manohar Vanam <manohar.crazy09@gmail.com>
Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Jason Gustafson <jason@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#5211 from ManoharVanam/KAFKA-7054
After a recent leader election, the leaders high-water mark might lag behind the offset at the beginning of the new epoch (as well as the previous leader's HW). This can lead to offsets going backwards from a client perspective, which is confusing and leads to strange behavior in some clients.
This change causes Partition#fetchOffsetForTimestamp to throw an exception to indicate the offsets are not yet available from the leader. For new clients, a new OFFSET_NOT_AVAILABLE error is added. For existing clients, a LEADER_NOT_AVAILABLE is thrown.
This is an implementation of [KIP-207](https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change).
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>
When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.
This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
Older versions of the Produce API should return an error if zstd is used. This validation existed, but it was done during request parsing, which means that instead of returning an error code, the broker disconnected. This patch fixes the issue by moving the validation outside of the parsing logic. It also fixes several other record validations which had the same problem.
Reviewers: Jason Gustafson <jason@confluent.io>
This test sometimes fails with
```
kafka.tools.MirrorMaker$NoRecordsException
at kafka.tools.MirrorMaker$ConsumerWrapper.receive(MirrorMaker.scala:483)
at kafka.tools.MirrorMakerIntegrationTest$$anonfun$testCommaSeparatedRegex$1.apply$mcZ$sp(MirrorMakerIntegrationTest.scala:92)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:738)
```
The test should catch `NoRecordsException` instead of `TimeoutException`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
On the follower side, for the empty `LogAppendInfo` retrieved from the leader, fetcherLagStats set the wrong lag for fetcherLagStats due to `nextOffset` is zero.
There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from this state by deleting and recreating the segment and all of its associated index files.
Reviewers: Jason Gustafson <jason@confluent.io>
Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. While appending new records, complete fetch requests after releasing leaderIsrUpdate of the Partition to which records were appended to avoid deadlocks in request handler threads.
Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
EndToEndLatency tool produces a dummy record in case the topic does not exist. This behavior was introduced in this PR https://github.com/apache/kafka/pull/5319 as part of updating the tool to use latest consumer API. However, if we run the tool with producer acks == 1, the high watermark may not be updated before we reset consumer offsets to latest. In rare cases when this happens, the tool will throw an exception in the for loop where the consumer will unexpectedly consume the dummy record. As a result, we occasionally see Benchmark.test_end_to_end_latency system test failures.
This PR checks if topic exists, and creates the topic using AdminClient if it does not exist.
Author: Anna Povzner <anna@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5950 from apovzner/fix-EndToEndLatency
- Remove ZKUtils usage from various tests
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Ismael Juma <ismael@juma.me.uk>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>, Ryanne Dolan <ryannedolan@gmail.com>
Closes#5480 from omkreddy/zkutils
Some deserializer needs the topic name to be able to correctly deserialize the payload of the message.
Console consumer works great with Deserializer<String> however it calls deserializer with topic set as null.
This breaks the API and the topic information is available in the ConsumerRecord.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Gardner Vickers <gardner@vickers.me>, Jun Rao <junrao@gmail.com>
Changes made as part of this [KIP-374](https://cwiki.apache.org/confluence/x/FgSQBQ) and [KAFKA-7418](https://issues.apache.org/jira/browse/KAFKA-7418)
- Checking for empty args or help option in command file to print Usage
- Added new class to enforce help option to all commands
- Refactored few lines (ex `PreferredReplicaLeaderElectionCommand`) to
make use of `CommandDefaultOptions` attributes.
- Made the changes in help text wordings
Run the unit tests in local(Windows) few Linux friendly tests are failing but
not any functionality, verified `--help` and no option response by running
Scala classes, since those all are having `main` method.
Author: Srinivas Reddy <srinivas96alluri@gmail.com>
Author: Srinivas Reddy <mrsrinivas@users.noreply.github.com>
Author: Srinivas <srinivas96alluri@gmail.com>
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Manikumar Reddy <manikumar.reddy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
Closes#5910 from mrsrinivas/KIP-374
While metrics like Min, Avg and Max make sense to respective use Double.MAX_VALUE, 0.0 and Double.MIN_VALUE as default values to ease computation logic, exposing those values makes reading them a bit misleading. For instance, how would you differentiate whether your -avg metric has a value of 0 because it was given samples of 0 or no samples were fed to it?
It makes sense to standardize on the output of these metrics with something that clearly denotes that no values have been recorded.
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Improve the default group id behavior by:
* changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed
* deprecating the use of empty (`""`) consumer group on the client
Reviewers: Jason Gustafson <jason@confluent.io>
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
We are seeing some timeouts in tests which depend on the awaitCommitCallback (e.g.
SaslMultiMechanismConsumerTest.testCoordinatorFailover). After some investigation,
we found that it is caused by a disconnect when attempting the async commit.
To fix the problem, we have added simple retry logic to the test utility.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
- Use Xlint:all with 3 exclusions (filed KAFKA-7613 to remove the exclusions)
- Use the same javac options when compiling tests (seems accidental that
we didn't do this before)
- Replaced several deprecated method calls with non-deprecated ones:
- `KafkaConsumer.poll(long)` and `KafkaConsumer.close(long)`
- `Class.newInstance` and `new Integer/Long` (deprecated since Java 9)
- `scala.Console` (deprecated in Scala 2.11)
- `PartitionData` taking a timestamp (one of them seemingly a bug)
- `JsonMappingException` single parameter constructor
- Fix unnecessary usage of raw types in several places.
- Add @SuppressWarnings for deprecations, unchecked and switch fallthrough in
several places.
- Scala clean-ups (var -> val, ETA expansion warnings, avoid reflective calls)
- Use lambdas to simplify code in a few places
- Add @SafeVarargs, fix varargs usage and remove unnecessary `Utils.mkList` method
Reviewers: Matthias J. Sax <mjsax@apache.org>, Manikumar Reddy <manikumar.reddy@gmail.com>, Randall Hauch <rhauch@gmail.com>, Bill Bejeck <bill@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>