Browse Source

MINOR: Improve PartitionState logging and remove duplication of code

Currently, logs involving PartitionState are not very helpful.

```
	Broker 449 cached leader info org.apache.kafka.common.requests.UpdateMetadataRequest$PartitionState3285d64a for partition <topic>-<partition> in response to UpdateMetadata request sent by controller 356 epoch 138 with correlation id 0

	TRACE state.change.logger: Broker 449 received LeaderAndIsr request org.apache.kafka.common.requests.LeaderAndIsrRequest$PartitionState66d6a8eb correlation id 3 from controller 356 epoch 138 for partition [<topic>,<partition>]
```

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1609 from SinghAsDev/partitionState
pull/1647/head
Ashish Singh 8 years ago committed by Ismael Juma
parent
commit
0e5700fb68
  1. 19
      clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
  2. 46
      clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
  3. 18
      clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
  4. 16
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  5. 12
      core/src/main/scala/kafka/cluster/Partition.scala
  6. 5
      core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  7. 3
      core/src/main/scala/kafka/server/MetadataCache.scala
  8. 18
      core/src/main/scala/kafka/server/ReplicaManager.scala
  9. 4
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  10. 3
      core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
  11. 4
      core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
  12. 7
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

19
clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java

@ -35,25 +35,6 @@ import java.util.Set; @@ -35,25 +35,6 @@ import java.util.Set;
public class LeaderAndIsrRequest extends AbstractRequest {
public static class PartitionState {
public final int controllerEpoch;
public final int leader;
public final int leaderEpoch;
public final List<Integer> isr;
public final int zkVersion;
public final Set<Integer> replicas;
public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
this.controllerEpoch = controllerEpoch;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.isr = isr;
this.zkVersion = zkVersion;
this.replicas = replicas;
}
}
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
private static final String CONTROLLER_ID_KEY_NAME = "controller_id";

46
clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java

@ -0,0 +1,46 @@ @@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.requests;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
public class PartitionState {
public final int controllerEpoch;
public final int leader;
public final int leaderEpoch;
public final List<Integer> isr;
public final int zkVersion;
public final Set<Integer> replicas;
public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
this.controllerEpoch = controllerEpoch;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.isr = isr;
this.zkVersion = zkVersion;
this.replicas = replicas;
}
@Override
public String toString() {
return "PartitionState(controllerEpoch=" + controllerEpoch +
", leader=" + leader +
", leaderEpoch=" + leaderEpoch +
", isr=" + Arrays.toString(isr.toArray()) +
", zkVersion=" + zkVersion +
", replicas=" + Arrays.toString(replicas.toArray()) + ")";
}
}

18
clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java

@ -33,24 +33,6 @@ import java.util.Set; @@ -33,24 +33,6 @@ import java.util.Set;
public class UpdateMetadataRequest extends AbstractRequest {
public static final class PartitionState {
public final int controllerEpoch;
public final int leader;
public final int leaderEpoch;
public final List<Integer> isr;
public final int zkVersion;
public final Set<Integer> replicas;
public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
this.controllerEpoch = controllerEpoch;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.isr = isr;
this.zkVersion = zkVersion;
this.replicas = replicas;
}
}
public static final class Broker {
public final int id;
public final Map<SecurityProtocol, EndPoint> endPoints;

16
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -376,15 +376,15 @@ public class RequestResponseTest { @@ -376,15 +376,15 @@ public class RequestResponseTest {
}
private AbstractRequest createLeaderAndIsrRequest() {
Map<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new HashMap<>();
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic5", 1),
new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic20", 1),
new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
Set<Node> leaders = new HashSet<>(Arrays.asList(
new Node(0, "test0", 1223),
@ -402,15 +402,15 @@ public class RequestResponseTest { @@ -402,15 +402,15 @@ public class RequestResponseTest {
@SuppressWarnings("deprecation")
private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic5", 1),
new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic20", 1),
new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
if (version == 0) {
Set<Node> liveBrokers = new HashSet<>(Arrays.asList(

12
core/src/main/scala/kafka/cluster/Partition.scala

@ -18,7 +18,7 @@ package kafka.cluster @@ -18,7 +18,7 @@ package kafka.cluster
import kafka.common._
import kafka.utils._
import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
@ -26,17 +26,15 @@ import kafka.server._ @@ -26,17 +26,15 @@ import kafka.server._
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.requests.PartitionState
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@ -166,7 +164,7 @@ class Partition(val topic: String, @@ -166,7 +164,7 @@ class Partition(val topic: String,
* from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager.
*/
def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
@ -207,7 +205,7 @@ class Partition(val topic: String, @@ -207,7 +205,7 @@ class Partition(val topic: String,
* Make the local replica the follower by setting the new leader and ISR to empty
* If the leader replica id does not change, return false to indicate the replica manager
*/
def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.leader

5
core/src/main/scala/kafka/controller/ControllerChannelManager.scala

@ -28,6 +28,7 @@ import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUp @@ -28,6 +28,7 @@ import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUp
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests
import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
@ -362,7 +363,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging @@ -362,7 +363,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
}
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader,
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
@ -379,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging @@ -379,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
broker, p._1)))
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)

3
core/src/main/scala/kafka/server/MetadataCache.scala

@ -30,8 +30,7 @@ import kafka.utils.CoreUtils._ @@ -30,8 +30,7 @@ import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.Node
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest}
/**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through

18
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -19,6 +19,7 @@ package kafka.server @@ -19,6 +19,7 @@ package kafka.server
import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{Partition, Replica}
@ -28,17 +29,18 @@ import kafka.log.{LogAppendInfo, LogManager} @@ -28,17 +29,18 @@ import kafka.log.{LogAppendInfo, LogManager}
import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.I0Itec.zkclient.IZkChildListener
import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, ReplicaNotAvailableException, RecordTooLargeException,
InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException, UnknownTopicOrPartitionException,
InvalidTimestampException}
import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException,
InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException,
RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.TopicConstants
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time => JTime}
import scala.collection._
import scala.collection.JavaConverters._
@ -611,7 +613,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -611,7 +613,7 @@ class ReplicaManager(val config: KafkaConfig,
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
val partitionLeaderEpoch = partition.getLeaderEpoch()
@ -680,7 +682,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -680,7 +682,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
partitionState.foreach(state =>
@ -751,7 +753,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -751,7 +753,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short],
metadataCache: MetadataCache) : Set[Partition] = {

4
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { @@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
}
private def createUpdateMetadataRequest = {
val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava
new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers)
@ -220,7 +220,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { @@ -220,7 +220,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
private def createLeaderAndIsrRequest = {
new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new Node(brokerId, "localhost", 0)).asJava)
}

3
core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala

@ -18,11 +18,10 @@ @@ -18,11 +18,10 @@
package kafka.server
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
import scala.collection.JavaConverters._
import kafka.api.LeaderAndIsr
import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse}
import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, PartitionState}
import org.junit.Assert._
import kafka.utils.{CoreUtils, TestUtils}
import kafka.cluster.Broker

4
core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala

@ -22,8 +22,8 @@ import util.Arrays.asList @@ -22,8 +22,8 @@ import util.Arrays.asList
import kafka.common.BrokerEndPointNotAvailableException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.UpdateMetadataRequest
import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint, PartitionState}
import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint}
import org.junit.Test
import org.junit.Assert._

7
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -24,19 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean @@ -24,19 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{MockTime => JMockTime}
import org.apache.kafka.common.{Node, TopicPartition}
import org.easymock.EasyMock
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Test, Before, After}
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.Map

Loading…
Cancel
Save