diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d336499be91..e24659c72a6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,6 +44,7 @@ object Defaults { val ZkSessionTimeoutMs = 6000 val ZkSyncTimeMs = 2000 val ZkEnableSecureAcls = false + val ZkMaxInFlightRequests = 10 /** ********* General Configuration ***********/ val BrokerIdGenerationEnable = true @@ -231,6 +232,7 @@ object KafkaConfig { val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms" val ZkSyncTimeMsProp = "zookeeper.sync.time.ms" val ZkEnableSecureAclsProp = "zookeeper.set.acl" + val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests" /** ********* General Configuration ***********/ val BrokerIdGenerationEnableProp = "broker.id.generation.enable" val MaxReservedBrokerIdProp = "reserved.broker.max.id" @@ -418,6 +420,7 @@ object KafkaConfig { val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used" val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader" val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" + val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to Zookeeper before blocking." /** ********* General Configuration ***********/ val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed." val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" @@ -695,6 +698,7 @@ object KafkaConfig { .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc) .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc) .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc) + .define(ZkMaxInFlightRequestsProp, INT, Defaults.ZkMaxInFlightRequests, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc) /** ********* General Configuration ***********/ .define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc) @@ -916,7 +920,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp)) val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp) val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp) - + val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp) /** ********* General Configuration ***********/ val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f8111ff8b93..dff83db50ba 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -222,7 +222,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, new StateChangeHandler { + config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, new StateChangeHandler { override def onReconnectionTimeout(): Unit = { error("Reconnection timeout.") } diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 0ff34c0239d..149e7eb0051 100644 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -18,7 +18,7 @@ package kafka.zookeeper import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit} +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, Semaphore, TimeUnit} import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} import kafka.utils.Logging @@ -37,9 +37,13 @@ import scala.collection.JavaConverters._ * @param connectString comma separated host:port pairs, each corresponding to a zk server * @param sessionTimeoutMs session timeout in milliseconds * @param connectionTimeoutMs connection timeout in milliseconds + * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread. */ -class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, +class ZooKeeperClient(connectString: String, + sessionTimeoutMs: Int, + connectionTimeoutMs: Int, + maxInFlightRequests: Int, stateChangeHandler: StateChangeHandler) extends Logging { this.logIdent = "[ZooKeeperClient] " private val initializationLock = new ReentrantReadWriteLock() @@ -47,6 +51,7 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition() private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala + private val inFlightRequests = new Semaphore(maxInFlightRequests) info(s"Initializing a new session to $connectString.") @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) @@ -81,9 +86,17 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => - send(request) { response => - responseQueue.add(response) - countDownLatch.countDown() + inFlightRequests.acquire() + try { + send(request) { response => + responseQueue.add(response) + inFlightRequests.release() + countDownLatch.countDown() + } + } catch { + case e: Throwable => + inFlightRequests.release() + throw e } } countDownLatch.await() diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index fabd9f75f42..9c459d839e3 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -539,6 +539,7 @@ class KafkaConfigTest { case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean") + case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 8d064f8bcb6..77ac748dc04 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -17,7 +17,7 @@ package kafka.zk import kafka.common.TopicAndPartition -import kafka.utils.ZkUtils +import kafka.server.Defaults import kafka.zookeeper.ZooKeeperClient import org.apache.kafka.common.TopicPartition import org.junit.Assert.{assertEquals, assertFalse, assertTrue} @@ -36,7 +36,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Defaults.ZkMaxInFlightRequests, null) zkClient = new KafkaZkClient(zooKeeperClient, false) } @@ -100,7 +100,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty) // create a topic path - zkClient.createRecursive(ZkUtils.getTopicPath(topic)) + zkClient.createRecursive(TopicZNode.path(topic)) val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]() assignment.put(new TopicAndPartition(topic, 0), Seq(0,1)) diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index d5952214267..50a065f0d64 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -19,7 +19,8 @@ package kafka.zookeeper import java.net.UnknownHostException import java.nio.charset.StandardCharsets import java.util.UUID -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit} import javax.security.auth.login.Configuration import kafka.zk.ZooKeeperTestHarness @@ -41,23 +42,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test(expected = classOf[UnknownHostException]) def testUnresolvableConnectString(): Unit = { - new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null) + new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, null) } @Test(expected = classOf[ZooKeeperClientTimeoutException]) def testConnectionTimeout(): Unit = { zookeeper.shutdown() - new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null) + new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, null) } @Test def testConnection(): Unit = { - new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) } @Test def testDeleteNonExistentZNode(): Unit = { - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1)) assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode) } @@ -65,7 +66,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testDeleteExistingZNode(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1)) @@ -74,7 +75,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testExistsNonExistentZNode(): Unit = { - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath)) assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode) } @@ -82,7 +83,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testExistsExistingZNode(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath)) @@ -91,7 +92,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testGetDataNonExistentZNode(): Unit = { - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath)) assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode) } @@ -100,7 +101,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { def testGetDataExistingZNode(): Unit = { import scala.collection.JavaConverters._ val data = bytes - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) @@ -111,7 +112,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testSetDataNonExistentZNode(): Unit = { - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode) } @@ -120,7 +121,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { def testSetDataExistingZNode(): Unit = { import scala.collection.JavaConverters._ val data = bytes - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) @@ -133,7 +134,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testGetAclNonExistentZNode(): Unit = { - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath)) assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode) } @@ -141,7 +142,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testGetAclExistingZNode(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath)) @@ -152,14 +153,14 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testSetAclNonExistentZNode(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1)) assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode) } @Test def testGetChildrenNonExistentZNode(): Unit = { - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath)) assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode) } @@ -167,7 +168,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testGetChildrenExistingZNode(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) @@ -183,7 +184,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { val child2 = "child2" val child1Path = mockPath + "/" + child1 val child2Path = mockPath + "/" + child2 - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) @@ -202,7 +203,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testPipelinedGetData(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) val createResponses = createRequests.map(zooKeeperClient.handleRequest) createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)) @@ -219,7 +220,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testMixedPipeline(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) @@ -234,7 +235,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testZNodeChangeHandlerForCreation(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) val zNodeChangeHandler = new ZNodeChangeHandler { override def handleCreation(): Unit = { @@ -255,7 +256,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testZNodeChangeHandlerForDeletion(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) val zNodeChangeHandler = new ZNodeChangeHandler { override def handleDeletion(): Unit = { @@ -278,7 +279,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testZNodeChangeHandlerForDataChange(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) val zNodeChangeHandler = new ZNodeChangeHandler { override def handleDataChange(): Unit = { @@ -301,7 +302,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testZNodeChildChangeHandlerForChildChange(): Unit = { import scala.collection.JavaConverters._ - val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null) val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1) val zNodeChildChangeHandler = new ZNodeChildChangeHandler { override def handleChildChange(): Unit = { @@ -331,9 +332,42 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { stateChangeHandlerCountDownLatch.countDown() } } - new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler) + new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, stateChangeHandler) assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) } + @Test + def testConnectionLossRequestTermination(): Unit = { + val batchSize = 10 + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, null) + zookeeper.shutdown() + val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i")) + val countDownLatch = new CountDownLatch(1) + val running = new AtomicBoolean(true) + val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize) + val requestThread = new Thread { + override def run(): Unit = { + while (running.get()) { + val responses = zooKeeperClient.handleRequests(requests) + val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS) + if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS)) + responses.foreach(unexpectedResponses.add) + if (!unexpectedResponses.isEmpty || suffix.nonEmpty) + running.set(false) + } + countDownLatch.countDown() + } + } + requestThread.start() + val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS) + if (!requestThreadTerminated) { + running.set(false) + requestThread.join(5000) + fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.") + } else if (!unexpectedResponses.isEmpty) { + fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses") + } + } + private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8) }