Browse Source

KAFKA-1333 follow-up; Add missing files for the coordinator folder

pull/1442/head
Guozhang Wang 10 years ago
parent
commit
71602de0bb
  1. 333
      core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
  2. 52
      core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
  3. 44
      core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
  4. 44
      core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
  5. 62
      core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
  6. 74
      core/src/main/scala/kafka/coordinator/GroupRegistry.scala
  7. 36
      core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala

333
core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala

@ -0,0 +1,333 @@ @@ -0,0 +1,333 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import org.apache.kafka.common.protocol.Errors
import kafka.common.TopicAndPartition
import kafka.server._
import kafka.utils._
import scala.collection.mutable.HashMap
import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
/**
* Kafka coordinator handles consumer group and consumer offset management.
*
* Each Kafka server instantiates a coordinator, which is responsible for a set of
* consumer groups; the consumer groups are assigned to coordinators based on their
* group names.
*/
class ConsumerCoordinator(val config: KafkaConfig,
val zkClient: ZkClient) extends Logging {
this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: "
/* zookeeper listener for topic-partition changes */
private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener]
/* the consumer group registry cache */
// TODO: access to this map needs to be synchronized
private val consumerGroupRegistries = new HashMap[String, GroupRegistry]
/* the list of subscribed groups per topic */
// TODO: access to this map needs to be synchronized
private val consumerGroupsPerTopic = new HashMap[String, List[String]]
/* the delayed operation purgatory for heartbeat-based failure detection */
private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
/* the delayed operation purgatory for handling join-group requests */
private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null
/* the delayed operation purgatory for preparing rebalance process */
private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
/* latest consumer heartbeat bucket's end timestamp in milliseconds */
private var latestHeartbeatBucketEndMs: Long = SystemTime.milliseconds
/**
* Start-up logic executed at the same time when the server starts up.
*/
def startup() {
// Initialize consumer group registries and heartbeat bucket metadata
latestHeartbeatBucketEndMs = SystemTime.milliseconds
// Initialize purgatories for delayed heartbeat, join-group and rebalance operations
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId)
joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId)
rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId)
}
/**
* Shut-down logic executed at the same time when server shuts down,
* ordering of actions should be reversed from the start-up process
*
*/
def shutdown() {
// De-register all Zookeeper listeners for topic-partition changes
for (topic <- topicPartitionChangeListeners.keys) {
deregisterTopicChangeListener(topic)
}
topicPartitionChangeListeners.clear()
// Shutdown purgatories for delayed heartbeat, join-group and rebalance operations
heartbeatPurgatory.shutdown()
joinGroupPurgatory.shutdown()
rebalancePurgatory.shutdown()
// Clean up consumer group registries metadata
consumerGroupRegistries.clear()
consumerGroupsPerTopic.clear()
}
/**
* Process a join-group request from a consumer to join as a new group member
*/
def consumerJoinGroup(groupId: String,
consumerId: String,
topics: List[String],
sessionTimeoutMs: Int,
partitionAssignmentStrategy: String,
responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) {
// if the group does not exist yet, create one
if (!consumerGroupRegistries.contains(groupId))
createNewGroup(groupId, partitionAssignmentStrategy)
// if the consumer id is unknown or it does exists in
// the group yet, register this consumer to the group
// TODO
// add a delayed join-group operation to the purgatory
// TODO
// if the current group is under rebalance process,
// check if the delayed rebalance operation can be finished
// TODO
// TODO --------------------------------------------------------------
// TODO: this is just a stub for new consumer testing,
// TODO: needs to be replaced with the logic above
// TODO --------------------------------------------------------------
// just return all the partitions of the subscribed topics
val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics)
val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) =>
partitionIds.map(partition => {
TopicAndPartition(topic, partition)
})
}.toList
responseCallback(partitions, 1 /* generation id */, Errors.NONE.code)
info("Handled join-group from consumer " + consumerId + " to group " + groupId)
}
/**
* Process a heartbeat request from a consumer
*/
def consumerHeartbeat(groupId: String,
consumerId: String,
generationId: Int,
responseCallback: Short => Unit) {
// check that the group already exists
// TODO
// check that the consumer has already registered for the group
// TODO
// check if the consumer generation id is correct
// TODO
// remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket
// TODO
// create the heartbeat response, if partition rebalance is triggered set the corresponding error code
// TODO
info("Handled heartbeat of consumer " + consumerId + " from group " + groupId)
// TODO --------------------------------------------------------------
// TODO: this is just a stub for new consumer testing,
// TODO: needs to be replaced with the logic above
// TODO --------------------------------------------------------------
// always return OK for heartbeat immediately
responseCallback(Errors.NONE.code)
}
/**
* Create a new consumer
*/
private def createNewConsumer(consumerId: String,
topics: List[String],
sessionTimeoutMs: Int,
groupRegistry: GroupRegistry) {
debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId)
// create the new consumer registry entry
// TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase
// check if the partition assignment strategy is consistent with the group
// TODO
// add the group to the subscribed topics
// TODO
// schedule heartbeat tasks for the consumer
// TODO
// add the member registry entry to the group
// TODO
// start preparing group partition rebalance
// TODO
info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId)
}
/**
* Create a new consumer group in the registry
*/
private def createNewGroup(groupId: String, partitionAssignmentStrategy: String) {
debug("Creating new group " + groupId)
val groupRegistry = new GroupRegistry(groupId, partitionAssignmentStrategy)
consumerGroupRegistries.put(groupId, groupRegistry)
info("Created new group registry " + groupId)
}
/**
* Callback invoked when a consumer's heartbeat has expired
*/
private def onConsumerHeartbeatExpired(groupId: String, consumerId: String) {
// if the consumer does not exist in group registry anymore, do nothing
// TODO
// record heartbeat failure
// TODO
// if the maximum failures has been reached, mark consumer as failed
// TODO
}
/**
* Callback invoked when a consumer is marked as failed
*/
private def onConsumerFailure(groupId: String, consumerId: String) {
// remove the consumer from its group registry metadata
// TODO
// cut the socket connection to the consumer
// TODO: howto ??
// if the group has no consumer members any more, remove the group
// otherwise start preparing group partition rebalance
// TODO
}
/**
* Prepare partition rebalance for the group
*/
private def prepareRebalance(groupId: String) {
// try to change the group state to PrepareRebalance
// add a task to the delayed rebalance purgatory
// TODO
}
/**
* Start partition rebalance for the group
*/
private def startRebalance(groupId: String) {
// try to change the group state to UnderRebalance
// compute new assignment based on the strategy
// send back the join-group response
// TODO
}
/**
* Fail current partition rebalance for the group
*/
/**
* Register ZK listeners for topic-partition changes
*/
private def registerTopicChangeListener(topic: String) = {
if (!topicPartitionChangeListeners.contains(topic)) {
val listener = new TopicPartitionChangeListener(config)
topicPartitionChangeListeners.put(topic, listener)
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic))
zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener)
}
}
/**
* De-register ZK listeners for topic-partition changes
*/
private def deregisterTopicChangeListener(topic: String) = {
val listener = topicPartitionChangeListeners.get(topic).get
zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener)
topicPartitionChangeListeners.remove(topic)
}
/**
* Zookeeper listener that catch topic-partition changes
*/
class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging {
this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: "
/**
* Try to trigger a rebalance for each group subscribed in the changed topic
*
* @throws Exception
* On any error.
*/
def handleChildChange(parentPath: String , curChilds: java.util.List[String]) {
debug("Fired for path %s with children %s".format(parentPath, curChilds))
// get the topic
val topic = parentPath.split("/").last
// get groups that subscribed to this topic
val groups = consumerGroupsPerTopic.get(topic).get
for (groupId <- groups) {
prepareRebalance(groupId)
}
}
}
}

