diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index f918b616024..760bd67299d 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -429,8 +429,13 @@ object PreferredReplicaElectionZNode { }.map(_.toSet).getOrElse(Set.empty) } +//old consumer path znode +object ConsumerPathZNode { + def path = "/consumers" +} + object ConsumerOffset { - def path(group: String, topic: String, partition: Integer) = s"/consumers/${group}/offsets/${topic}/${partition}" + def path(group: String, topic: String, partition: Integer) = s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}" def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8) def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new String(_, UTF_8).toLong) } @@ -721,7 +726,7 @@ object ZkData { // These are persistent ZK paths that should exist on kafka broker startup. val PersistentZkPaths = Seq( - "/consumers", // old consumer path + ConsumerPathZNode.path, // old consumer path BrokerIdsZNode.path, TopicsZNode.path, ConfigEntityChangeNotificationZNode.path, @@ -743,7 +748,8 @@ object ZkData { } def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = { - if (isSecure) { + //Old Consumer path is kept open as different consumers will write under this node. + if (!ConsumerPathZNode.path.equals(path) && isSecure) { val acls = new ArrayBuffer[ACL] acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala if (!sensitivePath(path)) diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 19fa19dafbc..1cdbe4b2a0e 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -19,10 +19,10 @@ package kafka.security.auth import kafka.admin.ZkSecurityMigrator import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} -import kafka.zk.ZooKeeperTestHarness +import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.security.JaasUtils -import org.apache.zookeeper.data.ACL +import org.apache.zookeeper.data.{ACL, Stat} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -304,4 +304,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { } } } + + @Test + def testConsumerOffsetPathAcls(): Unit = { + zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path) + + val consumerPathAcls = zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat()) + assertTrue("old consumer znode path acls are not open", consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure)) + } }