We would like to be able to use `KafkaRaftClient` for tooling/debugging use cases. For this, we need the localId to be optional so that the client can be used more like a consumer. This is already supported in the `Fetch` protocol by setting `replicaId=-1`, which the Raft implementation checks for. We just need to alter `QuorumState` so that the `localId` is optional. The main benefit of doing this is that it saves tools the need to generate an arbitrary id (which might cause conflicts given limited Int32 space) and it lets the leader avoid any local state for these observers (such as `ReplicaState` inside `LeaderState`).
Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>
It is helpful to delay initialization of the `RaftClient` configuration including the voter string until after construction. This helps in integration test cases where the voter ports may not be known until sockets are bound.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This patch factors out a `RaftManager` class from `TestRaftServer` which will be needed when we integrate this layer into the server. This class encapsulates the logic to build `KafkaRaftClient` as well as its IO thread.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Implements the code necessary for the leader to response to fetch snapshot requests and for the follower to fetch snapshots. This API is described in more detail in KIP-630: https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot. More specifically, this patch includes the following changes:
Leader Changes:
1. Raft leader response to FetchSnapshot request by reading the local snapshot and sending the requested bytes in the response. This implementation currently copies the bytes to memory. This will be fixed in a future PR.
Follower Changes:
1. Raft followers will start fetching snapshot if the leader sends a Fetch response that includes a SnapshotId.
2. Raft followers send FetchSnapshot requests if there is a pending download. The same timer is used for both Fetch and FetchSnapshot requests.
3. Raft follower handle FetchSnapshot responses by comping the bytes to the pending SnapshotWriter. This implementation doesn't fix the replicated log after the snapshot has been downloaded. This will be implemented in a future PR.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR replaces the terms endorsing with acknowledging for voters which have recognised the current leader.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch contains the following improvements:
- Separate inbound/outbound request flows so that we can open the door for concurrent inbound request handling
- Rewrite `KafkaNetworkChannel` to use `InterBrokerSendThread` which fixes a number of bugs/shortcomings
- Get rid of a lot of boilerplate conversions in `KafkaNetworkChannel`
- Improve validation of inbound responses in `KafkaRaftClient` by checking correlationId. This fixes a bug which could cause an out of order Fetch to be applied incorrectly.
Reviewers: David Arthur <mumrah@gmail.com>
This patch adds logic to complete fetches immediately after resigning by returning the BROKER_NOT_AVAILABLE error. This ensures that the new election cannot be delayed by fetches which are stuck in purgatory.
Reviewers: Jason Gustafson <jason@confluent.io>
When there are no pending operations, the raft IO thread can block indefinitely waiting for a network event. We rely on asynchronous wakeups in order to break the blocking wait in order to respond to a scheduled append. The current logic already does this, but only for the case when the linger time has been completed during the call to `scheduleAppend`. It is possible instead that after making one call to `scheduleAppend` to start the linger timer, the application does not do any additional appends. In this case, we still need the IO thread to wakeup when the linger timer expires. This patch fixes the problem by ensuring that the IO thread gets woken up after the first append which begins the linger timer.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch ensures that the leader is included among the voters in the `LeaderChangeMessage`. It also adds an additional field for the set of granting voters, which was originally specified in KIP-595.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This patch adds a unit test for `UnattachedState`, similar to `ResignedStateTest` and `VotedStateTest`.
Reviewers: Jason Gustafson <jason@confluent.io>
This PR adds support for generating snapshot for KIP-630.
1. Adds the interfaces `RawSnapshotWriter` and `RawSnapshotReader` and the implementations `FileRawSnapshotWriter` and `FileRawSnapshotReader` respectively. These interfaces and implementations are low level API for writing and reading snapshots. They are internal to the Raft implementation and are not exposed to the users of `RaftClient`. They operation at the `Record` level. These types are exposed to the `RaftClient` through the `ReplicatedLog` interface.
2. Adds a buffered snapshot writer: `SnapshotWriter<T>`. This type is a higher-level type and it is exposed through the `RaftClient` interface. A future PR will add the related `SnapshotReader<T>`, which will be used by the state machine to load a snapshot.
Reviewers: Jason Gustafson <jason@confluent.io>
When initializing the raft state machine after shutting down as a leader, we were previously entering the "unattached" state, which means we have no leader and no voted candidate. This was a bug because it allowed a reinitialized leader to cast a vote for a candidate in the same epoch that it was already the leader of. This patch fixes the problem by introducing a new "resigned" state which allows us to retain the leader state so that we cannot change our vote and we will not accept additional appends.
This patch also revamps the shutdown logic to make use of the new "resigned" state. Previously we had a separate path in `KafkaRaftClient.poll` for the shutdown logic which resulted in some duplication. Instead now we incorporate shutdown behavior into each state's respective logic.
Finally, this patch changes the shutdown logic so that `EndQuorumEpoch` is only sent by resigning leaders. Previously we allowed this request to be sent by candidates as well.
Reviewers: dengziming <dengziming1993@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
In #9418, we add a listener to the `RaftClient` interface. In that patch, we used it only to send commit notifications for writes from the leader. In this PR, we extend the `handleCommit` API to accept all committed data and we remove the pull-based `read` API. Additionally, we add two new callbacks to the listener interface in order to notify the state machine when the raft client has claimed or resigned leadership.
Finally, this patch allows the `RaftClient` to support multiple listeners. This is necessary for KIP-500 because we will have one listener for the controller role and one for the broker role.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Boyang Chen <boyang@confluent.io>
This patch fixes a test a test case in `QuorumStateTest`. The method name is "testVotedToUnattachedHigherEpoch," but the code initialized in the unattached state instead of the voted state.
Reviewers: Jason Gustafson <jason@confluent.io>
The patch adds `quorum.append.linger.ms` behavior to the raft implementation. This gives users a powerful knob to tune the impact of fsync. When an append is accepted from the state machine, it is held in an accumulator (similar to the producer) until the configured linger time is exceeded. This allows the implementation to amortize fsync overhead at the expense of some write latency.
The patch also improves our methodology for testing performance. Up to now, we have relied on the producer performance test, but it is difficult to simulate expected controller loads because producer performance is limited by other factors such as the number of producer clients and head-of-line blocking. Instead, this patch adds a workload generator which runs on the leader after election.
Finally, this patch brings us nearer to the write semantics expected by the KIP-500 controller. It makes the following changes:
- Introduce `RecordSerde<T>` interface which abstracts the underlying log implementation from `RaftClient`. The generic type is carried over to `RaftClient<T>` and is exposed through the read/write APIs.
- `RaftClient.append` is changed to `RaftClient.scheduleAppend` and returns the last offset of the expected log append.
- `RaftClient.scheduleAppend` accepts a list of records and ensures that the full set are included in a single batch.
- Introduce `RaftClient.Listener` with a single `handleCommit` API which will eventually replace `RaftClient.read` in order to surface committed data to the controller state machine. Currently `handleCommit` is only used for records appended by the leader.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
There is a lot of functionality in KafkaRaftClientTest that is useful for writing other tests. Refactor that functionality into another class that can be reused in other tests.
Reviewers: Jason Gustafson <jason@confluent.io>
* Replace quorum.bootstrap.servers and quorum.bootstrap.voters with
quorum.voters.
* Remove seemingly unused `verbose` config.
* Use constant to avoid unnecessary repeated concatenation.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch adds missing flush logic to `KafkaRaftClient`. The initial flushing behavior is simplistic. We guarantee that the leader will not replicate above the last flushed offset and we guarantee that the follower will not fetch data above its own flush point. More sophisticated flush behavior is proposed in KAFKA-10526.
We have also extended the simulation test so that it covers flush behavior. When a node is shutdown, all unflushed data is lost. We were able to confirm that the monotonic high watermark invariant fails without the added `flush` calls.
This patch also piggybacks a fix to the `TestRaftServer` implementation. The initial check-in contained a bug which caused `RequestChannel` to fail sending responses because the disabled APIs did not have metrics registered. As a result of this, it is impossible to elect leaders.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
One of the invariants that the raft replication protocol relies on is that each record is uniquely identified by leader epoch and offset. This can be violated if a leader remains elected with the same epoch between restarts since unflushed data could be lost.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
There is a minor difference in behavior between the epoch caching logic in `MockLog` from the behavior in `LeaderEpochFileCache`. The latter ensures that every new epoch/start offset entry added to the cache increases monotonically over the previous entries. This patch brings the behavior of `MockLog` in line.
It also simplifies the `assignEpochStartOffset` api in `ReplicatedLog`. We always intend to use the log end offset, so this patch removes the start offset parameter.
Reviewers: Boyang Chen <boyang@confluent.io>
I also removed a test class with no tests currently (Jason filed KAFKA-10519 for
filling the test gap).
Reviewers: Jason Gustafson <jason@confluent.io>
This is the core Raft implementation specified by KIP-595: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum. We have created a separate "raft" module where most of the logic resides. The new APIs introduced in this patch in order to support Raft election and such are disabled in the server until the integration with the controller is complete. Until then, there is a standalone server which can be used for testing the performance of the Raft implementation. See `raft/README.md` for details.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Boyang Chen <boyang@confluent.io>
Co-authored-by: Boyang Chen <boyang@confluent.io>
Co-authored-by: Guozhang Wang <wangguoz@gmail.com>