Browse Source

KAFKA-4704; Coordinator cache loading fails if groupId is reused for offset storage after group is removed

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2455 from hachikuji/KAFKA-4704
pull/2387/merge
Jason Gustafson 8 years ago committed by Ismael Juma
parent
commit
57f0cb2997
  1. 2
      core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
  2. 6
      core/src/main/scala/kafka/coordinator/GroupMetadata.scala
  3. 220
      core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
  4. 2
      core/src/main/scala/kafka/log/Log.scala
  5. 44
      core/src/main/scala/kafka/log/LogManager.scala
  6. 12
      core/src/main/scala/kafka/server/ReplicaManager.scala
  7. 215
      core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
  8. 16
      core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
  9. 3
      core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  10. 5
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

2
core/src/main/scala/kafka/coordinator/GroupCoordinator.scala

@ -621,7 +621,7 @@ class GroupCoordinator(val brokerId: Int, @@ -621,7 +621,7 @@ class GroupCoordinator(val brokerId: Int,
val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
member.awaitingJoinCallback = callback
group.add(member.memberId, member)
group.add(member)
maybePrepareRebalance(group)
member
}

6
core/src/main/scala/kafka/coordinator/GroupMetadata.scala

@ -158,7 +158,7 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou @@ -158,7 +158,7 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou
def has(memberId: String) = members.contains(memberId)
def get(memberId: String) = members(memberId)
def add(memberId: String, member: MemberMetadata) {
def add(member: MemberMetadata) {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
@ -167,8 +167,8 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou @@ -167,8 +167,8 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState: Grou
assert(supportsProtocols(member.protocols))
if (leaderId == null)
leaderId = memberId
members.put(memberId, member)
leaderId = member.memberId
members.put(member.memberId, member)
}
def remove(memberId: String) {

220
core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala

@ -235,7 +235,7 @@ class GroupMetadataManager(val brokerId: Int, @@ -235,7 +235,7 @@ class GroupMetadataManager(val brokerId: Int,
case Some((magicValue, timestampType, timestamp)) =>
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
Record.create(magicValue, timestampType, timestamp,
GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition.topic, topicPartition.partition),
GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition),
GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
}.toSeq
@ -367,115 +367,128 @@ class GroupMetadataManager(val brokerId: Int, @@ -367,115 +367,128 @@ class GroupMetadataManager(val brokerId: Int,
/**
* Asynchronously read the partition from the offsets topic and populate the cache
*/
def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
def loadGroupsAndOffsets() {
info("Loading offsets and group metadata from " + topicPartition)
def doLoadGroupsAndOffsets() {
info(s"Loading offsets and group metadata from $topicPartition")
inLock(partitionLock) {
if (loadingPartitions.contains(offsetsPartition)) {
info("Offset load from %s already in progress.".format(topicPartition))
info(s"Offset load from $topicPartition already in progress.")
return
} else {
loadingPartitions.add(offsetsPartition)
}
}
val startMs = time.milliseconds()
try {
replicaManager.logManager.getLog(topicPartition) match {
case Some(log) =>
var currOffset = log.logSegments.head.baseOffset
val buffer = ByteBuffer.allocate(config.loadBufferSize)
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
val removedOffsets = mutable.Set[GroupTopicPartition]()
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()
val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords]
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer).deepEntries.asScala.foreach { entry =>
val record = entry.record
require(record.hasKey, "Offset entry key should not be null")
val baseKey = GroupMetadataManager.readMessageKey(record.key)
if (baseKey.isInstanceOf[OffsetKey]) {
// load offset
val key = baseKey.key.asInstanceOf[GroupTopicPartition]
if (record.hasNullValue) {
loadedOffsets.remove(key)
removedOffsets.add(key)
} else {
val value = GroupMetadataManager.readOffsetMessageValue(record.value)
loadedOffsets.put(key, value)
removedOffsets.remove(key)
}
loadGroupsAndOffsets(topicPartition, onGroupLoaded)
} catch {
case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
} finally {
inLock(partitionLock) {
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
}
}
}
scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
}
private[coordinator] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L)
val startMs = time.milliseconds()
replicaManager.getLog(topicPartition) match {
case None =>
warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
case Some(log) =>
var currOffset = log.logStartOffset
val buffer = ByteBuffer.allocate(config.loadBufferSize)
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
val removedOffsets = mutable.Set[GroupTopicPartition]()
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()
while (currOffset < highWaterMark && !shuttingDown.get()) {
buffer.clear()
val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
.records.asInstanceOf[FileRecords]
val bufferRead = fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry =>
val record = entry.record
require(record.hasKey, "Group metadata/offset entry key should not be null")
GroupMetadataManager.readMessageKey(record.key) match {
case offsetKey: OffsetKey =>
// load offset
val key = offsetKey.key
if (record.hasNullValue) {
loadedOffsets.remove(key)
removedOffsets.add(key)
} else {
// load group metadata
val groupId = baseKey.key.asInstanceOf[String]
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
if (groupMetadata != null) {
trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata)
} else {
loadedGroups.remove(groupId)
removedGroups.add(groupId)
}
val value = GroupMetadataManager.readOffsetMessageValue(record.value)
loadedOffsets.put(key, value)
removedOffsets.remove(key)
}
currOffset = entry.nextOffset
}
}
val (groupOffsets, noGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, offsetAndMetadata)})
.partition(value => loadedGroups.contains(value._1))
case groupMetadataKey: GroupMetadataKey =>
// load group metadata
val groupId = groupMetadataKey.key
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
if (groupMetadata != null) {
trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata)
} else {
loadedGroups.remove(groupId)
removedGroups.add(groupId)
}
loadedGroups.values.foreach { group =>
val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
loadGroup(group, offsets)
onGroupLoaded(group)
case unknownKey =>
throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
}
noGroupOffsets.foreach { case (groupId, offsets) =>
val group = new GroupMetadata(groupId)
loadGroup(group, offsets)
onGroupLoaded(group)
}
currOffset = entry.nextOffset
}
}
removedGroups.foreach { groupId =>
if (groupMetadataCache.contains(groupId))
throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
s"loading partition $topicPartition")
}
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)} )
.partition { case (group, _) => loadedGroups.contains(group) }
if (!shuttingDown.get())
info("Finished loading offsets from %s in %d milliseconds."
.format(topicPartition, time.milliseconds() - startMs))
case None =>
warn("No log found for " + topicPartition)
loadedGroups.values.foreach { group =>
val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
loadGroup(group, offsets)
onGroupLoaded(group)
}
}
catch {
case t: Throwable =>
error("Error in loading offsets from " + topicPartition, t)
}
finally {
inLock(partitionLock) {
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
// load groups which store offsets in kafka, but which have no active members and thus no group
// metadata stored in the log
emptyGroupOffsets.foreach { case (groupId, offsets) =>
val group = new GroupMetadata(groupId)
loadGroup(group, offsets)
onGroupLoaded(group)
}
}
removedGroups.foreach { groupId =>
// if the cache already contains a group which should be removed, raise an error. Note that it
// is possible (however unlikely) for a consumer group to be removed, and then to be used only for
// offset storage (i.e. by "simple" consumers)
if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
s"loading partition $topicPartition")
}
if (!shuttingDown.get())
info("Finished loading offsets from %s in %d milliseconds."
.format(topicPartition, time.milliseconds() - startMs))
}
}
@ -568,7 +581,7 @@ class GroupMetadataManager(val brokerId: Int, @@ -568,7 +581,7 @@ class GroupMetadataManager(val brokerId: Int,
partitionOpt.foreach { partition =>
val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
Record.create(magicValue, timestampType, timestamp, commitKey, null)
}.toBuffer
trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
@ -606,16 +619,6 @@ class GroupMetadataManager(val brokerId: Int, @@ -606,16 +619,6 @@ class GroupMetadataManager(val brokerId: Int,
info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
}
private def getHighWatermark(partitionId: Int): Long = {
val partitionOpt = replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, partitionId))
val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset).getOrElse(-1L)
}.getOrElse(-1L)
hw
}
/*
* Check if the offset metadata length is valid
*/
@ -816,11 +819,12 @@ object GroupMetadataManager { @@ -816,11 +819,12 @@ object GroupMetadataManager {
*
* @return key for offset commit message
*/
private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
private[coordinator] def offsetCommitKey(group: String, topicPartition: TopicPartition,
versionId: Short = 0): Array[Byte] = {
val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
key.set(OFFSET_KEY_GROUP_FIELD, group)
key.set(OFFSET_KEY_TOPIC_FIELD, topic)
key.set(OFFSET_KEY_PARTITION_FIELD, partition)
key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
@ -833,7 +837,7 @@ object GroupMetadataManager { @@ -833,7 +837,7 @@ object GroupMetadataManager {
*
* @return key bytes for group metadata message
*/
def groupMetadataKey(group: String): Array[Byte] = {
private[coordinator] def groupMetadataKey(group: String): Array[Byte] = {
val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
key.set(GROUP_KEY_GROUP_FIELD, group)
@ -849,7 +853,7 @@ object GroupMetadataManager { @@ -849,7 +853,7 @@ object GroupMetadataManager {
* @param offsetAndMetadata consumer's current offset and metadata
* @return payload for offset commit message
*/
private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
private[coordinator] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
// generate commit value with schema version 1
val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
@ -871,9 +875,9 @@ object GroupMetadataManager { @@ -871,9 +875,9 @@ object GroupMetadataManager {
* @param version the version of the value message to use
* @return payload for offset commit message
*/
def groupMetadataValue(groupMetadata: GroupMetadata,
assignment: Map[String, Array[Byte]],
version: Short = 0): Array[Byte] = {
private[coordinator] def groupMetadataValue(groupMetadata: GroupMetadata,
assignment: Map[String, Array[Byte]],
version: Short = 0): Array[Byte] = {
val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@ -1013,7 +1017,7 @@ object GroupMetadataManager { @@ -1013,7 +1017,7 @@ object GroupMetadataManager {
member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
group.add(memberId, member)
group.add(member)
}
group

2
core/src/main/scala/kafka/log/Log.scala

@ -316,7 +316,7 @@ class Log(@volatile var dir: File, @@ -316,7 +316,7 @@ class Log(@volatile var dir: File,
/**
* Check if we have the "clean shutdown" file
*/
private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
/**
* The number of segments in the log.

44
core/src/main/scala/kafka/log/LogManager.scala

@ -349,13 +349,7 @@ class LogManager(val logDirs: Array[File], @@ -349,13 +349,7 @@ class LogManager(val logDirs: Array[File],
/**
* Get the log if it exists, otherwise return None
*/
def getLog(topicPartition: TopicPartition): Option[Log] = {
val log = logs.get(topicPartition)
if (log == null)
None
else
Some(log)
}
def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
/**
* Create a log for the given topic and the given partition
@ -363,28 +357,20 @@ class LogManager(val logDirs: Array[File], @@ -363,28 +357,20 @@ class LogManager(val logDirs: Array[File],
*/
def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
var log = logs.get(topicPartition)
// check if the log has already been created in another thread
if(log != null)
return log
// if not, create it
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
log = new Log(dir,
config,
recoveryPoint = 0L,
scheduler,
time)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
topicPartition.partition,
dataDir.getAbsolutePath,
config.originals.asScala.mkString(", ")))
log
// create the log if it has not already been created in another thread
getLog(topicPartition).getOrElse {
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
topicPartition.partition,
dataDir.getAbsolutePath,
config.originals.asScala.mkString(", ")))
log
}
}
}

12
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -25,7 +25,7 @@ import kafka.api._ @@ -25,7 +25,7 @@ import kafka.api._
import kafka.cluster.{Partition, Replica}
import kafka.common._
import kafka.controller.KafkaController
import kafka.log.{LogAppendInfo, LogManager}
import kafka.log.{Log, LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.utils._
@ -189,6 +189,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -189,6 +189,8 @@ class ReplicaManager(val config: KafkaConfig,
}
}
def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
/**
* Try to complete some delayed produce requests with the request key;
* this can be triggered when:
@ -926,10 +928,16 @@ class ReplicaManager(val config: KafkaConfig, @@ -926,10 +928,16 @@ class ReplicaManager(val config: KafkaConfig,
}
}
private def getLeaderPartitions() : List[Partition] = {
private def getLeaderPartitions(): List[Partition] = {
allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
}
def getHighWatermark(topicPartition: TopicPartition): Option[Long] = {
getPartition(topicPartition).flatMap { partition =>
partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset)
}
}
// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks() {
val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))

215
core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala

@ -17,20 +17,23 @@ @@ -17,20 +17,23 @@
package kafka.coordinator
import java.nio.ByteBuffer
import kafka.api.ApiVersion
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, Topic}
import kafka.log.LogAppendInfo
import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, TimestampType}
import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.junit.{Before, Test}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import kafka.utils.TestUtils.fail
import scala.collection._
import JavaConverters._
@ -79,10 +82,153 @@ class GroupMetadataManagerTest { @@ -79,10 +82,153 @@ class GroupMetadataManagerTest {
partition = EasyMock.niceMock(classOf[Partition])
}
@After
def tearDown() {
EasyMock.reset(replicaManager)
EasyMock.reset(partition)
@Test
def testLoadOffsetsWithoutGroup() {
val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadOffsetsWithTombstones() {
val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
val startOffset = 15L
val tombstonePartition = new TopicPartition("foo", 1)
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
tombstonePartition -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val tombstone = Record.create(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(tombstone): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size - 1, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
if (topicPartition == tombstonePartition)
assertEquals(None, group.offset(topicPartition))
else
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadOffsetsAndGroup() {
val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderId)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
def testLoadGroupWithTombstone() {
val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
val startOffset = 15L
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, Seq(groupMetadataRecord, groupMetadataTombstone): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
assertEquals(None, groupMetadataManager.getGroup(groupId))
}
@Test
def testOffsetWriteAfterGroupRemoved(): Unit = {
// this test case checks the following scenario:
// 1. the group exists at some point in time, but is later removed (because all members left)
// 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset,
Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(TestUtils.fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Empty, group.currentState)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test
@ -156,7 +302,7 @@ class GroupMetadataManagerTest { @@ -156,7 +302,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.add(member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@ -185,7 +331,7 @@ class GroupMetadataManagerTest { @@ -185,7 +331,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.add(member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@ -264,7 +410,7 @@ class GroupMetadataManagerTest { @@ -264,7 +410,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
val delayedStoreOpt = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicPartition)
@ -557,7 +703,7 @@ class GroupMetadataManagerTest { @@ -557,7 +703,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.add(member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@ -620,4 +766,47 @@ class GroupMetadataManagerTest { @@ -620,4 +766,47 @@ class GroupMetadataManagerTest {
.andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
}
private def buildStableGroupRecordWithMember(memberId: String): Record = {
val group = new GroupMetadata(groupId)
group.transitionTo(PreparingRebalance)
val memberProtocols = List(("roundrobin", Array.emptyByteArray))
val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, "consumer", memberProtocols)
group.add(member)
member.awaitingJoinCallback = _ => {}
group.initNextGeneration()
group.transitionTo(Stable)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
Record.create(groupMetadataKey, groupMetadataValue)
}
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
startOffset: Long,
records: MemoryRecords): Unit = {
val endOffset = startOffset + records.deepEntries.asScala.size
val logMock = EasyMock.mock(classOf[Log])
val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
EasyMock.expect(replicaManager.getHighWatermark(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
.andReturn(records.buffer)
EasyMock.replay(logMock, fileRecordsMock)
}
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
groupId: String = groupId): Seq[Record] = {
committedOffsets.map { case (topicPartition, offset) =>
val offsetAndMetadata = OffsetAndMetadata(offset)
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
Record.create(offsetCommitKey, offsetCommitValue)
}.toSeq
}
}

16
core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala

@ -180,14 +180,14 @@ class GroupMetadataTest extends JUnitSuite { @@ -180,14 +180,14 @@ class GroupMetadataTest extends JUnitSuite {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(memberId, member)
group.add(member)
assertEquals("range", group.selectProtocol)
val otherMemberId = "otherMemberId"
val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(otherMemberId, otherMember)
group.add(otherMember)
// now could be either range or robin since there is no majority preference
assertTrue(Set("range", "roundrobin")(group.selectProtocol))
@ -195,7 +195,7 @@ class GroupMetadataTest extends JUnitSuite { @@ -195,7 +195,7 @@ class GroupMetadataTest extends JUnitSuite {
val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(lastMemberId, lastMember)
group.add(lastMember)
// now we should prefer 'roundrobin'
assertEquals("roundrobin", group.selectProtocol)
}
@ -216,8 +216,8 @@ class GroupMetadataTest extends JUnitSuite { @@ -216,8 +216,8 @@ class GroupMetadataTest extends JUnitSuite {
val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(memberId, member)
group.add(otherMemberId, otherMember)
group.add(member)
group.add(otherMember)
assertEquals("roundrobin", group.selectProtocol)
}
@ -230,7 +230,7 @@ class GroupMetadataTest extends JUnitSuite { @@ -230,7 +230,7 @@ class GroupMetadataTest extends JUnitSuite {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(memberId, member)
group.add(member)
assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
assertTrue(group.supportsProtocols(Set("range", "foo")))
assertFalse(group.supportsProtocols(Set("foo", "bar")))
@ -239,7 +239,7 @@ class GroupMetadataTest extends JUnitSuite { @@ -239,7 +239,7 @@ class GroupMetadataTest extends JUnitSuite {
val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(otherMemberId, otherMember)
group.add(otherMember)
assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
assertFalse(group.supportsProtocols(Set("range", "foo")))
@ -253,7 +253,7 @@ class GroupMetadataTest extends JUnitSuite { @@ -253,7 +253,7 @@ class GroupMetadataTest extends JUnitSuite {
group.transitionTo(PreparingRebalance)
member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.add(member)
assertEquals(0, group.generationId)
assertNull(group.protocol)

3
core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

@ -21,7 +21,7 @@ import java.util.Properties @@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.LinkedBlockingQueue
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.junit.Assert._
import org.junit.Assert.{assertEquals, assertTrue}
import org.easymock.EasyMock
import org.junit.Test
import kafka.api._
@ -36,7 +36,6 @@ import kafka.utils.TestUtils._ @@ -36,7 +36,6 @@ import kafka.utils.TestUtils._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import kafka.utils._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.Time
@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")

5
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -288,6 +288,11 @@ object TestUtils extends Logging { @@ -288,6 +288,11 @@ object TestUtils extends Logging {
props
}
/**
* Fail a test case explicitly. Return Nothing so that we are not constrained by the return type.
*/
def fail(msg: String): Nothing = throw new AssertionError(msg)
/**
* Wrap a single record log buffer.
*/

Loading…
Cancel
Save