From 0a508a436c928c0440e8b90dda98c22dc0ec244c Mon Sep 17 00:00:00 2001 From: umesh chaudhary Date: Tue, 12 Dec 2017 16:00:04 +0200 Subject: [PATCH] KAFKA-5631; Use Jackson for serialising to JSON - Rename `encode` to `legacyEncodeAsString`, we can remove this when we remove `ZkUtils`. - Introduce `encodeAsString` that uses Jackson. - Change `encodeAsBytes` to use Jackson. - Avoid intermediate string when converting Broker to json bytes. The methods that use Jackson only support Java collections unlike `legacyEncodeAsString`. Tests were added `encodeAsString` and `encodeAsBytes`. Author: umesh chaudhary Reviewers: Ismael Juma Closes #4259 from umesh9794/KAFKA-5631 --- .../main/scala/kafka/admin/AdminUtils.scala | 6 +- .../scala/kafka/admin/LogDirsCommand.scala | 16 ++-- .../admin/ReassignPartitionsCommand.scala | 13 ++- .../main/scala/kafka/api/LeaderAndIsr.scala | 4 +- .../src/main/scala/kafka/cluster/Broker.scala | 14 +-- .../consumer/ZookeeperConsumerConnector.scala | 10 ++- .../transaction/ProducerIdManager.scala | 4 +- .../main/scala/kafka/security/auth/Acl.scala | 3 +- .../scala/kafka/tools/DumpLogSegments.scala | 16 ++-- core/src/main/scala/kafka/utils/Json.scala | 30 ++++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 17 ++-- core/src/main/scala/kafka/zk/ZkData.scala | 46 +++++----- .../scala/unit/kafka/admin/AdminTest.scala | 4 +- .../consumer/PartitionAssignorTest.scala | 6 +- .../unit/kafka/security/auth/AclTest.scala | 5 +- .../server/DynamicConfigChangeTest.scala | 9 +- .../scala/unit/kafka/utils/JsonTest.scala | 87 +++++++++++++++---- .../kafka/utils/ReplicationUtilsTest.scala | 5 +- .../scala/unit/kafka/utils/ZkUtilsTest.scala | 2 +- gradle/dependencies.gradle | 2 +- 20 files changed, 187 insertions(+), 112 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 4d0ad588648..f21b94281c3 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -25,7 +25,7 @@ import java.util.Random import java.util.Properties import kafka.common.TopicAlreadyMarkedForDeletionException -import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.errors._ import collection.{Map, Set, mutable, _} import scala.collection.JavaConverters._ @@ -628,7 +628,7 @@ object AdminUtils extends Logging with AdminUtilities { // create the change notification val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix - val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath)) + val content = Json.legacyEncodeAsString(getConfigChangeZnodeData(sanitizedEntityPath)) zkUtils.createSequentialPersistentPath(seqNode, content) } @@ -641,7 +641,7 @@ object AdminUtils extends Logging with AdminUtilities { */ private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) { val map = Map("version" -> 1, "config" -> config.asScala) - zkUtils.updatePersistentPath(entityPath, Json.encode(map)) + zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map)) } /** diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala index 6a167a23dca..d8e1beb364f 100644 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala @@ -56,7 +56,7 @@ object LogDirsCommand { } private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = { - Json.encode(Map( + Json.encodeAsString(Map( "version" -> 1, "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) => Map( @@ -73,13 +73,13 @@ object LogDirsCommand { "size" -> replicaInfo.size, "offsetLag" -> replicaInfo.offsetLag, "isFuture" -> replicaInfo.isFuture - ) - } - ) - } - ) - } - )) + ).asJava + }.asJava + ).asJava + }.asJava + ).asJava + }.asJava + ).asJava) } private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = { diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index e8aad7f167b..811e56e8775 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -43,7 +43,6 @@ object ReassignPartitionsCommand extends Logging { private[admin] val AnyLogDir = "any" def main(args: Array[String]): Unit = { - val opts = validateAndParseArgs(args) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) val zkUtils = ZkUtils(zkConnect, @@ -224,17 +223,17 @@ object ReassignPartitionsCommand extends Logging { def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], replicaLogDirAssignment: Map[TopicPartitionReplica, String]): String = { - Json.encode(Map( + Json.encodeAsString(Map( "version" -> 1, "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) => Map( "topic" -> topic, "partition" -> partition, - "replicas" -> replicas, - "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir)) - ) - } - )) + "replicas" -> replicas.asJava, + "log_dirs" -> replicas.map(r => replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(topic, partition, r), AnyLogDir)).asJava + ).asJava + }.asJava + ).asJava) } // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index 7a83cf3321b..cb59575ea7a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -17,8 +17,6 @@ package kafka.api -import kafka.utils._ - object LeaderAndIsr { val initialLeaderEpoch: Int = 0 val initialZKVersion: Int = 0 @@ -43,6 +41,6 @@ case class LeaderAndIsr(leader: Int, def newEpochAndZkVersion = newLeaderAndIsr(leader, isr) override def toString: String = { - Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr)) + s"LeaderAndIsr(leader=$leader, leaderEpoch=$leaderEpoch, isr=$isr, zkVersion=$zkVersion)" } } diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index a148dfd7540..df3be98dbef 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -24,6 +24,9 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Time +import scala.collection.Map +import scala.collection.JavaConverters._ + /** * A Kafka broker. * A broker has an id and a collection of end-points. @@ -127,12 +130,12 @@ object Broker { } } - def toJson(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int, - rack: Option[String]): String = { + def toJsonBytes(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int, + rack: Option[String]): Array[Byte] = { val jsonMap = collection.mutable.Map(VersionKey -> version, HostKey -> host, PortKey -> port, - EndpointsKey -> advertisedEndpoints.map(_.connectionString).toArray, + EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava, JmxPortKey -> jmxPort, TimestampKey -> Time.SYSTEM.milliseconds().toString ) @@ -141,10 +144,9 @@ object Broker { if (version >= 4) { jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint => endPoint.listenerName.value -> endPoint.securityProtocol.name - }.toMap) + }.toMap.asJava) } - - Json.encode(jsonMap) + Json.encodeAsBytes(jsonMap.asJava) } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index bb5fc0fc2d8..759da4fe7c0 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -45,6 +45,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ import scala.collection.JavaConverters._ + /** * This class handles the consumers interaction with zookeeper * @@ -272,8 +273,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { info("begin registering consumer " + consumerIdString + " in ZK") val timestamp = Time.SYSTEM.milliseconds.toString - val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern, - "timestamp" -> timestamp)) + + val consumerRegistrationInfo = Json.encodeAsString(Map("version" -> 1, + "subscription" -> topicCount.getTopicCountMap.asJava, + "pattern" -> topicCount.pattern, + "timestamp" -> timestamp + ).asJava) + val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs. consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index 5d320852465..716e3d1a7fc 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -22,6 +22,8 @@ import kafka.common.KafkaException import kafka.utils.{Json, Logging, ZkUtils} import kafka.zk.KafkaZkClient +import scala.collection.JavaConverters._ + /** * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way * such that the same producerId will not be assigned twice across multiple transaction coordinators. @@ -37,7 +39,7 @@ object ProducerIdManager extends Logging { Json.encodeAsBytes(Map("version" -> CurrentVersion, "broker" -> producerIdBlock.brokerId, "block_start" -> producerIdBlock.blockStartId.toString, - "block_end" -> producerIdBlock.blockEndId.toString) + "block_end" -> producerIdBlock.blockEndId.toString).asJava ) } diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala index 4e2cba4032f..67f3d9592f2 100644 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ b/core/src/main/scala/kafka/security/auth/Acl.scala @@ -20,6 +20,7 @@ package kafka.security.auth import kafka.utils.Json import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.SecurityUtils +import scala.collection.JavaConverters._ object Acl { val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*") @@ -71,7 +72,7 @@ object Acl { } def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = { - Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap).toList) + Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap.asJava).toList.asJava) } } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fe82dc2d72f..127c5702674 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import scala.collection.mutable +import scala.collection.{Map, mutable} import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ @@ -334,12 +334,14 @@ object DumpLogSegments { } }.mkString("{", ",", "}") - val keyString = Json.encode(Map("metadata" -> groupId)) - val valueString = Json.encode(Map( - "protocolType" -> protocolType, - "protocol" -> group.protocol, - "generationId" -> group.generationId, - "assignment" -> assignment)) + val keyString = Json.encodeAsString(Map("metadata" -> groupId).asJava) + + val valueString = Json.encodeAsString(Map( + "protocolType" -> protocolType, + "protocol" -> group.protocol, + "generationId" -> group.generationId, + "assignment" -> assignment + ).asJava) (Some(keyString), Some(valueString)) } diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index c654d4518a1..e8e7d8a79a8 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -59,9 +59,11 @@ object Json { * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T] * Any other type will result in an exception. * - * This method does not properly handle non-ascii characters. + * This implementation is inefficient, so we recommend `encodeAsString` or `encodeAsBytes` (the latter is preferred + * if possible). This method supports scala Map implementations while the other two do not. Once this functionality + * is no longer required, we can remove this method. */ - def encode(obj: Any): String = { + def legacyEncodeAsString(obj: Any): String = { obj match { case null => "null" case b: Boolean => b.toString @@ -69,22 +71,26 @@ object Json { case n: Number => n.toString case m: Map[_, _] => "{" + m.map { - case (k, v) => encode(k) + ":" + encode(v) + case (k, v) => legacyEncodeAsString(k) + ":" + legacyEncodeAsString(v) case elem => throw new IllegalArgumentException(s"Invalid map element '$elem' in $obj") }.mkString(",") + "}" - case a: Array[_] => encode(a.toSeq) - case i: Iterable[_] => "[" + i.map(encode).mkString(",") + "]" + case a: Array[_] => legacyEncodeAsString(a.toSeq) + case i: Iterable[_] => "[" + i.map(legacyEncodeAsString).mkString(",") + "]" case other: AnyRef => throw new IllegalArgumentException(s"Unknown argument of type ${other.getClass}: $other") } } /** - * Encode an object into a JSON value in bytes. This method accepts any type T where - * T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T] - * Any other type will result in an exception. - * - * This method does not properly handle non-ascii characters. - */ - def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8) + * Encode an object into a JSON string. This method accepts any type supported by Jackson's ObjectMapper in + * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid + * a jackson-scala dependency). + */ + def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj) + /** + * Encode an object into a JSON value in bytes. This method accepts any type supported by Jackson's ObjectMapper in + * the default configuration. That is, Java collections are supported, but Scala collections are not (to avoid + * a jackson-scala dependency). + */ + def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index b378280d05f..2c079e58d4b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,6 +17,7 @@ package kafka.utils +import java.nio.charset.StandardCharsets import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.admin._ @@ -198,15 +199,15 @@ object ZkUtils { } def controllerZkData(brokerId: Int, timestamp: Long): String = { - Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)) + Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)) } def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = { - Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))) + Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))) } def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { - Json.encode(Map( + Json.legacyEncodeAsString(Map( "version" -> 1, "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) => Map( @@ -315,8 +316,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, object ClusterId { def toJson(id: String) = { - val jsonMap = Map("version" -> "1", "id" -> id) - Json.encode(jsonMap) + Json.legacyEncodeAsString(Map("version" -> "1", "id" -> id)) } def fromJson(clusterIdJson: String): String = { @@ -457,7 +457,8 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, val brokerIdPath = BrokerIdsPath + "/" + id // see method documentation for reason why we do this val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 - val json = Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack) + val json = new String(Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack), + StandardCharsets.UTF_8) registerBrokerInZk(brokerIdPath, json) info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) @@ -486,7 +487,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, } def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { - Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, + Json.legacyEncodeAsString(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) } @@ -494,7 +495,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, * Get JSON partition to replica map from zookeeper. */ def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = { - Json.encode(Map("version" -> 1, "partitions" -> map)) + Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> map)) } /** diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 8bd32d0ed70..2223001843d 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -27,13 +27,15 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.utils.Json import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat +import scala.collection.JavaConverters._ // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). object ControllerZNode { def path = "/controller" - def encode(brokerId: Int, timestamp: Long): Array[Byte] = - Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)) + def encode(brokerId: Int, timestamp: Long): Array[Byte] = { + Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString).asJava) + } def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => js.asJsonObject("brokerid").to[Int] } @@ -68,7 +70,7 @@ object BrokerIdZNode { rack: Option[String], apiVersion: ApiVersion): Array[Byte] = { val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 - Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes(UTF_8) + Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack) } def decode(id: Int, bytes: Array[Byte]): Broker = { @@ -83,8 +85,10 @@ object TopicsZNode { object TopicZNode { def path(topic: String) = s"${TopicsZNode.path}/$topic" def encode(assignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = { - val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas } - Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson)) + val assignmentJson = assignment.map { case (partition, replicas) => + partition.partition.toString -> replicas.asJava + } + Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson.asJava).asJava) } def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = { Json.parseBytes(bytes).flatMap { js => @@ -113,7 +117,7 @@ object TopicPartitionStateZNode { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, - "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) + "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr.asJava).asJava) } def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = { Json.parseBytes(bytes).map { js => @@ -135,8 +139,7 @@ object ConfigEntityTypeZNode { object ConfigEntityZNode { def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName" def encode(config: Properties): Array[Byte] = { - import scala.collection.JavaConverters._ - Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala)) + Json.encodeAsBytes(Map("version" -> 1, "config" -> config).asJava) } def decode(bytes: Array[Byte]): Properties = { val props = new Properties() @@ -157,8 +160,8 @@ object ConfigEntityChangeNotificationZNode { object ConfigEntityChangeNotificationSequenceZNode { val SequenceNumberPrefix = "config_change_" def createPath = s"${ConfigEntityChangeNotificationZNode.path}/$SequenceNumberPrefix" - def encode(sanitizedEntityPath : String): Array[Byte] = Json.encodeAsBytes(Map("version" -> 2, "entity_path" -> sanitizedEntityPath)) - def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8) + def encode(sanitizedEntityPath: String): Array[Byte] = Json.encodeAsBytes( + Map("version" -> 2, "entity_path" -> sanitizedEntityPath).asJava) } object IsrChangeNotificationZNode { @@ -169,8 +172,8 @@ object IsrChangeNotificationSequenceZNode { val SequenceNumberPrefix = "isr_change_" def path(sequenceNumber: String = "") = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber" def encode(partitions: collection.Set[TopicPartition]): Array[Byte] = { - val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition)) - Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitionsJson)) + val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition).asJava) + Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitionsJson.asJava).asJava) } def decode(bytes: Array[Byte]): Set[TopicPartition] = { @@ -195,8 +198,9 @@ object LogDirEventNotificationSequenceZNode { val SequenceNumberPrefix = "log_dir_event_" val LogDirFailureEvent = 1 def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber" - def encode(brokerId: Int) = - Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)) + def encode(brokerId: Int) = { + Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent).asJava) + } def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => js.asJsonObject("broker").to[Int] } @@ -219,9 +223,9 @@ object ReassignPartitionsZNode { def path = s"${AdminZNode.path}/reassign_partitions" def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = { val reassignmentJson = reassignment.map { case (tp, replicas) => - Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas) - } - Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson)) + Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas.asJava).asJava + }.asJava + Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson).asJava) } def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js => val reassignmentJson = js.asJsonObject @@ -242,8 +246,8 @@ object PreferredReplicaElectionZNode { def path = s"${AdminZNode.path}/preferred_replica_election" def encode(partitions: Set[TopicPartition]): Array[Byte] = { val jsonMap = Map("version" -> 1, - "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))) - Json.encodeAsBytes(jsonMap) + "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition).asJava).asJava) + Json.encodeAsBytes(jsonMap.asJava) } def decode(bytes: Array[Byte]): Set[TopicPartition] = Json.parseBytes(bytes).map { js => val partitionsJson = js.asJsonObject("partitions").asJsonArray @@ -296,7 +300,9 @@ object ResourceTypeZNode { object ResourceZNode { def path(resource: Resource) = s"${AclZNode.path}/${resource.resourceType}/${resource.name}" - def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls)) + def encode(acls: Set[Acl]): Array[Byte] = { + Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava) + } def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion) } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 0c9bd6e4c6d..d1e758d4c70 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -532,8 +532,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { // Write config without notification to ZK. val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000") - val map = Map("version" -> 1, "config" -> configMap) - zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encode(map)) + val map = Map("version" -> 1, "config" -> configMap.asJava) + zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encodeAsString(map.asJava)) val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client) assertEquals("Must have 1 overriden client config", 1, configInZk.size) diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 3012112b58d..12fcba6dc74 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -153,7 +153,7 @@ private object PartitionAssignorTest extends Logging { private case class StaticSubscriptionInfo(streamCounts: Map[String, Int]) extends SubscriptionInfo { def registrationString = - Json.encode(Map("version" -> 1, + Json.legacyEncodeAsString(Map("version" -> 1, "subscription" -> streamCounts, "pattern" -> "static", "timestamp" -> 1234.toString)) @@ -166,7 +166,7 @@ private object PartitionAssignorTest extends Logging { private case class WildcardSubscriptionInfo(streamCount: Int, regex: String, isWhitelist: Boolean) extends SubscriptionInfo { def registrationString = - Json.encode(Map("version" -> 1, + Json.legacyEncodeAsString(Map("version" -> 1, "subscription" -> Map(regex -> streamCount), "pattern" -> (if (isWhitelist) "white_list" else "black_list"))) @@ -206,7 +206,7 @@ private object PartitionAssignorTest extends Logging { scenario.topicPartitionCounts.foreach { case(topic, partitionCount) => val replicaAssignment = Map((0 until partitionCount).map(partition => (partition.toString, Seq(0))):_*) EasyMock.expect(zkClient.readData("/brokers/topics/%s".format(topic), new Stat())) - .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment)) + .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment)) EasyMock.expectLastCall().anyTimes() } diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala index dfdd85face3..beeac376653 100644 --- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala @@ -22,6 +22,7 @@ import kafka.utils.Json import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.{Assert, Test} import org.scalatest.junit.JUnitSuite +import scala.collection.JavaConverters._ class AclTest extends JUnitSuite { @@ -36,9 +37,9 @@ class AclTest extends JUnitSuite { val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read) val acls = Set[Acl](acl1, acl2, acl3) - val jsonAcls = Json.encode(Acl.toJsonCompatibleMap(acls)) + val jsonAcls = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava) - Assert.assertEquals(acls, Acl.fromBytes(jsonAcls.getBytes(UTF_8))) + Assert.assertEquals(acls, Acl.fromBytes(jsonAcls)) Assert.assertEquals(acls, Acl.fromBytes(AclJson.getBytes(UTF_8))) } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 61da4200c0e..d596d0f81e2 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -31,6 +31,7 @@ import kafka.admin.AdminOperationException import org.apache.kafka.common.TopicPartition import scala.collection.Map +import scala.collection.JavaConverters._ class DynamicConfigChangeTest extends KafkaServerTestHarness { def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) @@ -195,7 +196,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Incorrect Map. No version try { val jsonMap = Map("v" -> 1, "x" -> 2) - configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -204,7 +205,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Version is provided. EntityType is incorrect try { val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x") - configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -214,7 +215,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // EntityName isn't provided try { val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic) - configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava)) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } catch { @@ -223,7 +224,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Everything is provided val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x") - configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap)) + configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap.asJava)) // Verify that processConfigChanges was only called once EasyMock.verify(handler) diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala b/core/src/test/scala/unit/kafka/utils/JsonTest.scala index 93509b4c16a..fa2a030f0b5 100644 --- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala +++ b/core/src/test/scala/unit/kafka/utils/JsonTest.scala @@ -16,6 +16,8 @@ */ package kafka.utils +import java.nio.charset.StandardCharsets + import org.junit.Assert._ import org.junit.Test import com.fasterxml.jackson.databind.JsonNode @@ -47,7 +49,7 @@ class JsonTest { // Test with encoder that properly escapes backslash and quotes val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""") - val encoded = Json.encode(map) + val encoded = Json.legacyEncodeAsString(map) val decoded = Json.parseFull(encoded) assertEquals(Json.parseFull("""{"foo1":"bar1\\,bar2", "foo2":"\\bar"}"""), decoded) @@ -57,24 +59,71 @@ class JsonTest { } @Test - def testJsonEncoding() { - assertEquals("null", Json.encode(null)) - assertEquals("1", Json.encode(1)) - assertEquals("1", Json.encode(1L)) - assertEquals("1", Json.encode(1.toByte)) - assertEquals("1", Json.encode(1.toShort)) - assertEquals("1.0", Json.encode(1.0)) - assertEquals("\"str\"", Json.encode("str")) - assertEquals("true", Json.encode(true)) - assertEquals("false", Json.encode(false)) - assertEquals("[]", Json.encode(Seq())) - assertEquals("[1,2,3]", Json.encode(Seq(1,2,3))) - assertEquals("[1,\"2\",[3]]", Json.encode(Seq(1,"2",Seq(3)))) - assertEquals("{}", Json.encode(Map())) - assertEquals("{\"a\":1,\"b\":2}", Json.encode(Map("a" -> 1, "b" -> 2))) - assertEquals("{\"a\":[1,2],\"c\":[3,4]}", Json.encode(Map("a" -> Seq(1,2), "c" -> Seq(3,4)))) - assertEquals(""""str1\\,str2"""", Json.encode("""str1\,str2""")) - assertEquals(""""\"quoted\""""", Json.encode(""""quoted"""")) + def testLegacyEncodeAsString() { + assertEquals("null", Json.legacyEncodeAsString(null)) + assertEquals("1", Json.legacyEncodeAsString(1)) + assertEquals("1", Json.legacyEncodeAsString(1L)) + assertEquals("1", Json.legacyEncodeAsString(1.toByte)) + assertEquals("1", Json.legacyEncodeAsString(1.toShort)) + assertEquals("1.0", Json.legacyEncodeAsString(1.0)) + assertEquals(""""str"""", Json.legacyEncodeAsString("str")) + assertEquals("true", Json.legacyEncodeAsString(true)) + assertEquals("false", Json.legacyEncodeAsString(false)) + assertEquals("[]", Json.legacyEncodeAsString(Seq())) + assertEquals("[1,2,3]", Json.legacyEncodeAsString(Seq(1,2,3))) + assertEquals("""[1,"2",[3]]""", Json.legacyEncodeAsString(Seq(1,"2",Seq(3)))) + assertEquals("{}", Json.legacyEncodeAsString(Map())) + assertEquals("""{"a":1,"b":2}""", Json.legacyEncodeAsString(Map("a" -> 1, "b" -> 2))) + assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.legacyEncodeAsString(Map("a" -> Seq(1,2), "c" -> Seq(3,4)))) + assertEquals(""""str1\\,str2"""", Json.legacyEncodeAsString("""str1\,str2""")) + assertEquals(""""\"quoted\""""", Json.legacyEncodeAsString(""""quoted"""")) + + } + + @Test + def testEncodeAsString() { + assertEquals("null", Json.encodeAsString(null)) + assertEquals("1", Json.encodeAsString(1)) + assertEquals("1", Json.encodeAsString(1L)) + assertEquals("1", Json.encodeAsString(1.toByte)) + assertEquals("1", Json.encodeAsString(1.toShort)) + assertEquals("1.0", Json.encodeAsString(1.0)) + assertEquals(""""str"""", Json.encodeAsString("str")) + assertEquals("true", Json.encodeAsString(true)) + assertEquals("false", Json.encodeAsString(false)) + assertEquals("[]", Json.encodeAsString(Seq().asJava)) + assertEquals("[null]", Json.encodeAsString(Seq(null).asJava)) + assertEquals("[1,2,3]", Json.encodeAsString(Seq(1,2,3).asJava)) + assertEquals("""[1,"2",[3],null]""", Json.encodeAsString(Seq(1,"2",Seq(3).asJava,null).asJava)) + assertEquals("{}", Json.encodeAsString(Map().asJava)) + assertEquals("""{"a":1,"b":2,"c":null}""", Json.encodeAsString(Map("a" -> 1, "b" -> 2, "c" -> null).asJava)) + assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava)) + assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava)) + assertEquals(""""str1\\,str2"""", Json.encodeAsString("""str1\,str2""")) + assertEquals(""""\"quoted\""""", Json.encodeAsString(""""quoted"""")) + } + + @Test + def testEncodeAsBytes() { + assertEquals("null", new String(Json.encodeAsBytes(null), StandardCharsets.UTF_8)) + assertEquals("1", new String(Json.encodeAsBytes(1), StandardCharsets.UTF_8)) + assertEquals("1", new String(Json.encodeAsBytes(1L), StandardCharsets.UTF_8)) + assertEquals("1", new String(Json.encodeAsBytes(1.toByte), StandardCharsets.UTF_8)) + assertEquals("1", new String(Json.encodeAsBytes(1.toShort), StandardCharsets.UTF_8)) + assertEquals("1.0", new String(Json.encodeAsBytes(1.0), StandardCharsets.UTF_8)) + assertEquals(""""str"""", new String(Json.encodeAsBytes("str"), StandardCharsets.UTF_8)) + assertEquals("true", new String(Json.encodeAsBytes(true), StandardCharsets.UTF_8)) + assertEquals("false", new String(Json.encodeAsBytes(false), StandardCharsets.UTF_8)) + assertEquals("[]", new String(Json.encodeAsBytes(Seq().asJava), StandardCharsets.UTF_8)) + assertEquals("[null]", new String(Json.encodeAsBytes(Seq(null).asJava), StandardCharsets.UTF_8)) + assertEquals("[1,2,3]", new String(Json.encodeAsBytes(Seq(1,2,3).asJava), StandardCharsets.UTF_8)) + assertEquals("""[1,"2",[3],null]""", new String(Json.encodeAsBytes(Seq(1,"2",Seq(3).asJava,null).asJava), StandardCharsets.UTF_8)) + assertEquals("{}", new String(Json.encodeAsBytes(Map().asJava), StandardCharsets.UTF_8)) + assertEquals("""{"a":1,"b":2,"c":null}""", new String(Json.encodeAsBytes(Map("a" -> 1, "b" -> 2, "c" -> null).asJava), StandardCharsets.UTF_8)) + assertEquals("""{"a":[1,2],"c":[3,4]}""", new String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava), StandardCharsets.UTF_8)) + assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", new String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" -> null).asJava), StandardCharsets.UTF_8)) + assertEquals(""""str1\\,str2"""", new String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8)) + assertEquals(""""\"quoted\""""", new String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8)) } } diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index 987160d8b62..05f8379b0bf 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition import org.junit.Assert._ import org.junit.{Before, Test} import org.easymock.EasyMock +import scala.collection.JavaConverters._ class ReplicationUtilsTest extends ZooKeeperTestHarness { private val zkVersion = 1 @@ -34,8 +35,8 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness { private val controllerEpoch = 1 private val isr = List(1, 2) private val topicPath = s"/brokers/topics/$topic/partitions/$partition/state" - private val topicData = Json.encode(Map("controller_epoch" -> controllerEpoch, "leader" -> leader, - "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr)) + private val topicData = Json.encodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader, + "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr).asJava) @Before override def setUp() { diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala index ecd07068f74..9f78124d031 100755 --- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala @@ -96,7 +96,7 @@ class ZkUtilsTest extends ZooKeeperTestHarness { val controllerEpoch = 1 val isr = List(1, 2) val topicPath = s"/brokers/topics/$topic/partitions/$partition/state" - val topicData = Json.encode(Map("controller_epoch" -> controllerEpoch, "leader" -> leader, + val topicData = Json.legacyEncodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader, "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr)) zkUtils.createPersistentPath(topicPath, topicData) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 24362418b3a..6f30e7adec1 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -73,7 +73,7 @@ versions += [ zkclient: "0.10", zookeeper: "3.4.10", jfreechart: "1.0.0", - mavenArtifact: "3.5.0", + mavenArtifact: "3.5.0" ] libs += [