Browse Source

KAFKA-7287: Set open ACL for old consumer znode path (#5503)

Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
pull/5584/head
Manikumar Reddy O 6 years ago committed by Jun Rao
parent
commit
9e23af3115
  1. 12
      core/src/main/scala/kafka/zk/ZkData.scala
  2. 12
      core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala

12
core/src/main/scala/kafka/zk/ZkData.scala

@ -429,8 +429,13 @@ object PreferredReplicaElectionZNode { @@ -429,8 +429,13 @@ object PreferredReplicaElectionZNode {
}.map(_.toSet).getOrElse(Set.empty)
}
//old consumer path znode
object ConsumerPathZNode {
def path = "/consumers"
}
object ConsumerOffset {
def path(group: String, topic: String, partition: Integer) = s"/consumers/${group}/offsets/${topic}/${partition}"
def path(group: String, topic: String, partition: Integer) = s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new String(_, UTF_8).toLong)
}
@ -721,7 +726,7 @@ object ZkData { @@ -721,7 +726,7 @@ object ZkData {
// These are persistent ZK paths that should exist on kafka broker startup.
val PersistentZkPaths = Seq(
"/consumers", // old consumer path
ConsumerPathZNode.path, // old consumer path
BrokerIdsZNode.path,
TopicsZNode.path,
ConfigEntityChangeNotificationZNode.path,
@ -743,7 +748,8 @@ object ZkData { @@ -743,7 +748,8 @@ object ZkData {
}
def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
if (isSecure) {
//Old Consumer path is kept open as different consumers will write under this node.
if (!ConsumerPathZNode.path.equals(path) && isSecure) {
val acls = new ArrayBuffer[ACL]
acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
if (!sensitivePath(path))

12
core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala

@ -19,10 +19,10 @@ package kafka.security.auth @@ -19,10 +19,10 @@ package kafka.security.auth
import kafka.admin.ZkSecurityMigrator
import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.data.ACL
import org.apache.zookeeper.data.{ACL, Stat}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@ -304,4 +304,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { @@ -304,4 +304,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
}
}
}
@Test
def testConsumerOffsetPathAcls(): Unit = {
zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path)
val consumerPathAcls = zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat())
assertTrue("old consumer znode path acls are not open", consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure))
}
}

Loading…
Cancel
Save