FetchResponse should return the partitionData's lastStabeleOffset
Author: lambdaliu <lambdaliu@tencent.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Dhruvil Shah <dhruvil@confluent.io>, Dong Lin <lindong28@gmail.com>
Closes#5835 from lambdaliu/KAFKA-7535
`ControllerIntegrationTest#waitUntilControllerEpoch` sometimes fails with the following error:
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at kafka.controller.ControllerIntegrationTest$$anonfun$waitUntilControllerEpoch$1.apply$mcZ$sp(ControllerIntegrationTest.scala:312)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:779)
at kafka.controller.ControllerIntegrationTest.waitUntilControllerEpoch(ControllerIntegrationTest.scala:312)
at kafka.controller.ControllerIntegrationTest.testEmptyCluster(ControllerIntegrationTest.scala:51)
```
We should retry until the value is defined or it times out.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Make sure that the transaction state is properly cleared when the
`transactionalId-expiration` task fails. Operations on that transactional
id would otherwise return a `CONCURRENT_TRANSACTIONS` error
and appear "untouchable" to transaction state changes, preventing
transactional producers from operating until a broker restart or
transaction coordinator change.
Unit tested by verifying that having the `transactionalId-expiration` task
won't leave the transaction metadata in a pending state if the replica
manager returns an error.
Reviewers: Jason Gustafson <jason@confluent.io>
Just a doc change
Author: John Eismeier <john.eismeier@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4573 from jeis2497052/trunk
After KAFKA-6051, we close leaderEndPoint in replica fetcher thread initiateShutdown to try to preempt in-progress fetch request and accelerate repica fetcher thread shutdown. However, leaderEndpoint can throw an Exception when the replica fetcher thread is still actively fetching, which can cause ReplicaManager to fail to shutdown cleanly. This PR catches the exceptions thrown in "leaderEndpoint.close()" instead of letting it throw up in the call stack.
Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Dong Lin <lindong28@gmail.com>
Closes#5808 from hzxa21/KAFKA-7464
This patch adds logging of topic config overrides during creation or during the handling of alter config requests. Also did some minor cleanup to avoid redundant validation logic when adding partitions.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#5812 from hachikuji/minor-log-topic-creation-configs
Add CreatePartitionsRequest.PartitionDetails similar to CreateTopicsRequest.TopicDetails to avoid references from `common.requests` package to `clients`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Adds `client.dns.lookup=resolve_canonical_bootstrap_servers_only` option to perform full dns resolution of bootstrap addresses
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Sriharsha Chintalapani <sriharsha@apache.org>, Edoardo Comar <ecomar@uk.ibm.com>, Mickael Maison <mickael.maison@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Implementation of KIP-302: Based on the new client configuration `client.dns.lookup`, a NetworkClient can use InetAddress.getAllByName to find all IPs and iterate over them when they fail to connect. Only uses either IPv4 or IPv6 addresses similar to the default mode.
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
- SslFactoryTest should use SslFactory to create SSLEngine
- Use Mockito instead of EasyMock in `ConsoleConsumerTest` as one of
the tests mocks a standard library class and the latest released EasyMock
version can't do that when Java 11 is used.
- Avoid mocking `ConcurrentMap` in `SourceTaskOffsetCommitterTest`
for similar reasons. As it happens, mocking is not actually needed here.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
* Fix `ZkUtils.getReassignmentJson` to pass Java map to `Json.encodeAsString`
* Allow new file creation in ReplicationQuotasTestRig test
Reviewers: Ismael Juma <ismael@juma.me.uk>
Ensure that `TestUtils.waitUntilTrue(..)` is blocked on both send completed and a new leader being assigned
Author: Gardner Vickers <gardner@vickers.me>
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Dong Lin <lindong28@gmail.com>
Closes#5695 from gardnervickers/log-dir-failure-test-fix
The thread no longer dies. When encountering an unexpected error, it marks the partition as "uncleanable" which means it will not try to clean its logs in subsequent runs.
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
This patch contains the broker-side support for the fencing improvements from KIP-320. This includes the leader epoch validation in the ListOffsets, OffsetsForLeaderEpoch, and Fetch APIs as well as the changes needed in the fetcher threads to maintain and use the current leader epoch. The client changes from KIP-320 will be left for a follow-up.
One notable change worth mentioning is that we now require the read lock in `Partition` in order to read from the log or to query offsets. This is necessary to ensure the safety of the leader epoch validation. Additionally, we forward all leader epoch changes to the replica fetcher thread and go through the truncation phase. This is needed to ensure the fetcher always has the latest epoch and to guarantee that we cannot miss needed truncation if we missed an epoch change.
Reviewers: Jun Rao <junrao@gmail.com>
This patch adds checks before reading the first record of a control batch. If the batch is empty, it is treated as having already been cleaned. In the case of LogCleaner this means it is safe to discard. In the case of ProducerStateManager it means it shouldn't cause state to be stored because the relevant transaction has already been cleaned. In the case of Fetcher, it just preempts the check for an abort. In the case of GroupMetadataManager, it doesn't process the offset as a commit. The patch also adds isControl to the output of DumpLogSegments. Changes were tested with unit tests, except the DumpLogSegments change which was tested manually.
In `ConsumerBuilder.build`, if `awaitInitialPositions` raises an exception, the consumer will not be closed properly. We should add the consumer instance to the `consumers` collection immediately after construction.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections.
Additionally, we have made the following changes:
1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache.
2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader.
Reviewers: Jun Rao <junrao@gmail.com>
During the consumer group rebalance, when the joining group phase finishes, the heartbeat delayed operation of the consumer that fails to rejoin the group should be removed from the purgatory. Otherwise, even though the member ID of the consumer has been removed from the group, its heartbeat delayed operation is still registered in the purgatory and the heartbeat delayed operation is going to timeout and then another unnecessary rebalance is triggered because of it.
Author: Lincong Li <lcli@linkedin.com>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5556 from Lincong/remove_heartbeat_delayedOperation
As part of KAFKA-6514, the `apiVersion` tag was added to the `RequestsPerSec`
metric. A thread unsafe `HashMap` was used in the implementation even though
it can be accessed by multiple threads. Fix it by replacing it with the thread-safe
`Pool`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Values for `message.format.version` and `log.message.format.version` should be verified before topic creation or config change.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This commit implements the changes described in KIP-320 for the persistence of leader epoch information in the offset commit protocol.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This fixes a regression caused by KAFKA-4682 (KIP-211) which caused offset commit failures after upgrading from an older version which used the v1 inter-broker format.
* KAFKA-7400: Compacted topic segments that precede the log start offset are not cleaned up
Currently we don't delete any log segments if the cleanup policy doesn't include delete. This patch changes the behavior to delete log segments that fully precede the log start offset even when deletion is not enabled. Tested with unit tests to verify that LogManager.cleanupLogs now cleans logs with cleanup.policy=compact and that Log.deleteOldSegments deletes segments that preced the start offset regardless of the cleanup policy.
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
This patch implements KIP-336. It adds a default implementation to the Serializer/Deserializer interface to support the use of headers and it deprecates the ExtendedSerializer and ExtendedDeserializer interfaces for later removal.
Reviewers: Satish Duggana <sduggana@hortonworks.com>, John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>
In order to fix race condition between log cleaner thread and log retention thread when dynamically switching topic cleanup policy, existing log cleaner in-progress map is used to prevent more than one thread from working on the same topic partition.
Author: Xiongqi Wesley Wu <xiongqi.wu@gmail.com>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5591 from xiowu0/trunk
This patch fixes the inconsistent handling of out of range errors in the replica fetcher. Previously we would raise a fatal error if the follower's offset is ahead of the leader's and unclean leader election is not enabled. The behavior was inconsistent depending on the message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower, the replica would use leader epoch information to reconcile the end of the log with the leader and simply truncate. Additionally, with the old format, the check is not really bulletproof for detecting data loss since the unclean leader's end offset might have already caught up to the follower's offset at the time of its initial fetch or when it queries for the current log end offset.
With this patch, we simply skip the unclean leader election check and allow the needed truncation to occur. When the truncation offset is below the high watermark, a warning will be logged. This makes the behavior consistent for all message formats and removes a scenario in which an error on one partition can bring the broker down.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
A call to `kafka-consumer-groups --describe --group ...` can result in NullPointerException for two reasons:
1) `Fetcher.fetchOffsetsByTimes()` may return too early, without sending list offsets request for topic partitions that are not in cached metadata.
2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. This happens in corner cases such as message format on broker is before 0.10, or maybe in cases of some other errors.
Testing:
-- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
-- did some manual testing with `kafka-consumer-groups --describe`, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala,
Reviewers: Jason Gustafson <jason@confluent.io>
With KIP-320, the OffsetsForLeaderEpoch API is intended to be used by consumers to detect log truncation. Therefore the new response schema should expose a field for the throttle time like all the other APIs.
Reviewers: Dong Lin <lindong28@gmail.com>
findBugs is abandoned, it doesn't work with Java 9 and the Gradle plugin will be deprecated in
Gradle 5.0: https://github.com/gradle/gradle/pull/6664
spotBugs is actively maintained and it supports Java 8, 9 and 10. Java 11 is not supported yet,
but it's likely to happen soon.
Also fixed a file leak in Connect identified by spotbugs.
Manually tested spotBugsMain, jarAll and importing kafka in IntelliJ and running
a build in the IDE.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Dong Lin <lindong28@gmail.com>
Closes#5625 from ijuma/kafka-5887-spotbugs