The reason for KAFKA-13959 is a little complex, the two keys to this problem are:
KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW.
Here are the event sequences:
1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1)
2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m)
2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse
2.2. leader send FetchResponse(HW=m)
3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
3. Leader append NoOpRecord, LEO=m+2. leader HW=m
4. Looping 1-4
If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.
We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
This commit adds a check to ensure the RecordBatch CRC is valid when
iterating over a Batch of Records using the RecordsIterator. The
RecordsIterator is used by both Snapshot reads and Log Records reads in
Kraft. The check can be turned off by a class parameter and is on by default.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
Fixes two issues in the implementation of `LocalLogManager`:
- As per the interface contract for `RaftClient.scheduleAtomicAppend()`, it should throw a `NotLeaderException` exception when the provided current leader epoch does not match the current epoch. However, the current `LocalLogManager`'s implementation of the API returns a LONG_MAX instead of throwing an exception. This change fixes the behaviour and makes it consistent with the interface contract.
- As per the interface contract for `RaftClient.resign(epoch)`if the parameter epoch does not match the current epoch, this call will be ignored. But in the current `LocalLogManager` implementation the leader epoch might change when the thread is waiting to acquire a lock on `shared.tryAppend()` (note that tryAppend() is a synchronized method). In such a case, if a NotALeaderException is thrown (as per code change in above), then resign should be ignored.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Tom Bentley <tbentley@redhat.com>, Jason Gustafson <jason@confluent.io>
The kafka-dump-log command should accept files with a suffix of ".checkpoint". It should also decode and print using JSON the snapshot header and footer control records.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
Implement NoOpRecord as described in KIP-835. This is controlled by the new
metadata.max.idle.interval.ms configuration.
The KRaft controller schedules an event to write NoOpRecord to the metadata log if the metadata
version supports this feature. This event is scheduled at the interval defined in
metadata.max.idle.interval.ms. Brokers and controllers were improved to ignore the NoOpRecord when
replaying the metadata log.
This PR also addsffour new metrics to the KafkaController metric group, as described KIP-835.
Finally, there are some small fixes to leader recovery. This PR fixes a bug where metadata version
3.3-IV1 was not marked as changing the metadata. It also changes the ReplicaControlManager to
accept a metadata version supplier to determine if the leader recovery state is supported.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Since the StandardAuthorizer relies on the metadata log to store its ACLs, we need to be sure that
we have the latest metadata before allowing the authorizer to be used. However, if the authorizer
is not usable for controllers in the cluster, the latest metadata cannot be fetched, because
inter-node communication cannot occur. In the initial commit which introduced StandardAuthorizer,
we punted on the loading issue by allowing the authorizer to be used immediately. This commit fixes
that by implementing early.start.listeners as specified in KIP-801. This will allow in superusers
immediately, but throw the new AuthorizerNotReadyException if non-superusers try to use the
authorizer before StandardAuthorizer#completeInitialLoad is called.
For the broker, we call StandardAuthorizer#completeInitialLoad immediately after metadata catch-up
is complete, right before unfencing. For the controller, we call
StandardAuthorizer#completeInitialLoad when the node has caught up to the high water mark of the
cluster metadata partition.
This PR refactors the SocketServer so that it creates the configured acceptors and processors in
its constructor, rather than requiring a call to SocketServer#startup A new function,
SocketServer#enableRequestProcessing, then starts the threads and begins listening on the
configured ports. enableRequestProcessing uses an async model: we will start the acceptor and
processors associated with an endpoint as soon as that endpoint's authorizer future is completed.
Also fix a bug where the controller and listener were sharing an Authorizer when in co-located
mode, which was not intended.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR includes the changes to feature flags that were outlined in KIP-778. Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.
The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands. Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
Within a LogSegment, the TimeIndex and OffsetIndex are lazy indices that don't get created on disk until they are accessed for the first time. However, Log recovery logic expects the presence of an offset index file on disk for each segment, otherwise, the segment is considered corrupted.
This PR introduces a forceFlushActiveSegment boolean for the log.flush function to allow the shutdown process to flush the empty active segment, which makes sure the offset index file exists.
Co-Author: Kowshik Prakasam kowshik@gmail.com
Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
Make sure that the compression type is passed along to the `RecordsSnapshotWriter` constructor when creating the snapshot writer using the static `createWithHeader` method.
Reviewers: Jason Gustafson <jason@confluent.io>
Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this commit.
Co-authored-by: loboxu <loboxu@tencent.com>
Reviews: José Armando García Sancio <jsancio@users.noreply.github.com>
This patch adds additional test cases covering the validations done when snapshots are created by the state machine.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time.
In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches.
KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds
Co-authored-by: Ted Yu <yuzhihong@gmail.com>
Co-authored-by: Richard Yu <yohan.richard.yu@gmail.com>
Java 17 is at release candidate stage and it will be a LTS release once
it's out (previous LTS release was Java 11).
Details:
* Replace Java 16 with Java 17 in Jenkins and Readme.
* Replace `--illegal-access=permit` (which was removed from Java 17)
with `--add-opens` for the packages we require internal access to.
Filed KAFKA-13275 for updating the tests not to require `--add-opens`
(where possible).
* Update `release.py` to use JDK8. and JDK 17 (instead of JDK 8 and JDK 15).
* Removed all but one Streams test from `testsToExclude`. The
Connect test exclusion list remains the same.
* Add notable change to upgrade.html
* Upgrade to Gradle 7.2 as it's required for proper Java 17 support.
* Upgrade mockito to 3.12.4 for better Java 17 support.
* Adjusted `KafkaRaftClientTest` and `QuorumStateTest` not to require
private access to `jdk.internal.util.random`.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch improves the return type for `scheduleAppend` and `scheduleAtomicAppend`. Previously we were using a `Long` value and using both `null` and `Long.MaxValue` to distinguish between different error cases. In this PR, we change the return type to `long` and only return a value if the append was accepted. For the error cases, we instead throw an exception. For this purpose, the patch introduces a couple new exception types: `BufferAllocationException` and `NotLeaderException`.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Instead of waiting for a high-watermark of 20 after the partition, the
test should wait for the high-watermark to reach an offset greater than
the largest log end offset at the time of the partition. Only that offset
is guarantee to be reached as the high-watermark by the new majority.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch adds support for unregistering listeners to `RaftClient`.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
When the active controller encounters an event exception it attempts to renounce leadership.
Unfortunately, this doesn't tell the RaftClient that it should attempt to give up leadership. This
will result in inconsistent state with the RaftClient as leader but with the controller as
inactive. This PR changes the implementation so that the active controller asks the RaftClient
to resign.
Reviewers: Jose Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
The leader assumes that there is always an in-memory snapshot at the last
committed offset. This means that the controller needs to generate an in-memory
snapshot when getting promoted from inactive to active. This PR adds that
code. This fixes a bug where sometimes we would try to look for that in-memory
snapshot and not find it.
The controller always starts inactive, and there is no requirement that there
exists an in-memory snapshot at the last committed offset when the controller
is inactive. Therefore we can remove the initial snapshot at offset -1.
We should also optimize when a snapshot is cancelled or completes, by deleting
all in-memory snapshots less that the last committed offset.
SnapshotRegistry's createSnapshot should allow the creating of a snapshot if
the last snapshot's offset is the given offset. This allows for simpler client
code. Finally, this PR renames createSnapshot to getOrCreateSnapshot.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Check and verify generated snapshots for the controllers and the
brokers. Assert reader state when reading last log append time.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Fix a simulation test failure by:
1. Relaxing the valiation of the snapshot id against the log start
offset when the state machine attempts to create new snapshot. It
is safe to just ignore the request instead of throwing an exception
when the snapshot id is less that the log start offset.
2. Fixing the MockLog implementation so that it uses startOffset both
externally and internally.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Updated FetchRequest and FetchResponse to use topic IDs rather than topic names.
Some of the complicated code is found in FetchSession and FetchSessionHandler.
We need to be able to store topic IDs and maintain a cache on the broker for IDs that may not have been resolved. On incremental fetch requests, we will try to resolve them or remove them if in toForget.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support periodic
cleaning of old log segments and snapshots.
Four new public config keys are introduced: metadata.log.segment.bytes,
metadata.log.segment.ms, metadata.max.retention.bytes, and
metadata.max.retention.ms.
These are used to configure the log layer as well as the snapshot cleaning logic. Snapshot
and log cleaning is performed based on two criteria: total metadata log + snapshot size
(metadata.max.retention.bytes), and max age of a snapshot (metadata.max.retention.ms).
Since we have a requirement that the log start offset must always align with a snapshot,
we perform the cleaning on snapshots first and then clean what logs we can.
The cleaning algorithm follows:
1. Delete the oldest snapshot.
2. Advance the log start offset to the new oldest snapshot.
3. Request that the log layer clean any segments prior to the new log start offset
4. Repeat this until the retention size or time is no longer violated, or only a single
snapshot remains.
The cleaning process is triggered every 60 seconds from the KafkaRaftClient polling
thread.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, dengziming <dengziming1993@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Track handleSnapshot calls and make sure it is never triggered on the leader node.
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Boyang Chen <bchen11@outlook.com>
Add the record append time to Batch. Change SnapshotReader to set this time to the
time of the last log in the last batch. Fix the QuorumController to remember the last
committed batch append time and to store it in the generated snapshot.
Reviewers: David Arthur <mumrah@gmail.com>, Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Add the ability for KRaft controllers to generate snapshots based on the number of new record bytes that have
been applied since the last snapshot. Add a new configuration key to control this parameter. For now, it
defaults to being off, although we will change that in a follow-on PR. Also, fix LocalLogManager so that
snapshot loading is only triggered when the listener is not the leader.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Add header and footer records for raft snapshots. This helps identify when the snapshot
starts and ends. The header also contains a time. The time field is currently set to 0.
KAFKA-12997 will add in the necessary wiring to use the correct timestamp.
Reviewers: Jose Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This patch adds an implementation of the `resign()` API which allows the controller to proactively resign leadership in case it encounters an unrecoverable situation. There was not a lot to do here because we already supported a `Resigned` state to facilitate graceful shutdown.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
We should process the entire batch in `BrokerMetadataListener` and make sure that `hasNext` is called before calling `next` on the iterator. The previous code worked because the raft client kept track of the position in the iterator, but it caused NoSuchElementException to be raised when the reader was empty (as might be the case with control records).
Reviewers: Jason Gustafson <jason@confluent.io>
This patch fixes a few minor javadoc issues in the `RaftClient` interface.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, David Jacot <djacot@confluent.io>
Directly use `RaftClient.Listener`, `SnapshotWriter` and `SnapshotReader` in the quorum controller.
1. Allow `RaftClient` users to create snapshots by specifying the last committed offset and last committed epoch. These values are validated against the log and leader epoch cache.
2. Remove duplicate classes in the metadata module for writing and reading snapshots.
3. Changed the logic for comparing snapshots. The old logic was assuming a certain batch grouping. This didn't match the implementation of the snapshot writer. The snapshot writer is free to merge batches before writing them.
4. Improve `LocalLogManager` to keep track of multiple snapshots.
5. Improve the documentation and API for the snapshot classes to highlight the distinction between the offset of batches in the snapshot vs the offset of batches in the log. These two offsets are independent of one another. `SnapshotWriter` and `SnapshotReader` expose a method called `lastOffsetFromLog` which represents the last inclusive offset from the log that is represented in the snapshot.
Reviewers: dengziming <swzmdeng@163.com>, Jason Gustafson <jason@confluent.io>
The raft module may not be fully consistent on this but in general in that module we have decided to not throw the checked IOException. We have been avoiding checked IOException exceptions by wrapping them in RuntimeException. The raft module should instead wrap IOException in UncheckedIOException.
Reviewers: Luke Chen <showuon@gmail.com>, David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
The command `./gradlew raft:integrationTest` can't run any integration test since `org.junit.jupiter.api.Tag` does not work for jqwik engine (see https://github.com/jlink/jqwik/issues/36#issuecomment-436535760).
Reviewers: Ismael Juma <ismael@juma.me.uk>
This patch removes the temporary shim layer we added to bridge the interface
differences between MetaLogManager and RaftClient. Instead, we now use the
RaftClient directly from the metadata module. This also means that the
metadata gradle module now depends on raft, rather than the other way around.
Finally, this PR also consolidates the handleResign and handleNewLeader APIs
into a single handleLeaderChange API.
Co-authored-by: Jason Gustafson <jason@confluent.io>