diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 8a2fb2d9a42..7ae999ec619 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer +import org.apache.kafka.common.config.ConfigException import collection._ import kafka.api.LeaderAndIsr import org.apache.zookeeper.data.Stat @@ -212,7 +213,7 @@ object ZkUtils extends Logging { */ def makeSurePersistentPathExists(client: ZkClient, path: String) { if (!client.exists(path)) - client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException + new ZkPath(client).createPersistent(path, true) // won't throw NoNodeException or NodeExistsException } /** @@ -220,20 +221,22 @@ object ZkUtils extends Logging { */ private def createParentPath(client: ZkClient, path: String): Unit = { val parentDir = path.substring(0, path.lastIndexOf('/')) - if (parentDir.length != 0) - client.createPersistent(parentDir, true) + if (parentDir.length != 0) { + new ZkPath(client).createPersistent(parentDir, true) + } } /** * Create an ephemeral node with the given path and data. Create parents if necessary. */ private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = { + val zkPath = new ZkPath(client) try { - client.createEphemeral(path, data) + zkPath.createEphemeral(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - client.createEphemeral(path, data) + zkPath.createEphemeral(path, data) } } } @@ -312,18 +315,19 @@ object ZkUtils extends Logging { * Create an persistent node with the given path and data. Create parents if necessary. */ def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { + val zkPath = new ZkPath(client) try { - client.createPersistent(path, data) + zkPath.createPersistent(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) - client.createPersistent(path, data) + zkPath.createPersistent(path, data) } } } def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { - client.createPersistentSequential(path, data) + new ZkPath(client).createPersistentSequential(path, data) } /** @@ -338,7 +342,7 @@ object ZkUtils extends Logging { case e: ZkNoNodeException => { createParentPath(client, path) try { - client.createPersistent(path, data) + new ZkPath(client).createPersistent(path, data) } catch { case e: ZkNodeExistsException => client.writeData(path, data) @@ -409,7 +413,7 @@ object ZkUtils extends Logging { } catch { case e: ZkNoNodeException => { createParentPath(client, path) - client.createEphemeral(path, data) + new ZkPath(client).createEphemeral(path, data) } case e2: Throwable => throw e2 } @@ -806,3 +810,25 @@ class ZKConfig(props: VerifiableProperties) { /** how far a ZK follower can be behind a ZK leader */ val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000) } + +class ZkPath(client: ZkClient) { + if (!client.exists("/")) { + throw new ConfigException("Zookeeper namespace does not exist") + } + + def createPersistent(path: String, data: Object) { + client.createPersistent(path, data) + } + + def createPersistent(path: String, createParents: Boolean) { + client.createPersistent(path, createParents) + } + + def createEphemeral(path: String, data: Object) { + client.createEphemeral(path, data) + } + + def createPersistentSequential(path: String, data: Object): String = { + client.createPersistentSequential(path, data) + } +} diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala new file mode 100644 index 00000000000..9897b2fa8f8 --- /dev/null +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package unit.kafka.zk + +import junit.framework.Assert +import kafka.consumer.ConsumerConfig +import kafka.utils.{TestUtils, ZKStringSerializer, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.config.ConfigException +import org.scalatest.junit.JUnit3Suite + +class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { + + val path: String = "/some_dir" + val zkSessionTimeoutMs = 1000 + val zkConnectWithInvalidRoot: String = zkConnect + "/ghost" + + def testCreatePersistentPathThrowsException { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, + "test", "1")) + var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.createPersistentPath(zkClient, path) + fail("Failed to throw ConfigException for missing zookeeper root node") + } catch { + case configException: ConfigException => + case exception: Throwable => fail("Should have thrown ConfigException") + } + } + + def testCreatePersistentPath { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) + var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.createPersistentPath(zkClient, path) + } catch { + case exception: Throwable => fail("Failed to create persistent path") + } + + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)); + } + + def testMakeSurePersistsPathExistsThrowsException { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, + "test", "1")) + var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.makeSurePersistentPathExists(zkClient, path) + fail("Failed to throw ConfigException for missing zookeeper root node") + } catch { + case configException: ConfigException => + case exception: Throwable => fail("Should have thrown ConfigException") + } + } + + def testMakeSurePersistsPathExists { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) + var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.makeSurePersistentPathExists(zkClient, path) + } catch { + case exception: Throwable => fail("Failed to create persistent path") + } + + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)); + } + + def testCreateEphemeralPathThrowsException { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, + "test", "1")) + var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata") + fail("Failed to throw ConfigException for missing zookeeper root node") + } catch { + case configException: ConfigException => + case exception: Throwable => fail("Should have thrown ConfigException") + } + } + + def testCreateEphemeralPathExists { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) + var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata") + } catch { + case exception: Throwable => fail("Failed to create ephemeral path") + } + + Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)); + } + + def testCreatePersistentSequentialThrowsException { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, + "test", "1")) + var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + ZKStringSerializer) + try { + ZkUtils.createSequentialPersistentPath(zkClient, path) + fail("Failed to throw ConfigException for missing zookeeper root node") + } catch { + case configException: ConfigException => + case exception: Throwable => fail("Should have thrown ConfigException") + } + } + + def testCreatePersistentSequentialExists { + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) + var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, + ZKStringSerializer) + + var actualPath: String = "" + try { + actualPath = ZkUtils.createSequentialPersistentPath(zkClient, path) + } catch { + case exception: Throwable => fail("Failed to create persistent path") + } + + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath)); + } +}