52
core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.HashMap
/**
* Consumer registry metadata contains the following metadata:
*
* Heartbeat metadata:
* 1. negotiated heartbeat session timeout.
* 2. recorded number of timed-out heartbeats.
* 3. associated heartbeat bucket in the purgatory.
*
* Subscription metadata:
* 1. subscribed topic list
* 2. assigned partitions for the subscribed topics.
*/
class ConsumerRegistry(val consumerId: String,
val subscribedTopics: List[String],
val sessionTimeoutMs: Int,
val groupRegistry: GroupRegistry) {
/* number of expired heartbeat recorded */
val numExpiredHeartbeat = new AtomicInteger(0)
/* flag indicating if join group request is received */
val joinGroupReceived = new AtomicBoolean(false)
/* assigned partitions per subscribed topic */
val assignedPartitions = new HashMap[String, List[Int]]
/* associated heartbeat bucket */
var currentHeartbeatBucket = null
}

44
core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import kafka.server.DelayedOperation
/**
* Delayed heartbeat operations that are added to the purgatory for session-timeout checking
*
* These operations will always be expired. Once it has expired, all its
* currently contained consumers are marked as heartbeat timed out.
*/
class DelayedHeartbeat(sessionTimeout: Long,
bucket: HeartbeatBucket,
expireCallback: (String, String) => Unit)
extends DelayedOperation(sessionTimeout) {
/* this function should never be called */
override def tryComplete(): Boolean = {
throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket")
}
/* mark all consumers within the heartbeat as heartbeat timed out */
override def onComplete() {
for (registry <- bucket.consumerRegistryList)
expireCallback(registry.groupRegistry.groupId, registry.consumerId)
}
}

