Browse Source

KAFKA-1664 Kafka does not properly parse multiple ZK nodes with non-root chroot; reviewed by Neha Narkhede and Jun Rao

pull/51/head
Ashish Singh 10 years ago committed by Neha Narkhede
parent
commit
b56f5973c7
  1. 46
      core/src/main/scala/kafka/utils/ZkUtils.scala
  2. 147
      core/src/test/scala/unit/kafka/zk/ZKPathTest.scala

46
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
ZkMarshallingError, ZkBadVersionException} ZkMarshallingError, ZkBadVersionException}
import org.I0Itec.zkclient.serialize.ZkSerializer import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.kafka.common.config.ConfigException
import collection._ import collection._
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import org.apache.zookeeper.data.Stat import org.apache.zookeeper.data.Stat
@ -212,7 +213,7 @@ object ZkUtils extends Logging {
*/ */
def makeSurePersistentPathExists(client: ZkClient, path: String) { def makeSurePersistentPathExists(client: ZkClient, path: String) {
if (!client.exists(path)) if (!client.exists(path))
client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException new ZkPath(client).createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
} }
/** /**
@ -220,20 +221,22 @@ object ZkUtils extends Logging {
*/ */
private def createParentPath(client: ZkClient, path: String): Unit = { private def createParentPath(client: ZkClient, path: String): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/')) val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0) if (parentDir.length != 0) {
client.createPersistent(parentDir, true) new ZkPath(client).createPersistent(parentDir, true)
}
} }
/** /**
* Create an ephemeral node with the given path and data. Create parents if necessary. * Create an ephemeral node with the given path and data. Create parents if necessary.
*/ */
private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = { private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
val zkPath = new ZkPath(client)
try { try {
client.createEphemeral(path, data) zkPath.createEphemeral(path, data)
} catch { } catch {
case e: ZkNoNodeException => { case e: ZkNoNodeException => {
createParentPath(client, path) createParentPath(client, path)
client.createEphemeral(path, data) zkPath.createEphemeral(path, data)
} }
} }
} }
@ -312,18 +315,19 @@ object ZkUtils extends Logging {
* Create an persistent node with the given path and data. Create parents if necessary. * Create an persistent node with the given path and data. Create parents if necessary.
*/ */
def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
val zkPath = new ZkPath(client)
try { try {
client.createPersistent(path, data) zkPath.createPersistent(path, data)
} catch { } catch {
case e: ZkNoNodeException => { case e: ZkNoNodeException => {
createParentPath(client, path) createParentPath(client, path)
client.createPersistent(path, data) zkPath.createPersistent(path, data)
} }
} }
} }
def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
client.createPersistentSequential(path, data) new ZkPath(client).createPersistentSequential(path, data)
} }
/** /**
@ -338,7 +342,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException => { case e: ZkNoNodeException => {
createParentPath(client, path) createParentPath(client, path)
try { try {
client.createPersistent(path, data) new ZkPath(client).createPersistent(path, data)
} catch { } catch {
case e: ZkNodeExistsException => case e: ZkNodeExistsException =>
client.writeData(path, data) client.writeData(path, data)
@ -409,7 +413,7 @@ object ZkUtils extends Logging {
} catch { } catch {
case e: ZkNoNodeException => { case e: ZkNoNodeException => {
createParentPath(client, path) createParentPath(client, path)
client.createEphemeral(path, data) new ZkPath(client).createEphemeral(path, data)
} }
case e2: Throwable => throw e2 case e2: Throwable => throw e2
} }
@ -806,3 +810,25 @@ class ZKConfig(props: VerifiableProperties) {
/** how far a ZK follower can be behind a ZK leader */ /** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000) val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
} }
class ZkPath(client: ZkClient) {
if (!client.exists("/")) {
throw new ConfigException("Zookeeper namespace does not exist")
}
def createPersistent(path: String, data: Object) {
client.createPersistent(path, data)
}
def createPersistent(path: String, createParents: Boolean) {
client.createPersistent(path, createParents)
}
def createEphemeral(path: String, data: Object) {
client.createEphemeral(path, data)
}
def createPersistentSequential(path: String, data: Object): String = {
client.createPersistentSequential(path, data)
}
}

147
core/src/test/scala/unit/kafka/zk/ZKPathTest.scala

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.zk
import junit.framework.Assert
import kafka.consumer.ConsumerConfig
import kafka.utils.{TestUtils, ZKStringSerializer, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.config.ConfigException
import org.scalatest.junit.JUnit3Suite
class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
val path: String = "/some_dir"
val zkSessionTimeoutMs = 1000
val zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.createPersistentPath(zkClient, path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
case configException: ConfigException =>
case exception: Throwable => fail("Should have thrown ConfigException")
}
}
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.createPersistentPath(zkClient, path)
} catch {
case exception: Throwable => fail("Failed to create persistent path")
}
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
}
def testMakeSurePersistsPathExistsThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.makeSurePersistentPathExists(zkClient, path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
case configException: ConfigException =>
case exception: Throwable => fail("Should have thrown ConfigException")
}
}
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.makeSurePersistentPathExists(zkClient, path)
} catch {
case exception: Throwable => fail("Failed to create persistent path")
}
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
}
def testCreateEphemeralPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
case configException: ConfigException =>
case exception: Throwable => fail("Should have thrown ConfigException")
}
}
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
} catch {
case exception: Throwable => fail("Failed to create ephemeral path")
}
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path));
}
def testCreatePersistentSequentialThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
ZKStringSerializer)
try {
ZkUtils.createSequentialPersistentPath(zkClient, path)
fail("Failed to throw ConfigException for missing zookeeper root node")
} catch {
case configException: ConfigException =>
case exception: Throwable => fail("Should have thrown ConfigException")
}
}
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
ZKStringSerializer)
var actualPath: String = ""
try {
actualPath = ZkUtils.createSequentialPersistentPath(zkClient, path)
} catch {
case exception: Throwable => fail("Failed to create persistent path")
}
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath));
}
}
Loading…
Cancel
Save