This brought down a cluster by causing continuous controller moves.
ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects that aren't thread-safe:
* Selector
* NetworkClient
* SSLEngine (this was the big one for us. We turn on SSL for interbroker communication).
As per the "Concurrency Notes" section from https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html:
> two threads must not attempt to call the same method (either wrap() or unwrap()) concurrently
SSLEngine.wrap gets called in:
* SslTransportLayer.write
* SslTransportLayer.handshake
* SslTransportLayer.close
It turns out that the ZkEventThread and RequestSendThread can concurrently call SSLEngine.wrap:
* ZkEventThread calls SslTransportLayer.close from ControllerChannelManager.removeExistingBroker
* RequestSendThread can call SslTransportLayer.write or SslTransportLayer.handshake from NetworkClient.poll
Suppose the controller moves for whatever reason. The former controller could have had a RequestSendThread who
was in the middle of sending out messages to the cluster while the ZkEventThread began executing
KafkaController.onControllerResignation, which calls ControllerChannelManager.shutdown, which sequentially cleans
up the controller-to-broker queue and connection for every broker in the cluster. This cleanup includes the call
to ControllerChannelManager.removeExistingBroker as mentioned earlier, causing the concurrent call to SSLEngine.wrap.
This concurrent call throws a BufferOverflowException which ControllerChannelManager.removeExistingBroker catches so
the ControllerChannelManager.shutdown moves onto cleaning up the next controller-to-broker queue and connection,
skipping the cleanup steps such as clearing the queue, stopping the RequestSendThread, and removing the entry from its
brokerStateInfo map.
By failing out of the Selector.close, the sensors corresponding to the broker connection has not been cleaned up. Any
later attempt at initializing an identical Selector will result in a sensor collision and therefore cause Selector
initialization to throw an exception. In other words, any later attempts by this broker to become controller again
will fail on initialization. When controller initialization fails, the controller deletes the /controller znode and
lets another broker take over.
Now suppose the controller moves enough times such that every broker hits the BufferOverflowException concurrency
issue. We're now guaranteed to fail controller initialization due to the sensor collision on every controller
transition, so the controller will move across brokers continuously.
This patch avoids the concurrent use of non-threadsafe classes in ControllerChannelManager.removeExistingBroker
by shutting down the RequestSendThread before closing the NetworkClient.
Author: Onur Karaman <okaraman@linkedin.com>
Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2746 from onurkaraman/KAFKA-4959
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin <becket.qin@gmail.com>
Closes#2476 from lindong28/KAFKA-4586
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Jun Rao <junrao@gmail.com>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2614 from hachikuji/exactly-once-message-format
This uses JUnit Categories to identify integration tests. Adds 2 new build targets:
`integrationTest` and `unitTest`.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska <eno@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Closes#2695 from dguy/junit-categories
Fixes related to handling of MAX_POLL_INTERVAL_MS_CONFIG during deadlock and CommitFailedException on partition revoked.
Author: Sachin Mittal <sjmittal@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes#2642 from sjmittal/trunk
This problem is hard to debug otherwise as there error
returned to the client (“Coordinator not available”) is not
very informative.
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2652 from enothereska/minor-warning-enforcement-offset
If request logging is enabled, `ProduceRequest` can be accessed
and mutated concurrently from a network thread (which calls
`toString`) and a request handler thread (which calls
`clearPartitionRecords()`).
That can lead to a `ConcurrentModificationException` when iterating
the `partitionRecords` map.
The underlying thread-safety issue has existed since the server
started using the Java implementation of ProduceRequest in 0.10.0.
However, we were incorrectly not clearing the underlying struct until
0.10.2, so `toString` itself was thread-safe until that change. In 0.10.2,
`toString` is no longer thread-safe and we could potentially see a
`NullPointerException` given the right set of interleavings between
`toString` and `clearPartitionRecords` although we haven't seen that
happen yet.
In trunk, we changed the requests to have a `toStruct` method
instead of creating a struct in the constructor and `toString` was
no longer printing the contents of the `Struct`. This accidentally
fixed the race condition, but it meant that request logging was less
useful.
A couple of days ago, `AbstractRequest.toString` was changed to
print the contents of the request by calling `toStruct().toString()`
and reintroduced the race condition. The impact is more visible
because we iterate over a `HashMap`, which proactively
checks for concurrent modification (unlike arrays).
We will need a separate PR for 0.10.2.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Onur Karaman <okaraman@linkedin.com>, Jun Rao <junrao@gmail.com>
Closes#2689 from ijuma/produce-request-thread-safety
The record should be created with CreateTime (like in the producer). The conversion to
LogAppendTime happens automatically (if necessary).
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2657 from ijuma/kafka-4861-log-append-time-breaks-group-data-manager
Increase the reliability of the one temporal comparison in ReassignPartitionsClusterTest by imposing a delay after ZK is updated. This should be more reliable than just increasing the amount of data.
This relates to a previous PR: https://github.com/apache/kafka/pull/1982
Author: Ben Stopford <benstopford@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#1997 from benstopford/KAFKA-4266
A couple of updates were missed in the [PR](https://github.com/apache/kafka/pull/2475) that replaced the use of error codes with Errors objects.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2635 from vahidhashemian/minor/Errors_refactoring_leftover
* Turned off Nagle on the sending sockets to force the socket to physically acknowledge after the first write in `sendRequest`
* Added a `200ms` delay between write attempts (necessary on Linux, but not Mac)
Author: Armin Braun <armin.braun@1und1.de>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2632 from original-brownbear/KAFKA-3182
This applies to new-consumer based groups and would avoid scenarios in which user issues a `--describe` query while the group is initializing.
Example: The following could occur for a newly created group.
```
kafkakafka:~/workspace/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Error: Executing consumer group command failed due to The group coordinator is not available.
```
With this PR the group is queried repeatedly at specific intervals within a preset (and configurable) timeout `group-init-timeout` to circumvent unfortunate situations like above.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2538 from vahidhashemian/KAFKA-2857
If leader node of one more more partitions in a consumer subscription are temporarily unavailable, request metadata refresh so that partitions skipped for assignment dont have to wait for metadata expiry before reassignment. Metadata refresh is also requested if a subscribe topic or assigned partition doesn't exist.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#2622 from rajinisivaram/KAFKA-4631
Have fetcherThreadMap keyed off brokerId + fetcherId instead of broker + fetcherId, but did not consider the case where port is changed.
Author: huxi <huxi@zhenrongbao.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#2606 from amethystic/kafka4811_ReplicaFetchThread_fail_create
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2303 from mjsax/licenseHeader
It’s a simple matter of creating the internal topic before trying to send
to it. Otherwise, we could get an `UnknownTopicOrPartitionException`
in some cases.
Without the change, I could reproduce a failure in less than
5 runs. With the change, 30 consecutive runs succeeded.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#2584 from ijuma/test-cannot-send-to-internal-topic-transient-failure
It contained this step:
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown && shutdownLatch.getCount > 0) {
without any fallback for the case of `shutdownLatch.getCount == 0`. So in the case
of `shutdownLatch.getCount == 0` (when a previous call to the shutdown method
was right about to finish) you would set `isShuttingDown` to true again without any
possibility of ever getting the server started (since `startup` will check
`isShuttingDown` before setting up a new latch with count 1).
Long story short: concurrent calls to shutdown can get the server locked in a broken state.
This fixes the reported error:
java.lang.IllegalStateException: Kafka server is still shutting down, cannot re-start!
at kafka.server.KafkaServer.startup(KafkaServer.scala:184)
at kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117)
at kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
at kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116)
at kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34)
at kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158)
Author: Armin Braun <me@obrown.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2568 from original-brownbear/KAFKA-4198
The intent is good, but it needs to take into account broker configs as well.
See KAFKA-4788 for more details.
This reverts commit 4ca5abe8ee.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#2588 from ijuma/kafka-4788
Also move `requireTimestamp` to `minVersion` logic from `Fetcher` to
`ListOffsetRequest.Builder.forConsumer()`.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Jason Gustafson <jason@confluent.io>
Closes#2580 from ijuma/move-proto-utils-to-api-keys
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2489 from cmccabe/KAFKA-4708
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Closes#2543 from hachikuji/remove-message-writer
Fixed ClassCastException resulting from missing type hint in request logging.
Author: Armin Braun <me@obrown.io>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2571 from original-brownbear/fix-logging-err-response
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Ewen Cheslack-Postava <me@ewencp.org>
Closes#2566 from hachikuji/hotfix-request-logging
More details:
* Replaced `struct` field in Request/Response with a `toStruct` method. This
makes the performance model (including memory usage) easier to understand.
Note that requests have `toStruct()` while responses have `toStruct(version)`.
* Replaced mutable `version` field in `Request.Builder` with an immutable
field `desiredVersion` and a `version` parameter passed to the `build` method.
* Optimised `handleFetchRequest` to avoid unnecessary creation of `Struct`
instances (from 4 to 2 in the worst case and 2 to 1 in the best case).
* Various clean-ups in request/response classes and their test. In particular,
it is now clear what we are testing. Previously, it looked like we were testing
more than we really were.
With this in place, we could remove `AbstractRequest.Builder` in the future by
doing the following:
* Change `AbstractRequest.toStruct` to accept a version (like responses).
* Change `AbstractRequest.version` to be `desiredVersion` (like `Builder`).
* Change `ClientRequest` to take `AbstractRequest`.
* Move validation from the `build` methods to the request constructors or
static factory methods.
* Anything else required for the code to compile again.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#2513 from ijuma/separate-struct
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2544 from becketqin/KAFKA-4340_follow_up
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
Closes#2545 from hachikuji/KAFKA-4761
…porter.configure
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#2540 from cmccabe/KAFKA-4756
`ConsumerGroup` should be `Group`.
Author: Dustin Cote <dustin@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2534 from cotedm/acl-znode-correction
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#2475 from vahidhashemian/minor/use_explicit_Errors_type_when_possible
Added general explanation of the tool and what it does. Also added few details to the arguments.
Author: Gwen Shapira <cshapi@gmail.com>
Reviewers: Matthias J. Sax, Michael G. Noll, Guozhang Wang
Closes#2503 from gwenshap/KAFKA-4733
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#2501 from becketqin/KAFKA-4734
In https://issues.apache.org/jira/browse/KAFKA-4521 we fixed a potential message reorder bug in MM. However, the patch introduced another bug that can cause deadlock during MM shutdown. The deadlock will happen if zookeeper listener thread call requestAndWaitForCommit() after MirrorMaker thread has already exited loop of consuming and producing messages.
This patch fixes the problem by setting `iter` to `null` in `MirrorMakerOldConsumer.cleanup()`. If zookeeper listener thread calls `requestAndWaitForCommit()` after `cleanup()`, then it will not block waiting for commit notification since `iter == null`. If zookeeper listener thread calls `requestAndWaitForCommit()` before `cleanup()`, then `cleanup()` will call `notifyAll()` to unblock zookeeper listener thread.
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Jiangjie Qin <becket.qin@gmail.com>
Closes#2504 from lindong28/KAFKA-4735
OfflinePartitionsCount PreferredReplicaImbalanceCount metrics check for
topic being deleted
Added integration test which polls the metrics while topics are being
created and deleted
Developed with mimaison
Author: Edoardo Comar <ecomar@uk.ibm.com>
Reviewers: Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#2325 from edoardocomar/KAFKA-4441
With this change, the consumer will be considered initialized in the
ProduceConsumeValidate tests once its partitions have been assigned.
Author: Apurva Mehta <apurva.1618@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2347 from apurvam/KAFKA-4588-fix-race-between-producer-consumer-start
Author: Maysam Yabandeh <myabandeh@dropbox.com>
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#2474 from ijuma/kafka-4039-deadlock-during-shutdown
Kafka brokers have a config called "offsets.topic.replication.factor" that specify the replication factor for the "__consumer_offsets" topic. The problem is that this config isn't being enforced. If an attempt to create the internal topic is made when there are fewer brokers than "offsets.topic.replication.factor", the topic ends up getting created anyway with the current number of live brokers. The current behavior is pretty surprising when you have clients or tooling running as the cluster is getting setup. Even if your cluster ends up being huge, you'll find out much later that __consumer_offsets was setup with no replication.
The cluster not meeting the "offsets.topic.replication.factor" requirement on the internal topic is another way of saying the cluster isn't fully setup yet.
The right behavior should be for "offsets.topic.replication.factor" to be enforced. Topic creation of the internal topic should fail with GROUP_COORDINATOR_NOT_AVAILABLE until the "offsets.topic.replication.factor" requirement is met. This closely resembles the behavior of regular topic creation when the requested replication factor exceeds the current size of the cluster, as the request fails with error INVALID_REPLICATION_FACTOR.
Author: Onur Karaman <okaraman@linkedin.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2177 from onurkaraman/KAFKA-3959
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2455 from hachikuji/KAFKA-4704
In some Places for the loop was used but it can be replaced by the for each.
In One file if else if else was used so I replaced the same with match.
Author: Akash Sethi <akash.sethi@knoldus.in>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2435 from akashsethi24/trunk
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2443 from ijuma/close-create-topics-policy-during-shutdown
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#2406 from ijuma/kafka-4636-per-listener-security-settings
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2436 from hachikuji/hotfix-offset-deletion
This behaviour was changed in 8b3c6c0, but it caused interceptor
test failures (which rely on callbacks) and since we’re so close to
code freeze, it’s better to be conservative.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2440 from ijuma/kafka-4699-callbacks-invoked-before-future-is-completed