This is now possible since `InterBrokerSend` was moved from `core` to `server-common`.
Also rewrite/move `KafkaNetworkChannelTest`.
The scala version of `KafkaNetworkChannelTest` passed with the changes here (before I
deleted it).
Reviewers: Justine Olshan <jolshan@confluent.io>, José Armando García Sancio <jsancio@users.noreply.github.com>
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.
Reviewers: Divij Vaidya <diviv@amazon.com>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
In the Windows OS atomic move are not allowed if the file has another open handle. E.g
__cluster_metadata-0\quorum-state: The process cannot access the file because it is being used by another process
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
at java.base/java.nio.file.Files.move(Files.java:1430)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
This is fixed by first closing the temporary quorum-state file before attempting to move it.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
Co-Authored-By: Renaldo Baur Filho <renaldobf@gmail.com>
Reading an unknown version of quorum-state-file should trigger an error. Currently the only known version is 0. Reading any other version should cause an error.
Reviewers: Justine Olshan <jolshan@confluent.io>, Luke Chen <showuon@gmail.com>
In a non-empty log the KRaft leader only notifies the listener of leadership when it has read to the leader's epoch start offset. This guarantees that the leader epoch has been committed and that the listener has read all committed offsets/records.
Unfortunately, the KRaft leader doesn't do this when the log is empty. When the log is empty the listener is notified immediately when it has become leader. This makes the API inconsistent and harder to program against.
This change fixes that by having the KRaft leader wait for the listener's nextOffset to be greater than the leader's epochStartOffset before calling handleLeaderChange.
The RecordsBatchReader implementation is also changed to include control records. This makes it possible for the state machine learn about committed control records. This additional information can be used to compute the committed offset or for counting those bytes when determining when to snapshot the partition.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
The KRaft client uses an expiration service to complete FETCH requests that have timed out. This expiration service uses a different thread from the KRaft polling thread. This means that it is unsafe for the expiration service thread to call tryCompleteFetchRequest. tryCompleteFetchRequest reads and updates a lot of states that is assumed to be only be read and updated from the polling thread.
The KRaft client now does not call tryCompleteFetchRequest when the FETCH request has expired. It instead will send the FETCH response that was computed when the FETCH request was first handled.
This change also fixes a bug where the KRaft client was not sending the FETCH response immediately, if the response contained a diverging epoch or snapshot id.
Reviewers: Jason Gustafson <jason@confluent.io>
On ext4 file systems we have seen snapshots with zero-length files. This is possible if
the file is closed and moved before forcing the channel to write to disk.
Reviewers: Ron Dagostino <rndgstn@gmail.com>, Alok Thatikunta <athatikunta@confluent.io>
Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
where this is useful, such as logging, implementing metadata transactions, or handling broker
registration records.
In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
record offset from the batch base offset and the record index.
The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
choose a batch base offset later than the one we expect, if someone else is also adding records.
While the QC is the only entity submitting data records, control records may be added at any time.
In the current implementation, these are really only used for leadership elections. However, this
could change with the addition of quorum reconfiguration or similar features.
Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
would have resulted in a batch base offset other than what was expected. This in turn will trigger a
controller failover. In the future, if automatically added control records become more common, we
may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
for now, this will allow us to rely on the offset as correct.
In order that the active QC can learn what offset to start writing at, the PR also adds a new
RaftClient#endOffset function.
At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
when we request a base offset that doesn't match the one the Raft layer would have given us.
Although this exception should cause a failover, it should not be considered a fault. This
complicated the exception handling a bit and motivated splitting more of it out into the new
EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
bit better.
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Implement some of the metrics from KIP-938: Add more metrics for
measuring KRaft performance.
Add these metrics to QuorumControllerMetrics:
kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
kafka.controller:type=KafkaController,name=NewActiveControllersCount
Create LoaderMetrics with these new metrics:
kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
Create SnapshotEmitterMetrics with these new metrics:
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
Reviewers: Ron Dagostino <rndgstn@gmail.com>
If the follower has an empty log, fetches with offset 0, it is more
efficient for the leader to reply with a snapshot id (redirect to
FETCH_SNAPSHOT) than for the follower to continue fetching from the log
segments.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
If the KRaft listener is at offset 0, the start of the log, and KRaft has generated a snapshot, it should prefer the latest snapshot instead of having the listener read from the start of the log.
This is implemented by having KafkaRaftClient send a Listener.handleLoadSnapshot event, if the Listener is at offset 0 and the KRaft partition has generated a snapshot.
Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
This change refactors the lastContainedLogTimestamp to the Snapshots class, for re-usability. Introduces IdentitySerde based on ByteBuffer, required for using RecordsSnapshotReader. This change also removes the "recordSerde: RecordSerde[_]" argument from the KafkaMetadataLog constructor.
Reviewers: José Armando García Sancio <jsancio@apache.org>
After this change,
For broker side decompression: JMH benchmark RecordBatchIterationBenchmark demonstrates 20-70% improvement in throughput (see results for RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize).
For consumer side decompression: JMH benchmark RecordBatchIterationBenchmark a mix bag of single digit regression for some compression type to 10-50% improvement for Zstd (see results for RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize).
Reviewers: Luke Chen <showuon@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Ismael Juma <mail@ismaeljuma.com>
Rename handleSnapshot to handleLoadSnapshot to make it explicit that it is handling snapshot load,
not generation.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
Currently, the current-state KRaft related metric reports follower state for a broker while technically it should be reported as an observer as the kafka-metadata-quorum tool does.
Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.
The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.
To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
To help debug KRaft's behavior this change increases the log level of
some rare messages to INFO level.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch is the first part of KIP-903. It updates the FetchRequest to include the new tagged ReplicaState field which replaces the now deprecated ReplicaId field. The FetchRequest version is bumped to version 15 and the MetadataVersion to 3.5-IV1.
Reviewers: David Jacot <djacot@confluent.io>
The leader only requires that voters have flushed their log up to the fetch offset before sending a fetch request.
This change only flushes the log when handling the fetch response, if the follower is a voter. This should improve the disk performance on observers (brokers).
Reviewers: Jason Gustafson <jason@confluent.io>
Remove clusterId field from the KRaft controller's quorum-state file $LOG_DIR/__cluster_metadata-0/quorum-state
Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>, Christo Lolov <christololov@gmail.com>
The raft idle ratio is currently computed as the average of all recorded poll durations. This tends to underestimate the actual idle ratio since it treats all measurements equally regardless how much time was spent. For example, say we poll twice with the following durations:
Poll 1: 2s
Poll 2: 0s
Assume that the busy time is negligible, so 2s passes overall.
In the first measurement, 2s is spent waiting, so we compute and record a ratio of 1.0. In the second measurement, no time passes, and we record 0.0. The idle ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 0.5), which suggests that the process was busy for 1s, which overestimates the true busy time.
In this patch, we create a new `TimeRatio` class which tracks the total duration of a periodic event over a full interval of time measurement.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Make LeaderState's grantingVoters field explicitly immutable. The set of voters that granted their voter to the current leader was already immutable. This change makes that explicit.
Reviewers: Jason Gustafson <jason@confluent.io>, Mathew Hogan <mathewdhogan@@users.noreply.github.com>
The KRaft client expects the offset of the snapshot id to be an end offset. End offsets are
exclusive. The MetadataProvenance type was createing a snapshot id using the last contained offset
which is inclusive. This change fixes that and renames some of the fields to make this difference
more obvious.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Let `RaftClient.createSnapshot` take the snapshotId directly instead of the committed offset/epoch (which may not exist).
Reviewers: José Armando García Sancio <jsancio@apache.org>
While debugging KRaft and the metadata state machines it is helpful to always log the first time the replica discovers the high watermark. All other updates to the high watermark are logged at trace because they are more frequent and less useful.
Reviewers: Luke Chen <showuon@gmail.com>
Extract jointly owned parts of BrokerServer and ControllerServer into SharedServer. Shut down
SharedServer when the last component using it shuts down. But make sure to stop the raft manager
before closing the ControllerServer's sockets.
This PR also fixes a memory leak where ReplicaManager was not removing some topic metric callbacks
during shutdown. Finally, we now release memory from the BatchMemoryPool in KafkaRaftClient#close.
These changes should reduce memory consumption while running junit tests.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Implement time based snapshot for the controller. The general strategy for this feature is that the controller will use the record-batch's append time to determine if a snapshot should be generated. If the oldest record that has been committed but is not included in the latest snapshot is older than `metadata.log.max.snapshot.interval.ms`, the controller will trigger a snapshot immediately. This is useful in case the controller was offline for more that `metadata.log.max.snapshot.interval.ms` milliseconds.
If the oldest record that has been committed but is not included in the latest snapshot is NOT older than `metadata.log.max.snapshot.interval.ms`, the controller will schedule a `maybeGenerateSnapshot` deferred task.
It is possible that when the controller wants to generate a new snapshot, either because of time or number of bytes, the controller is currently generating a snapshot. In this case the `SnapshotGeneratorManager` was changed so that it checks and potentially triggers another snapshot when the currently in-progress snapshot finishes.
To better support this feature the following additional changes were made:
1. The configuration `metadata.log.max.snapshot.interval.ms` was added to `KafkaConfig` with a default value of one hour.
2. `RaftClient` was extended to return the latest snapshot id. This snapshot id is used to determine if a given record is included in a snapshot.
3. Improve the `SnapshotReason` type to support the inclusion of values in the message.
Reviewers: Jason Gustafson <jason@confluent.io>, Niket Goel <niket-goel@users.noreply.github.com>
`RecordsIteratorTest` takes the longest times in recent builds (even including integration tests). The default of 1000 tries from jqwik is probably overkill and causes the test to take 10 minutes locally. Decreasing to 50 tries reduces that to less than 30s.
Reviewers: David Jacot <djacot@confluent.io>
This commit adds KRaft monitoring related metrics to the Kafka docs (docs/ops.html).
Reviewers: Jason Gustafson <jason@confluent.io>, Luke Chen <showuon@gmail.com>
What changes in this PR:
1. Use addExact to avoid overflow in BatchAccumulator#bytesNeeded. We did use addExact in bytesNeededForRecords method, but forgot that when returning the result.
2. javadoc improvement
Reviewers: Jason Gustafson <jason@confluent.io>
When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
other things), check that we have at least N bytes remaining before allocating an array of size N.
Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
remaining. Instead, when reading an individual record in the Raft layer, simply create a
ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
RequestResponseTest.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>, Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
We should prevent the metadata log from initializing in a known bad state. If the log start offset of the first segment is greater than 0, then must be a snapshot an offset greater than or equal to it order to ensure that the initialized state is complete.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
When a snapshot is taken it is due to either of the following reasons -
Max bytes were applied
Metadata version was changed
Once the snapshot process is started, it will log the reason that initiated the process.
Updated existing tests to include code changes required to log the reason. I was not able to check the logs when running tests - could someone guide me on how to enable logs when running a specific test case.
Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is
possible for the memory pool to run out of memory if the snapshot is
greater than 5 * 8MB.
This change allows the BatchMemoryPool to always allocate a buffer when
requested. The memory pool frees the extra allocated buffer when released if
the number of pooled buffers is greater than the configured maximum
batches.
Reviewers: Jason Gustafson <jason@confluent.io>
The boostrap.checkpoint files should include a control record batch for
the SnapshotHeaderRecord at the start of the file. It should also
include a control record batch for the SnapshotFooterRecord at the end
of the file.
The snapshot header record is important because it versions the rest of
the bootstrap file.
Reviewers: David Arthur <mumrah@gmail.com>
A few small cleanups in the `DescribeQuorum` API and handling logic:
- Change field types in `QuorumInfo`:
- `leaderId`: `Integer` -> `int`
- `leaderEpoch`: `Integer` -> `long` (to allow for type expansion in the future)
- `highWatermark`: `Long` -> `long`
- Use field names `lastFetchTimestamp` and `lastCaughtUpTimestamp` consistently
- Move construction of `DescribeQuorumResponseData.PartitionData` into `LeaderState`
- Consolidate fetch time/offset update logic into `LeaderState.ReplicaState.updateFollowerState`
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs in the raft layer, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we can rely on the admin client to retry it after seeing this error.
Reviewers: David Jacot <djacot@confluent.io>