Sometimes logging leaves us guessing at the cause of an increment to the log start offset. Since this results in deletion of user data, we should provide the reason explicitly.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
We unnecessarily iterate the versions list each time we lookup
lastVersion, including in the hotpath Log.appendAsFollower.
Given that allVersions is a constant, this is unnecessary.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
"console-producer" supports the setting of "client.id", which is a reasonable requirement, and the way "console consumer" and "console producer" handle "client.id" can be unified. "client.id" defaults to "console-producer"
Co-authored-by: xinzhuxiansheng <xinzhuxiansheng@autohome.com.cn>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Prior to KAFKA-8106, we allowed the v0 and v1 message formats to contain non-consecutive inner offsets. Inside `LogValidator`, we would detect this case and rewrite the batch. After KAFKA-8106, we changed the logic to raise an error in the case of the v1 message format (v0 was still expected to be rewritten). This caused an incompatibility for older clients which were depending on the looser validation. This patch reverts the old logic of rewriting the batch to fix the invalid inner offsets.
Note that the v2 message format has always had stricter validation. This patch also adds a test case for this.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Ismael Juma <ismael@juma.me.uk>
* Fix describeConfigs and alterConfigs not to invoke authorizer more
than once
* Add tests to KafkaApisTest to verify the fixes
* Rename `filterAuthorized` to `filterByAuthorized`
* Tweak `filterByAuthorized` to take resources instead of resource
names and improve implementation
* Introduce `partitionMapByAuthorized` and `partitionSeqByAuthorized`
and simplify code by using it
* Replace List with Seq in some AdminManager methods
* Remove stray `println` in `KafkaApisTest`
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
* Use `forEach` instead of `asScala.foreach` for Java Iterables.
* Use `ifPresent` instead of `asScala.foreach` for Java Optionals.
* Use `forEach` instead of `entrySet.forEach` for Java maps.
* Keep `asScala.foreach` for `Properties` as the Scala implementation
has a better interface (keys and values are of type `String`).
* A few clean-ups: unnecessary `()`, `{}`, `new`, etc.
Reviewers: Manikumar Reddy <manikumar@confluent.io>
This patch ensures that both clients and the bounce schedule get shutdown properly in this test. Additionally, it fixes the surprising behavior of using the passed delivery timeout to override the request timeout in `createTransactionalProducer`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Previously we had fallback logic when parsing ACLs to handle older entries which may contain non-escaped characters. This code became dead after 1.1 since it was no longer used in the parsing of ACLs. This patch removes the fallback logic.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This patch fixes a regression in the `StopReplica` response handling. We should only send the event on receiving the `StopReplica` response if we had requested deletion in the request.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>
In the case described in the JIRA, there was a 50%+ increase in the total fetch request rate in
2.4.0 due to this change.
I included a few additional clean-ups:
* Simplify `findPreferredReadReplica` and avoid unnecessary collection copies.
* Use `LongSupplier` instead of `Supplier<Long>` in `SubscriptionState` to avoid unnecessary boxing.
Added a unit test to ReplicaManagerTest and cleaned up the test class a bit including
consistent usage of Time in MockTimer and other components.
Reviewers: Gwen Shapira <gwen@confluent.io>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
Avoid calling into ConfigCommand and TopicCommand from tests that are not related
to these commands. It's better to just invoke the admin APIs.
Change a few cases where we were testing the deprecated --zookeeper flag to testing
the --bootstrap-server flag instead. Unless we're explicitly testing the deprecated code
path, we should be using the non-deprecated flags.
Move testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient from
TopicCommandWithAdminClientTest.scala into TopicCommandWithZKClientTest.scala,
since it makes more sense in the latter.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Comparing all other test cases, the shouldAllowConcurrentAccesses starts an async producer sending records throughout the test other than just synchronously sent and acked a few records before we start the streams application. Right after the streams app is started, we check that at least one record is sent to the output topic (i.e. completed processing). However since only this test starts the producer async and did not wait for it to complete, it is possible that the async producer gets too longer to produce some records and causing it to fail.
To follow what other tests did, I let this test to first send one round of records synchronously before starting the async producing.
Also encountered some new scala warnings that I fixed along with this PR.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Adjust `checkLogAppendTimeNonCompressed` to assert
`shallowOffsetOfMaxTimestamp` correctly for message format 2.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Fetches which hit purgatory are currently counted twice in fetch request rate metrics. This patch moves the metric update into `fetchMessages` so that they are only counted once.
Reviewers: Ismael Juma <ismael@juma.me.uk>
After KIP-219, responses are sent immediately and we rely on a combination
of clients and muting of the channel to throttle. The result of this is that
we need to track `apiThrottleTimeMs` as an explicit value instead of
inferring it. On the other hand, we no longer need
`apiRemoteCompleteTimeNanos`.
Extend `BaseQuotaTest` to verify that throttle time in the request channel
metrics are being set. Given the nature of the throttling numbers, the test
is not particularly precise.
I included a few clean-ups:
* Pass KafkaMetric to QuotaViolationException so that the caller doesn't
have to retrieve it from the metrics registry.
* Inline Supplier in SocketServer (use SAM).
* Reduce redundant `time.milliseconds` and `time.nanoseconds`calls.
* Use monotonic clock in ThrottledChannel and simplify `compareTo` method.
* Simplify `TimerTaskList.compareTo`.
* Consolidate the number of places where we update `apiLocalCompleteTimeNanos`
and `responseCompleteTimeNanos`.
* Added `toString` to ByteBufferSend` and `MultiRecordsSend`.
* Restrict access to methods in `QuotaTestClients` to expose only what we need
to.
Reviewers: Jun Rao <junrao@gmail.com>
If a broker contains 8k replicas, we would previously issue 8k ZK calls to retrieve topic
configs when processing the first LeaderAndIsr request. That should translate to 0 after
these changes.
Credit to @junrao for identifying the problem.
Reviewers: Jun Rao <junrao@gmail.com>
This PR updates the algorithm which limits the number of members within a group (`group.max.size`) to fix the following two issues:
1. As described in KAFKA-9885, we found out that multiple members of a group can be evicted if the leader of the consumer offset partition changes before the group is persisted. This happens because the current eviction logic always evict the first member rejoining the group.
2. We also found out that dynamic members, when required to have a known member id, are not always limited. The caveat is that the current logic only considers unknown members and uses the group size, which does not include the so called pending members, to accept or reject a member. In this case, when they rejoins, they are not unknown member anymore and thus could bypass the limit. See `testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember` for the whole scenario.
This PR changes the logic to address the above two issues and extends the tests coverage to cover all the member types.
Reviewers: Jason Gustafson <jason@confluent.io>
In this commit we made sure that the auto leader election only happens after the newly starter broker is in the isr.
No accompany tests are added due to the fact that:
this is a change to the private method and no public facing change is made
it is hard to create tests for this change without considerable effort
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Jun Rao <junrao@gmail.com>
A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker.
With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
Add an option to kafka-configs.sh `--add-config-file` that adds the configs from a properties file.
Testing: Added new tests to ConfigCommandTest.scala
Author: Aneel Nazareth <aneel@confluent.io>
Reviewers: David Jacot <djacot@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#8184 from WanderingStar/KAFKA-9612
A partition is under reassignment if the either the set of adding
replicas or set removing replicas is non-empty.
Fix the test assertion such that it prints stdout on failure.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This patch fixes a race condition in the join group request handling which sometimes results in not enforcing the maximum number of members allowed in a group.
Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
For join / sync / commit / heartbeat request, we would remember the sent generation in the created handler object, and then upon getting the error code, we could check whether the sent generation still matches the current generation. If not, it means that the member has already reset its generation or has participated in a new rebalance already. This means:
1. For join / sync-group request, we do not need to call reset-generation any more for illegal-generation / unknown-member. But we would still set the error since at a given time only one join/sync round-trip would be in flight, and hence we should not be participating in a new rebalance. Also for fenced instance error we still treat it as fatal since we should not be participating in a new rebalance, so this is still not expected.
2. For commit request, we do not set the corresponding error for illegal-generation / unknown-member / fenced-instance but raise rebalance-in-progress. For commit-sync it would be still thrown to user, while for commit-async it would be logged and swallowed.
3. For heartbeat request, we do not treat illegal-generation / unknown-member / fenced-instance errors and just consider it as succeeded since this should be a stale heartbeat which can be ignored.
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
* Upgrade to Scala 2.13.2 which introduces the ability to suppress warnings.
* Upgrade to scala-collection-compat 2.1.6 as it introduces the
@nowarn annotation for Scala 2.12.
* While at it, also update scala-java8-compat to 0.9.1.
* Fix compiler warnings and add @nowarn for the unfixed ones.
Scala 2.13.2 highlights (besides @nowarn):
* Rewrite Vector (using "radix-balanced finger tree vectors"),
for performance. Small vectors are now more compactly
represented. Some operations are now drastically faster on
large vectors. A few operations may be a little slower.
* Matching strings makes switches in bytecode.
https://github.com/scala/scala/releases/tag/v2.13.2
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Also:
* Remove deprecated `=` in resolutionStrategy.
* Replace `AES/GCM/PKCS5Padding` with `AES/GCM/NoPadding`
in `PasswordEncoderTest`. The former is invalid and JDK 14 rejects it,
see https://bugs.openjdk.java.net/browse/JDK-8229043.
With these changes, the build works with Java 14 and Scala 2.12. The
same will apply to Scala 2.13 when Scala 2.13.2 is released (should
happen within 1-2 weeks).
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Matthias J. Sax <matthias@confluent.io>
The impact of trace logging is normally small, on the order of 40ns per getEffectiveLevel check, however this adds up with trace is called multiple times per partition in the replica fetch hot path.
This PR removes some trace logs that are not very useful and reduces cases where the level is checked over and over for one fetch request.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Adding a dynamically updatable log config is currently error prone, as it is easy to
set them up as a val not a def and this would result in a dynamically updated
broker default not applying to a LogConfig after broker restart.
This PR adds a guard against introducing these issues by ensuring that all log
configs are exhaustively checked via a test.
For example, if the following line was a val and not a def, there would be a
problem with dynamically updating broker defaults for the config.
4bde9bb3cc/core/src/main/scala/kafka/server/KafkaConfig.scala (L1216)
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Ismael Juma <ismael@juma.me.uk>
This commit reworks the SocketServer to always start the acceptor threads after the processor threads and to always stop the acceptor threads before the processor threads. It ensures that the acceptor shutdown is not blocked waiting on the processors to be fully shutdown by decoupling the shutdown signal and the awaiting. It also ensure that the processor threads drain its newConnection queue to unblock acceptors that may be waiting. However, the acceptors still bind during the startup, only the processing of new connections and requests is further delayed.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
`KafkaApis#handleOffsetDeleteRequest` does not build the response correctly because `topics.add` is not in the correct loop. Fortunately, due to how the response is processed by the admin client, it works but sends redundant information on the wire.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch moves the state change logger logs for handling a LeaderAndIsr/StopReplica request inside the replicaStateChangeLock in order to serialize the logs. This helps to tell apart per-partition actions of concurrent LAIR/StopReplica requests in cases where requests pile up waiting on the lock.
Reviewer: Jun Rao <junrao@gmail.com>
QuotaViolationException generates an exception message via String.format in the constructor
even though the message is often not used, e.g. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L258. We now override `toString` instead.
It also generates an unnecessary stack trace, which is now avoided using the same pattern as in ApiException.
I have also avoided use of QuotaViolationException for control flow in
ReplicationQuotaManager which is another hotspot that we have seen in practice.
Reviewers: Gwen Shapira <gwen@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
There are two cases in the fetch pass where a partition is unnecessarily looked up
from the partition Pool, when one is already accessible. This will be a fairly minor
improvement on high partition count clusters, but could be worth 1% from some
profiles I have seen.
More importantly, the code is cleaner this way.
Reviewers: Ismael Juma <ismael@juma.me.uk>
FetchRequest.PartitionData.equals unnecessarily uses Object.equals generating a lot of allocations due to boxing, even though primitives are being compared. This is shown in the allocation profile below. Note that the CPU overhead is negligble.

![image](https://user-images.githubusercontent.com/252189/79079019-46686300-7cc1-11ea-9bc9-44fd17bae888.png)
Author: Lucas Bradstreet <lucasbradstreet@gmail.com>
Reviewers: Chia-Ping Tsai, Gwen Shapira
Closes#8473 from lbradstreet/avoid-boxing-partition-data-equals
In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages.
This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment.
This fix has two parts:
It changes logSegments to handle start==end by returning an empty List always.
It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow.
These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning.
Reviewer: Jun Rao <junrao@gmail.com>