This patch moves the state change logger logs for handling a LeaderAndIsr/StopReplica request inside the replicaStateChangeLock in order to serialize the logs. This helps to tell apart per-partition actions of concurrent LAIR/StopReplica requests in cases where requests pile up waiting on the lock.
Reviewer: Jun Rao <junrao@gmail.com>
QuotaViolationException generates an exception message via String.format in the constructor
even though the message is often not used, e.g. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L258. We now override `toString` instead.
It also generates an unnecessary stack trace, which is now avoided using the same pattern as in ApiException.
I have also avoided use of QuotaViolationException for control flow in
ReplicationQuotaManager which is another hotspot that we have seen in practice.
Reviewers: Gwen Shapira <gwen@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
There are two cases in the fetch pass where a partition is unnecessarily looked up
from the partition Pool, when one is already accessible. This will be a fairly minor
improvement on high partition count clusters, but could be worth 1% from some
profiles I have seen.
More importantly, the code is cleaner this way.
Reviewers: Ismael Juma <ismael@juma.me.uk>
FetchRequest.PartitionData.equals unnecessarily uses Object.equals generating a lot of allocations due to boxing, even though primitives are being compared. This is shown in the allocation profile below. Note that the CPU overhead is negligble.

![image](https://user-images.githubusercontent.com/252189/79079019-46686300-7cc1-11ea-9bc9-44fd17bae888.png)
Author: Lucas Bradstreet <lucasbradstreet@gmail.com>
Reviewers: Chia-Ping Tsai, Gwen Shapira
Closes#8473 from lbradstreet/avoid-boxing-partition-data-equals
In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages.
This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment.
This fix has two parts:
It changes logSegments to handle start==end by returning an empty List always.
It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow.
These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning.
Reviewer: Jun Rao <junrao@gmail.com>
When the LogManager resumes cleaning it states that compaction is resumed, however the topic in question is not necessarily a compacted one.
Author: Lucas Bradstreet <lucas@confluent.io>
Reviewers: Gwen Shapira, Chia-Ping Tsai
Closes#8466 from lbradstreet/bad-cleaning-message
This fixes a version pinning issue where a transitive dependency had a
major version upgrade that a dependency did not account for, breaking
the build.
Reviewers: Andrew Egelhofer <aegelhofer@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is a follow-up to #8077. The bug exposed a testing gap in how we group partitions. This patch adds a test case which reproduces the reported problem.
Reviewers: David Arthur <mumrah@gmail.com>
The previous code did not use the collection produced by `takeWhile()`.
It only used the length of that collection to select the next element.
Reviewers: Ismael Juma <ismael@juma.me.uk>
We were hitting an IllegalStateException: There is already a changelog registered for ... in trunk-eos due to failing to call TaskManager#cleanup on unrevoekd tasks that we end up closing in handleAssignment after failing to batch commit.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms
Refactored BufferExhaustedException to be a subclass of TimeoutException so existing code that catches TimeoutExceptions keeps working.
Added handling to count these Exceptions in the metric "buffer-exhausted-records".
Test Strategy
There were existing test cases to check this behavior, which I refactored.
I then added an extra case to check whether the expected Exception is actually thrown, which was not covered by current tests.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Remove the restriction in the protocol generation code that a structure
field needs to be part of an array.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Currently a `LeaderAndIsr` request with a stale leader epoch for some partition may still result in the starting of the log dir fetcher for that partition (if the future log exists). I am not sure if this causes any correctness problem since we don't use any state from the request to start the fetcher, but it seems unnecessary to rely on this side effect.
Reviewers: Jun Rao <junrao@gmail.com>
In `validateOffsetsAsync` in t he consumer, we group the requests by leader node for efficiency. The list of topic-partitions are grouped from `partitionsToValidate` (all partitions) to `node` => `fetchPostitions` (partitions by node). However, when actually sending the request with `OffsetsForLeaderEpochClient`, we use `partitionsToValidate`, which is the list of all topic-partitions passed into `validateOffsetsAsync`. This results in extra partitions being included in the request sent to brokers that are potentially not the leader for those partitions.
This PR fixes the issue by using `fetchPositions`, which is the proper list of partitions that we should send in the request. Additionally, a small typo of API name in `OffsetsForLeaderEpochClient` is corrected (it originally referenced `LisfOffsets` as the API name).
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
A read from the end of the log interleaved with a concurrent write can result in reading data above the expected read limit. In particular, this would allow a read above the high watermark. The root of the problem is consecutive calls to `sizeInBytes` in `FileRecords.slice` which do not account for an increase in size due to a concurrent write. This patch fixes the problem by using a single call to `sizeInBytes` and caching the result.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Fix the direct cause of the observed issue on the client side: when heartbeat getting errors and resetting generation, we only need to set it to UNJOINED when it was not already in REBALANCING; otherwise, the join-group handler would throw the retriable UnjoinedGroupException to force the consumer to re-send join group unnecessarily.
Fix the root cause of the issue on the broker side: we should still trigger rebalance when static member joins in CompletingRebalance phase; otherwise the member.ids would be changed when the assignment is received from the leader, hence causing the new member.id's assignment to be empty.
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
Invoke `waitForQuotaUpdate` after the quotas are removed. It also changes
the default request quota to `Long.MaxValue`.
Reviewers: Anna Povzner <anna@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Add type bounds to the ProcessorContext, which bounds the types that can be forwarded to child nodes.
Reviewers: Matthias J. Sax <matthias@confluent.io>
90bbeedf52 introduced a regression resulting in passing an action per resource
name to the `Authorizer` instead of passing one per unique resource name. Refactor
the signatures of both `filterAuthorized` and `authorize` to make them easier to test
and add a test for each.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Some tasks get closed inside HandleAssignment and did not remove from the task manager bookkeep list. The next time they would be re-closed which is illegal state.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The upper limit offset is displayed incorrectly in the log cleaner summary message. For example:
```
Log cleaner thread 0 cleaned log __consumer_offsets-47 (dirty section = [358800359, 358800359])
```
We should be using the next dirty offset as the upper limit.
Reviewers: David Arthur <mumrah@gmail.com>
On metadata change for assigned topics, we trigger rebalance, revoke partitions and send JoinGroup. If metadata reverts to the original value and JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`. This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata change to ensure that we retry on failure.
Reviewers: Boyang Chen <boyang@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
Instance-level:
* number of alive stream threads
Thread-level:
* avg / max number of records polled from the consumer per runOnce, INFO
* avg / max number of records processed by the task manager (i.e. across all tasks) per runOnce, INFO
Task-level:
* number of current buffered records at the moment (i.e. it is just a dynamic gauge), DEBUG.
Reviewers: Bruno Cadonna <bruno@confluent.io>, John Roesler <john@confluent.io>
As title suggests, we would like to broaden this check so that we don't fail to close a doom-to-cleanup task.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
As title suggested, consumers would first do an OffsetFetch before starting the normal processing. It makes sense to add it to the concurrent test suite to verify whether there would be a blocking behavior.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
https://issues.apache.org/jira/browse/KAFKA-8889 attempted to fill in the missing stacktrace in the log message when handling errors in FetchSessionHandler#handleError
But the fix is not effective without KAFKA-7016
The current fix removes the redundant pair of braces {} at the end of the log message. If and when the Throwable that is passed as argument to this method has a stacktrace, the log message will include it. Currently it doesn't because the Throwable argument does not have a stacktrace.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>