diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index c7edcefc105..36b0c865f46 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -621,7 +621,7 @@ class GroupCoordinator(val brokerId: Int, val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) member.awaitingJoinCallback = callback - group.add(member.memberId, member) + group.add(member) maybePrepareRebalance(group) member } diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala index 0e49995fa5e..4ea5bdda762 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala @@ -158,7 +158,7 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou def has(memberId: String) = members.contains(memberId) def get(memberId: String) = members(memberId) - def add(memberId: String, member: MemberMetadata) { + def add(member: MemberMetadata) { if (members.isEmpty) this.protocolType = Some(member.protocolType) @@ -167,8 +167,8 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou assert(supportsProtocols(member.protocols)) if (leaderId == null) - leaderId = memberId - members.put(memberId, member) + leaderId = member.memberId + members.put(member.memberId, member) } def remove(memberId: String) { diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 45ed77b9069..c66ce7496ac 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -235,7 +235,7 @@ class GroupMetadataManager(val brokerId: Int, case Some((magicValue, timestampType, timestamp)) => val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) => Record.create(magicValue, timestampType, timestamp, - GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition.topic, topicPartition.partition), + GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition), GroupMetadataManager.offsetCommitValue(offsetAndMetadata)) }.toSeq @@ -367,115 +367,128 @@ class GroupMetadataManager(val brokerId: Int, /** * Asynchronously read the partition from the offsets topic and populate the cache */ - def loadGroupsForPartition(offsetsPartition: Int, - onGroupLoaded: GroupMetadata => Unit) { + def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) { val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) - scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets) - def loadGroupsAndOffsets() { - info("Loading offsets and group metadata from " + topicPartition) + def doLoadGroupsAndOffsets() { + info(s"Loading offsets and group metadata from $topicPartition") inLock(partitionLock) { if (loadingPartitions.contains(offsetsPartition)) { - info("Offset load from %s already in progress.".format(topicPartition)) + info(s"Offset load from $topicPartition already in progress.") return } else { loadingPartitions.add(offsetsPartition) } } - val startMs = time.milliseconds() try { - replicaManager.logManager.getLog(topicPartition) match { - case Some(log) => - var currOffset = log.logSegments.head.baseOffset - val buffer = ByteBuffer.allocate(config.loadBufferSize) - // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 - val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() - val removedOffsets = mutable.Set[GroupTopicPartition]() - val loadedGroups = mutable.Map[String, GroupMetadata]() - val removedGroups = mutable.Set[String]() - - while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { - buffer.clear() - val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords] - fileRecords.readInto(buffer, 0) - - MemoryRecords.readableRecords(buffer).deepEntries.asScala.foreach { entry => - val record = entry.record - - require(record.hasKey, "Offset entry key should not be null") - val baseKey = GroupMetadataManager.readMessageKey(record.key) - - if (baseKey.isInstanceOf[OffsetKey]) { - // load offset - val key = baseKey.key.asInstanceOf[GroupTopicPartition] - if (record.hasNullValue) { - loadedOffsets.remove(key) - removedOffsets.add(key) - } else { - val value = GroupMetadataManager.readOffsetMessageValue(record.value) - loadedOffsets.put(key, value) - removedOffsets.remove(key) - } + loadGroupsAndOffsets(topicPartition, onGroupLoaded) + } catch { + case t: Throwable => error(s"Error loading offsets from $topicPartition", t) + } finally { + inLock(partitionLock) { + ownedPartitions.add(offsetsPartition) + loadingPartitions.remove(offsetsPartition) + } + } + } + + scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets) + } + + private[coordinator] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) { + def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L) + + val startMs = time.milliseconds() + replicaManager.getLog(topicPartition) match { + case None => + warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") + + case Some(log) => + var currOffset = log.logStartOffset + val buffer = ByteBuffer.allocate(config.loadBufferSize) + // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 + val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() + val removedOffsets = mutable.Set[GroupTopicPartition]() + val loadedGroups = mutable.Map[String, GroupMetadata]() + val removedGroups = mutable.Set[String]() + + while (currOffset < highWaterMark && !shuttingDown.get()) { + buffer.clear() + val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true) + .records.asInstanceOf[FileRecords] + val bufferRead = fileRecords.readInto(buffer, 0) + + MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry => + val record = entry.record + require(record.hasKey, "Group metadata/offset entry key should not be null") + + GroupMetadataManager.readMessageKey(record.key) match { + case offsetKey: OffsetKey => + // load offset + val key = offsetKey.key + if (record.hasNullValue) { + loadedOffsets.remove(key) + removedOffsets.add(key) } else { - // load group metadata - val groupId = baseKey.key.asInstanceOf[String] - val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) - if (groupMetadata != null) { - trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}") - removedGroups.remove(groupId) - loadedGroups.put(groupId, groupMetadata) - } else { - loadedGroups.remove(groupId) - removedGroups.add(groupId) - } + val value = GroupMetadataManager.readOffsetMessageValue(record.value) + loadedOffsets.put(key, value) + removedOffsets.remove(key) } - currOffset = entry.nextOffset - } - } - - val (groupOffsets, noGroupOffsets) = loadedOffsets - .groupBy(_._1.group) - .mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, offsetAndMetadata)}) - .partition(value => loadedGroups.contains(value._1)) + case groupMetadataKey: GroupMetadataKey => + // load group metadata + val groupId = groupMetadataKey.key + val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) + if (groupMetadata != null) { + trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}") + removedGroups.remove(groupId) + loadedGroups.put(groupId, groupMetadata) + } else { + loadedGroups.remove(groupId) + removedGroups.add(groupId) + } - loadedGroups.values.foreach { group => - val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata]) - loadGroup(group, offsets) - onGroupLoaded(group) + case unknownKey => + throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata") } - noGroupOffsets.foreach { case (groupId, offsets) => - val group = new GroupMetadata(groupId) - loadGroup(group, offsets) - onGroupLoaded(group) - } + currOffset = entry.nextOffset + } + } - removedGroups.foreach { groupId => - if (groupMetadataCache.contains(groupId)) - throw new IllegalStateException(s"Unexpected unload of active group $groupId while " + - s"loading partition $topicPartition") - } + val (groupOffsets, emptyGroupOffsets) = loadedOffsets + .groupBy(_._1.group) + .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)} ) + .partition { case (group, _) => loadedGroups.contains(group) } - if (!shuttingDown.get()) - info("Finished loading offsets from %s in %d milliseconds." - .format(topicPartition, time.milliseconds() - startMs)) - case None => - warn("No log found for " + topicPartition) + loadedGroups.values.foreach { group => + val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata]) + loadGroup(group, offsets) + onGroupLoaded(group) } - } - catch { - case t: Throwable => - error("Error in loading offsets from " + topicPartition, t) - } - finally { - inLock(partitionLock) { - ownedPartitions.add(offsetsPartition) - loadingPartitions.remove(offsetsPartition) + + // load groups which store offsets in kafka, but which have no active members and thus no group + // metadata stored in the log + emptyGroupOffsets.foreach { case (groupId, offsets) => + val group = new GroupMetadata(groupId) + loadGroup(group, offsets) + onGroupLoaded(group) } - } + + removedGroups.foreach { groupId => + // if the cache already contains a group which should be removed, raise an error. Note that it + // is possible (however unlikely) for a consumer group to be removed, and then to be used only for + // offset storage (i.e. by "simple" consumers) + if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId)) + throw new IllegalStateException(s"Unexpected unload of active group $groupId while " + + s"loading partition $topicPartition") + } + + if (!shuttingDown.get()) + info("Finished loading offsets from %s in %d milliseconds." + .format(topicPartition, time.milliseconds() - startMs)) } } @@ -568,7 +581,7 @@ class GroupMetadataManager(val brokerId: Int, partitionOpt.foreach { partition => val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) => trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") - val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition) + val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) Record.create(magicValue, timestampType, timestamp, commitKey, null) }.toBuffer trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.") @@ -606,16 +619,6 @@ class GroupMetadataManager(val brokerId: Int, info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.") } - private def getHighWatermark(partitionId: Int): Long = { - val partitionOpt = replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, partitionId)) - - val hw = partitionOpt.map { partition => - partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset).getOrElse(-1L) - }.getOrElse(-1L) - - hw - } - /* * Check if the offset metadata length is valid */ @@ -816,11 +819,12 @@ object GroupMetadataManager { * * @return key for offset commit message */ - private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = { + private[coordinator] def offsetCommitKey(group: String, topicPartition: TopicPartition, + versionId: Short = 0): Array[Byte] = { val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA) key.set(OFFSET_KEY_GROUP_FIELD, group) - key.set(OFFSET_KEY_TOPIC_FIELD, topic) - key.set(OFFSET_KEY_PARTITION_FIELD, partition) + key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic) + key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition) val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf) byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) @@ -833,7 +837,7 @@ object GroupMetadataManager { * * @return key bytes for group metadata message */ - def groupMetadataKey(group: String): Array[Byte] = { + private[coordinator] def groupMetadataKey(group: String): Array[Byte] = { val key = new Struct(CURRENT_GROUP_KEY_SCHEMA) key.set(GROUP_KEY_GROUP_FIELD, group) @@ -849,7 +853,7 @@ object GroupMetadataManager { * @param offsetAndMetadata consumer's current offset and metadata * @return payload for offset commit message */ - private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { + private[coordinator] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { // generate commit value with schema version 1 val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA) value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset) @@ -871,9 +875,9 @@ object GroupMetadataManager { * @param version the version of the value message to use * @return payload for offset commit message */ - def groupMetadataValue(groupMetadata: GroupMetadata, - assignment: Map[String, Array[Byte]], - version: Short = 0): Array[Byte] = { + private[coordinator] def groupMetadataValue(groupMetadata: GroupMetadata, + assignment: Map[String, Array[Byte]], + version: Short = 0): Array[Byte] = { val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA) value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) @@ -1013,7 +1017,7 @@ object GroupMetadataManager { member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer]) - group.add(memberId, member) + group.add(member) } group diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8dea5ca25c0..25bb83d3f07 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -316,7 +316,7 @@ class Log(@volatile var dir: File, /** * Check if we have the "clean shutdown" file */ - private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() + private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 8cd9b34f6e5..761edf9ac76 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -349,13 +349,7 @@ class LogManager(val logDirs: Array[File], /** * Get the log if it exists, otherwise return None */ - def getLog(topicPartition: TopicPartition): Option[Log] = { - val log = logs.get(topicPartition) - if (log == null) - None - else - Some(log) - } + def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition)) /** * Create a log for the given topic and the given partition @@ -363,28 +357,20 @@ class LogManager(val logDirs: Array[File], */ def createLog(topicPartition: TopicPartition, config: LogConfig): Log = { logCreationOrDeletionLock synchronized { - var log = logs.get(topicPartition) - - // check if the log has already been created in another thread - if(log != null) - return log - - // if not, create it - val dataDir = nextLogDir() - val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition) - dir.mkdirs() - log = new Log(dir, - config, - recoveryPoint = 0L, - scheduler, - time) - logs.put(topicPartition, log) - info("Created log for partition [%s,%d] in %s with properties {%s}." - .format(topicPartition.topic, - topicPartition.partition, - dataDir.getAbsolutePath, - config.originals.asScala.mkString(", "))) - log + // create the log if it has not already been created in another thread + getLog(topicPartition).getOrElse { + val dataDir = nextLogDir() + val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition) + dir.mkdirs() + val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time) + logs.put(topicPartition, log) + info("Created log for partition [%s,%d] in %s with properties {%s}." + .format(topicPartition.topic, + topicPartition.partition, + dataDir.getAbsolutePath, + config.originals.asScala.mkString(", "))) + log + } } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index dd8fc03a798..1aa88a2bb60 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,7 +25,7 @@ import kafka.api._ import kafka.cluster.{Partition, Replica} import kafka.common._ import kafka.controller.KafkaController -import kafka.log.{LogAppendInfo, LogManager} +import kafka.log.{Log, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.UnboundedQuota import kafka.utils._ @@ -189,6 +189,8 @@ class ReplicaManager(val config: KafkaConfig, } } + def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition) + /** * Try to complete some delayed produce requests with the request key; * this can be triggered when: @@ -926,10 +928,16 @@ class ReplicaManager(val config: KafkaConfig, } } - private def getLeaderPartitions() : List[Partition] = { + private def getLeaderPartitions(): List[Partition] = { allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList } + def getHighWatermark(topicPartition: TopicPartition): Option[Long] = { + getPartition(topicPartition).flatMap { partition => + partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset) + } + } + // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId)) diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index 9478122ffbc..30dfc6366c4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -17,20 +17,23 @@ package kafka.coordinator +import java.nio.ByteBuffer + import kafka.api.ApiVersion import kafka.cluster.Partition import kafka.common.{OffsetAndMetadata, Topic} -import kafka.log.LogAppendInfo -import kafka.server.{KafkaConfig, ReplicaManager} +import kafka.log.{Log, LogAppendInfo} +import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager} import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType} +import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, TimestampType} import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, EasyMock, IAnswer} -import org.junit.{After, Before, Test} -import org.junit.Assert._ +import org.junit.{Before, Test} +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import kafka.utils.TestUtils.fail import scala.collection._ import JavaConverters._ @@ -79,10 +82,153 @@ class GroupMetadataManagerTest { partition = EasyMock.niceMock(classOf[Partition]) } - @After - def tearDown() { - EasyMock.reset(replicaManager) - EasyMock.reset(partition) + @Test + def testLoadOffsetsWithoutGroup() { + val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val startOffset = 15L + + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + new TopicPartition("bar", 0) -> 8992L + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords: _*) + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Empty, group.currentState) + assertEquals(committedOffsets.size, group.allOffsets.size) + committedOffsets.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + @Test + def testLoadOffsetsWithTombstones() { + val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val startOffset = 15L + + val tombstonePartition = new TopicPartition("foo", 1) + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + tombstonePartition -> 455L, + new TopicPartition("bar", 0) -> 8992L + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + val tombstone = Record.create(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null) + val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(tombstone): _*) + + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Empty, group.currentState) + assertEquals(committedOffsets.size - 1, group.allOffsets.size) + committedOffsets.foreach { case (topicPartition, offset) => + if (topicPartition == tombstonePartition) + assertEquals(None, group.offset(topicPartition)) + else + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + @Test + def testLoadOffsetsAndGroup() { + val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val startOffset = 15L + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + new TopicPartition("bar", 0) -> 8992L + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + val memberId = "98098230493" + val groupMetadataRecord = buildStableGroupRecordWithMember(memberId) + val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(groupMetadataRecord): _*) + + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Stable, group.currentState) + assertEquals(memberId, group.leaderId) + assertEquals(Set(memberId), group.allMembers) + assertEquals(committedOffsets.size, group.allOffsets.size) + committedOffsets.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + @Test + def testLoadGroupWithTombstone() { + val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val startOffset = 15L + + val memberId = "98098230493" + val groupMetadataRecord = buildStableGroupRecordWithMember(memberId) + val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null) + val records = MemoryRecords.withRecords(startOffset, Seq(groupMetadataRecord, groupMetadataTombstone): _*) + + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + assertEquals(None, groupMetadataManager.getGroup(groupId)) + } + + @Test + def testOffsetWriteAfterGroupRemoved(): Unit = { + // this test case checks the following scenario: + // 1. the group exists at some point in time, but is later removed (because all members left) + // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets + + val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val startOffset = 15L + + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + new TopicPartition("bar", 0) -> 8992L + ) + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + val memberId = "98098230493" + val groupMetadataRecord = buildStableGroupRecordWithMember(memberId) + val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null) + val records = MemoryRecords.withRecords(startOffset, + Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*) + + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + val group = groupMetadataManager.getGroup(groupId).getOrElse(TestUtils.fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Empty, group.currentState) + assertEquals(committedOffsets.size, group.allOffsets.size) + committedOffsets.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } } @Test @@ -156,7 +302,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) member.awaitingJoinCallback = _ => () - group.add(memberId, member) + group.add(member) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -185,7 +331,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) member.awaitingJoinCallback = _ => () - group.add(memberId, member) + group.add(member) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -264,7 +410,7 @@ class GroupMetadataManagerTest { commitErrors = Some(errors) } - val delayedStoreOpt = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback) + groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback) assertFalse(commitErrors.isEmpty) val maybeError = commitErrors.get.get(topicPartition) @@ -557,7 +703,7 @@ class GroupMetadataManagerTest { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) member.awaitingJoinCallback = _ => () - group.add(memberId, member) + group.add(member) group.transitionTo(PreparingRebalance) group.initNextGeneration() @@ -620,4 +766,47 @@ class GroupMetadataManagerTest { .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)) } + private def buildStableGroupRecordWithMember(memberId: String): Record = { + val group = new GroupMetadata(groupId) + group.transitionTo(PreparingRebalance) + val memberProtocols = List(("roundrobin", Array.emptyByteArray)) + val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, "consumer", memberProtocols) + group.add(member) + member.awaitingJoinCallback = _ => {} + group.initNextGeneration() + group.transitionTo(Stable) + + val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId) + val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte])) + Record.create(groupMetadataKey, groupMetadataValue) + } + + private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition, + startOffset: Long, + records: MemoryRecords): Unit = { + val endOffset = startOffset + records.deepEntries.asScala.size + val logMock = EasyMock.mock(classOf[Log]) + val fileRecordsMock = EasyMock.mock(classOf[FileRecords]) + + EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) + EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) + EasyMock.expect(replicaManager.getHighWatermark(groupMetadataTopicPartition)).andStubReturn(Some(endOffset)) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true))) + .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) + EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) + .andReturn(records.buffer) + + EasyMock.replay(logMock, fileRecordsMock) + } + + private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long], + groupId: String = groupId): Seq[Record] = { + committedOffsets.map { case (topicPartition, offset) => + val offsetAndMetadata = OffsetAndMetadata(offset) + val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) + val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata) + Record.create(offsetCommitKey, offsetCommitValue) + }.toSeq + } + } diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala index bf695bf7eab..3db78185381 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala @@ -180,14 +180,14 @@ class GroupMetadataTest extends JUnitSuite { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) - group.add(memberId, member) + group.add(member) assertEquals("range", group.selectProtocol) val otherMemberId = "otherMemberId" val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) - group.add(otherMemberId, otherMember) + group.add(otherMember) // now could be either range or robin since there is no majority preference assertTrue(Set("range", "roundrobin")(group.selectProtocol)) @@ -195,7 +195,7 @@ class GroupMetadataTest extends JUnitSuite { val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) - group.add(lastMemberId, lastMember) + group.add(lastMember) // now we should prefer 'roundrobin' assertEquals("roundrobin", group.selectProtocol) } @@ -216,8 +216,8 @@ class GroupMetadataTest extends JUnitSuite { val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) - group.add(memberId, member) - group.add(otherMemberId, otherMember) + group.add(member) + group.add(otherMember) assertEquals("roundrobin", group.selectProtocol) } @@ -230,7 +230,7 @@ class GroupMetadataTest extends JUnitSuite { val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) - group.add(memberId, member) + group.add(member) assertTrue(group.supportsProtocols(Set("roundrobin", "foo"))) assertTrue(group.supportsProtocols(Set("range", "foo"))) assertFalse(group.supportsProtocols(Set("foo", "bar"))) @@ -239,7 +239,7 @@ class GroupMetadataTest extends JUnitSuite { val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) - group.add(otherMemberId, otherMember) + group.add(otherMember) assertTrue(group.supportsProtocols(Set("roundrobin", "foo"))) assertFalse(group.supportsProtocols(Set("range", "foo"))) @@ -253,7 +253,7 @@ class GroupMetadataTest extends JUnitSuite { group.transitionTo(PreparingRebalance) member.awaitingJoinCallback = _ => () - group.add(memberId, member) + group.add(member) assertEquals(0, group.generationId) assertNull(group.protocol) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 7a00f2a7b69..6358bdc83a0 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -21,7 +21,7 @@ import java.util.Properties import java.util.concurrent.LinkedBlockingQueue import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.junit.Assert._ +import org.junit.Assert.{assertEquals, assertTrue} import org.easymock.EasyMock import org.junit.Test import kafka.api._ @@ -36,7 +36,6 @@ import kafka.utils.TestUtils._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import kafka.utils._ -import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.utils.Time @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f132f9e7870..743756e32cc 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -288,6 +288,11 @@ object TestUtils extends Logging { props } + /** + * Fail a test case explicitly. Return Nothing so that we are not constrained by the return type. + */ + def fail(msg: String): Nothing = throw new AssertionError(msg) + /** * Wrap a single record log buffer. */