From 71602de0bbf7727f498a812033027f6cbfe34eb8 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 9 Feb 2015 15:24:44 -0800 Subject: [PATCH] KAFKA-1333 follow-up; Add missing files for the coordinator folder --- .../coordinator/ConsumerCoordinator.scala | 333 ++++++++++++++++++ .../kafka/coordinator/ConsumerRegistry.scala | 52 +++ .../kafka/coordinator/DelayedHeartbeat.scala | 44 +++ .../kafka/coordinator/DelayedJoinGroup.scala | 44 +++ .../kafka/coordinator/DelayedRebalance.scala | 62 ++++ .../kafka/coordinator/GroupRegistry.scala | 74 ++++ .../kafka/coordinator/HeartbeatBucket.scala | 36 ++ 7 files changed, 645 insertions(+) create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala create mode 100644 core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala create mode 100644 core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala create mode 100644 core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala create mode 100644 core/src/main/scala/kafka/coordinator/DelayedRebalance.scala create mode 100644 core/src/main/scala/kafka/coordinator/GroupRegistry.scala create mode 100644 core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala new file mode 100644 index 00000000000..01cf1d91b70 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import org.apache.kafka.common.protocol.Errors + +import kafka.common.TopicAndPartition +import kafka.server._ +import kafka.utils._ + +import scala.collection.mutable.HashMap + +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} + + +/** + * Kafka coordinator handles consumer group and consumer offset management. + * + * Each Kafka server instantiates a coordinator, which is responsible for a set of + * consumer groups; the consumer groups are assigned to coordinators based on their + * group names. + */ +class ConsumerCoordinator(val config: KafkaConfig, + val zkClient: ZkClient) extends Logging { + + this.logIdent = "[Kafka Coordinator " + config.brokerId + "]: " + + /* zookeeper listener for topic-partition changes */ + private val topicPartitionChangeListeners = new HashMap[String, TopicPartitionChangeListener] + + /* the consumer group registry cache */ + // TODO: access to this map needs to be synchronized + private val consumerGroupRegistries = new HashMap[String, GroupRegistry] + + /* the list of subscribed groups per topic */ + // TODO: access to this map needs to be synchronized + private val consumerGroupsPerTopic = new HashMap[String, List[String]] + + /* the delayed operation purgatory for heartbeat-based failure detection */ + private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null + + /* the delayed operation purgatory for handling join-group requests */ + private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null + + /* the delayed operation purgatory for preparing rebalance process */ + private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null + + /* latest consumer heartbeat bucket's end timestamp in milliseconds */ + private var latestHeartbeatBucketEndMs: Long = SystemTime.milliseconds + + /** + * Start-up logic executed at the same time when the server starts up. + */ + def startup() { + + // Initialize consumer group registries and heartbeat bucket metadata + latestHeartbeatBucketEndMs = SystemTime.milliseconds + + // Initialize purgatories for delayed heartbeat, join-group and rebalance operations + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId) + joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId) + rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId) + + } + + /** + * Shut-down logic executed at the same time when server shuts down, + * ordering of actions should be reversed from the start-up process + * + */ + def shutdown() { + + // De-register all Zookeeper listeners for topic-partition changes + for (topic <- topicPartitionChangeListeners.keys) { + deregisterTopicChangeListener(topic) + } + topicPartitionChangeListeners.clear() + + // Shutdown purgatories for delayed heartbeat, join-group and rebalance operations + heartbeatPurgatory.shutdown() + joinGroupPurgatory.shutdown() + rebalancePurgatory.shutdown() + + // Clean up consumer group registries metadata + consumerGroupRegistries.clear() + consumerGroupsPerTopic.clear() + } + + /** + * Process a join-group request from a consumer to join as a new group member + */ + def consumerJoinGroup(groupId: String, + consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + partitionAssignmentStrategy: String, + responseCallback:(List[TopicAndPartition], Int, Short) => Unit ) { + + // if the group does not exist yet, create one + if (!consumerGroupRegistries.contains(groupId)) + createNewGroup(groupId, partitionAssignmentStrategy) + + // if the consumer id is unknown or it does exists in + // the group yet, register this consumer to the group + // TODO + + // add a delayed join-group operation to the purgatory + // TODO + + // if the current group is under rebalance process, + // check if the delayed rebalance operation can be finished + // TODO + + // TODO -------------------------------------------------------------- + // TODO: this is just a stub for new consumer testing, + // TODO: needs to be replaced with the logic above + // TODO -------------------------------------------------------------- + // just return all the partitions of the subscribed topics + val partitionIdsPerTopic = ZkUtils.getPartitionsForTopics(zkClient, topics) + val partitions = partitionIdsPerTopic.flatMap{ case (topic, partitionIds) => + partitionIds.map(partition => { + TopicAndPartition(topic, partition) + }) + }.toList + + responseCallback(partitions, 1 /* generation id */, Errors.NONE.code) + + info("Handled join-group from consumer " + consumerId + " to group " + groupId) + } + + /** + * Process a heartbeat request from a consumer + */ + def consumerHeartbeat(groupId: String, + consumerId: String, + generationId: Int, + responseCallback: Short => Unit) { + + // check that the group already exists + // TODO + + // check that the consumer has already registered for the group + // TODO + + // check if the consumer generation id is correct + // TODO + + // remove the consumer from its current heartbeat bucket, and add it back to the corresponding bucket + // TODO + + // create the heartbeat response, if partition rebalance is triggered set the corresponding error code + // TODO + + info("Handled heartbeat of consumer " + consumerId + " from group " + groupId) + + // TODO -------------------------------------------------------------- + // TODO: this is just a stub for new consumer testing, + // TODO: needs to be replaced with the logic above + // TODO -------------------------------------------------------------- + // always return OK for heartbeat immediately + responseCallback(Errors.NONE.code) + } + + /** + * Create a new consumer + */ + private def createNewConsumer(consumerId: String, + topics: List[String], + sessionTimeoutMs: Int, + groupRegistry: GroupRegistry) { + debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId) + + // create the new consumer registry entry + // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase + + // check if the partition assignment strategy is consistent with the group + // TODO + + // add the group to the subscribed topics + // TODO + + // schedule heartbeat tasks for the consumer + // TODO + + // add the member registry entry to the group + // TODO + + // start preparing group partition rebalance + // TODO + + info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId) + } + + /** + * Create a new consumer group in the registry + */ + private def createNewGroup(groupId: String, partitionAssignmentStrategy: String) { + debug("Creating new group " + groupId) + + val groupRegistry = new GroupRegistry(groupId, partitionAssignmentStrategy) + + consumerGroupRegistries.put(groupId, groupRegistry) + + info("Created new group registry " + groupId) + } + + /** + * Callback invoked when a consumer's heartbeat has expired + */ + private def onConsumerHeartbeatExpired(groupId: String, consumerId: String) { + + // if the consumer does not exist in group registry anymore, do nothing + // TODO + + // record heartbeat failure + // TODO + + // if the maximum failures has been reached, mark consumer as failed + // TODO + } + + /** + * Callback invoked when a consumer is marked as failed + */ + private def onConsumerFailure(groupId: String, consumerId: String) { + + // remove the consumer from its group registry metadata + // TODO + + // cut the socket connection to the consumer + // TODO: howto ?? + + // if the group has no consumer members any more, remove the group + // otherwise start preparing group partition rebalance + // TODO + + } + + /** + * Prepare partition rebalance for the group + */ + private def prepareRebalance(groupId: String) { + + // try to change the group state to PrepareRebalance + + // add a task to the delayed rebalance purgatory + + // TODO + } + + /** + * Start partition rebalance for the group + */ + private def startRebalance(groupId: String) { + + // try to change the group state to UnderRebalance + + // compute new assignment based on the strategy + + // send back the join-group response + + // TODO + } + + /** + * Fail current partition rebalance for the group + */ + + /** + * Register ZK listeners for topic-partition changes + */ + private def registerTopicChangeListener(topic: String) = { + if (!topicPartitionChangeListeners.contains(topic)) { + val listener = new TopicPartitionChangeListener(config) + topicPartitionChangeListeners.put(topic, listener) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getTopicPath(topic)) + zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + } + } + + /** + * De-register ZK listeners for topic-partition changes + */ + private def deregisterTopicChangeListener(topic: String) = { + val listener = topicPartitionChangeListeners.get(topic).get + zkClient.unsubscribeChildChanges(ZkUtils.getTopicPath(topic), listener) + topicPartitionChangeListeners.remove(topic) + } + + /** + * Zookeeper listener that catch topic-partition changes + */ + class TopicPartitionChangeListener(val config: KafkaConfig) extends IZkChildListener with Logging { + + this.logIdent = "[TopicChangeListener on coordinator " + config.brokerId + "]: " + + /** + * Try to trigger a rebalance for each group subscribed in the changed topic + * + * @throws Exception + * On any error. + */ + def handleChildChange(parentPath: String , curChilds: java.util.List[String]) { + debug("Fired for path %s with children %s".format(parentPath, curChilds)) + + // get the topic + val topic = parentPath.split("/").last + + // get groups that subscribed to this topic + val groups = consumerGroupsPerTopic.get(topic).get + + for (groupId <- groups) { + prepareRebalance(groupId) + } + } + } +} + + diff --git a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala new file mode 100644 index 00000000000..b65c04d0a5d --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.HashMap + +/** + * Consumer registry metadata contains the following metadata: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout. + * 2. recorded number of timed-out heartbeats. + * 3. associated heartbeat bucket in the purgatory. + * + * Subscription metadata: + * 1. subscribed topic list + * 2. assigned partitions for the subscribed topics. + */ +class ConsumerRegistry(val consumerId: String, + val subscribedTopics: List[String], + val sessionTimeoutMs: Int, + val groupRegistry: GroupRegistry) { + + /* number of expired heartbeat recorded */ + val numExpiredHeartbeat = new AtomicInteger(0) + + /* flag indicating if join group request is received */ + val joinGroupReceived = new AtomicBoolean(false) + + /* assigned partitions per subscribed topic */ + val assignedPartitions = new HashMap[String, List[Int]] + + /* associated heartbeat bucket */ + var currentHeartbeatBucket = null + +} diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala new file mode 100644 index 00000000000..894d6edb407 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation + +/** + * Delayed heartbeat operations that are added to the purgatory for session-timeout checking + * + * These operations will always be expired. Once it has expired, all its + * currently contained consumers are marked as heartbeat timed out. + */ +class DelayedHeartbeat(sessionTimeout: Long, + bucket: HeartbeatBucket, + expireCallback: (String, String) => Unit) + extends DelayedOperation(sessionTimeout) { + + /* this function should never be called */ + override def tryComplete(): Boolean = { + + throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") + } + + /* mark all consumers within the heartbeat as heartbeat timed out */ + override def onComplete() { + for (registry <- bucket.consumerRegistryList) + expireCallback(registry.groupRegistry.groupId, registry.consumerId) + } +} diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala new file mode 100644 index 00000000000..445bfa1bf88 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation + +/** + * Delayed join-group operations that are kept in the purgatory before the partition assignment completed + * + * These operation should never expire; when the rebalance has completed, all consumer's + * join-group operations will be completed by sending back the response with the + * calculated partition assignment. + */ +class DelayedJoinGroup(sessionTimeout: Long, + consumerRegistry: ConsumerRegistry, + responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) { + + /* always successfully complete the operation once called */ + override def tryComplete(): Boolean = { + forceComplete() + } + + /* always assume the partition is already assigned as this delayed operation should never time-out */ + override def onComplete() { + + // TODO + responseCallback + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala new file mode 100644 index 00000000000..b3b3749a21d --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation +import java.util.concurrent.atomic.AtomicBoolean + + +/** + * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance + * + * Whenever a join-group request is received, check if all known consumers have requested + * to re-join the group; if yes, complete this operation to proceed rebalance. + * + * When the operation has expired, any known consumers that have not requested to re-join + * the group are marked as failed, and complete this operation to proceed rebalance with + * the rest of the group. + */ +class DelayedRebalance(sessionTimeout: Long, + groupRegistry: GroupRegistry, + rebalanceCallback: String => Unit, + failureCallback: (String, String) => Unit) + extends DelayedOperation(sessionTimeout) { + + val allConsumersJoinedGroup = new AtomicBoolean(false) + + /* check if all known consumers have requested to re-join group */ + override def tryComplete(): Boolean = { + allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft + (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) + + if (allConsumersJoinedGroup.get()) + forceComplete() + else + false + } + + /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ + override def onComplete() { + groupRegistry.memberRegistries.values.foreach(consumerRegistry => + if (!consumerRegistry.joinGroupReceived.get()) + failureCallback(groupRegistry.groupId, consumerRegistry.consumerId) + ) + + rebalanceCallback(groupRegistry.groupId) + } +} diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala new file mode 100644 index 00000000000..7d17e102235 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import scala.collection.mutable + +sealed trait GroupStates { def state: Byte } + +/** + * Consumer group is preparing start rebalance + * + * action: respond consumer heartbeat with error code, + * transition: all known consumers has re-joined group => UnderRebalance + */ +case object PrepareRebalance extends GroupStates { val state: Byte = 1 } + +/** + * Consumer group is under rebalance + * + * action: send the join-group response with new assignment + * transition: all consumers has heartbeat with the new generation id => Fetching + * new consumer join-group received => PrepareRebalance + */ +case object UnderRebalance extends GroupStates { val state: Byte = 2 } + +/** + * Consumer group is fetching data + * + * action: respond consumer heartbeat normally + * transition: consumer failure detected via heartbeat => PrepareRebalance + * consumer join-group received => PrepareRebalance + * zookeeper watcher fired => PrepareRebalance + */ +case object Fetching extends GroupStates { val state: Byte = 3 } + +case class GroupState() { + @volatile var currentState: Byte = PrepareRebalance.state +} + +/* Group registry contains the following metadata of a registered group in the coordinator: + * + * Membership metadata: + * 1. List of consumers registered in this group + * 2. Partition assignment strategy for this group + * + * State metadata: + * 1. Current group state + * 2. Current group generation id + */ +class GroupRegistry(val groupId: String, + val partitionAssignmentStrategy: String) { + + val memberRegistries = new mutable.HashMap[String, ConsumerRegistry]() + + val state: GroupState = new GroupState() + + var generationId: Int = 1 +} + diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala new file mode 100644 index 00000000000..821e26e97ea --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import scala.collection.mutable + +/** + * A bucket of consumers that are scheduled for heartbeat expiration. + * + * The motivation behind this is to avoid expensive fine-grained per-consumer + * heartbeat expiration but use coarsen-grained methods that group consumers + * with similar deadline together. This will result in some consumers not + * being expired for heartbeats in time but is tolerable. + */ +class HeartbeatBucket(val startMs: Long, endMs: Long) { + + /* The list of consumers that are contained in this bucket */ + val consumerRegistryList = new mutable.HashSet[ConsumerRegistry] + + // TODO +}