This commit reworks the SocketServer to always start the acceptor threads after the processor threads and to always stop the acceptor threads before the processor threads. It ensures that the acceptor shutdown is not blocked waiting on the processors to be fully shutdown by decoupling the shutdown signal and the awaiting. It also ensure that the processor threads drain its newConnection queue to unblock acceptors that may be waiting. However, the acceptors still bind during the startup, only the processing of new connections and requests is further delayed.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
When building a release candidate with release.py, if it's not the first RC, we need to drop the previous RC's artifacts from the staging repository before closing the new ones. This adds a log message to remind the release manager of this
The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Method split takes up too many resources and might
cause outOfMemory error when the bigBatch is huge.
Call closeForRecordAppends() to free up resources
like compression buffers.
Change-Id: Iac6519fcc2e432330b8af2d9f68a8d4d4a07646b
Signed-off-by: Jiamei Xie <jiamei.xiearm.com>
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Jiamei Xie <jiamei.xie@arm.com>
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#8286 from jiameixie/outOfMemory
`KafkaApis#handleOffsetDeleteRequest` does not build the response correctly because `topics.add` is not in the correct loop. Fortunately, due to how the response is processed by the admin client, it works but sends redundant information on the wire.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
When it comes to actually closing a task we now treat all states exactly the same, and call StateManagerUtil#closeStateManager regardless of whether it's in CREATED or RESTORING or RUNNING
Unfortunately StateManagerUtil doesn't actually check to make sure that we actually own the lock for this task's state. During a dirty close with eos enabled, we wipe the state -- but in some cases, this means deleting the state out from under another StreamThread who is still in the process of revoking this task.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch moves the state change logger logs for handling a LeaderAndIsr/StopReplica request inside the replicaStateChangeLock in order to serialize the logs. This helps to tell apart per-partition actions of concurrent LAIR/StopReplica requests in cases where requests pile up waiting on the lock.
Reviewer: Jun Rao <junrao@gmail.com>
QuotaViolationException generates an exception message via String.format in the constructor
even though the message is often not used, e.g. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L258. We now override `toString` instead.
It also generates an unnecessary stack trace, which is now avoided using the same pattern as in ApiException.
I have also avoided use of QuotaViolationException for control flow in
ReplicationQuotaManager which is another hotspot that we have seen in practice.
Reviewers: Gwen Shapira <gwen@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
There are two cases in the fetch pass where a partition is unnecessarily looked up
from the partition Pool, when one is already accessible. This will be a fairly minor
improvement on high partition count clusters, but could be worth 1% from some
profiles I have seen.
More importantly, the code is cleaner this way.
Reviewers: Ismael Juma <ismael@juma.me.uk>
FetchRequest.PartitionData.equals unnecessarily uses Object.equals generating a lot of allocations due to boxing, even though primitives are being compared. This is shown in the allocation profile below. Note that the CPU overhead is negligble.

![image](https://user-images.githubusercontent.com/252189/79079019-46686300-7cc1-11ea-9bc9-44fd17bae888.png)
Author: Lucas Bradstreet <lucasbradstreet@gmail.com>
Reviewers: Chia-Ping Tsai, Gwen Shapira
Closes#8473 from lbradstreet/avoid-boxing-partition-data-equals
In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages.
This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment.
This fix has two parts:
It changes logSegments to handle start==end by returning an empty List always.
It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow.
These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning.
Reviewer: Jun Rao <junrao@gmail.com>
When the LogManager resumes cleaning it states that compaction is resumed, however the topic in question is not necessarily a compacted one.
Author: Lucas Bradstreet <lucas@confluent.io>
Reviewers: Gwen Shapira, Chia-Ping Tsai
Closes#8466 from lbradstreet/bad-cleaning-message
This fixes a version pinning issue where a transitive dependency had a
major version upgrade that a dependency did not account for, breaking
the build.
Reviewers: Andrew Egelhofer <aegelhofer@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is a follow-up to #8077. The bug exposed a testing gap in how we group partitions. This patch adds a test case which reproduces the reported problem.
Reviewers: David Arthur <mumrah@gmail.com>
The previous code did not use the collection produced by `takeWhile()`.
It only used the length of that collection to select the next element.
Reviewers: Ismael Juma <ismael@juma.me.uk>
We were hitting an IllegalStateException: There is already a changelog registered for ... in trunk-eos due to failing to call TaskManager#cleanup on unrevoekd tasks that we end up closing in handleAssignment after failing to batch commit.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms
Refactored BufferExhaustedException to be a subclass of TimeoutException so existing code that catches TimeoutExceptions keeps working.
Added handling to count these Exceptions in the metric "buffer-exhausted-records".
Test Strategy
There were existing test cases to check this behavior, which I refactored.
I then added an extra case to check whether the expected Exception is actually thrown, which was not covered by current tests.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Remove the restriction in the protocol generation code that a structure
field needs to be part of an array.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Currently a `LeaderAndIsr` request with a stale leader epoch for some partition may still result in the starting of the log dir fetcher for that partition (if the future log exists). I am not sure if this causes any correctness problem since we don't use any state from the request to start the fetcher, but it seems unnecessary to rely on this side effect.
Reviewers: Jun Rao <junrao@gmail.com>
In `validateOffsetsAsync` in t he consumer, we group the requests by leader node for efficiency. The list of topic-partitions are grouped from `partitionsToValidate` (all partitions) to `node` => `fetchPostitions` (partitions by node). However, when actually sending the request with `OffsetsForLeaderEpochClient`, we use `partitionsToValidate`, which is the list of all topic-partitions passed into `validateOffsetsAsync`. This results in extra partitions being included in the request sent to brokers that are potentially not the leader for those partitions.
This PR fixes the issue by using `fetchPositions`, which is the proper list of partitions that we should send in the request. Additionally, a small typo of API name in `OffsetsForLeaderEpochClient` is corrected (it originally referenced `LisfOffsets` as the API name).
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
A read from the end of the log interleaved with a concurrent write can result in reading data above the expected read limit. In particular, this would allow a read above the high watermark. The root of the problem is consecutive calls to `sizeInBytes` in `FileRecords.slice` which do not account for an increase in size due to a concurrent write. This patch fixes the problem by using a single call to `sizeInBytes` and caching the result.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Fix the direct cause of the observed issue on the client side: when heartbeat getting errors and resetting generation, we only need to set it to UNJOINED when it was not already in REBALANCING; otherwise, the join-group handler would throw the retriable UnjoinedGroupException to force the consumer to re-send join group unnecessarily.
Fix the root cause of the issue on the broker side: we should still trigger rebalance when static member joins in CompletingRebalance phase; otherwise the member.ids would be changed when the assignment is received from the leader, hence causing the new member.id's assignment to be empty.
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
Invoke `waitForQuotaUpdate` after the quotas are removed. It also changes
the default request quota to `Long.MaxValue`.
Reviewers: Anna Povzner <anna@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Add type bounds to the ProcessorContext, which bounds the types that can be forwarded to child nodes.
Reviewers: Matthias J. Sax <matthias@confluent.io>