diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 22d9dffccd1..251c57021c3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -165,7 +165,7 @@ class Partition(val topicPartition: TopicPartition, private val stateStore: PartitionStateStore, private val delayedOperations: DelayedOperations, private val metadataCache: MetadataCache, - private val logManager: LogManager) extends HostedPartition with Logging with KafkaMetricsGroup { + private val logManager: LogManager) extends Logging with KafkaMetricsGroup { def topic: String = topicPartition.topic def partitionId: Int = topicPartition.partition diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala index dac9f7927ec..236c8d103fd 100644 --- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala +++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala @@ -20,10 +20,9 @@ package kafka.server import java.util.concurrent.TimeUnit -import kafka.cluster.Partition import kafka.metrics.KafkaMetricsGroup -import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.DeleteRecordsResponse import scala.collection._ @@ -74,7 +73,7 @@ class DelayedDeleteRecords(delayMs: Long, // skip those partitions that have already been satisfied if (status.acksPending) { val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match { - case partition: Partition => + case HostedPartition.Online(partition) => partition.leaderReplicaIfLocal match { case Some(_) => val leaderLW = partition.lowWatermarkIfLeader diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 1570d4baf28..f1d1407ff8b 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -22,11 +22,10 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Lock import com.yammer.metrics.core.Meter -import kafka.cluster.Partition import kafka.metrics.KafkaMetricsGroup import kafka.utils.Pool -import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import scala.collection._ @@ -88,7 +87,7 @@ class DelayedProduce(delayMs: Long, // skip those partitions that have already been satisfied if (status.acksPending) { val (hasEnough, error) = replicaManager.getPartition(topicPartition) match { - case partition: Partition => + case HostedPartition.Online(partition) => partition.checkEnoughReplicasReachOffset(status.requiredOffset) case HostedPartition.Offline => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 64b6beed4ed..7ded9854dbb 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -114,21 +114,24 @@ case class FetchPartitionData(error: Errors = Errors.NONE, * instance when the broker receives a LeaderAndIsr request from the controller indicating * that it should be either a leader or follower of a partition. */ -trait HostedPartition - +sealed trait HostedPartition object HostedPartition { /** * This broker does not have any state for this partition locally. */ - object None extends HostedPartition + final object None extends HostedPartition + + /** + * This broker hosts the partition and it is online. + */ + final case class Online(partition: Partition) extends HostedPartition /** * This broker hosts the partition, but it is in an offline log directory. */ - object Offline extends HostedPartition + final object Offline extends HostedPartition } - object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" val IsrChangePropagationBlackOut = 5000L @@ -183,8 +186,9 @@ class ReplicaManager(val config: KafkaConfig, /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch private val localBrokerId = config.brokerId - private val allPartitions = new Pool[TopicPartition, HostedPartition](valueFactory = Some(tp => - Partition(tp, time, this))) + private val allPartitions = new Pool[TopicPartition, HostedPartition]( + valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, this))) + ) private val replicaStateChangeLock = new Object val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower) val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats) @@ -320,8 +324,8 @@ class ReplicaManager(val config: KafkaConfig, private def maybeRemoveTopicMetrics(topic: String): Unit = { val topicHasOnlinePartition = allPartitions.values.exists { - case partition: Partition => topic == partition.topic - case _ => false + case HostedPartition.Online(partition) => topic == partition.topic + case HostedPartition.None | HostedPartition.Offline => false } if (!topicHasOnlinePartition) brokerTopicStats.removeMetrics(topic) @@ -335,8 +339,8 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.Offline => throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") - case removedPartition: Partition => - if (allPartitions.remove(topicPartition, removedPartition)) { + case hostedPartition @ HostedPartition.Online(removedPartition) => + if (allPartitions.remove(topicPartition, hostedPartition)) { maybeRemoveTopicMetrics(topicPartition.topic) // this will delete the local log. This call may throw exception if the log is on offline directory removedPartition.delete() @@ -393,14 +397,14 @@ class ReplicaManager(val config: KafkaConfig, // Visible for testing def createPartition(topicPartition: TopicPartition): Partition = { val partition = Partition(topicPartition, time, this) - allPartitions.put(topicPartition, partition) + allPartitions.put(topicPartition, HostedPartition.Online(partition)) partition } def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = { getPartition(topicPartition) match { - case partition: Partition => Some(partition) - case _ => None + case HostedPartition.Online(partition) => Some(partition) + case HostedPartition.None | HostedPartition.Offline => None } } @@ -408,8 +412,8 @@ class ReplicaManager(val config: KafkaConfig, // the iterator has been constructed could still be returned by this iterator. private def nonOfflinePartitionsIterator: Iterator[Partition] = { allPartitions.values.iterator.flatMap { - case p: Partition => Some(p) - case _ => None + case HostedPartition.Online(partition) => Some(partition) + case HostedPartition.None | HostedPartition.Offline => None } } @@ -419,7 +423,7 @@ class ReplicaManager(val config: KafkaConfig, def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = { getPartition(topicPartition) match { - case partition: Partition => + case HostedPartition.Online(partition) => partition case HostedPartition.Offline => @@ -577,7 +581,7 @@ class ReplicaManager(val config: KafkaConfig, throw new KafkaStorageException(s"Log directory $destinationDir is offline") getPartition(topicPartition) match { - case partition: Partition => + case HostedPartition.Online(partition) => // Stop current replica movement if the destinationDir is different from the existing destination log directory if (partition.futureReplicaDirChanged(destinationDir)) { replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) @@ -586,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.Offline => throw new KafkaStorageException(s"Partition $topicPartition is offline") - case _ => // Do nothing + case HostedPartition.None => // Do nothing } // If the log for this partition has not been created yet: @@ -776,8 +780,8 @@ class ReplicaManager(val config: KafkaConfig, (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => val logStartOffset = getPartition(topicPartition) match { - case partition: Partition => partition.logStartOffset - case _ => -1L + case HostedPartition.Online(partition) => partition.logStartOffset + case HostedPartition.None | HostedPartition.Offline => -1L } brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() @@ -1064,11 +1068,11 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) None - case partition: Partition => Some(partition) + case HostedPartition.Online(partition) => Some(partition) case HostedPartition.None => val partition = Partition(topicPartition, time, this) - allPartitions.putIfNotExists(topicPartition, partition) + allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition)) newPartitions.add(partition) Some(partition) } @@ -1534,7 +1538,7 @@ class ReplicaManager(val config: KafkaConfig, def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = { requestedEpochInfo.map { case (tp, partitionData) => val epochEndOffset = getPartition(tp) match { - case partition: Partition => + case HostedPartition.Online(partition) => partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch, fetchOnlyFromLeader = true) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index c12e19d1f5c..d5d33fb7aac 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -2017,7 +2017,7 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition)) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) @@ -2558,7 +2558,7 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition)) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) @@ -2599,7 +2599,7 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition)) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 0487178c474..f9b571e68c2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -17,15 +17,24 @@ package kafka.coordinator.group +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Gauge +import java.nio.ByteBuffer +import java.util.Collections +import java.util.Optional +import java.util.concurrent.locks.ReentrantLock import kafka.api._ import kafka.cluster.Partition import kafka.common.OffsetAndMetadata import kafka.log.{Log, LogAppendInfo} import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager} +import kafka.server.HostedPartition import kafka.utils.{KafkaScheduler, MockTime, TestUtils} +import kafka.zk.KafkaZkClient import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.OffsetFetchResponse @@ -34,19 +43,8 @@ import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue} import org.junit.{Before, Test} import org.scalatest.Assertions.fail -import java.nio.ByteBuffer -import java.util.Collections -import java.util.Optional - -import com.yammer.metrics.Metrics -import com.yammer.metrics.core.Gauge -import org.apache.kafka.common.internals.Topic - import scala.collection.JavaConverters._ import scala.collection._ -import java.util.concurrent.locks.ReentrantLock - -import kafka.zk.KafkaZkClient class GroupMetadataManagerTest { @@ -2025,7 +2023,7 @@ class GroupMetadataManagerTest { } private def mockGetPartition(): Unit = { - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(HostedPartition.Online(partition)) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) }