Kafka networking layer doesn't close FileRecords and assumes that they are already open when sending them over a channel. To support this pattern this commit changes the ownership model for FileRawSnapshotReader so that they are owned by KafkaMetadataLog.
Reviewers: dengziming <swzmdeng@163.com>, David Arthur <mumrah@gmail.com>, Jun Rao <junrao@gmail.com>
Added server-common module to have server side common classes. Moved ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to server-common module.
Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format.
Added tests for all the event types for that topic.
This is part of the tiered storaqe implementation KIP-405.
Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
Implement Raft Snapshot loading API.
1. Adds a new method `handleSnapshot` to `raft.Listener` which is called whenever the `RaftClient` determines that the `Listener` needs to load a new snapshot before reading the log. This happens when the `Listener`'s next offset is less than the log start offset also known as the earliest snapshot.
2. Adds a new type `SnapshotReader<T>` which provides a `Iterator<Batch<T>>` interface and de-serializes records in the `RawSnapshotReader` into `T`s
3. Adds a new type `RecordsIterator<T>` that implements an `Iterator<Batch<T>>` by scanning a `Records` object and deserializes the batches and records into `Batch<T>`. This type is used by both `SnapshotReader<T>` and `RecordsBatchReader<T>` internally to implement the `Iterator` interface that they expose.
4. Changes the `MockLog` implementation to read one or two batches at a time. The previous implementation always read from the given offset to the high-watermark. This made it impossible to test interesting snapshot loading scenarios.
5. Removed `throws IOException` from some methods. Some of types were inconsistently throwing `IOException` in some cases and throwing `RuntimeException(..., new IOException(...))` in others. This PR improves the consistent by wrapping `IOException` in `RuntimeException` in a few more places and replacing `Closeable` with `AutoCloseable`.
6. Updated the Kafka Raft simulation test to take into account snapshot. `ReplicatedCounter` was updated to generate snapshot after 10 records get committed. This means that the `ConsistentCommittedData` validation was extended to take snapshots into account. Also added a new invariant to ensure that the log start offset is consistently set with the earliest snapshot.
Reviewers: dengziming <swzmdeng@163.com>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
The KafkaRaftClient has a field for the BatchAccumulator that is only used and set when it is the leader. In other cases, leader specific information was stored in LeaderState. In a recent change EpochState, which LeaderState implements, was changed to be a Closable. QuorumState makes sure to always close the previous state before transitioning to the next state. This redesign was used to move the BatchAccumulator to the LeaderState and simplify some of the handling in KafkaRaftClient.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
Adding a property to the `raft/config/kraft.properties` for running the raft
test server in development.
For testing I ran `./bin/test-kraft-server-start.sh --config config/kraft.properties`
and validated the test server started running with a throughput test.
Reviewers: Ismael Juma <ismael@juma.me.uk>
KIP-595 describes an extra condition on commitment here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Fetch. In order to ensure that a newly elected leader's committed entries cannot get lost, it must commit one record from its own epoch. This guarantees that its latest entry is larger (in terms of epoch/offset) than any previously written record which ensures that any future leader must also include it. This is the purpose of the `LeaderChange` record which is written to the log as soon as the leader gets elected.
Although we had this check implemented, it was off by one. We only ensured that replication reached the epoch start offset, which does not reflect the appended `LeaderChange` record. This patch fixes the check and clarifies the point of the check. The rest of the patch is just fixing up test cases.
Reviewers: dengziming <swzmdeng@163.com>, Guozhang Wang <wangguoz@gmail.com>
KIP-516 introduces topic IDs to topics, but there is a small issue with how the KIP-500 metadata topic will interact with topic IDs.
For example, https://github.com/apache/kafka/pull/9944 aims to replace topic names in the Fetch request with topic IDs. In order to get these IDs, brokers must fetch from the metadata topic. This leads to a sort of "chicken and the egg" problem concerning how we find out the metadata topic's topic ID.
This PR adds the a special sentinel topic ID for the metadata topic, which gets around this problem.
More information can be found in the [JIRA](https://issues.apache.org/jira/browse/KAFKA-12457) and in [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers).
Reviewers: Jason Gustafson <jason@confluent.io>
1. Add `canGrantVote` to `EpochState`
2. Move the if-else in `KafkaRaftCllient.handleVoteRequest` to `EpochState`
3. Add unit tests for `canGrantVote`
Reviewers: Jason Gustafson <jason@confluent.io>
Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk.
The problem is that following sequence of calls doesn't guarantee file durability:
fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
write(fd);
fsync(fd);
If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear.
This PR is to flush the directory when flush() is called for the first time.
Reviewers: Jun Rao <junrao@gmail.com>
When a `@Property` tests fail, jqwik helpfully reports the initial seed that resulted in the failure. For example, if we are executing a test scenario 100 times and it fails on the 51st run, then we will get the initial seed that generated . But if you specify the seed in the `@Property` annotation as the previous comment suggested, then the test still needs to run 50 times before we get to the 51st case, which makes debugging very difficult given the complex nature of the simulation tests. Jqwik also gives us the specific argument list that failed, but that is not very helpful at the moment since `Random` does not have a useful `toString` which indicates the initial seed.
To address these problems, I've changed the `@Property` methods to take the random seed as an argument directly so that it is displayed clearly in the output of a failure. I've also updated the documentation to clarify how to reproduce failures.
Reviewers: David Jacot <djacot@confluent.io>
`Self-managed` is also used in the context of Cloud vs on-prem and it can
be confusing.
`KRaft` is a cute combination of `Kafka Raft` and it's pronounced like `craft`
(as in `craftsmanship`).
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jose Sancio <jsancio@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
Currently the Raft leader raises an exception if there is a non-monotonic update to the fetch offset of a replica. In a situation where the replica had lost it disk state, this would prevent the replica from being able to recover. In this patch, we relax the validation to address this problem. It is worth pointing out that this validation could not be relied on to protect from data loss after a voter has lost committed state.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Boyang Chen <boyang@confluent.io>
Introduce "testkit" package which includes KafkaClusterTestKit class for enabling integration tests of self-managed clusters. Also make use of this new integration test harness in the ClusterTestExtentions JUnit extension.
Adds RaftClusterTest for basic self-managed integration test.
Reviewers: Jason Gustafson <jason@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
Co-authored-by: Colin P. McCabe <cmccabe@apache.org>
Previously we implemented ClusterId validation for the Fetch API in the Raft implementation. This patch adds ClusterId validation to the remaining Raft RPCs.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Improves test coverage of `validateOffsetAndEpoch`.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
Replace the use of the method `last` and `first` in `ConcurrentSkipListSet` with the descending and ascending iterator respectively. The methods `last` and `first` throw an exception when the set is empty this causes poor `KafkaRaftClient` performance when there aren't any snapshots.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This patch changes the raft simulation tests to use jqwik, which is a property testing library. This provides two main benefits:
- It simplifies the randomization of test parameters. Currently the tests use a fixed set of `Random` seeds, which means that most builds are doing redundant work. We get a bigger benefit from allowing each build to test different parameterizations.
- It makes it easier to reproduce failures. Whenever a test fails, jqwik will report the random seed that failed. A developer can then modify the `@Property` annotation to use that specific seed in order to reproduce the failure.
This patch also includes an optimization for `MockLog.earliestSnapshotId` which reduces the time to run the simulation tests dramatically.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, David Jacot <djacot@confluent.io>
Initially we want to be strict about the loss of committed data for the `@metadata` topic. This patch ensures that truncation below the high watermark is not allowed. Note that `MockLog` already had the logic to do so, so the patch adds a similar check to `KafkaMetadataLog`.
Reviewers: David Jacot <djacot@confluent.io>, Boyang Chen <boyang@confluent.io>
This patch adds logic to delete old snapshots. There are three cases we handle:
1. Remove old snapshots after a follower completes fetching a snapshot and truncates the log to the latest snapshot
2. Remove old snapshots after a new snapshot is created.
3. Remove old snapshots during recovery after the node is restarted.
Reviewers: Cao Manh Dat<caomanhdat317@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This patch ensures that the constant max batch size defined in `KafkaRaftClient` is propagated to the constructed log configuration in `KafkaMetadataLog`. We also ensure that the fetch max size is set consistently with appropriate testing.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
1. rename INVALID_HIGHWATERMARK to INVALID_HIGH_WATERMARK
2. replace FetchResponse.AbortedTransaction by FetchResponseData.AbortedTransaction
3. remove redundant constructors from FetchResponse.PartitionData
4. rename recordSet to records
5. add helpers "recordsOrFail" and "recordsSize" to FetchResponse to process record casting
Reviewers: Ismael Juma <ismael@juma.me.uk>
Since we expect KIP-631 controller fail-overs to be fairly cheap, tune
the default raft configuration parameters so that we detect node
failures more quickly.
Reduce the broker session timeout as well so that broker failures are
detected more quickly.
Reviewers: Jason Gustafson <jason@confluent.io>, Alok Nikhil <anikhil@confluent.io>
This patch fixes a small shutdown bug. Current logic closes the log twice: once in `KafkaRaftClient`, and once in `RaftManager`. This can lead to errors like the following:
```
[2021-02-18 18:35:12,643] WARN (kafka.utils.CoreUtils$)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at org.apache.kafka.common.record.FileRecords.close(FileRecords.java:204)
at kafka.log.LogSegment.$anonfun$close$4(LogSegment.scala:592)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
at kafka.log.LogSegment.close(LogSegment.scala:592)
at kafka.log.Log.$anonfun$close$4(Log.scala:1038)
at kafka.log.Log.$anonfun$close$4$adapted(Log.scala:1038)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
at kafka.log.Log.$anonfun$close$3(Log.scala:1038)
at kafka.log.Log.close(Log.scala:2433)
at kafka.raft.KafkaMetadataLog.close(KafkaMetadataLog.scala:295)
at kafka.raft.KafkaRaftManager.shutdown(RaftManager.scala:150)
```
I have tended to view `RaftManager` as owning the lifecycle of the log, so I removed the extra call to close in `KafkaRaftClient`.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Ismael Juma <ismael@juma.me.uk>
This patch adds clusterId validation in the `Fetch` API as documented in KIP-595. A new error code `INCONSISTENT_CLUSTER_ID` is returned if the request clusterId does not match the value on the server. If no clusterId is provided, the request is treated as valid.
Reviewers: Jason Gustafson <jason@confluent.io>
1. Type `BatchAccumulator`. Add support for appending records into one or more batches.
2. Type `RaftClient`. Rename `scheduleAppend` to `scheduleAtomicAppend`.
3. Type `RaftClient`. Add a new method `scheduleAppend` which appends records to the log using as many batches as necessary.
4. Increase the batch size from 1MB to 8MB.
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
The `KafkaMetadataLog` implementation of `ReplicatedLog` validates that batches appended using `appendAsLeader` and `appendAsFollower` have an offset that matches the LEO. This is enforced by `KafkaRaftClient` and `BatchAccumulator`. When creating control batches for the `LeaderChangeMessage` the default base offset of `0` was being used instead of using the LEO. This is fixed by:
1. Changing the implementation for `MockLog` to validate against this and throw an `RuntimeException` if this invariant is violated.
2. Always create a batch for `LeaderChangeMessage` with an offset equal to the LEO.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR adds the KIP-500 BrokerServer and ControllerServer classes and
makes some related changes to get them working. Note that the ControllerServer
does not instantiate a QuorumController object yet, since that will be added in
PR #10070.
* Add BrokerServer and ControllerServer
* Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
endpoints which do not support PRODUCE (such as KIP-500 controller nodes)
* KafkaAdminClientTest: fix some lingering references to decommissionBroker
that should be references to unregisterBroker.
* Make some changes to allow SocketServer to be used by ControllerServer as
we as by the broker.
* We now return a random active Broker ID as the Controller ID in
MetadataResponse for the Raft-based case as per KIP-590.
* Add the RaftControllerNodeProvider
* Add EnvelopeUtils
* Add MetaLogRaftShim
* In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
The script `test-raft-server-start.sh` requires the config to be specified with `--config`. I've included this in the README and added an error message for this specific case.
Reviewers: Jason Gustafson <jason@confluent.io>
We don't really need it and it causes problems in older Android versions
and GraalVM native image usage (there are workarounds for the latter).
Move the logic to separate classes that are only invoked when the
relevant compression library is actually used. Place such classes
in their own package and enforce via checkstyle that only these
classes refer to compression library packages.
To avoid cyclic dependencies, moved `BufferSupplier` to the `utils`
package.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch adds a `RecordSerde` implementation for the metadata record format expected by KIP-631.
Reviewers: Colin McCabe <cmccabe@apache.org>, Ismael Juma <mlists@juma.me.uk>
Since the Raft leader is already doing the work of assigning offsets and the leader epoch, we can skip the same logic in `Log.appendAsLeader`. This lets us avoid an unnecessary round of decompression.
Reviewers: dengziming <dengziming1993@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch contains the new handling of `meta.properties` required by the KIP-500 server as specified in KIP-631. When using the self-managed quorum, the `meta.properties` file is required in each log directory with the new `version` property set to 1. It must include the `cluster.id` property and it must have a `node.id` matching that in the configuration.
The behavior of `meta.properties` for the Zookeeper-based `KafkaServer` does not change. We treat `meta.properties` as optional and as if it were `version=0`. We continue to generate the clusterId and/or the brokerId through Zookeeper as needed.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
Adds support for nonzero log start offsets.
Changes to `Log`:
1. Add a new "reason" for increasing the log start offset. This is used by `KafkaMetadataLog` when a snapshot is generated.
2. `LogAppendInfo` should return if it was rolled because of an records append. A log is rolled when a new segment is created. This is used by `KafkaMetadataLog` to in some cases delete the created segment based on the log start offset.
Changes to `KafkaMetadataLog`:
1. Update both append functions to delete old segments based on the log start offset whenever the log is rolled.
2. Update `lastFetchedEpoch` to return the epoch of the latest snapshot whenever the log is empty.
3. Add a function that empties the log whenever the latest snapshot is greater than the replicated log. This is used when first loading the `KafkaMetadataLog` and whenever the `KafkaRaftClient` downloads a snapshot from the leader.
Changes to `KafkaRaftClient`:
1. Improve `validateFetchOffsetAndEpoch` so that it can handle fetch offset and last fetched epoch that are smaller than the log start offset. This is in addition to the existing code that check for a diverging log. This is used by the raft client to determine if the Fetch response should include a diverging epoch or a snapshot id.
2. When a follower finishes fetching a snapshot from the leader fully truncate the local log.
3. When polling the current state the raft client should check if the state machine has generated a new snapshot and update the log start offset accordingly.
Reviewers: Jason Gustafson <jason@confluent.io>
With KIP-595, we previously expect `RaftConfig` to specify the quorum voter endpoints upfront on startup. In the general case, this works fine. However, for testing where the bound port is not known ahead of time, we need a lazier approach that discovers the other voters in the quorum after startup.
In this patch, we take the voter endpoint initialization out of `KafkaRaftClient.initialize` and move it to `RaftManager`. We use a special address to indicate that the voter addresses will be provided later This approach also lends itself well to future use cases where we might discover voter addresses through an external service (for example).
Reviewers: Jason Gustafson <jason@confluent.io>
This patch adds zero-copy support for the `FetchSnapshot` API. Unlike the normal `Fetch` API, records are not assumed to be offset-aligned in `FetchSnapshot` responses. Hence this patch introduces a new `UnalignedRecords` type which allows us to use most of the existing logic to support zero-copy while preserving type safety in the snapshot APIs.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
The test takes over 1 minute to run, so it should not be considered a
unit test.
Also:
* Replace `test` prefix with `check` prefix for helper methods. A common
mistake is to forget to add the @Test annotation, so it's good to use a
different naming convention for methods that should have the annotation
versus methods that should not.
* Replace `Action` functional interface with built-in `Runnable`.
* Remove unnecessary `assumeTrue`.
* Remove `@FunctionalInterface` from `Invariant` since it's not used
in that way.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
The primary purpose of this patch is to remove the internal `enable.metadata.quorum` configuration. Instead, we rely on `process.roles` to determine if the self-managed quorum has been enabled. As a part of this, I've done the following:
1. Replace the notion of "disabled" APIs with "controller-only" APIs. We previously marked some APIs which were intended only for the KIP-500 as "disabled" so that they would not be unintentionally exposed. For example, the Raft quorum APIs were disabled. Marking them as "controller-only" carries the same effect, but makes the intent that they should be only exposed by the KIP-500 controller clearer.
2. Make `ForwardingManager` optional in `KafkaServer` and `KafkaApis`. Previously we used `null` if forwarding was enabled and relied on the metadata quorum check.
3. Make `zookeeper.connect` an optional configuration if `process.roles` is defined.
4. Update raft README to remove reference to `zookeeper.conntect`
Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>, Boyang Chen <boyang@confluent.io>
This patch moves Raft config definitions from `RaftConfig` to `KafkaConfig`, where they are re-defined as internal configs until we are ready to expose them. It also adds the missing "controller" prefix that was added by KIP-631.
Reviewers: Jason Gustafson <jason@confluent.io>
We would like to be able to use `KafkaRaftClient` for tooling/debugging use cases. For this, we need the localId to be optional so that the client can be used more like a consumer. This is already supported in the `Fetch` protocol by setting `replicaId=-1`, which the Raft implementation checks for. We just need to alter `QuorumState` so that the `localId` is optional. The main benefit of doing this is that it saves tools the need to generate an arbitrary id (which might cause conflicts given limited Int32 space) and it lets the leader avoid any local state for these observers (such as `ReplicaState` inside `LeaderState`).
Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>
It is helpful to delay initialization of the `RaftClient` configuration including the voter string until after construction. This helps in integration test cases where the voter ports may not be known until sockets are bound.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This patch factors out a `RaftManager` class from `TestRaftServer` which will be needed when we integrate this layer into the server. This class encapsulates the logic to build `KafkaRaftClient` as well as its IO thread.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Implements the code necessary for the leader to response to fetch snapshot requests and for the follower to fetch snapshots. This API is described in more detail in KIP-630: https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot. More specifically, this patch includes the following changes:
Leader Changes:
1. Raft leader response to FetchSnapshot request by reading the local snapshot and sending the requested bytes in the response. This implementation currently copies the bytes to memory. This will be fixed in a future PR.
Follower Changes:
1. Raft followers will start fetching snapshot if the leader sends a Fetch response that includes a SnapshotId.
2. Raft followers send FetchSnapshot requests if there is a pending download. The same timer is used for both Fetch and FetchSnapshot requests.
3. Raft follower handle FetchSnapshot responses by comping the bytes to the pending SnapshotWriter. This implementation doesn't fix the replicated log after the snapshot has been downloaded. This will be implemented in a future PR.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR replaces the terms endorsing with acknowledging for voters which have recognised the current leader.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch contains the following improvements:
- Separate inbound/outbound request flows so that we can open the door for concurrent inbound request handling
- Rewrite `KafkaNetworkChannel` to use `InterBrokerSendThread` which fixes a number of bugs/shortcomings
- Get rid of a lot of boilerplate conversions in `KafkaNetworkChannel`
- Improve validation of inbound responses in `KafkaRaftClient` by checking correlationId. This fixes a bug which could cause an out of order Fetch to be applied incorrectly.
Reviewers: David Arthur <mumrah@gmail.com>