Browse Source

KAFKA-5894; add the notion of max inflight requests to async ZooKeeperClient

ZooKeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. We want to add the notion of max inflight requests to the client for several reasons:
1. to bound memory overhead associated with async requests on the client.
2. to not overwhelm the zookeeper ensemble with a burst of requests.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ted Yu <yuzhihong@gmail.com>, Jun Rao <junrao@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #3860 from onurkaraman/KAFKA-5894
pull/3860/merge
Onur Karaman 7 years ago
parent
commit
58138126ce
  1. 6
      core/src/main/scala/kafka/server/KafkaConfig.scala
  2. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  3. 23
      core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
  4. 1
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  5. 6
      core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
  6. 84
      core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala

6
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -44,6 +44,7 @@ object Defaults { @@ -44,6 +44,7 @@ object Defaults {
val ZkSessionTimeoutMs = 6000
val ZkSyncTimeMs = 2000
val ZkEnableSecureAcls = false
val ZkMaxInFlightRequests = 10
/** ********* General Configuration ***********/
val BrokerIdGenerationEnable = true
@ -231,6 +232,7 @@ object KafkaConfig { @@ -231,6 +232,7 @@ object KafkaConfig {
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
val ZkEnableSecureAclsProp = "zookeeper.set.acl"
val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
val MaxReservedBrokerIdProp = "reserved.broker.max.id"
@ -418,6 +420,7 @@ object KafkaConfig { @@ -418,6 +420,7 @@ object KafkaConfig {
val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used"
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to Zookeeper before blocking."
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@ -695,6 +698,7 @@ object KafkaConfig { @@ -695,6 +698,7 @@ object KafkaConfig {
.define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
.define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
.define(ZkMaxInFlightRequestsProp, INT, Defaults.ZkMaxInFlightRequests, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
/** ********* General Configuration ***********/
.define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc)
@ -916,7 +920,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra @@ -916,7 +920,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -222,7 +222,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -222,7 +222,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, new StateChangeHandler {
config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, new StateChangeHandler {
override def onReconnectionTimeout(): Unit = {
error("Reconnection timeout.")
}

23
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.zookeeper
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, Semaphore, TimeUnit}
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
@ -37,9 +37,13 @@ import scala.collection.JavaConverters._ @@ -37,9 +37,13 @@ import scala.collection.JavaConverters._
* @param connectString comma separated host:port pairs, each corresponding to a zk server
* @param sessionTimeoutMs session timeout in milliseconds
* @param connectionTimeoutMs connection timeout in milliseconds
* @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
* @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
*/
class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
class ZooKeeperClient(connectString: String,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
stateChangeHandler: StateChangeHandler) extends Logging {
this.logIdent = "[ZooKeeperClient] "
private val initializationLock = new ReentrantReadWriteLock()
@ -47,6 +51,7 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi @@ -47,6 +51,7 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
private val inFlightRequests = new Semaphore(maxInFlightRequests)
info(s"Initializing a new session to $connectString.")
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
@ -81,9 +86,17 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi @@ -81,9 +86,17 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
requests.foreach { request =>
send(request) { response =>
responseQueue.add(response)
countDownLatch.countDown()
inFlightRequests.acquire()
try {
send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
}
} catch {
case e: Throwable =>
inFlightRequests.release()
throw e
}
}
countDownLatch.await()

1
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -539,6 +539,7 @@ class KafkaConfigTest { @@ -539,6 +539,7 @@ class KafkaConfigTest {
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

6
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package kafka.zk
import kafka.common.TopicAndPartition
import kafka.utils.ZkUtils
import kafka.server.Defaults
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.TopicPartition
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@ -36,7 +36,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -36,7 +36,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Before
override def setUp() {
super.setUp()
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Defaults.ZkMaxInFlightRequests, null)
zkClient = new KafkaZkClient(zooKeeperClient, false)
}
@ -100,7 +100,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -100,7 +100,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
// create a topic path
zkClient.createRecursive(ZkUtils.getTopicPath(topic))
zkClient.createRecursive(TopicZNode.path(topic))
val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))

84
core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala

@ -19,7 +19,8 @@ package kafka.zookeeper @@ -19,7 +19,8 @@ package kafka.zookeeper
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
import javax.security.auth.login.Configuration
import kafka.zk.ZooKeeperTestHarness
@ -41,23 +42,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -41,23 +42,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test(expected = classOf[UnknownHostException])
def testUnresolvableConnectString(): Unit = {
new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, null)
}
@Test(expected = classOf[ZooKeeperClientTimeoutException])
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()
new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, null)
}
@Test
def testConnection(): Unit = {
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
}
@Test
def testDeleteNonExistentZNode(): Unit = {
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
}
@ -65,7 +66,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -65,7 +66,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testDeleteExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
@ -74,7 +75,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -74,7 +75,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testExistsNonExistentZNode(): Unit = {
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
}
@ -82,7 +83,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -82,7 +83,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testExistsExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
@ -91,7 +92,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -91,7 +92,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetDataNonExistentZNode(): Unit = {
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
}
@ -100,7 +101,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -100,7 +101,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
def testGetDataExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val data = bytes
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@ -111,7 +112,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -111,7 +112,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testSetDataNonExistentZNode(): Unit = {
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
}
@ -120,7 +121,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -120,7 +121,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
def testSetDataExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val data = bytes
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@ -133,7 +134,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -133,7 +134,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetAclNonExistentZNode(): Unit = {
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
}
@ -141,7 +142,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -141,7 +142,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetAclExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
@ -152,14 +153,14 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -152,14 +153,14 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testSetAclNonExistentZNode(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
}
@Test
def testGetChildrenNonExistentZNode(): Unit = {
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
}
@ -167,7 +168,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -167,7 +168,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetChildrenExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@ -183,7 +184,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -183,7 +184,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
val child2 = "child2"
val child1Path = mockPath + "/" + child1
val child2Path = mockPath + "/" + child2
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@ -202,7 +203,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -202,7 +203,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testPipelinedGetData(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val createResponses = createRequests.map(zooKeeperClient.handleRequest)
createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
@ -219,7 +220,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -219,7 +220,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testMixedPipeline(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@ -234,7 +235,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -234,7 +235,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForCreation(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleCreation(): Unit = {
@ -255,7 +256,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -255,7 +256,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForDeletion(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleDeletion(): Unit = {
@ -278,7 +279,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -278,7 +279,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForDataChange(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleDataChange(): Unit = {
@ -301,7 +302,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -301,7 +302,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChildChangeHandlerForChildChange(): Unit = {
import scala.collection.JavaConverters._
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
override def handleChildChange(): Unit = {
@ -331,9 +332,42 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -331,9 +332,42 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
stateChangeHandlerCountDownLatch.countDown()
}
}
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, stateChangeHandler)
assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
def testConnectionLossRequestTermination(): Unit = {
val batchSize = 10
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, null)
zookeeper.shutdown()
val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
val countDownLatch = new CountDownLatch(1)
val running = new AtomicBoolean(true)
val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
val requestThread = new Thread {
override def run(): Unit = {
while (running.get()) {
val responses = zooKeeperClient.handleRequests(requests)
val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
responses.foreach(unexpectedResponses.add)
if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
running.set(false)
}
countDownLatch.countDown()
}
}
requestThread.start()
val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
if (!requestThreadTerminated) {
running.set(false)
requestThread.join(5000)
fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
} else if (!unexpectedResponses.isEmpty) {
fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
}
}
private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
}

Loading…
Cancel
Save