44
core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import kafka.server.DelayedOperation
/**
* Delayed join-group operations that are kept in the purgatory before the partition assignment completed
*
* These operation should never expire; when the rebalance has completed, all consumer's
* join-group operations will be completed by sending back the response with the
* calculated partition assignment.
*/
class DelayedJoinGroup(sessionTimeout: Long,
consumerRegistry: ConsumerRegistry,
responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) {
/* always successfully complete the operation once called */
override def tryComplete(): Boolean = {
forceComplete()
}
/* always assume the partition is already assigned as this delayed operation should never time-out */
override def onComplete() {
// TODO
responseCallback
}
}

62
core/src/main/scala/kafka/coordinator/DelayedRebalance.scala

@ -0,0 +1,62 @@ @@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import kafka.server.DelayedOperation
import java.util.concurrent.atomic.AtomicBoolean
/**
* Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
*
* Whenever a join-group request is received, check if all known consumers have requested
* to re-join the group; if yes, complete this operation to proceed rebalance.
*
* When the operation has expired, any known consumers that have not requested to re-join
* the group are marked as failed, and complete this operation to proceed rebalance with
* the rest of the group.
*/
class DelayedRebalance(sessionTimeout: Long,
groupRegistry: GroupRegistry,
rebalanceCallback: String => Unit,
failureCallback: (String, String) => Unit)
extends DelayedOperation(sessionTimeout) {
val allConsumersJoinedGroup = new AtomicBoolean(false)
/* check if all known consumers have requested to re-join group */
override def tryComplete(): Boolean = {
allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft
(true) ((agg, cur) => agg && cur.joinGroupReceived.get()))
if (allConsumersJoinedGroup.get())
forceComplete()
else
false
}
/* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */
override def onComplete() {
groupRegistry.memberRegistries.values.foreach(consumerRegistry =>
if (!consumerRegistry.joinGroupReceived.get())
failureCallback(groupRegistry.groupId, consumerRegistry.consumerId)
)
rebalanceCallback(groupRegistry.groupId)
}
}

74
core/src/main/scala/kafka/coordinator/GroupRegistry.scala

@ -0,0 +1,74 @@ @@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import scala.collection.mutable
sealed trait GroupStates { def state: Byte }
/**
* Consumer group is preparing start rebalance
*
* action: respond consumer heartbeat with error code,
* transition: all known consumers has re-joined group => UnderRebalance
*/
case object PrepareRebalance extends GroupStates { val state: Byte = 1 }
/**
* Consumer group is under rebalance
*
* action: send the join-group response with new assignment
* transition: all consumers has heartbeat with the new generation id => Fetching
* new consumer join-group received => PrepareRebalance
*/
case object UnderRebalance extends GroupStates { val state: Byte = 2 }
/**
* Consumer group is fetching data
*
* action: respond consumer heartbeat normally
* transition: consumer failure detected via heartbeat => PrepareRebalance
* consumer join-group received => PrepareRebalance
* zookeeper watcher fired => PrepareRebalance
*/
case object Fetching extends GroupStates { val state: Byte = 3 }
case class GroupState() {
@volatile var currentState: Byte = PrepareRebalance.state
}
/* Group registry contains the following metadata of a registered group in the coordinator:
*
* Membership metadata:
* 1. List of consumers registered in this group
* 2. Partition assignment strategy for this group
*
* State metadata:
* 1. Current group state
* 2. Current group generation id
*/
class GroupRegistry(val groupId: String,
val partitionAssignmentStrategy: String) {
val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]()
val state: GroupState = new GroupState()
var generationId: Int = 1
}

36
core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala

@ -0,0 +1,36 @@ @@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import scala.collection.mutable
/**
* A bucket of consumers that are scheduled for heartbeat expiration.
*
* The motivation behind this is to avoid expensive fine-grained per-consumer
* heartbeat expiration but use coarsen-grained methods that group consumers
* with similar deadline together. This will result in some consumers not
* being expired for heartbeats in time but is tolerable.
*/
class HeartbeatBucket(val startMs: Long, endMs: Long) {
/* The list of consumers that are contained in this bucket */
val consumerRegistryList = new mutable.HashSet[ConsumerRegistry]
// TODO
}
Loading…
Cancel
Save