Implements the following metrics:
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=loading
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=active
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=failed
kafka.server:type=group-coordinator-metrics,name=event-queue-size
kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
The PR makes these metrics generic so that in the future the transaction coordinator runtime can implement the same metrics in a similar fashion.
Also, CoordinatorLoaderImpl#load will now return LoadSummary which encapsulates the start time, end time, number of records/bytes.
Co-authored-by: David Jacot <djacot@confluent.io>
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch implements the groups and offsets expiration in the new group coordinator.
Reviewers: Ritika Reddy <rreddy@confluent.io>, David Jacot <djacot@confluent.io>
Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that provides more clarity. Since static members also have a magic number (-2), this will motivate future commits to use constants instead of hardcoded values.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch implements DeleteGroups and OffsetDelete API in the new group coordinator.
Reviewers: yangy0000, Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
The Javadoc for `GroupMetadataManager#throwIfMemberEpochIsInvalid` suggests that it throws a `NotCoordinatorException` exception when the member epoch in the consumer heartbeat request is greater than the one known by this coordinator for the given member. This could happen if the heartbeat-ing member got a higher epoch from another coordinator. However the method throws `FencedMemberEpochException` even in this case. This PR corrects the Javadocs to reflect the same.
Reviewers: David Jacot <djacot@confluent.io>
As a first step, we plan to release a preview of the new consumer group rebalance protocol without the client side assignor. This patch removes all the related fields from the ConsumerGroupHeartbeat API for now. Removing fields is fine here because this API is not released yet and not exposed by default. We will add them back while bumping the version of the request when we release this part in the future.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.
Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch makes the styling consistent inside GroupMetadataManagerTest. Also, it adds JoinResult to simplify the JoinGroup API responses in the tests.
Reviewers: David Arthur <mumrah@gmail.com>, David Jacot <djacot@confluent.io>
This patch refactors the GroupCoordinator.fetchOffsets and GroupCoordinator.fetchAllOffsets methods to take an OffsetFetchRequestGroup and to return an OffsetFetchResponseGroup. It prepares the ground for adding the member id and the member epoch to the OffsetFetchRequest. This change also makes those two methods more aligned with the others in the interface.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch implements the OffsetFetch API in the new group coordinator.
I found out that implementing the `RequireStable` flag is hard (to not say impossible) in the current model. For the context, the flag is here to ensure that an OffsetRequest request does not return stale offsets if there are pending offsets to be committed. In the scala code, we basically check the pending offsets data structure and if they are any pending offsets, we return the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.
In our new model, we don't have the pending offsets data structure. Instead, we use a timeline data structure to handle all the pending/uncommitted changes. Because of this we don't know whether offsets are pending for a particular group. Instead of doing this, I propose to not return the `UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, we use a write operation to ensure that we read the latest offsets. If they are uncommitted offsets, the write operation ensures that the response is only return when they are committed. This gives a similar behaviour in the end.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch does a few code cleanups in the group-coordinator module.
It renames Coordinator to CoordinatorShard;
It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me;
It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again.
It renames assignors to consumerGroupAssignors.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient.
Reviewers: David Jacot <djacot@confluent.io>
Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following:
1) Adds a lock to `CoordinatorContext`;
2) Adds helper methods to get the context and acquire/release the lock.
3) Allow transition from Failed to Loading. Previously, the context was recreated in this case.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests.
Reviewers: David Jacot <djacot@confluent.io>
This patch does a few things:
1) It introduces the `OffsetAndMetadata` class which hold the committed offsets in the group coordinator.
2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of the OffsetCommit value record that should be used.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch implements the existing JoinGroup protocol within the new group coordinator.
Some notable differences:
* Methods return a CoordinatorResult to the runtime framework, which includes records to append to the log as well as a future to complete after the append succeeds/fails.
* The coordinator runtime ensures that only a single thread will be processing a group at any given time, therefore there is no more locking on groups.
* Instead of using on purgatories, we rely on the Timer interface to schedule/cancel delayed operations.
Reviewers: David Jacot <djacot@confluent.io>
This patch wires the new group coordinator in BrokerServer (KRaft only). With this, it is now possible to run a cluster with the new group coordinator and to use the ConsumerGroupHeartbeat API by specifying the following two properties:
- group.coordinator.new.enable = true (to enable the new group coordinator)
- unstable.api.versions.enable = true (to enable unreleased APIs)
Note that the new group coordinator does not support all the existing APIs yet.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds the session timeout and the revocation timeout to the new consumer group protocol.
Reviewers: Calvin Liu <caliu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds EventBasedCoordinatorTimer and the CoordinatorTimer interface.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch introduces the GroupCoordinatorService. This is the new (incomplete) implementation of the group coordinator based on the coordinator runtime introduced in https://github.com/apache/kafka/pull/13795.
Reviewers: Divij Vaidya <diviv@amazon.com>, Justine Olshan <jolshan@confluent.io>
This patch introduces the CoordinatorRuntime. The CoordinatorRuntime is a framework which encapsulate all the common features requires to build a coordinator such as the group coordinator. Please refer to the javadoc of that class for the details.
Reviewers: Divij Vaidya <diviv@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch introduces the `PartitionWriter` interface in the `group-coordinator` module. The `ReplicaManager` resides in the `core` module and it is thus not accessible from the `group-coordinator` one. The `CoordinatorPartitionWriter` is basically an implementation of the interface residing in `core` which interfaces with the `ReplicaManager`.
One notable difference from the usual produce path is that the `PartitionWriter` returns the offset following the written records. This is then used by the coordinator runtime to track when the request associated with the write can be completed.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds the CoordinatorLoader and the CoordinatorPlayback interfaces. The former is used to load (or rebuild) the coordinator state by replaying all the records stored in the partition. The later is the interface that must be implemented by coordinator to support replaying records.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch makes the record type generic, moves the class to the runtime package and finally rename the class to follow the naming of the other classes in the runtime package.
Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
Adds CoordinatorEvent, CoordinatorEventProcessor, and MultiThreadedEventProcessor.
Reviewers: Kirk True <ktrue@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds the GroupMetadataManager to the group-coordinator module. This manager is responsible for handling the groups management, the members management and the entire reconciliation process. At this point, only the new consumer group type/protocol is implemented.
The new manager is based on an architecture inspired from the quorum controller. A request can access/read the state but can't mutate it directly. Instead, a list of records is generated together with the response and those records are applied to the state by the runtime framework. We use timeline data structures. Note that the runtime framework is not part of this patch. It will come in a following one.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch introduces `GenericGroup` which rewrite the `GroupMetadata` in Java. The `GenericGroup` is basically a group using the current rebalance protocol in the new group coordinator.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Christo Lolov <lolovc@amazon.com>, David Jacot <djacot@confluent.io>
This patch adds the RangeAssignor on the server for KIP-848. This range assignor is very different from the old client side implementation. We added functionality to make these assignments sticky while also inheriting crucial properties of the range assignor such as facilitating joins and distributing partitions of a topic somewhat equally amongst its subscribers.
Reviewers: Philip Nee <philipnee@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch adds the `CurrentAssignmentBuilder` class which encapsulates the reconciliation engine of the consumer group protocol. Given the current state of a member and a desired or target assignment state, the state machine takes the necessary steps to converge the member to its desired state.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds TargetAssignmentBuilder. It is responsible for computing a target assignment for a given group.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds ConsumerGroupMember.
Reviewers: Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
This patch adds ClientAssignor, Assignment, TopicMetadata and VersionedMetadata classes.
Reviewers: Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch implemented the second part of KIP-915. It bumps the versions of the value records used by the group coordinator and the transaction coordinator to make them flexible versions. The new versions are not used when writing to the partitions but only when reading from the partitions. This allows downgrades from future versions that will include tagged fields.
Reviewers: David Jacot <djacot@confluent.io>
This patch updates the `PartitionAssignor` server-side interface used in the new group coordinator for the new consumer group protocol as follow:
- It switches subscription from topic names to topic ids in order to be closer to the server side implementation.
- It switches assignment from Set to Map<Integer, Set> to be closer to the server side implementation.
- It adds getters for all attributes.
- It makes all attributes final private.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Alexandre Dupriez <alexandre.dupriez@gmail.com>, David Jacot <djacot@confluent.io>
This patch adds the `EventAccumulator` which will be used in the runtime of the new group coordinator. The aim of this accumulator is to basically have a queue per __consumer_group partitions and to ensure that events addressed to the same partitions are not processed concurrently. The accumulator is generic so we could reuse it in different context.
Reviewers: Alexandre Dupriez <alexandre.dupriez@gmail.com>, Justine Olshan <jolshan@confluent.io>