This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
Avoid busy waiting for processable tasks. We need to be a bit careful here to not have the task executors to sleep when work is available. We have to make sure to signal on the condition variable any time a task becomes "processable". Here are some situations where a task becomes processable:
- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.
So in summary, we
- We should probably lock tasks when they are paused and unlock them when they are resumed. We should also wake the task executors after every polling phase. This belongs to the StreamThread integration work (separate PR). We add DefaultTaskManager.signalProcessableTasks for this.
- We need to awake the task executors in DefaultTaskManager.unassignTask, DefaultTaskManager.unlockTasks and DefaultTaskManager.add.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@apache.org>
This PR moves GetOffsetShell from core module to tools module with rewriting from Scala to Java.
Reviewers: Federico Valeri fedevaleri@gmail.com, Ziming Deng dengziming1993@gmail.com, Mickael Maison mimaison@apache.org.
- RSM and RLMM classpath can be empty since it's optional so removed the non-empty string validator
- Fix getting the `localTieredStorage` by brokerId after stopping a broker.
Reviewers: Christo Lolov <lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
This patch allows broker heartbeat events to be completed while a metadata transaction is in-flight.
More generally, this patch allows any RUNS_IN_PREMIGRATION event to complete while the controller
is in pre-migration mode even if the migration transaction is in-flight.
We had a problem with broker heartbeats timing out because they could not be completed while a large
ZK migration transaction was in-flight. This resulted in the controller fencing all the ZK brokers which
has many undesirable downstream effects.
Reviewers: Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Colin Patrick McCabe <cmccabe@apache.org>
Log at warning level when remote fetch execution is rejected. This can be potentially useful to troubleshoot UNKNOWN_SERVER_ERROR responses for remote fetch requests.
Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
In the Windows OS atomic move are not allowed if the file has another open handle. E.g
__cluster_metadata-0\quorum-state: The process cannot access the file because it is being used by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
at java.base/java.nio.file.Files.move(Files.java:1430)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
This is fixed by first closing the temporary quorum-state file before attempting to move it.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
Co-Authored-By: Renaldo Baur Filho <renaldobf@gmail.com>
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.
Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.
In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.
The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest
On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.
QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
This restores previous behavior for Admin::listOffsets, which was to fail immediately if topic metadata could not be found, and only retry if metadata for one or more specific partitions could not be found.
There is a subtle difference here: prior to https://github.com/apache/kafka/pull/13432, the operation would be retried if any metadata error was reported for any individual topic partition, even if an error was also reported for the entire topic. With this change, the operation always fails if an error is reported for the entire topic, even if an error is also reported for one or more individual topic partitions.
I am not aware of any cases where brokers might return both topic- and topic partition-level errors for a metadata request, and if there are none, then this change should be safe. However, if there are such cases, we may need to refine this PR to remove the discrepancy in behavior.
Reviewers: Justine Olshan <jolshan@confluent.io>
* Added the integration test for DELETE_RECORDS API for tiered storage enabled topic
* Added validation checks before removing remote log segments for log-start-offset breach
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
This patch adds the schemas of the new ConsumerGroupDescribe API (KIP-848) and adds an handler to the KafkaApis.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
On leadership failover, the new leader's start offset may be older than the start offset of old leader. This works fine for local storage scenario because the new leader still contains data associated with stale start offset. But in case of remote storage, although new leader has a stale offset, the data associated with it has been deleted from remote by the old leader. Hence, we end up in a situation where leader has a start offset but no data associated with it.
This commit fixes the situation by ensuring that on every leadership failover, for topics with remote storage, the leader will update it's start offset from the base of first segment in current leader chain present in the remote storage (if any).
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>
A mocked method is executed unexpectedly when we enable DEBUG
log level, leading to confusing test failures during debugging.
Since the log message itself seems useful, we adapt the test
to take the additional mocked method call into account).
Reviewer: Bruno Cadonna <cadonna@apache.org>
This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.
Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
Implementation of KIP-580 to add exponential back-off to situations in which retry.backoff.ms
is used to delay backoff attempts. This KIP adds exponential backoff behavior with a maximum
controlled by a new config retry.backoff.max.ms, together with a +/- 20% of jitter to spread the
retry attempts of the client fleet.
Reviewers: Mayank Shekhar Narula <mayanks.narula@gmail.com>, Milind Luthra <i.milind.luthra@gmail.com>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
- Updated the log-start-offset to the correct value while building the replica state in ReplicaFetcherTierStateMachine#buildRemoteLogAuxState
Integration tests added:
1. ReassignReplicaExpandTest
2. ReassignReplicaMoveTest and
3. ReassignReplicaShrinkTest
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This will allow enabling and disabling transaction verification (KIP-890 part 1) without having to roll the cluster.
Tested that restarting the cluster persists the configuration.
If a verification is disabled/enabled while we have an inflight request, depending on the step of the process, the change may or may not be seen in the inflight request (enabling will typically fail unverified requests, but we may still verify and reject when we first disable) Subsequent requests/retries will behave as expected for verification.
Sequence checks will continue to take place after disabling until the first message is written to the partition (thus clearing the verification entry with the tentative sequence) or the broker restarts/partition is reassigned which will clear the memory. On enabling, we will only track sequences that for requests received after the verification is enabled.
Reviewers: Jason Gustafson <jason@confluent.io>, Satish Duggana <satishd@apache.org>
Added the below integration tests with tiered storage
- PartitionsExpandTest
- DeleteSegmentsByRetentionSizeTest
- DeleteSegmentsByRetentionTimeTest and
- EnableRemoteLogOnTopicTest
- Enabled the test for both ZK and Kraft modes.
These are enabled for both ZK and Kraft modes.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>
A follow-up to https://github.com/apache/kafka/pull/13804.
This follow-up adds the alternative fix approach mentioned in
the PR above - bumping the session timeout used in the test
with 1 second.
Reproducing the flake-out locally has been much harder than
on the CI runs, as neither Gradle with Java 11 or Java 14 nor
IntelliJ with Java 14 could show it, but IntelliJ with Java 11
could occasionally reproduce the failure the first time
immediately after a rebuild. While I was unable to see the
failure with the bumped session timeout, the testing procedure
definitely didn't provide sufficient reassurance for the
fix as even without it often I'd see hundreds of consecutive
successful test runs when the first run didn't fail.
Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
This change is about RLM task handling retriable exception when it tries to copy segments to remote but the RLMM is not yet initialized. On encountering the exception, we log the error and throw the exception back to the caller. We also make sure that the failure metrics are updated since this is a temporary error because RLMM is not yet initialized.
Added unit tests to verify RLM task does not attempt to copy segments to remote on encountering the retriable exception and that failure metrics remain unchanged.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
In SocketServerTest, we create SocketServer and enableRequestProcessing on each test class initialization. That's fine since we shutdown it in @AfterEach. The issue we have is we disabled 2 tests in this test suite. And when running these disabled tests, we will go through class initialization, but without @AfterEach. That causes 2 network thread leaked.
Compared the error message in DynamicBrokerReconfigurationTest#testThreadPoolResize test here:
org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: List(data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> expected: <true> but was: <false>
The 2 unexpected network threads are leaked from SocketServerTest.
Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Chris Egerton <chrise@aiven.io>
Resets the value of transactionInFlight to false when closing the
StreamsProducer. This ensures we don't try to commit against a
closed producer
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Preliminary fix for KAFKA-15429 which updates StreamThread.completeShutdown to
catch-and-log errors from consumer.unsubscribe. Though this does not prevent
the exception, it does preserve the original exception that caused the stream
thread to exit.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Implementation of the OffsetRequestsManager, responsible for building requests and processing responses for requests related to partition offsets.
In this PR, the manager includes support for ListOffset requests, generated when the user makes any of the following consumer API calls:
beginningOffsets
endOffsets
offsetsForTimes
All previous consumer API calls interact with the OffsetsRequestsManager by generating a ListOffsetsApplicationEvent.
Includes tests to cover the new functionality and to ensure no API level changes are introduced.
This covers KAFKA-14965 and KAFKA-15081.
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
This patch makes the styling consistent inside GroupMetadataManagerTest. Also, it adds JoinResult to simplify the JoinGroup API responses in the tests.
Reviewers: David Arthur <mumrah@gmail.com>, David Jacot <djacot@confluent.io>
This patch refactors the GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets methods to take an OffsetFetchRequestGroup and to return an OffsetFetchResponseGroup. It prepares the ground for adding the member id and the member epoch to the OffsetFetchRequest. This change also makes those two methods more aligned with the others in the interface.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should not reset its offset to first-local-log-segment-base-offset.
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>