This change sets the groundwork for migrating other modules incrementally.
Main changes:
- Replace `junit` 4.13 with `junit-jupiter` and `junit-vintage` 5.7.0-RC1.
- All modules except for `tools` depend on `junit-vintage`.
- `tools` depends on `junit-jupiter`.
- Convert `tools` tests to JUnit 5.
- Update `PushHttpMetricsReporterTest` to use `mockito` instead of `powermock` and `easymock`
(powermock doesn't seem to work well with JUnit 5 and we don't need it since mockito can mock
static methods).
- Update `mockito` to 3.5.7.
- Update `TestUtils` to use JUnit 5 assertions since `tools` depends on it.
Unrelated clean-ups:
- Remove `unit` from package names in a few `core` tests.
- Replace `try/catch/fail` with `assertThrows` in a number of places.
- Tag `CoordinatorTest` as integration test.
- Remove unnecessary type parameters when invoking methods and constructors.
Tested with IntelliJ and gradle. Verified that the following commands work as expected:
* ./gradlew tools:unitTest
* ./gradlew tools:integrationTest
* ./gradlew tools:test
* ./gradlew core:unitTest
* ./gradlew core:integrationTest
* ./gradlew clients:test
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
1. Split the consumer coordinator's REBALANCING state into PREPARING_REBALANCE and COMPLETING_REBALANCE. The first is when the join group request is sent, and the second is after the join group response is received. During the first state we should still not send hb since it shares the same socket with the join group request and the group coordinator has disabled timeout, however when we transit to the second state we should start sending hb in case leader's assign takes long time. This is also for fixing KAFKA-10122.
2. When deciding coordinator#timeToNextPoll, do not count in timeToNextHeartbeat if the state is in UNJOINED or PREPARING_REBALANCE since we would disable hb and hence its timer would not be updated.
3. On the broker side, allow hb received during PREPARING_REBALANCE, return NONE error code instead of REBALANCE_IN_PROGRESS. However on client side, we still need to ignore REBALANCE_IN_PROGRESS if state is COMPLETING_REBALANCE in case it is talking to an old versioned broker.
4. Piggy-backing a log4j improvement on the broker coordinator for triggering rebalance reason, as I found it a bit blurred during the investigation. Also subsumed #9038 with log4j improvements.
The tricky part for allowing hb during COMPLETING_REBALANCE is in two parts: 1) before the sync-group response is received, a hb response may have reset the generation; also after the sync-group response but before the callback is triggered, a hb response can still reset the generation, we need to handle both cases by checking the generation / state. 2) with the hb thread enabled, the sync-group request may be sent by the hb thread even if the caller thread did not call poll yet.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
The main changes of this PR are shown below.
1. replace tryLock by lock for DelayedOperation#maybeTryComplete
2. complete the delayed requests without holding group lock
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
The leader epoch cache is incorrectly recovered for epoch 0 as the
assignment is skipped when epoch == 0. This check was likely intended to
prevent negative epochs from being applied or there was an assumption
that epochs started at 1.
A test has been added to LogSegmentTest to show the LogSegment
recovery path works for the epoch cache. This was a test gap as none of the
recover calls supply a leader epoch cache to recover.
Reviewers: Jason Gustafson <jason@confluent.io>
Implement the KIP-554 API to create, describe, and alter SCRAM user configurations via the AdminClient. Add ducktape tests, and modify JUnit tests to test and use the new API where appropriate.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Rajini Sivaram <rajinisivaram@googlemail.com>
Make sure that we set the isNew field in LeaderAndIsrRequest correctly for brokers
that gets added to the replica set on reassignment.
This is tested by creating a variant of ControllerIntergationTest.testPartitionReassignment()
that makes one of the log directories on the target broker offline before initiating the
reassignment. Without the change to the way isNew is set, this fails after a timeout. With
the change, it succeeds.
To facilitate calling causeLogDirFailure() both from ControllerIntegrationTest and
LogDirFailureTest, the method was moved to TestUtils along with the other helper
methods that deals with interacting with KafkaServer instances for test cases.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
MetadataCache#getPartitionMetadata returns an error when the topic's leader Id
is present at MetadataCache but listener endpoint is not present for this leader.
For older versions, LEADER_NOT_AVAILABLE is returned while LISTENER_NOT_FOUND is
returned for new metadata versions.
The problem is that getPartitionMetadata was looking up MetadataCache's host brokerId rather
than the topic's leader id while determining what error to return. This
could result in the call returning LISTENER_NOT_FOUND when it should
have returned LEADER_NOT_AVAILABLE. This commit corrects this behavior.
Unit tests were already present to test out the error codes returned
under different situations but they were giving out a false positive.
The test was using same broker id for both the MetadataCache's host as
well as for the topic's leader. Error manifests when the MetadataCache's
host id is changed. Improved the test.
This commit also consolidated couple of related tests to reduce code
duplication.
Reviewers: Jason Gustafson <jason@confluent.io>
Implements the part of KIP-612 that adds broker configurations for broker-wide and per-listener connection creation rate limits and enforces these limits.
Reviewers: David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
Add a separate error code as PRODUCER_FENCED to differentiate INVALID_PRODUCER_EPOCH. On broker side, replace INVALID_PRODUCER_EPOCH with PRODUCER_FENCED when the request version is the latest, while still returning INVALID_PRODUCER_EPOCH to older clients. On client side, simply handling INVALID_PRODUCER_EPOCH the same as PRODUCER_FENCED if from txn coordinator APIs.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The patch https://github.com/apache/kafka/pull/8672 introduced a bug leading to crashing the replica fetcher threads. The issue is that https://github.com/apache/kafka/pull/8672 deletes the Partitions prior to stopping the replica fetchers. As the replica fetchers relies access the Partition in the ReplicaManager, they crash with a NotLeaderOrFollowerException that is not handled.
This PR reverts the code to the original ordering to avoid this issue.
The regression was caught and validated by our system test: `kafkatest.tests.core.reassign_partitions_test`.
Reviewers: Vikas Singh <vikas@confluent.io>, Jason Gustafson <jason@confluent.io>
This PR improves the logging for segment deletion to ensure that a reason is logged for segment deletions via all code paths. It also updates the logging so that we log a reason for an entire batch of deletions instead of logging one message per segment in cases when segment-level details are not significant.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Kowshik Prakasam <kprakasam@confluent.io>, Jason Gustafson <jason@confluent.io>
JIRA: https://issues.apache.org/jira/browse/KAFKA-10193
* add `preempt(): Unit` method for all `ControllerEvent` so that all events (and future events) must implement it
* for events that have callbacks, move the preemption from individual methods to `preempt()`
* add preemption for `ApiPartitionReassignment` and `ListPartitionReassignments`
* add integration tests:
1. test whether `preempt()` is called when controller shuts down
2. test whether the events with callbacks have the correct error response (`NOT_CONTROLLER`)
* explicit typing for `ControllerEvent` methods
Author: jeff kim <jeff.kim@confluent.io>
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>,Stanislav Kozlovski <stanislav@confluent.io>, David Arthur <mumrah@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#9050 from jeffkbkim/KAFKA-10193-controller-events-add-preemption
Based on the discussion in #9072, I have put together an alternative way. This one does the following:
Instead of changing the implementation of the Rate to behave like a Token Bucket, it actually use two different metrics: the regular Rate and a new Token Bucket. The latter is used to enforce the quota.
The Token Bucket algorithm uses the rate of the quota as the refill rate for the credits and compute the burst based on the number of samples and their length (# samples * sample length * quota).
The Token Bucket algorithm used can go under zero in order to handle unlimited burst (e.g. create topic with a number of partitions higher than the burst). Throttling kicks in when the number of credits is under zero.
The throttle time is computed as credits under zero / refill rate (or quota).
Only the controller mutation uses it for now.
The remaining number of credits in the bucket is exposed with the tokens metrics per user/clientId.
Reviewers: Anna Povzner <anna@confluent.io>, Jun Rao <junrao@gmail.com>
Currently, we remove the Log metrics when asynchronous deletion of the log is triggered. However, we attempt to register the metrics immediately upon log creation. If a Log object is re-created for a partition that is pending deletion (because a topic was quickly re-created or because a partition was moved off and back onto a broker), the registration of the new metrics can happen before the asyncrhonous deletion. In this case, the metrics are removed after the second registration, leading to missing Log metrics.
To fix this, this patch changes the log deletion behavior to remove the metrics when the log is first marked for deletion, rather than when the files are deleted. This removes the window in which metrics registration can occur before metrics removal. This is justifiable because the log should be logically deleted when a delete request or partition movement finishes, rather than when the files are actually removed. Tested with unit tests.
Author: Bob Barrett <bob.barrett@confluent.io>
Reviewers: David Jacot, Dhruvil Shah, Vikas Singh, Gwen Shapira
Closes#9054 from bob-barrett/KAFKA-10282
Refactored FetchRequest and FetchResponse to use the generated message classes for serialization and deserialization. This allows us to bypass unnecessary Struct conversion in a few places. A new "records" type was added to the message protocol which uses BaseRecords as the field type. When sending, we can set a FileRecords instance on the message, and when receiving the message class will use MemoryRecords.
Also included a few JMH benchmarks which indicate a small performance improvement for requests with high partition counts or small record sizes.
Reviewers: Jason Gustafson <jason@confluent.io>, Boyang Chen <boyang@confluent.io>, David Jacot <djacot@confluent.io>, Lucas Bradstreet <lucas@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, David Jacot <djacot@confluent.io>, Lee Dongjin <dongjin@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Modified KafkaProducer.sendOffsetsToTransaction() to be affected with max.block.ms, and added timeout test for blocking methods
Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Xi Hu <huxi_2b@hotmail.com>
Add a broker to controller channel manager for use cases such as redirection and AlterIsr.
Reviewers: David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
Co-authored-by: Viktor Somogyi <viktorsomogyi@gmail.com>
Co-authored-by: Boyang Chen <boyang@confluent.io>
Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow
for throttling to take place. This helps avoid a race condition where the
reassignment would complete more quickly than expected causing an
assertion to fail some times.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
https://issues.apache.org/jira/browse/KAFKA-10305
When `kafka-consumer-perf-test.sh` is executed without required options or no options at all, only the error message is displayed. It's better off showing the usage as well like what we did for kafka-console-producer.sh.
We would previously update the map by adding the new replicas to the map and then removing the old ones.
During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it.
While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to
the previous behavior of showing the intermediate state of the map with extra replicas, rather than an
intermediate state of the map with no replicas.
Reviewers: Ismael Juma <ismael@juma.me.uk>
* KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
https://issues.apache.org/jira/browse/KAFKA-10268
Currently, ConfigCommand --delete-config API does not restore the config to default value, no matter at broker-level or broker-default level. Besides, Admin.incrementalAlterConfigs API also runs into this problem. This patch fixes it by removing the corresponding config from the newConfig properties when reconfiguring dynamic broker config.
This flaky test exists for a long time, and it happened more frequently recently. (also happened in my PR testing!! :( ) In KAFKA-8264 and KAFKA-8460, it described the issue for this test is that
> Timed out before consuming expected 2700 records. The number consumed was xxxx
I did some investigation. This test is to test:
> we consume all partitions if fetch max bytes and max.partition.fetch.bytes are low.
And what it did, is to create 3 topics and 30 partitions for each. And then, iterate through all 90 partitions to send 30 records for each. Finally, verify the we can consume all the records successfully.
What the error message saying is that it cannot consume all the records in time (might be the busy system) So, we can actually decrease the record size to avoid it. I checked all the error messages we collected in KAFKA-8264 and KAFKA-8460, the failed cases can always consume at least 1440 up (total is 2700). So, I set the records half size of the original setting, it'll become 1350 records in total. It should make this test more stable.
Author: Luke Chen <showuon@gmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#8885 from showuon/KAFKA-8264
This PR implements the broker side changes of KIP-599, except the changes of the Rate implementation which will be addressed separately. The PR changes/introduces the following:
- It introduces the protocol changes.
- It introduces a new quota manager ControllerMutationQuotaManager which is another specialization of the ClientQuotaManager.
- It enforces the quota in the KafkaApis and in the AdminManager. This part handles new and old clients as described in the KIP.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
- part of KIP-572
- deprecates producer config `retries` (still in use)
- deprecates admin config `retries` (still in use)
- deprecates Kafka Streams config `retries` (will be ignored)
- adds new Kafka Streams config `task.timeout.ms` (follow up PRs will leverage this new config)
Reviewers: John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>, Randall Hauch <randall@confluent.io>
- After #8312, older brokers are returning empty configs, with latest `adminClient.describeConfigs`. Old brokers are receiving empty configNames in `AdminManageer.describeConfigs()` method. Older brokers does not handle empty configKeys. Due to this old brokers are filtering all the configs.
- Update ClientCompatibilityTest to verify describe configs
- Add test case to test describe configs with empty configuration Keys
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#9046 from omkreddy/KAFKA-9432
add a timeout for event queue time histogram;
reset eventQueueTimeHist when the controller event queue is empty;
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Igor Soarez <i@soarez.me>, Jun Rao <junrao@gmail.com>
Brokers currently return NOT_LEADER_FOR_PARTITION to producers and REPLICA_NOT_AVAILABLE to consumers if a replica is not available on the broker during reassignments. Non-Java clients treat REPLICA_NOT_AVAILABLE as a non-retriable exception, Java consumers handle this error by explicitly matching the error code even though it is not an InvalidMetadataException. This PR renames NOT_LEADER_FOR_PARTITION to NOT_LEADER_OR_FOLLOWER and uses the same error for producers and consumers. This is compatible with both Java and non-Java clients since all clients handle this error code (6) as retriable exception. The PR also makes ReplicaNotAvailableException a subclass of InvalidMetadataException.
- ALTER_REPLICA_LOG_DIRS continues to return REPLICA_NOT_AVAILABLE. Retained this for compatibility since this request never returned NOT_LEADER_FOR_PARTITION earlier.
- MetadataRequest version 0 also returns REPLICA_NOT_AVAILABLE as topic-level error code for compatibility. Newer versions filter these out and return Errors.NONE, so didn't change this.
- Partition responses in MetadataRequest return REPLICA_NOT_AVAILABLE to indicate that one of the replicas is not available. Did not change this since NOT_LEADER_FOR_PARTITION is not suitable in this case.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Bob Barrett <bob.barrett@confluent.io>
When fencing producers, we currently blindly bump the epoch by 1 and write an abort marker to the transaction log. If the log is unavailable (for example, because the number of in-sync replicas is less than min.in.sync.replicas), we will roll back the attempted write of the abort marker, but still increment the epoch in the transaction metadata cache. During periods of prolonged log unavailability, producer retires of InitProducerId calls can cause the epoch to be increased to the point of exhaustion, at which point further InitProducerId calls fail because the producer can no longer be fenced. With this patch, we track whenever we have failed to write the bumped epoch, and when that has happened, we don't bump the epoch any further when attempting to fence. This is safe because the in-memory epoch is still causes old producers to be fenced.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>
Update checkpoint files once for all deleted partitions instead of updating them for each deleted partitions. With this, a stop replica requests with 2000 partitions to be deleted takes ~2 secs instead of ~40 secs previously.
Refactor the checkpointing methods to not compute the logsByDir all the time. It is now reused as much as possible.
Refactor the exception handling. Some checkpointing methods were handling IOException but the underlying write process already catches them and throws KafkaStorageException instead.
Reduce the logging in the log cleaner manager. It does not log anymore when a partition is deleted as it is not a useful information.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Author: Tom Bentley <tbentley@redhat.com>
Reviewers: David Jacot <djacot@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#8808 from tombentley/KAFKA-10109-AclComment-multiple-AdminClients
Add null check for configurationKey to avoid NPE, and add test for it.
Author: Luke Chen <showuon@gmail.com>
Reviewers: Tom Bentley <tbentley@redhat.com>, huxi <huxi_2b@hotmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#8966 from showuon/KAFKA-10220
Call KafkaStreams#cleanUp to reset local state before starting application up the second run.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>