A mocked method is executed unexpectedly when we enable DEBUG
log level, leading to confusing test failures during debugging.
Since the log message itself seems useful, we adapt the test
to take the additional mocked method call into account).
Reviewer: Bruno Cadonna <cadonna@apache.org>
This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.
Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
Implementation of KIP-580 to add exponential back-off to situations in which retry.backoff.ms
is used to delay backoff attempts. This KIP adds exponential backoff behavior with a maximum
controlled by a new config retry.backoff.max.ms, together with a +/- 20% of jitter to spread the
retry attempts of the client fleet.
Reviewers: Mayank Shekhar Narula <mayanks.narula@gmail.com>, Milind Luthra <i.milind.luthra@gmail.com>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
- Updated the log-start-offset to the correct value while building the replica state in ReplicaFetcherTierStateMachine#buildRemoteLogAuxState
Integration tests added:
1. ReassignReplicaExpandTest
2. ReassignReplicaMoveTest and
3. ReassignReplicaShrinkTest
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This will allow enabling and disabling transaction verification (KIP-890 part 1) without having to roll the cluster.
Tested that restarting the cluster persists the configuration.
If a verification is disabled/enabled while we have an inflight request, depending on the step of the process, the change may or may not be seen in the inflight request (enabling will typically fail unverified requests, but we may still verify and reject when we first disable) Subsequent requests/retries will behave as expected for verification.
Sequence checks will continue to take place after disabling until the first message is written to the partition (thus clearing the verification entry with the tentative sequence) or the broker restarts/partition is reassigned which will clear the memory. On enabling, we will only track sequences that for requests received after the verification is enabled.
Reviewers: Jason Gustafson <jason@confluent.io>, Satish Duggana <satishd@apache.org>
Added the below integration tests with tiered storage
- PartitionsExpandTest
- DeleteSegmentsByRetentionSizeTest
- DeleteSegmentsByRetentionTimeTest and
- EnableRemoteLogOnTopicTest
- Enabled the test for both ZK and Kraft modes.
These are enabled for both ZK and Kraft modes.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>
A follow-up to https://github.com/apache/kafka/pull/13804.
This follow-up adds the alternative fix approach mentioned in
the PR above - bumping the session timeout used in the test
with 1 second.
Reproducing the flake-out locally has been much harder than
on the CI runs, as neither Gradle with Java 11 or Java 14 nor
IntelliJ with Java 14 could show it, but IntelliJ with Java 11
could occasionally reproduce the failure the first time
immediately after a rebuild. While I was unable to see the
failure with the bumped session timeout, the testing procedure
definitely didn't provide sufficient reassurance for the
fix as even without it often I'd see hundreds of consecutive
successful test runs when the first run didn't fail.
Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
This change is about RLM task handling retriable exception when it tries to copy segments to remote but the RLMM is not yet initialized. On encountering the exception, we log the error and throw the exception back to the caller. We also make sure that the failure metrics are updated since this is a temporary error because RLMM is not yet initialized.
Added unit tests to verify RLM task does not attempt to copy segments to remote on encountering the retriable exception and that failure metrics remain unchanged.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
In SocketServerTest, we create SocketServer and enableRequestProcessing on each test class initialization. That's fine since we shutdown it in @AfterEach. The issue we have is we disabled 2 tests in this test suite. And when running these disabled tests, we will go through class initialization, but without @AfterEach. That causes 2 network thread leaked.
Compared the error message in DynamicBrokerReconfigurationTest#testThreadPoolResize test here:
org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: List(data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> expected: <true> but was: <false>
The 2 unexpected network threads are leaked from SocketServerTest.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Chris Egerton <chrise@aiven.io>
Resets the value of transactionInFlight to false when closing the
StreamsProducer. This ensures we don't try to commit against a
closed producer
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Preliminary fix for KAFKA-15429 which updates StreamThread.completeShutdown to
catch-and-log errors from consumer.unsubscribe. Though this does not prevent
the exception, it does preserve the original exception that caused the stream
thread to exit.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Implementation of the OffsetRequestsManager, responsible for building requests and processing responses for requests related to partition offsets.
In this PR, the manager includes support for ListOffset requests, generated when the user makes any of the following consumer API calls:
beginningOffsets
endOffsets
offsetsForTimes
All previous consumer API calls interact with the OffsetsRequestsManager by generating a ListOffsetsApplicationEvent.
Includes tests to cover the new functionality and to ensure no API level changes are introduced.
This covers KAFKA-14965 and KAFKA-15081.
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
This patch makes the styling consistent inside GroupMetadataManagerTest. Also, it adds JoinResult to simplify the JoinGroup API responses in the tests.
Reviewers: David Arthur <mumrah@gmail.com>, David Jacot <djacot@confluent.io>
This patch refactors the GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets methods to take an OffsetFetchRequestGroup and to return an OffsetFetchResponseGroup. It prepares the ground for adding the member id and the member epoch to the OffsetFetchRequest. This change also makes those two methods more aligned with the others in the interface.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should not reset its offset to first-local-log-segment-base-offset.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
Implement setting and clearing task timeouts, as well as changing the output on exceptions to make
it similar to the existing code path.
Reviewer: Walker Carlson <wcarlson@apache.org>
Disabled the below tests to fix the thread leak:
1. kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() and
2. org.apache.kafka.tiered.storage.integration.OffloadAndConsumeFromLeaderTest
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>
Minor fix to avoid creating unnecessary standby tasks, especially when these may be surprising or unexpected as in the case of an application with num.standby.replicas = 0 and warmup replicas disabled.
The "bug" here was introduced during the fix for an issue with cooperative rebalancing and in-memory stores. The fundamental problem is that in-memory stores cannot be unassigned from a consumer for any period, however temporary, without being closed and losing all the accumulated state. This caused some grief when the new HA task assignor would assign an active task to a node based on the readiness of the standby version of that task, but would have to remove the active task from the initial assignment so it could first be revoked from its previous owner, as per the cooperative rebalancing protocol. This temporary gap in any version of that task among the consumer's assignment for that one intermediate rebalance would end up causing the consumer to lose all state for it, in the case of in-memory stores.
To fix this, we simply began to place standby tasks on the intended recipient of an active task awaiting revocation by another consumer. However, the fix was a bit of an overreach, as we assigned these temporary standby tasks in all cases, regardless of whether there had previously been a standby version of that task. We can narrow this down without sacrificing any of the intended functionality by only assigning this kind of standby task where the consumer had previously owned some version of it that would otherwise potentially be lost.
Also breaks up some of the long log lines in the StreamsPartitionAssignor and expands the summary info while moving it all to the front of the line (following reports of missing info due to truncation of long log lines in larger applications)
When running in kraft mode, LogManager.startup is called in a different thread than the main broker (#14239)
startup thread (by BrokerMetadataPublisher when the first metadata update is received.) If a fatal
error happens during broker startup, before LogManager.startup is completed, LogManager.shutdown may
mark log dirs as clean shutdown improperly.
This PR includes following change:
1. During LogManager startup time:
- track hadCleanShutdwon info for each log dir
- track loadLogsCompleted status for each log dir
2. During LogManager shutdown time:
- do not write clean shutdown marker file for log dirs which have hadCleanShutdown==false and loadLogsCompleted==false
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Reading an unknown version of quorum-state-file should trigger an error. Currently the only known version is 0. Reading any other version should cause an error.
Reviewers: Justine Olshan <jolshan@confluent.io>, Luke Chen <showuon@gmail.com>
The purpose of this change is to not allow a broker to start up with Tiered Storage disabled (remote.log.storage.system.enable=false) while there are still topics that have 'remote.storage.enable' set.
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
Summary
Implemented wakeup() mechanism using a WakeupTrigger class to store the pending wakeup item, and when wakeup() is invoked, it checks whether there's an active task or a wakeup task.
If there's an active task: the task will be completed exceptionally and the atomic reference will be freed up.
If there an wakedup task, which means wakeup() was invoked before a blocking call was issued. Therefore, the current task will be completed exceptionally immediately.
This PR also addressed minor issues such as:
Throwing WakeupException at the right place: As wakeups are thrown by completing an active future exceptionally. The WakeupException is wrapped inside of the ExecutionException.
mockConstruction is a thread-lock mock; therefore, we need to free up the reference before completing the test. Otherwise, other tests will continue using the thread-lock mock.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
assertThrows makes the verification of exceptions clearer and more intuitive, thus improving code readability compared to the annotation approach. It is considered a test smell in the research literature. One possible research is due to developers not keeping up to date with recent versions of testing frameworks.
All such patterns in streams have been refactored.
Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch implements the OffsetFetch API in the new group coordinator.
I found out that implementing the `RequireStable` flag is hard (to not say impossible) in the current model. For the context, the flag is here to ensure that an OffsetRequest request does not return stale offsets if there are pending offsets to be committed. In the scala code, we basically check the pending offsets data structure and if they are any pending offsets, we return the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.
In our new model, we don't have the pending offsets data structure. Instead, we use a timeline data structure to handle all the pending/uncommitted changes. Because of this we don't know whether offsets are pending for a particular group. Instead of doing this, I propose to not return the `UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, we use a write operation to ensure that we read the latest offsets. If they are uncommitted offsets, the write operation ensures that the response is only return when they are committed. This gives a similar behaviour in the end.
Reviewers: Justine Olshan <jolshan@confluent.io>
As described in https://issues.apache.org/jira/browse/KAFKA-15353
When the AlterPartitionRequest version is < 3 and its builder.build is called multiple times, both newIsrWithEpochs and newIsr will all be empty. This can happen if the sender retires on errors.
Reviewers: Luke Chen <showuon@gmail.com>
This change does the following:
1. Make RemoteLogManagerConfigs that are implemented public
2. Add tasks to generate html docs for the configs
3. Include config docs in the main site
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
This PR adds the following changes to the `TopicBasedRemoteLogMetadataManager`
1. Added a guard in RemoteLogMetadataCache so that the incoming request can be served from the cache iff the corresponding user-topic-partition is initalized
2. Improve error handling in ConsumerTask thread so that is not killed when there are errors in reading the internal topic
3. ConsumerTask initialization should handle the case when there are no records to read
and some other minor changes
Added Unit Tests for the changes
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Reviewers: Luke Chen <showuon@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
When starting up a controller for the first time (i.e., with an empty log), it is possible for
MetadataLoader to publish an empty MetadataImage before the activation records of the controller
have been written. While this is not a bug, it could be confusing. This patch closes that gap by
waiting for at least one controller record to be committed before the MetadataLoader starts publishing
images.
Reviewers: Colin P. McCabe <cmccabe@apache.org>