Browse Source

MINOR: Remove unused TopicAndPartition and remove unused symbols (#7119)

With the removal of ZkUtils and AdminUtils, TopicAndPartition is finally
unused.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
pull/7129/head
Ismael Juma 5 years ago committed by GitHub
parent
commit
acf507ce9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/src/main/scala/kafka/admin/AclCommand.scala
  2. 30
      core/src/main/scala/kafka/common/TopicAndPartition.scala
  3. 2
      core/src/main/scala/kafka/controller/KafkaController.scala
  4. 2
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  5. 2
      core/src/main/scala/kafka/log/LogCleanerManager.scala
  6. 5
      core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
  7. 2
      core/src/main/scala/kafka/server/KafkaApis.scala
  8. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala
  9. 2
      core/src/main/scala/kafka/zk/KafkaZkClient.scala

4
core/src/main/scala/kafka/admin/AclCommand.scala

@ -280,9 +280,9 @@ object AclCommand extends Logging { @@ -280,9 +280,9 @@ object AclCommand extends Logging {
private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter,
listPrincipal: Option[KafkaPrincipal] = None): Map[Resource, Set[Acl]] =
if (listPrincipal.isEmpty)
authorizer.getAcls().filter { case (resource, acl) => filter.matches(resource.toPattern) }
authorizer.getAcls().filter { case (resource, _) => filter.matches(resource.toPattern) }
else
authorizer.getAcls(listPrincipal.get).filter { case (resource, acl) => filter.matches(resource.toPattern) }
authorizer.getAcls(listPrincipal.get).filter { case (resource, _) => filter.matches(resource.toPattern) }
}

30
core/src/main/scala/kafka/common/TopicAndPartition.scala

@ -1,30 +0,0 @@ @@ -1,30 +0,0 @@
package kafka.common
import org.apache.kafka.common.TopicPartition
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Convenience case class since (topic, partition) pairs are ubiquitous.
*/
case class TopicAndPartition(topic: String, partition: Int) {
def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
override def toString: String = s"$topic-$partition"
}

2
core/src/main/scala/kafka/controller/KafkaController.scala

@ -976,7 +976,7 @@ class KafkaController(val config: KafkaConfig, @@ -976,7 +976,7 @@ class KafkaController(val config: KafkaConfig,
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
true
case (partition, Left(e)) =>
case (_, Left(e)) =>
throw e
}.getOrElse(false)
case None =>

2
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -312,7 +312,7 @@ class GroupMetadataManager(brokerId: Int, @@ -312,7 +312,7 @@ class GroupMetadataManager(brokerId: Int,
// construct the message set to append
if (filteredOffsetMetadata.isEmpty) {
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (k, v) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
responseCallback(commitStatus)
None
} else {

2
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -333,7 +333,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], @@ -333,7 +333,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case None => false
case Some(state) =>
state match {
case LogCleaningPaused(s) =>
case _: LogCleaningPaused =>
true
case _ =>
false

5
core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala

@ -271,15 +271,14 @@ class SimpleAclAuthorizer extends Authorizer with Logging { @@ -271,15 +271,14 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
for (rType <- resourceTypes) {
val resourceType = Try(ResourceType.fromString(rType))
resourceType match {
case Success(resourceTypeObj) => {
case Success(resourceTypeObj) =>
val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj)
for (resourceName <- resourceNames) {
val resource = new Resource(resourceTypeObj, resourceName, store.patternType)
val versionedAcls = getAclsFromZk(resource)
updateCache(resource, versionedAcls)
}
}
case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType")
case Failure(_) => warn(s"Ignoring unknown ResourceType: $rType")
}
}
})

2
core/src/main/scala/kafka/server/KafkaApis.scala

@ -1336,7 +1336,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1336,7 +1336,7 @@ class KafkaApis(val requestChannel: RequestChannel,
private def authorizedOperations(session: RequestChannel.Session, resource: Resource): Int = {
val authorizedOps = authorizer match {
case None => resource.resourceType.supportedOperations
case Some(auth) => resource.resourceType.supportedOperations
case Some(_) => resource.resourceType.supportedOperations
.filter(operation => authorize(session, operation, resource))
}

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -1311,8 +1311,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @@ -1311,8 +1311,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => listenerName }
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => securityProtocol }
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName }
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol }
def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1

2
core/src/main/scala/kafka/zk/KafkaZkClient.scala

@ -1721,7 +1721,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo @@ -1721,7 +1721,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
SetDataOp(path, data, 0)))
)
val stat = response.resultCode match {
case code@ Code.OK =>
case Code.OK =>
val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult]
setDataResult.getStat
case Code.NODEEXISTS =>

Loading…
Cancel
Save