Browse Source

KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface (#13112)

This patch migrates all the internal APIs of the current group coordinator to the new `GroupCoordinator` interface. It also makes the current implementation package private to ensure that it is not used anymore.

Reviewers: Justine Olshan <jolshan@confluent.io>
pull/13139/head
David Jacot 2 years ago committed by GitHub
parent
commit
2e0a005dd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build.gradle
  2. 1
      checkstyle/import-control-jmh-benchmarks.xml
  3. 4
      core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
  4. 46
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  5. 78
      core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
  6. 25
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  7. 4
      core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
  8. 11
      core/src/main/scala/kafka/server/BrokerServer.scala
  9. 42
      core/src/main/scala/kafka/server/KafkaApis.scala
  10. 2
      core/src/main/scala/kafka/server/KafkaBroker.scala
  11. 11
      core/src/main/scala/kafka/server/KafkaServer.scala
  12. 6
      core/src/main/scala/kafka/server/RequestHandlerHelper.scala
  13. 21
      core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
  14. 2
      core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
  15. 2
      core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
  16. 4
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
  17. 10
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
  18. 12
      core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
  19. 93
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  20. 4
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  21. 90
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
  22. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java

1
build.gradle

@ -2428,6 +2428,7 @@ project(':jmh-benchmarks') { @@ -2428,6 +2428,7 @@ project(':jmh-benchmarks') {
}
implementation project(':server-common')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')

1
checkstyle/import-control-jmh-benchmarks.xml

@ -49,6 +49,7 @@ @@ -49,6 +49,7 @@
<allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.timeline" />

4
core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
package kafka.server.builders;
import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.group.GroupCoordinatorAdapter;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
@ -35,6 +33,7 @@ import kafka.server.ReplicaManager; @@ -35,6 +33,7 @@ import kafka.server.ReplicaManager;
import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;
@ -179,7 +178,6 @@ public class KafkaApisBuilder { @@ -179,7 +178,6 @@ public class KafkaApisBuilder {
metadataSupport,
replicaManager,
groupCoordinator,
new GroupCoordinatorAdapter(groupCoordinator, time),
txnCoordinator,
autoTopicCreationManager,
brokerId,

46
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package kafka.coordinator.group
import java.util.Properties
import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.OffsetAndMetadata
import kafka.server._
@ -48,14 +48,16 @@ import scala.math.max @@ -48,14 +48,16 @@ import scala.math.max
* used by its callback. The delayed callback may acquire the group lock
* since the delayed operation is completed only if the group lock can be acquired.
*/
class GroupCoordinator(val brokerId: Int,
val groupConfig: GroupConfig,
val offsetConfig: OffsetConfig,
val groupManager: GroupMetadataManager,
val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
val rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics) extends Logging {
private[group] class GroupCoordinator(
val brokerId: Int,
val groupConfig: GroupConfig,
val offsetConfig: OffsetConfig,
val groupManager: GroupMetadataManager,
val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
val rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics
) extends Logging {
import GroupCoordinator._
type JoinCallback = JoinGroupResult => Unit
@ -1188,7 +1190,7 @@ class GroupCoordinator(val brokerId: Int, @@ -1188,7 +1190,7 @@ class GroupCoordinator(val brokerId: Int,
*
* @param offsetTopicPartitionId The partition we are no longer leading
*/
def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: OptionalInt): Unit = {
info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
groupManager.removeGroupsForPartition(offsetTopicPartitionId, coordinatorEpoch, onGroupUnloaded)
}
@ -1707,10 +1709,12 @@ object GroupCoordinator { @@ -1707,10 +1709,12 @@ object GroupCoordinator {
val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
val NewMemberJoinTimeoutMs: Int = 5 * 60 * 1000
def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
time: Time,
metrics: Metrics): GroupCoordinator = {
private[group] def apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
time: Time,
metrics: Metrics
): GroupCoordinator = {
val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val rebalancePurgatory = DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics)
@ -1729,12 +1733,14 @@ object GroupCoordinator { @@ -1729,12 +1733,14 @@ object GroupCoordinator {
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
)
def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics): GroupCoordinator = {
private[group] def apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics
): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,

78
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala

@ -17,26 +17,47 @@ @@ -17,26 +17,47 @@
package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
import kafka.server.RequestLocal
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import java.util
import java.util.Optional
import java.util.{Optional, OptionalInt, Properties}
import java.util.concurrent.CompletableFuture
import java.util.function.IntSupplier
import scala.collection.{immutable, mutable}
import scala.jdk.CollectionConverters._
object GroupCoordinatorAdapter {
def apply(
config: KafkaConfig,
replicaManager: ReplicaManager,
time: Time,
metrics: Metrics
): GroupCoordinatorAdapter = {
new GroupCoordinatorAdapter(
GroupCoordinator(
config,
replicaManager,
time,
metrics
),
time
)
}
}
/**
* GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
* that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
*/
class GroupCoordinatorAdapter(
private[group] class GroupCoordinatorAdapter(
private val coordinator: GroupCoordinator,
private val time: Time
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
@ -511,4 +532,53 @@ class GroupCoordinatorAdapter( @@ -511,4 +532,53 @@ class GroupCoordinatorAdapter(
future
}
override def partitionFor(groupId: String): Int = {
coordinator.partitionFor(groupId)
}
override def onTransactionCompleted(
producerId: Long,
partitions: java.lang.Iterable[TopicPartition],
transactionResult: TransactionResult
): Unit = {
coordinator.scheduleHandleTxnCompletion(
producerId,
partitions.asScala,
transactionResult
)
}
override def onPartitionsDeleted(
topicPartitions: util.List[TopicPartition],
bufferSupplier: BufferSupplier
): Unit = {
coordinator.handleDeletedPartitions(topicPartitions.asScala, RequestLocal(bufferSupplier))
}
override def onElection(
groupMetadataPartitionIndex: Int,
groupMetadataPartitionLeaderEpoch: Int
): Unit = {
coordinator.onElection(groupMetadataPartitionIndex, groupMetadataPartitionLeaderEpoch)
}
override def onResignation(
groupMetadataPartitionIndex: Int,
groupMetadataPartitionLeaderEpoch: OptionalInt
): Unit = {
coordinator.onResignation(groupMetadataPartitionIndex, groupMetadataPartitionLeaderEpoch)
}
override def groupMetadataTopicConfigs(): Properties = {
coordinator.offsetsTopicConfigs
}
override def startup(groupMetadataTopicPartitionCount: IntSupplier): Unit = {
coordinator.startup(() => groupMetadataTopicPartitionCount.getAsInt)
}
override def shutdown(): Unit = {
coordinator.shutdown()
}
}

25
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -20,7 +20,7 @@ package kafka.coordinator.group @@ -20,7 +20,7 @@ package kafka.coordinator.group
import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Optional
import java.util.{Optional, OptionalInt}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
@ -547,7 +547,7 @@ class GroupMetadataManager(brokerId: Int, @@ -547,7 +547,7 @@ class GroupMetadataManager(brokerId: Int,
onGroupLoaded: GroupMetadata => Unit,
startTimeMs: java.lang.Long
): Unit = {
if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, Some(coordinatorEpoch))) {
if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, OptionalInt.of(coordinatorEpoch))) {
info(s"Not loading offsets and group metadata for $topicPartition " +
s"in epoch $coordinatorEpoch since current epoch is ${epochForPartitionId.get(topicPartition.partition)}")
} else if (!addLoadingPartition(topicPartition.partition)) {
@ -763,7 +763,7 @@ class GroupMetadataManager(brokerId: Int, @@ -763,7 +763,7 @@ class GroupMetadataManager(brokerId: Int,
* @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
*/
def removeGroupsForPartition(offsetsPartition: Int,
coordinatorEpoch: Option[Int],
coordinatorEpoch: OptionalInt,
onGroupUnloaded: GroupMetadata => Unit): Unit = {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
@ -771,7 +771,7 @@ class GroupMetadataManager(brokerId: Int, @@ -771,7 +771,7 @@ class GroupMetadataManager(brokerId: Int,
}
private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
coordinatorEpoch: Option[Int],
coordinatorEpoch: OptionalInt,
onGroupUnloaded: GroupMetadata => Unit): Unit = {
val offsetsPartition = topicPartition.partition
if (maybeUpdateCoordinatorEpoch(offsetsPartition, coordinatorEpoch)) {
@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int, @@ -810,19 +810,22 @@ class GroupMetadataManager(brokerId: Int,
*/
private def maybeUpdateCoordinatorEpoch(
partitionId: Int,
epochOpt: Option[Int]
epochOpt: OptionalInt
): Boolean = {
val updatedEpoch = epochForPartitionId.compute(partitionId, (_, currentEpoch) => {
if (currentEpoch == null) {
epochOpt.map(Int.box).orNull
if (epochOpt.isPresent) epochOpt.getAsInt
else null
} else {
epochOpt match {
case Some(epoch) if epoch > currentEpoch => epoch
case _ => currentEpoch
}
if (epochOpt.isPresent && epochOpt.getAsInt > currentEpoch) epochOpt.getAsInt
else currentEpoch
}
})
epochOpt.forall(_ == updatedEpoch)
if (epochOpt.isPresent) {
epochOpt.getAsInt == updatedEpoch
} else {
true
}
}
// visible for testing

4
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala

@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
@ -34,6 +33,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, @@ -34,6 +33,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@ -237,7 +237,7 @@ class DefaultAutoTopicCreationManager( @@ -237,7 +237,7 @@ class DefaultAutoTopicCreationManager(
.setName(topic)
.setNumPartitions(config.offsetsTopicPartitions)
.setReplicationFactor(config.offsetsTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(groupCoordinator.offsetsTopicConfigs))
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
new CreatableTopic()
.setName(topic)

11
core/src/main/scala/kafka/server/BrokerServer.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.server
import kafka.cluster.Broker.ServerInfo
import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
@ -37,6 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism @@ -37,6 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft
@ -283,7 +284,12 @@ class BrokerServer( @@ -283,7 +284,12 @@ class BrokerServer(
// Create group coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
groupCoordinator = GroupCoordinatorAdapter(
config,
replicaManager,
Time.SYSTEM,
metrics
)
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
@ -409,7 +415,6 @@ class BrokerServer( @@ -409,7 +415,6 @@ class BrokerServer(
metadataSupport = raftSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.nodeId,

42
core/src/main/scala/kafka/server/KafkaApis.scala

@ -20,7 +20,6 @@ package kafka.server @@ -20,7 +20,6 @@ package kafka.server
import kafka.admin.AdminUtils
import kafka.api.ElectLeadersRequestOps
import kafka.controller.ReplicaAssignment
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
@ -65,6 +64,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} @@ -65,6 +64,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
@ -76,7 +76,7 @@ import java.nio.ByteBuffer @@ -76,7 +76,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Collections, Optional}
import java.util.{Collections, Optional, OptionalInt}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, mutable}
@ -90,9 +90,6 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -90,9 +90,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val replicaManager: ReplicaManager,
val groupCoordinator: GroupCoordinator,
// newGroupCoordinator is temporary here. It will be removed when
// the migration to the new interface is completed in this class.
val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val autoTopicCreationManager: AutoTopicCreationManager,
val brokerId: Int,
@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
&& partitionState.deletePartition) {
val leaderEpoch = if (partitionState.leaderEpoch >= 0)
Some(partitionState.leaderEpoch)
OptionalInt.of(partitionState.leaderEpoch)
else
None
OptionalInt.empty
groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
} else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
&& partitionState.deletePartition) {
@ -357,8 +354,9 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -357,8 +354,9 @@ class KafkaApis(val requestChannel: RequestChannel,
new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
} else {
val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
if (deletedPartitions.nonEmpty)
groupCoordinator.handleDeletedPartitions(deletedPartitions, requestLocal)
if (deletedPartitions.nonEmpty) {
groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava, requestLocal.bufferSupplier)
}
if (zkSupport.adminManager.hasDelayedTopicOperations) {
updateMetadataRequest.partitionStates.forEach { partitionState =>
@ -538,7 +536,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -538,7 +536,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava)
newGroupCoordinator.commitOffsets(
groupCoordinator.commitOffsets(
request.context,
offsetCommitRequestData,
requestLocal.bufferSupplier
@ -1437,7 +1435,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1437,7 +1435,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
requireStable: Boolean
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
newGroupCoordinator.fetchAllOffsets(
groupCoordinator.fetchAllOffsets(
requestContext,
groupOffsetFetch.groupId,
requireStable
@ -1475,7 +1473,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1475,7 +1473,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupOffsetFetch.topics.asScala
)(_.name)
newGroupCoordinator.fetchOffsets(
groupCoordinator.fetchOffsets(
requestContext,
groupOffsetFetch.groupId,
authorizedTopics.asJava,
@ -1628,7 +1626,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1628,7 +1626,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
newGroupCoordinator.describeGroups(
groupCoordinator.describeGroups(
request.context,
authorizedGroups.asJava
).handle[Unit] { (results, exception) =>
@ -1663,7 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1663,7 +1661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val listGroupsRequest = request.body[ListGroupsRequest]
val hasClusterDescribe = authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
newGroupCoordinator.listGroups(
groupCoordinator.listGroups(
request.context,
listGroupsRequest.data
).handle[Unit] { (response, exception) =>
@ -1701,7 +1699,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1701,7 +1699,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
newGroupCoordinator.joinGroup(
groupCoordinator.joinGroup(
request.context,
joinGroupRequest.data,
requestLocal.bufferSupplier
@ -1735,7 +1733,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1735,7 +1733,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, syncGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
newGroupCoordinator.syncGroup(
groupCoordinator.syncGroup(
request.context,
syncGroupRequest.data,
requestLocal.bufferSupplier
@ -1759,7 +1757,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1759,7 +1757,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedGroups, unauthorizedGroups) =
authHelper.partitionSeqByAuthorized(request.context, DELETE, GROUP, groups)(identity)
newGroupCoordinator.deleteGroups(
groupCoordinator.deleteGroups(
request.context,
authorizedGroups.toList.asJava,
requestLocal.bufferSupplier
@ -1800,7 +1798,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1800,7 +1798,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, heartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
newGroupCoordinator.heartbeat(
groupCoordinator.heartbeat(
request.context,
heartbeatRequest.data
).handle[Unit] { (response, exception) =>
@ -1820,7 +1818,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1820,7 +1818,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, leaveGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
newGroupCoordinator.leaveGroup(
groupCoordinator.leaveGroup(
request.context,
leaveGroupRequest.normalizedData()
).handle[Unit] { (response, exception) =>
@ -2312,7 +2310,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -2312,7 +2310,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// as soon as the end transaction marker has been written for a transactional offset commit,
// call to the group coordinator to materialize the offsets into the cache
try {
groupCoordinator.scheduleHandleTxnCompletion(producerId, successfulOffsetsPartitions, result)
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result)
} catch {
case e: Exception =>
error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
@ -2586,7 +2584,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -2586,7 +2584,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setTransactionalId(txnOffsetCommitRequest.data.transactionalId)
.setTopics(authorizedTopicCommittedOffsets.asJava)
newGroupCoordinator.commitTransactionalOffsets(
groupCoordinator.commitTransactionalOffsets(
request.context,
txnOffsetCommitRequestData,
requestLocal.bufferSupplier
@ -3215,7 +3213,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -3215,7 +3213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setGroupId(offsetDeleteRequest.data.groupId)
.setTopics(authorizedTopicPartitions)
newGroupCoordinator.deleteOffsets(
groupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequestData,
requestLocal.bufferSupplier

2
core/src/main/scala/kafka/server/KafkaBroker.scala

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package kafka.server
import com.yammer.metrics.core.MetricName
import kafka.coordinator.group.GroupCoordinator
import kafka.log.LogManager
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
import kafka.network.SocketServer
@ -28,6 +27,7 @@ import org.apache.kafka.common.internals.ClusterResourceListeners @@ -28,6 +27,7 @@ import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.metrics.KafkaYammerMetrics

11
core/src/main/scala/kafka/server/KafkaServer.scala

@ -24,7 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsReporter
@ -49,6 +49,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, Node}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
@ -444,7 +445,12 @@ class KafkaServer( @@ -444,7 +445,12 @@ class KafkaServer(
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
groupCoordinator = GroupCoordinatorAdapter(
config,
replicaManager,
Time.SYSTEM,
metrics
)
groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
/* create producer ids manager */
@ -506,7 +512,6 @@ class KafkaServer( @@ -506,7 +512,6 @@ class KafkaServer(
metadataSupport = zkSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time),
txnCoordinator = transactionCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = config.brokerId,

6
core/src/main/scala/kafka/server/RequestHandlerHelper.scala

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Partition
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
@ -27,6 +26,9 @@ import org.apache.kafka.common.internals.Topic @@ -27,6 +26,9 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import java.util.OptionalInt
object RequestHandlerHelper {
@ -46,7 +48,7 @@ object RequestHandlerHelper { @@ -46,7 +48,7 @@ object RequestHandlerHelper {
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GROUP_METADATA_TOPIC_NAME)
groupCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
groupCoordinator.onResignation(partition.partitionId, OptionalInt.of(partition.getLeaderEpoch))
else if (partition.topic == Topic.TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.onResignation(partition.partitionId, Some(partition.getLeaderEpoch))
}

21
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

@ -17,21 +17,22 @@ @@ -17,21 +17,22 @@
package kafka.server.metadata
import java.util.Properties
import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler
import scala.collection.mutable
import scala.jdk.CollectionConverters._
object BrokerMetadataPublisher extends Logging {
@ -174,7 +175,8 @@ class BrokerMetadataPublisher( @@ -174,7 +175,8 @@ class BrokerMetadataPublisher(
delta,
Topic.GROUP_METADATA_TOPIC_NAME,
groupCoordinator.onElection,
groupCoordinator.onResignation)
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in ${deltaName}", t)
@ -200,7 +202,7 @@ class BrokerMetadataPublisher( @@ -200,7 +202,7 @@ class BrokerMetadataPublisher(
}
}
if (deletedTopicPartitions.nonEmpty) {
groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching)
groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, RequestLocal.NoCaching.bufferSupplier)
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
@ -270,6 +272,13 @@ class BrokerMetadataPublisher( @@ -270,6 +272,13 @@ class BrokerMetadataPublisher(
}
}
private def toOptionalInt(option: Option[Int]): OptionalInt = {
option match {
case Some(leaderEpoch) => OptionalInt.of(leaderEpoch)
case None => OptionalInt.empty
}
}
override def publishedOffset: Long = publishedOffsetAtomic.get()
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
@ -342,8 +351,8 @@ class BrokerMetadataPublisher( @@ -342,8 +351,8 @@ class BrokerMetadataPublisher(
}
try {
// Start the group coordinator.
groupCoordinator.startup(() => metadataCache.numPartitions(
Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
.getOrElse(conf.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
}

2
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

@ -158,7 +158,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @@ -158,7 +158,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers,
numPartitions = servers.head.config.offsetsTopicPartitions,
replicationFactor = numServers,
topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
TestMetricsReporter.testReporters.clear()
}

2
core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala

@ -109,7 +109,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT @@ -109,7 +109,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
}
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)

4
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package kafka.coordinator.group
import java.util.Optional
import java.util.{Optional, OptionalInt}
import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils._
@ -190,7 +190,7 @@ class GroupCoordinatorTest { @@ -190,7 +190,7 @@ class GroupCoordinatorTest {
val otherGroupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId)
when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None)
// Call removeGroupsAndOffsets so that partition removed from loadingPartitions
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition, Some(1), group => {})
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition, OptionalInt.of(1), group => {})
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition, 1, group => {}, 0L)
assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(otherGroupId)._1)
}

10
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

@ -20,7 +20,7 @@ package kafka.coordinator.group @@ -20,7 +20,7 @@ package kafka.coordinator.group
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Optional}
import java.util.{Collections, Optional, OptionalInt}
import com.yammer.metrics.core.Gauge
import javax.management.ObjectName
@ -669,7 +669,7 @@ class GroupMetadataManagerTest { @@ -669,7 +669,7 @@ class GroupMetadataManagerTest {
loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, Some(groupEpoch), _ => ())
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.of(groupEpoch), _ => ())
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
@ -685,7 +685,7 @@ class GroupMetadataManagerTest { @@ -685,7 +685,7 @@ class GroupMetadataManagerTest {
val groupEpoch = 2
loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, Some(groupEpoch), _ => ())
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.of(groupEpoch), _ => ())
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
@ -696,7 +696,7 @@ class GroupMetadataManagerTest { @@ -696,7 +696,7 @@ class GroupMetadataManagerTest {
val groupEpoch = 2
val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, Some(groupEpoch - 1), _ => ())
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.of(groupEpoch - 1), _ => ())
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()))
val group = groupMetadataManager.getGroup(groupId).getOrElse(throw new AssertionError("Group was not loaded into the cache"))
assertEquals(initiallyLoaded.groupId, group.groupId)
@ -718,7 +718,7 @@ class GroupMetadataManagerTest { @@ -718,7 +718,7 @@ class GroupMetadataManagerTest {
val groupEpoch = 2
val initiallyLoaded = loadOffsetsAndGroup(groupTopicPartition, groupEpoch)
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, None, _ => ())
groupMetadataManager.removeGroupsAndOffsets(groupTopicPartition, OptionalInt.empty, _ => ())
assertTrue(groupMetadataManager.getGroup(groupId).isEmpty,
"Removed group remained in cache")
assertEquals(groupEpoch, groupMetadataManager.epochForPartitionId.get(groupTopicPartition.partition()),

12
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala

@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions, RequestCompletionHandler}
@ -38,6 +37,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -38,6 +37,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@ -82,7 +82,7 @@ class AutoTopicCreationManagerTest { @@ -82,7 +82,7 @@ class AutoTopicCreationManagerTest {
@Test
def testCreateOffsetTopic(): Unit = {
Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
testCreateTopic(GROUP_METADATA_TOPIC_NAME, true, internalTopicPartitions, internalTopicReplicationFactor)
}
@ -159,7 +159,7 @@ class AutoTopicCreationManagerTest { @@ -159,7 +159,7 @@ class AutoTopicCreationManagerTest {
@Test
def testInvalidReplicationFactorForConsumerOffsetsTopic(): Unit = {
Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
}
@ -177,7 +177,7 @@ class AutoTopicCreationManagerTest { @@ -177,7 +177,7 @@ class AutoTopicCreationManagerTest {
@Test
def testTopicExistsErrorSwapForConsumerOffsetsTopic(): Unit = {
Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
}
@ -197,7 +197,7 @@ class AutoTopicCreationManagerTest { @@ -197,7 +197,7 @@ class AutoTopicCreationManagerTest {
@Test
def testRequestTimeoutErrorSwapForConsumerOffsetTopic(): Unit = {
Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
expectedError = Some(Errors.LEADER_NOT_AVAILABLE))
}
@ -216,7 +216,7 @@ class AutoTopicCreationManagerTest { @@ -216,7 +216,7 @@ class AutoTopicCreationManagerTest {
@Test
def testUnknownTopicPartitionForConsumerOffsetTopic(): Unit = {
Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
}

93
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -26,7 +26,6 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties} @@ -26,7 +26,6 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties}
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
@ -91,6 +90,7 @@ import scala.jdk.CollectionConverters._ @@ -91,6 +90,7 @@ import scala.jdk.CollectionConverters._
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchParams, FetchPartitionData}
@ -100,8 +100,6 @@ class KafkaApisTest { @@ -100,8 +100,6 @@ class KafkaApisTest {
private val requestChannelMetrics: RequestChannel.Metrics = mock(classOf[RequestChannel.Metrics])
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator])
private val newGroupCoordinator: org.apache.kafka.coordinator.group.GroupCoordinator =
mock(classOf[org.apache.kafka.coordinator.group.GroupCoordinator])
private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator])
private val controller: KafkaController = mock(classOf[KafkaController])
@ -191,7 +189,6 @@ class KafkaApisTest { @@ -191,7 +189,6 @@ class KafkaApisTest {
metadataSupport = metadataSupport,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
newGroupCoordinator = newGroupCoordinator,
txnCoordinator = txnCoordinator,
autoTopicCreationManager = autoTopicCreationManager,
brokerId = brokerId,
@ -1049,7 +1046,7 @@ class KafkaApisTest { @@ -1049,7 +1046,7 @@ class KafkaApisTest {
case CoordinatorType.GROUP =>
topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString)
when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
authorizeResource(authorizer, AclOperation.DESCRIBE, ResourceType.GROUP,
groupId, AuthorizationResult.ALLOWED)
Topic.GROUP_METADATA_TOPIC_NAME
@ -1160,7 +1157,7 @@ class KafkaApisTest { @@ -1160,7 +1157,7 @@ class KafkaApisTest {
case Topic.GROUP_METADATA_TOPIC_NAME =>
topicConfigOverride.put(KafkaConfig.OffsetsTopicPartitionsProp, numBrokersNeeded.toString)
topicConfigOverride.put(KafkaConfig.OffsetsTopicReplicationFactorProp, numBrokersNeeded.toString)
when(groupCoordinator.offsetsTopicConfigs).thenReturn(new Properties)
when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
true
case Topic.TRANSACTION_STATE_TOPIC_NAME =>
@ -1283,7 +1280,7 @@ class KafkaApisTest { @@ -1283,7 +1280,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build())
val future = new CompletableFuture[OffsetCommitResponseData]()
when(newGroupCoordinator.commitOffsets(
when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
offsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@ -1327,7 +1324,7 @@ class KafkaApisTest { @@ -1327,7 +1324,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new OffsetCommitRequest.Builder(offsetCommitRequest).build())
val future = new CompletableFuture[OffsetCommitResponseData]()
when(newGroupCoordinator.commitOffsets(
when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
offsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@ -1423,7 +1420,7 @@ class KafkaApisTest { @@ -1423,7 +1420,7 @@ class KafkaApisTest {
.setCommittedOffset(50)).asJava)).asJava)
val future = new CompletableFuture[OffsetCommitResponseData]()
when(newGroupCoordinator.commitOffsets(
when(groupCoordinator.commitOffsets(
requestChannelRequest.context,
expectedOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@ -1588,7 +1585,7 @@ class KafkaApisTest { @@ -1588,7 +1585,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
when(newGroupCoordinator.commitTransactionalOffsets(
when(groupCoordinator.commitTransactionalOffsets(
requestChannelRequest.context,
txnOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@ -1632,7 +1629,7 @@ class KafkaApisTest { @@ -1632,7 +1629,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build())
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
when(newGroupCoordinator.commitTransactionalOffsets(
when(groupCoordinator.commitTransactionalOffsets(
requestChannelRequest.context,
txnOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@ -1728,7 +1725,7 @@ class KafkaApisTest { @@ -1728,7 +1725,7 @@ class KafkaApisTest {
.setCommittedOffset(50)).asJava)).asJava)
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
when(newGroupCoordinator.commitTransactionalOffsets(
when(groupCoordinator.commitTransactionalOffsets(
requestChannelRequest.context,
expectedTnxOffsetCommitRequest,
RequestLocal.NoCaching.bufferSupplier
@ -1829,7 +1826,7 @@ class KafkaApisTest { @@ -1829,7 +1826,7 @@ class KafkaApisTest {
val requestLocal = RequestLocal.withThreadConfinedCaching
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
when(newGroupCoordinator.commitTransactionalOffsets(
when(groupCoordinator.commitTransactionalOffsets(
request.context,
offsetCommitRequest.data,
requestLocal.bufferSupplier
@ -2364,10 +2361,10 @@ class KafkaApisTest { @@ -2364,10 +2361,10 @@ class KafkaApisTest {
if (deletePartition) {
if (leaderEpoch >= 0) {
verify(txnCoordinator).onResignation(txnStatePartition.partition, Some(leaderEpoch))
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, Some(leaderEpoch))
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, OptionalInt.of(leaderEpoch))
} else {
verify(txnCoordinator).onResignation(txnStatePartition.partition, None)
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, None)
verify(groupCoordinator).onResignation(groupMetadataPartition.partition, OptionalInt.empty)
}
}
}
@ -2462,7 +2459,7 @@ class KafkaApisTest { @@ -2462,7 +2459,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
when(newGroupCoordinator.deleteGroups(
when(groupCoordinator.deleteGroups(
requestChannelRequest.context,
List("group-1", "group-2", "group-3").asJava,
RequestLocal.NoCaching.bufferSupplier
@ -2505,7 +2502,7 @@ class KafkaApisTest { @@ -2505,7 +2502,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new DeleteGroupsRequest.Builder(deleteGroupsRequest).build())
val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
when(newGroupCoordinator.deleteGroups(
when(groupCoordinator.deleteGroups(
requestChannelRequest.context,
List("group-1", "group-2", "group-3").asJava,
RequestLocal.NoCaching.bufferSupplier
@ -2564,7 +2561,7 @@ class KafkaApisTest { @@ -2564,7 +2561,7 @@ class KafkaApisTest {
}
val future = new CompletableFuture[DeleteGroupsResponseData.DeletableGroupResultCollection]()
when(newGroupCoordinator.deleteGroups(
when(groupCoordinator.deleteGroups(
requestChannelRequest.context,
List("group-2", "group-3").asJava,
RequestLocal.NoCaching.bufferSupplier
@ -2611,7 +2608,7 @@ class KafkaApisTest { @@ -2611,7 +2608,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build())
val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
when(newGroupCoordinator.describeGroups(
when(groupCoordinator.describeGroups(
requestChannelRequest.context,
describeGroupsRequest.groups
)).thenReturn(future)
@ -2653,7 +2650,7 @@ class KafkaApisTest { @@ -2653,7 +2650,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build())
val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
when(newGroupCoordinator.describeGroups(
when(groupCoordinator.describeGroups(
requestChannelRequest.context,
describeGroupsRequest.groups
)).thenReturn(future)
@ -2707,7 +2704,7 @@ class KafkaApisTest { @@ -2707,7 +2704,7 @@ class KafkaApisTest {
}
val future = new CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]]()
when(newGroupCoordinator.describeGroups(
when(groupCoordinator.describeGroups(
requestChannelRequest.context,
List("group-2").asJava
)).thenReturn(future)
@ -2764,7 +2761,7 @@ class KafkaApisTest { @@ -2764,7 +2761,7 @@ class KafkaApisTest {
val requestLocal = RequestLocal.withThreadConfinedCaching
val future = new CompletableFuture[OffsetDeleteResponseData]()
when(newGroupCoordinator.deleteOffsets(
when(groupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequest.data,
requestLocal.bufferSupplier
@ -2857,7 +2854,7 @@ class KafkaApisTest { @@ -2857,7 +2854,7 @@ class KafkaApisTest {
).asJava.iterator))
val future = new CompletableFuture[OffsetDeleteResponseData]()
when(newGroupCoordinator.deleteOffsets(
when(groupCoordinator.deleteOffsets(
requestChannelRequest.context,
expectedOffsetDeleteRequest,
RequestLocal.NoCaching.bufferSupplier
@ -2962,7 +2959,7 @@ class KafkaApisTest { @@ -2962,7 +2959,7 @@ class KafkaApisTest {
// The group coordinator is called even if there are no
// topic-partitions left after the validation.
when(newGroupCoordinator.deleteOffsets(
when(groupCoordinator.deleteOffsets(
request.context,
new OffsetDeleteRequestData().setGroupId(group),
RequestLocal.NoCaching.bufferSupplier
@ -2994,7 +2991,7 @@ class KafkaApisTest { @@ -2994,7 +2991,7 @@ class KafkaApisTest {
val request = buildRequest(offsetDeleteRequest)
val future = new CompletableFuture[OffsetDeleteResponseData]()
when(newGroupCoordinator.deleteOffsets(
when(groupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequest.data,
RequestLocal.NoCaching.bufferSupplier
@ -3386,7 +3383,7 @@ class KafkaApisTest { @@ -3386,7 +3383,7 @@ class KafkaApisTest {
.setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
when(groupCoordinator.joinGroup(
requestChannelRequest.context,
expectedJoinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3429,7 +3426,7 @@ class KafkaApisTest { @@ -3429,7 +3426,7 @@ class KafkaApisTest {
.setSessionTimeoutMs(joinGroupRequest.sessionTimeoutMs)
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
when(groupCoordinator.joinGroup(
requestChannelRequest.context,
expectedJoinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3448,7 +3445,7 @@ class KafkaApisTest { @@ -3448,7 +3445,7 @@ class KafkaApisTest {
val expectedJoinGroupResponse = new JoinGroupResponseData()
.setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code)
.setMemberId("member")
.setProtocolName(if (version >= 7) null else GroupCoordinator.NoProtocol)
.setProtocolName(if (version >= 7) null else kafka.coordinator.group.GroupCoordinator.NoProtocol)
future.complete(joinGroupResponse)
val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest)
@ -3467,7 +3464,7 @@ class KafkaApisTest { @@ -3467,7 +3464,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
when(groupCoordinator.joinGroup(
requestChannelRequest.context,
joinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3519,7 +3516,7 @@ class KafkaApisTest { @@ -3519,7 +3516,7 @@ class KafkaApisTest {
val requestChannelRequest = buildRequest(new JoinGroupRequest.Builder(joinGroupRequest).build())
val future = new CompletableFuture[JoinGroupResponseData]()
when(newGroupCoordinator.joinGroup(
when(groupCoordinator.joinGroup(
requestChannelRequest.context,
joinGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3562,7 +3559,7 @@ class KafkaApisTest { @@ -3562,7 +3559,7 @@ class KafkaApisTest {
.setProtocolName(if (version >= 5) "range" else null)
val future = new CompletableFuture[SyncGroupResponseData]()
when(newGroupCoordinator.syncGroup(
when(groupCoordinator.syncGroup(
requestChannelRequest.context,
expectedSyncGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3599,7 +3596,7 @@ class KafkaApisTest { @@ -3599,7 +3596,7 @@ class KafkaApisTest {
.setProtocolName("range")
val future = new CompletableFuture[SyncGroupResponseData]()
when(newGroupCoordinator.syncGroup(
when(groupCoordinator.syncGroup(
requestChannelRequest.context,
expectedSyncGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3652,7 +3649,7 @@ class KafkaApisTest { @@ -3652,7 +3649,7 @@ class KafkaApisTest {
.setMemberId("member")
val future = new CompletableFuture[SyncGroupResponseData]()
when(newGroupCoordinator.syncGroup(
when(groupCoordinator.syncGroup(
requestChannelRequest.context,
expectedSyncGroupRequest,
RequestLocal.NoCaching.bufferSupplier
@ -3694,7 +3691,7 @@ class KafkaApisTest { @@ -3694,7 +3691,7 @@ class KafkaApisTest {
.setGenerationId(0)
val future = new CompletableFuture[HeartbeatResponseData]()
when(newGroupCoordinator.heartbeat(
when(groupCoordinator.heartbeat(
requestChannelRequest.context,
expectedHeartbeatRequest
)).thenReturn(future)
@ -3722,7 +3719,7 @@ class KafkaApisTest { @@ -3722,7 +3719,7 @@ class KafkaApisTest {
.setGenerationId(0)
val future = new CompletableFuture[HeartbeatResponseData]()
when(newGroupCoordinator.heartbeat(
when(groupCoordinator.heartbeat(
requestChannelRequest.context,
expectedHeartbeatRequest
)).thenReturn(future)
@ -3882,7 +3879,7 @@ class KafkaApisTest { @@ -3882,7 +3879,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
when(newGroupCoordinator.leaveGroup(
when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@ -3927,7 +3924,7 @@ class KafkaApisTest { @@ -3927,7 +3924,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
when(newGroupCoordinator.leaveGroup(
when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@ -3980,7 +3977,7 @@ class KafkaApisTest { @@ -3980,7 +3977,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
when(newGroupCoordinator.leaveGroup(
when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@ -4012,7 +4009,7 @@ class KafkaApisTest { @@ -4012,7 +4009,7 @@ class KafkaApisTest {
).asJava)
val future = new CompletableFuture[LeaveGroupResponseData]()
when(newGroupCoordinator.leaveGroup(
when(groupCoordinator.leaveGroup(
requestChannelRequest.context,
expectedLeaveGroupRequest
)).thenReturn(future)
@ -4053,7 +4050,7 @@ class KafkaApisTest { @@ -4053,7 +4050,7 @@ class KafkaApisTest {
val requestChannelRequest = makeRequest(version)
val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
when(newGroupCoordinator.fetchOffsets(
when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@ -4064,14 +4061,14 @@ class KafkaApisTest { @@ -4064,14 +4061,14 @@ class KafkaApisTest {
)).thenReturn(group1Future)
val group2Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
when(newGroupCoordinator.fetchAllOffsets(
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-2",
false
)).thenReturn(group2Future)
val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
when(newGroupCoordinator.fetchAllOffsets(
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-3",
false
@ -4155,7 +4152,7 @@ class KafkaApisTest { @@ -4155,7 +4152,7 @@ class KafkaApisTest {
val requestChannelRequest = makeRequest(version)
val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
when(newGroupCoordinator.fetchOffsets(
when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@ -4254,7 +4251,7 @@ class KafkaApisTest { @@ -4254,7 +4251,7 @@ class KafkaApisTest {
// group-1 is allowed and bar is allowed.
val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
when(newGroupCoordinator.fetchOffsets(
when(groupCoordinator.fetchOffsets(
requestChannelRequest.context,
"group-1",
List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
@ -4266,7 +4263,7 @@ class KafkaApisTest { @@ -4266,7 +4263,7 @@ class KafkaApisTest {
// group-3 is allowed and bar is allowed.
val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
when(newGroupCoordinator.fetchAllOffsets(
when(groupCoordinator.fetchAllOffsets(
requestChannelRequest.context,
"group-3",
false
@ -4656,7 +4653,7 @@ class KafkaApisTest { @@ -4656,7 +4653,7 @@ class KafkaApisTest {
.setStatesFilter(if (version >= 4) List("Stable", "Empty").asJava else List.empty.asJava)
val future = new CompletableFuture[ListGroupsResponseData]()
when(newGroupCoordinator.listGroups(
when(groupCoordinator.listGroups(
requestChannelRequest.context,
expectedListGroupsRequest
)).thenReturn(future)
@ -4691,7 +4688,7 @@ class KafkaApisTest { @@ -4691,7 +4688,7 @@ class KafkaApisTest {
.setStatesFilter(List("Stable", "Empty").asJava)
val future = new CompletableFuture[ListGroupsResponseData]()
when(newGroupCoordinator.listGroups(
when(groupCoordinator.listGroups(
requestChannelRequest.context,
expectedListGroupsRequest
)).thenReturn(future)
@ -4787,7 +4784,7 @@ class KafkaApisTest { @@ -4787,7 +4784,7 @@ class KafkaApisTest {
val expectedListGroupsRequest = new ListGroupsRequestData()
val future = new CompletableFuture[ListGroupsResponseData]()
when(newGroupCoordinator.listGroups(
when(groupCoordinator.listGroups(
requestChannelRequest.context,
expectedListGroupsRequest
)).thenReturn(future)

4
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -505,7 +505,7 @@ object TestUtils extends Logging { @@ -505,7 +505,7 @@ object TestUtils extends Logging {
numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
brokers = brokers,
topicConfig = broker.groupCoordinator.offsetsTopicConfigs,
topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs,
)
}
@ -611,7 +611,7 @@ object TestUtils extends Logging { @@ -611,7 +611,7 @@ object TestUtils extends Logging {
server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
server.groupCoordinator.offsetsTopicConfigs)
server.groupCoordinator.groupMetadataTopicConfigs)
}
/**

90
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
@ -37,11 +38,18 @@ import org.apache.kafka.common.message.SyncGroupResponseData; @@ -37,11 +38,18 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntSupplier;
/**
* Group Coordinator's internal API.
*/
public interface GroupCoordinator {
/**
@ -215,4 +223,86 @@ public interface GroupCoordinator { @@ -215,4 +223,86 @@ public interface GroupCoordinator {
OffsetDeleteRequestData request,
BufferSupplier bufferSupplier
);
/**
* Return the partition index for the given Group.
*
* @param groupId The group id.
*
* @return The partition index.
*/
int partitionFor(String groupId);
/**
* Commit or abort the pending transactional offsets for the given partitions.
*
* @param producerId The producer id.
* @param partitions The partitions.
* @param transactionResult The result of the transaction.
*/
void onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult
);
/**
* Remove the provided deleted partitions offsets.
*
* @param topicPartitions The deleted partitions.
* @param bufferSupplier The buffer supplier tight to the request thread.
*/
void onPartitionsDeleted(
List<TopicPartition> topicPartitions,
BufferSupplier bufferSupplier
);
/**
* Group coordinator is now the leader for the given partition at the
* given leader epoch. It should load cached state from the partition
* and begin handling requests for groups mapped to it.
*
* @param groupMetadataPartitionIndex The partition index.
* @param groupMetadataPartitionLeaderEpoch The leader epoch of the partition.
*/
void onElection(
int groupMetadataPartitionIndex,
int groupMetadataPartitionLeaderEpoch
);
/**
* Group coordinator is no longer the leader for the given partition
* at the given leader epoch. It should unload cached state and stop
* handling requests for groups mapped to it.
*
* @param groupMetadataPartitionIndex The partition index.
* @param groupMetadataPartitionLeaderEpoch The leader epoch of the partition as an
* optional value. An empty value means that
* the topic was deleted.
*/
void onResignation(
int groupMetadataPartitionIndex,
OptionalInt groupMetadataPartitionLeaderEpoch
);
/**
* Return the configuration properties of the internal group
* metadata topic.
*
* @return Properties of the internal topic.
*/
Properties groupMetadataTopicConfigs();
/**
* Startup the group coordinator.
*
* @param groupMetadataTopicPartitionCount A supplier to get the number of partitions
* of the consumer offsets topic.
*/
void startup(IntSupplier groupMetadataTopicPartitionCount);
/**
* Shutdown the group coordinator.
*/
void shutdown();
}

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package org.apache.kafka.jmh.metadata;
import kafka.controller.KafkaController;
import kafka.coordinator.group.GroupCoordinator;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
@ -60,6 +59,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest; @@ -60,6 +59,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;

Loading…
Cancel
Save