This patch adds reconciliation logic to migrating ZK brokers to deal with pending topic deletions as well as missed StopReplicas.
During the hybrid mode of the ZK migration, the KRaft controller is asynchronously sending UMR and LISR to the ZK brokers to propagate metadata. Since this process is essentially "best effort" it is possible for a broker to miss a StopReplicas. The new logic lets the ZK broker examine its local logs compared with the full set of replicas in a "Full" LISR. Any local logs which are not present in the set of replicas in the request are removed from ReplicaManager and marked as "stray".
To avoid inadvertent data loss with this new behavior, the brokers do not delete the "stray" partitions. They will rename the directories and log warning messages during log recovery. It will be up to the operator to manually delete the stray partitions. We can possibly enhance this in the future to clean up old stray logs.
This patch makes use of the previously unused Type field on LeaderAndIsrRequest. This was added as part of KIP-516 but never implemented. Since its introduction, an implicit 0 was sent in all LISR. The KRaft controller will now send a value of 2 to indicate a full LISR (as specified by the KIP). The presence of this value acts as a trigger for the ZK broker to perform the log reconciliation.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
ConsumerGroupCommand contains code duplications for table row format.
This PR reduces code duplication and make it more clear and easy to understand.
Reviewers: Luke Chen <showuon@gmail.com>, hudeqi <1217150961@qq.com>
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.
The bug could be reproduced as per the following order of events:-
Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.
This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Fix test FetchRequestTest.testLastFetchedEpochValidation for KRaft mode
The test fails due to unexpected error (OFFSET_OUT_OF_RANGE) when enabled with KRaft mode.
The reason it takes longer to set the leader epoch in KRaft mode is because of the way the topic partitions are created differently than Zookeeper. In Zookeeper mode, we create the topic partitions directly with Zookeeper therefore seem to take less time to create the logs and set leader epoch on broker. In KRaft mode, we use Admin client to create topic partitions. Even though the test waits for topic partitions to get created and appear in metadata cache, it doesn’t seem to be sufficient time for leader epoch to get set on the brokers.
Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>
I've added a new class with an incrementing atomic long to represent the verification guard. Upon creation of verification guard, we will increment this value and assign it to the guard.
The expected behavior is the same as the object guard, but with better debuggability with the string value and type safety (I found a type safety issue in the current code when implementing this)
Reviewers: Ismael Juma <ismael@juma.me.uk>, Artem Livshits <alivshits@confluent.io>
In ConsumerGroupCommand, there are two methods: getLogEndOffsets and getLogStartOffsets, the first parameter groupId is not used, so remove it.
Reviewers: Luke Chen <showuon@gmail.com>
Fixed some of the failing tests in FetchRequestTest.
testFetchWithPartitionsWithIdError and testCreateIncrementalFetchWithPartitionsInErrorV12 fail with the following error when enabled with KRaft mode. These tests only fail sometimes when running locally but consistently failed when running in the Jenkins Pipeline.
Tests will call the utility function TestUtils.waitUntilLeaderIsKnown after creating the topic partitions so that they wait for the logs to be created on the leader before sending fetch requests.
Enabled all tests except checkLastFetchedEpochValidation with KRaft mode.
Looking at the build history in Jenkins, all the other tests except these 2 tests and checkLastFetchedEpochValidation were passing when they were enabled with KRaft mode. Therefore enabled them with KRaft mode again but left checkLastFetchedEpochValidation to be investigated further.
Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>
This patch introduces preliminary changes for Eligible Leader Replicas (KIP-966)
* New MetadataVersion 16 (3.7-IV1)
* New record versions for PartitionRecord and PartitionChangeRecord
* New tagged fields on PartitionRecord and PartitionChangeRecord
* New static config "eligible.leader.replicas.enable" to gate the whole feature
Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
The PR includes:
* Added a new class of CleanShutdownFile which helps write and read from a clean shutdown file.
* Updated the BrokerRegistration API.
* Client side handling for the broker epoch.
* Minimum work on the controller side.
Reviewers: Jun Rao <junrao@gmail.com>
Implements the following metrics:
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=loading
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=active
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=failed
kafka.server:type=group-coordinator-metrics,name=event-queue-size
kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
The PR makes these metrics generic so that in the future the transaction coordinator runtime can implement the same metrics in a similar fashion.
Also, CoordinatorLoaderImpl#load will now return LoadSummary which encapsulates the start time, end time, number of records/bytes.
Co-authored-by: David Jacot <djacot@confluent.io>
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
This is now possible since `InterBrokerSend` was moved from `core` to `server-common`.
Also rewrite/move `KafkaNetworkChannelTest`.
The scala version of `KafkaNetworkChannelTest` passed with the changes here (before I
deleted it).
Reviewers: Justine Olshan <jolshan@confluent.io>, José Armando García Sancio <jsancio@users.noreply.github.com>
Do not return fenced brokers from metadataCache.getPartitionReplicaEndpoints, since that could lead to
them getting used as preferred read replicas.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
A few notes:
* Delete a few methods from `UnifiedLog` that were simply invoking the related method in `LogFileUtils`
* Fix `CoreUtils.swallow` to use the passed in `logging`
* Fix `LogCleanerParameterizedIntegrationTest` to close `log` before reopening
* Minor tweaks in `LogSegment` for readability
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.
Reviewers: Divij Vaidya <diviv@amazon.com>
This patch implements the groups and offsets expiration in the new group coordinator.
Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
We introduced a bunch of flaky tests in #14295 , which are normal when running locally but will always fail in CI, lets rollback them unless we find the cause before the end of today.
Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>
A race can happen in the following sequence.
1. Stale Fetch triggers the ISR expansion.
2. The first time we check whether the replica is eligible. Catch up? Yes. broker epoch match? Yes (the metadata cache update has not happened)
3. Metadata cache update happens.
4. During the second time check the eligibility
a. Catch up? Yes
b. A new fetch request comes in. It cancels the replica caught-up and updates the broker epoch
c. broker epoch match? Yes. New fetch epoch = new metadata cache epoch
5. Send an AlterPartition request with the new broker epoch.
----------------
The solution is to make sure that the 4.a) ,4.c) and 5) use the same replica state.
Reviewers: David Mao <47232755+splett2@users.noreply.github.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down.
Reviewers: Luke Chen <showuon@gmail.com>, Kirk True <kirk@kirktrue.pro>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, vamossagar12 <sagarmeansocean@gmail.com>
est Cases Covered
1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
This patch fixes a problem where we migrate the current producer ID batch to KRaft instead of the next producer ID batch. Since KRaft stores the next batch in the log, we end up serving up a duplicate batch to the first caller of AllocateProducerIds once the KRaft controller has taken over.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
The LogAppendInfo class is a bit bloated in terms of class fields. That's because it is used as an umbrella class for both leader log appends and follower log appends and needs to carry fields for both. This makes the constructor for the class a bit cludgy to use. It also ends up being a bit confusing when fields are important and when they aren't. I noticed there were a few fields that didn't seem necessary.
Below is a description of changes:
firstOffset is a LogOffsetMetadata but there are no readers of the field that use anything but the messageOffset field - simplified to a long.
LogAppendInfo.errorMessage is only set in one context - when calling LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo. When we use this constructor, we pass up the original exception in LogAppendResult anyway, so the field is redundant with the LogAppendResult.exception field. This allows us to simplify the handling in KAFKA-15459: Convert coordinator retriable errors to a known producer response error #14378 since there are no custom error messages we just return whatever is in the exception message.
We only use targetCompressionType when constructing the LogValidator - just inline the call instead of including it in the LogAppendInfo.
offsetsMonotonic is only used when not assigning offsets to throw an exception - just throw the exception instead of setting a field to throw later.
shallowCount is only there to determine whether there are any messages in the append. Instead, we can just check validBytes which is incremented with a non-zero value every time we increment shallowCount.
Reviewers: Justine Olshan <jolshan@confluent.io>
This PR is part of #13247
It contains ReassignPartitionsIntegrationTest rewritten in java.
Goal of PR is reduce changes size in main PR.
Reviewers: Taras Ledkov <tledkov@apache.org>, Justine Olshan <jolshan@confluent.io>
Previous commits left out TxnOffsetCommits which go through the group coordinator (not directly from the producer).
I've wired up the methods to include the transactional id and state partition to do the verification.
I've also updated UnifiedLog to verify on client and coordinator requests that are transactional.
I've not updated any sequence check logic since the sequence is always 0 on group coordinator initiated writes.
Added returned errors to Response files. Both InvalidPidMapping and InvalidTxnState will be returned and be fatal for the transactional OffsetCommit requests.
Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
As a first step, we plan to release a preview of the new consumer group rebalance protocol without the client side assignor. This patch removes all the related fields from the ConsumerGroupHeartbeat API for now. Removing fields is fine here because this API is not released yet and not exposed by default. We will add them back while bumping the version of the request when we release this part in the future.
Reviewers: Justine Olshan <jolshan@confluent.io>
A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead.
Reviewers: Divij Vaidya <diviv@amazon.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
bump snappy-java version to 1.1.10.4, and add more tests to verify the compressed data can be correctly decompressed and read.
For LogCleanerParameterizedIntegrationTest, we increased the message size for snappy decompression since in the new version of snappy, the decompressed size is increasing compared with the previous version. But since the compression algorithm is not kafka's scope, all we need to do is to make sure the compressed data can be successfully decompressed and parsed/read.
Reviewers: Divij Vaidya <diviv@amazon.com>, Ismael Juma <ismael@juma.me.uk>, Josep Prat <josep.prat@aiven.io>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Add support for --bootstrap-controller in the following command-line tools:
- kafka-cluster.sh
- kafka-configs.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
To implement this, the following AdminClient APIs now support the new bootstrap.controllers
configuration:
- Admin.alterConfigs
- Admin.describeCluster
- Admin.describeConfigs
- Admin.describeFeatures
- Admin.describeMetadataQuorum
- Admin.incrementalAlterConfigs
- Admin.updateFeatures
Command-line tool changes:
- Add CommandLineUtils.initializeBootstrapProperties to handle parsing --bootstrap-controller
in addition to --bootstrap-server.
- Add --bootstrap-controller to ConfigCommand.scala, ClusterTool.java, FeatureCommand.java, and
MetadataQuorumCommand.java.
KafkaAdminClient changes:
- Add the AdminBootstrapAddresses class to handle extracting bootstrap.servers or
bootstrap.controllers from the config map for KafkaAdminClient.
- In AdminMetadataManager, store the new usingBootstrapControllers boolean. Generalize
authException to encompass the concept of fatal exceptions in general. (For example, the
fatal exception where we talked to the wrong node type.) Treat
MismatchedEndpointTypeException and UnsupportedEndpointTypeException as fatal exceptions.
- Extend NodeProvider to include information about whether bootstrap.controllers is supported.
- Modify the APIs described above to support bootstrap.controllers.
Server-side changes:
- Support DescribeConfigsRequest on kcontrollers.
- Add KRaftMetadataCache to the kcontroller to simplify implemeting describeConfigs (and
probably more APIs in the future). It's mainly a wrapper around MetadataImage, so there is
essentially no extra resource consumption.
- Split RuntimeLoggerManager out of ConfigAdminManager to handle the incrementalAlterConfigs
support for BROKER_LOGGER. This is now supported on kcontrollers as well as brokers.
- Fix bug in AuthHelper.computeDescribeClusterResponse that resulted in us always sending back
BROKER as the endpoint type, even on the kcontroller.
Miscellaneous:
- Fix a few places in exceptions and log messages where we wrote "broker" instead of "node".
For example, an exception in NodeApiVersions.java, and a log message in NetworkClient.java.
- Fix the slf4j log prefix used by KafkaRequestHandler logging so that request handlers on a
controller don't look like they're on a broker.
- Make the FinalizedVersionRange constructor public for the sake of a junit test.
- Add unit and integration tests for the above.
Reviewers: David Arthur <mumrah@gmail.com>, Doguscan Namal <namal.doguscan@gmail.com>
This patch refactors the ReplicaManager.appendRecords method and the AddPartitionsToTxnManager class in order to move the logic to identify the transaction coordinator based on the transaction id from the former to the latter. While working on KAFKA-14505, I found pretty annoying that we require to pass the transaction state partition to appendRecords because we have to do the same from the group coordinator. It seems preferable to delegate that job to the AddPartitionsToTxnManager.
Reviewers: Justine Olshan <jolshan@confluent.io>
When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments less than expected. This is a follow-up of KAFKA-15352
Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
It offers a quickfix action for certain errors, includes a number of bug fixes and it
introduces a new warning by default (https://github.com/scala/scala/pull/10462).
In addition to the scala version bump, we also fix the new compiler warnings and
bump the scalafmt version (the previous version failed with the new scala version).
Release notes: https://github.com/scala/scala/releases/tag/v2.13.12
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
This PR is part of #13247
It contains changes to rewrite single test in java.
Intention is reduce changes in parent PR.
Reviewers: Luke Chen <showuon@gmail.com>, Taras Ledkov <tledkov@apache.org>
Endpoint information provided by KafkaConfig can be incomplete in two ways. One is that endpoints
using ephemeral ports will show up as using port 0. Another is that endpoints binding to 0.0.0.0
will show up with a null or blank hostname. Because we were not accounting for this in controller
registration, it was leading to a null pointer dereference when we tried to register a controller
using an endpoint defined as PLAINTEXT://:9092.
This PR adds a ListenerInfo class which can fix both of the causes of incomplete endpoint
information. It also handles serialization to and from various RPC and record formats.
This allows us to remove a lot of boilerplate code and standardize the handling of listeners
between BrokerServer and ControllerServer.
Reviewers: David Arthur <mumrah@gmail.com>