diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index fee3c211141..52b9674af8c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -35,25 +35,6 @@ import java.util.Set; public class LeaderAndIsrRequest extends AbstractRequest { - public static class PartitionState { - public final int controllerEpoch; - public final int leader; - public final int leaderEpoch; - public final List isr; - public final int zkVersion; - public final Set replicas; - - public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, Set replicas) { - this.controllerEpoch = controllerEpoch; - this.leader = leader; - this.leaderEpoch = leaderEpoch; - this.isr = isr; - this.zkVersion = zkVersion; - this.replicas = replicas; - } - - } - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id); private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java new file mode 100644 index 00000000000..e7666321628 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +public class PartitionState { + public final int controllerEpoch; + public final int leader; + public final int leaderEpoch; + public final List isr; + public final int zkVersion; + public final Set replicas; + + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, Set replicas) { + this.controllerEpoch = controllerEpoch; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.isr = isr; + this.zkVersion = zkVersion; + this.replicas = replicas; + } + + @Override + public String toString() { + return "PartitionState(controllerEpoch=" + controllerEpoch + + ", leader=" + leader + + ", leaderEpoch=" + leaderEpoch + + ", isr=" + Arrays.toString(isr.toArray()) + + ", zkVersion=" + zkVersion + + ", replicas=" + Arrays.toString(replicas.toArray()) + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 27f89fa5796..1c217896383 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -33,24 +33,6 @@ import java.util.Set; public class UpdateMetadataRequest extends AbstractRequest { - public static final class PartitionState { - public final int controllerEpoch; - public final int leader; - public final int leaderEpoch; - public final List isr; - public final int zkVersion; - public final Set replicas; - - public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, Set replicas) { - this.controllerEpoch = controllerEpoch; - this.leader = leader; - this.leaderEpoch = leaderEpoch; - this.isr = isr; - this.zkVersion = zkVersion; - this.replicas = replicas; - } - } - public static final class Broker { public final int id; public final Map endPoints; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 2f53a3ce36a..afeece7d5cb 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -376,15 +376,15 @@ public class RequestResponseTest { } private AbstractRequest createLeaderAndIsrRequest() { - Map partitionStates = new HashMap<>(); + Map partitionStates = new HashMap<>(); List isr = Arrays.asList(1, 2); List replicas = Arrays.asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); partitionStates.put(new TopicPartition("topic5", 1), - new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); partitionStates.put(new TopicPartition("topic20", 1), - new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); Set leaders = new HashSet<>(Arrays.asList( new Node(0, "test0", 1223), @@ -402,15 +402,15 @@ public class RequestResponseTest { @SuppressWarnings("deprecation") private AbstractRequest createUpdateMetadataRequest(int version, String rack) { - Map partitionStates = new HashMap<>(); + Map partitionStates = new HashMap<>(); List isr = Arrays.asList(1, 2); List replicas = Arrays.asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); partitionStates.put(new TopicPartition("topic5", 1), - new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); partitionStates.put(new TopicPartition("topic20", 1), - new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); if (version == 0) { Set liveBrokers = new HashSet<>(Arrays.asList( diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a561a97037f..28d3c8d35c1 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.utils._ -import kafka.utils.CoreUtils.{inReadLock,inWriteLock} +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr import kafka.log.LogConfig @@ -26,17 +26,15 @@ import kafka.server._ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet - import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock + import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException} import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.LeaderAndIsrRequest - import scala.collection.JavaConverters._ - import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.requests.PartitionState /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR @@ -166,7 +164,7 @@ class Partition(val topic: String, * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { + def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt) // record the epoch of the controller that made the leadership decision. This is useful while updating the isr @@ -207,7 +205,7 @@ class Partition(val topic: String, * Make the local replica the follower by setting the new leader and ISR to empty * If the leader replica id does not change, return false to indicate the replica manager */ - def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { + def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt) val newLeaderBrokerId: Int = partitionStateInfo.leader diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 32478ca7a35..30070040a39 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -28,6 +28,7 @@ import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUp import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} +import org.apache.kafka.common.requests import org.apache.kafka.common.requests.{UpdateMetadataRequest, _} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, TopicPartition} @@ -362,7 +363,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch - val partitionState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader, + val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, partitionStateInfo.allReplicas.map(Integer.valueOf).asJava ) @@ -379,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging broker, p._1))) val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch - val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader, + val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, partitionStateInfo.allReplicas.map(Integer.valueOf).asJava ) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index b387f2efab3..f493e7d96d0 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -30,8 +30,7 @@ import kafka.utils.CoreUtils._ import kafka.utils.Logging import org.apache.kafka.common.Node import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState -import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} +import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest} /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b1cf68b5c11..77df0299d30 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,6 +19,7 @@ package kafka.server import java.io.{File, IOException} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{Partition, Replica} @@ -28,17 +29,18 @@ import kafka.log.{LogAppendInfo, LogManager} import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet} import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import org.I0Itec.zkclient.IZkChildListener -import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException, -InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException, -InvalidTimestampException} +import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, + InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException, + RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, + UnknownTopicOrPartitionException} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.TopicConstants import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest} +import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time => JTime} + import scala.collection._ import scala.collection.JavaConverters._ @@ -611,7 +613,7 @@ class ReplicaManager(val config: KafkaConfig, controllerEpoch = leaderAndISRRequest.controllerEpoch // First check partition's leader epoch - val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]() + val partitionState = new mutable.HashMap[Partition, PartitionState]() leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition) val partitionLeaderEpoch = partition.getLeaderEpoch() @@ -680,7 +682,7 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeLeaders(controllerId: Int, epoch: Int, - partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState], + partitionState: Map[Partition, PartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = { partitionState.foreach(state => @@ -751,7 +753,7 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeFollowers(controllerId: Int, epoch: Int, - partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState], + partitionState: Map[Partition, PartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Short], metadataCache: MetadataCache) : Set[Partition] = { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b1f0283a621..8f23c49c8b6 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { } private def createUpdateMetadataRequest = { - val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers) @@ -220,7 +220,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { private def createLeaderAndIsrRequest = { new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue, - Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, + Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, Set(new Node(brokerId, "localhost", 0)).asJava) } diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 3c30b6bed79..2e24aae4575 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -18,11 +18,10 @@ package kafka.server import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState import scala.collection.JavaConverters._ import kafka.api.LeaderAndIsr -import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse} +import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, PartitionState} import org.junit.Assert._ import kafka.utils.{CoreUtils, TestUtils} import kafka.cluster.Broker diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 770513c5cfe..b34c93df787 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -22,8 +22,8 @@ import util.Arrays.asList import kafka.common.BrokerEndPointNotAvailableException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.requests.UpdateMetadataRequest -import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint, PartitionState} +import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest} +import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} import org.junit.Test import org.junit.Assert._ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 57398562385..bfb66b99201 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,19 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo} import kafka.cluster.Broker import kafka.common.TopicAndPartition -import kafka.message.{MessageSet, ByteBufferMessageSet, Message} +import kafka.message.{ByteBufferMessageSet, Message, MessageSet} import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.LeaderAndIsrRequest -import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState +import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime => JMockTime} import org.apache.kafka.common.{Node, TopicPartition} import org.easymock.EasyMock import org.junit.Assert.{assertEquals, assertTrue} -import org.junit.{Test, Before, After} +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ import scala.collection.Map