The bug meant that the base offset was the same as the batch size instead of 0 so the broker would always recompress batches.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#2794 from ijuma/fix-records-builder-construction
Fixes deadlock scenario found during local test run:
The main thread was waiting for the coordinator lock.
The thread performing close() was holding the
coordinator lock and polling to find coordinator.
The test expected close() to timeout, but for timing
out, the main thread had to update time, which it
couldn't since it was waiting for the lock. This fix
avoids using coordinator in the main thread during
the close task.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2792 from rajinisivaram/MINOR-closetest-deadlock
This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.
With current trunk there is a possibility to run into this:
```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
at java.lang.Thread.run(Thread.java:745)
"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
```
In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :
```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x000000077d45a590> (a java.lang.Thread)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```
=> causing a deadlock.
Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).
Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).
Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).
Author: Armin Braun <me@obrown.io>
Author: Armin <me@obrown.io>
Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2791 from original-brownbear/fix-streams-deadlock
This is from the KIP-98 proposal.
The main points of discussion surround the correctness logic, particularly the Log class where incoming entries are validated and duplicates are dropped, and also the producer error handling to ensure that the semantics are sound from the users point of view.
There is some subtlety in the idempotent producer semantics. This patch only guarantees idempotent production upto the point where an error has to be returned to the user. Once we hit a such a non-recoverable error, we can no longer guarantee message ordering nor idempotence without additional logic at the application level.
In particular, if an application wants guaranteed message order without duplicates, then it needs to do the following in the error callback:
1. Close the producer so that no queued batches are sent. This is important for guaranteeing ordering.
2. Read the tail of the log to inspect the last message committed. This is important for avoiding duplicates.
Author: Apurva Mehta <apurva@confluent.io>
Author: hachikuji <jason@confluent.io>
Author: Apurva Mehta <apurva.1618@gmail.com>
Author: Guozhang Wang <wangguoz@gmail.com>
Author: fpj <fpj@apache.org>
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#2735 from apurvam/exactly-once-idempotent-producer
See the JIRA for the full details. Essentially the test assertions depend on receiving reliable events from the consumer processes, but this is not generally possible in the presence of a hard failure (i.e. `kill -9`). Until we solve this problem, the hard failure scenarios will be turned off.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Closes#2771 from hachikuji/KAFKA-4689
Fix for adding state stores with regex defined sources
Author: bbejeck <bbejeck@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes#2618 from bbejeck/KAFKA-4791_unable_to_add_statestore_regex_topics
Author: Jason Gustafson <jason@confluent.io>
Author: Ismael Juma <github@juma.me.uk>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2762 from hachikuji/ensure-decompression-stream-closed
We got test error `org.apache.kafka.common.errors.TopicExistsException: Topic 'inputTopic' already exists.` in some builds. Can reproduce reliably at local machine. Root cause it async "topic delete" that might not be finished before topic gets re-created.
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Ismael Juma, Damian Guy, Guozhang Wang
Closes#2757 from mjsax/minor-fix-resetintegrationtest
Note: None of the use cases for offset fetch would lead to a `TOPIC_AUTHORIZATION_FAILED` error (fetching offset of an unauthorized partition would return an `UNKNOWN_TOPIC_OR_PARTITION` error). That is why it is being removed from the `PARTITION_ERRORS` list.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2653 from vahidhashemian/minor/update_possible_errors_in_offset_fetch_response
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2760 from lindong28/KAFKA-4973
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Jun Rao <junrao@gmail.com>, Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Closes#2691 from cmccabe/KAFKA-4902
This is a minor change but it helps to improve the log readability.
Author: Kamal C <kamal.chandraprakash@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2709 from Kamal15/util
The maxBytes field should be set to DEFAULT_RESPONSE_MAX_BYTES,
the same way as the constructor using the Struct does.
codeveloped with mimaison
Author: Edoardo Comar <ecomar@uk.ibm.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2694 from edoardocomar/MINOR-FetchRequest
- Improves streams efficiency by more than 200K requests/second (small 100 byte requests)
- Gets streams efficiency very close to pure consumer (see results in https://jenkins.confluent.io/job/system-test-kafka-branch-builder/746/console)
- Maintains same fairness across tasks
- Schedules all records in the queue in-between poll() calls, not just one per task.
Author: Eno Thereska <eno@confluent.io>
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes#2643 from enothereska/minor-schedule-round-robin
I manually tested that Crc32CTest and AbstractChecksums pass with JDK 9. I also verified that `Java9ChecksumFactory` is used in that case.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2739 from ijuma/kafka-1449-crc32c
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Michael G. Noll <michael@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Closes#2727 from cmccabe/KAFKA-4944
The transient failures make it harder to spot real failures and we can live without what is being tested (adding security to ZK via a rolling upgrade) until KIP-101 lands.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>
Closes#2742 from ijuma/disable-zk-upgrade-test
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#2738 from hachikuji/streaming-compressed-iterator
This brought down a cluster by causing continuous controller moves.
ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects that aren't thread-safe:
* Selector
* NetworkClient
* SSLEngine (this was the big one for us. We turn on SSL for interbroker communication).
As per the "Concurrency Notes" section from https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html:
> two threads must not attempt to call the same method (either wrap() or unwrap()) concurrently
SSLEngine.wrap gets called in:
* SslTransportLayer.write
* SslTransportLayer.handshake
* SslTransportLayer.close
It turns out that the ZkEventThread and RequestSendThread can concurrently call SSLEngine.wrap:
* ZkEventThread calls SslTransportLayer.close from ControllerChannelManager.removeExistingBroker
* RequestSendThread can call SslTransportLayer.write or SslTransportLayer.handshake from NetworkClient.poll
Suppose the controller moves for whatever reason. The former controller could have had a RequestSendThread who
was in the middle of sending out messages to the cluster while the ZkEventThread began executing
KafkaController.onControllerResignation, which calls ControllerChannelManager.shutdown, which sequentially cleans
up the controller-to-broker queue and connection for every broker in the cluster. This cleanup includes the call
to ControllerChannelManager.removeExistingBroker as mentioned earlier, causing the concurrent call to SSLEngine.wrap.
This concurrent call throws a BufferOverflowException which ControllerChannelManager.removeExistingBroker catches so
the ControllerChannelManager.shutdown moves onto cleaning up the next controller-to-broker queue and connection,
skipping the cleanup steps such as clearing the queue, stopping the RequestSendThread, and removing the entry from its
brokerStateInfo map.
By failing out of the Selector.close, the sensors corresponding to the broker connection has not been cleaned up. Any
later attempt at initializing an identical Selector will result in a sensor collision and therefore cause Selector
initialization to throw an exception. In other words, any later attempts by this broker to become controller again
will fail on initialization. When controller initialization fails, the controller deletes the /controller znode and
lets another broker take over.
Now suppose the controller moves enough times such that every broker hits the BufferOverflowException concurrency
issue. We're now guaranteed to fail controller initialization due to the sensor collision on every controller
transition, so the controller will move across brokers continuously.
This patch avoids the concurrent use of non-threadsafe classes in ControllerChannelManager.removeExistingBroker
by shutting down the RequestSendThread before closing the NetworkClient.
Author: Onur Karaman <okaraman@linkedin.com>
Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2746 from onurkaraman/KAFKA-4959
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin <becket.qin@gmail.com>
Closes#2476 from lindong28/KAFKA-4586
Author: Armin Braun <me@obrown.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#2699 from original-brownbear/KAFKA-4569
1. Use Initialization-on-demand holder idiom that relies on JVM lazy-loading instead of explicit initialization check.
2. Method handles were designed to be faster than Core Reflection, particularly if the method handle can be stored in a static final field (the JVM can then optimise the call as if it was a regular method call). Since the code is of similar complexity (and simpler if we consider the whole PR), I am treating this as a clean-up instead of a performance improvement (which would require doing benchmarks).
3. Remove unused `ByteBufferReceive`.
4. I removed the snappy library from the classpath and verified that `CompressionTypeTest` (which uses LZ4) still passes. This shows that the right level of laziness is achieved even if we use one of the lazily loaded compression algorithms.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2740 from ijuma/use-method-handles-for-compressed-stream-supplier
Thanks to Dong Lin for finding the lastStableOffset issue.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#2737 from ijuma/fix-fetch-response-lso
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Jun Rao <junrao@gmail.com>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2614 from hachikuji/exactly-once-message-format
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Michael G. Noll, Eno Thereska, Matthias J. Sax, Elias Levy, Guozhang Wang
Closes#2725 from dguy/minor-processor-java-doc
1. add thread id as prefix in state directory classes; also added logs for lock activities.
2. add logging for task creation / suspension.
3. add more information in rebalance listener logging.
4. add restored number of records into changlog reader.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Eno Thereska, Damian Guy, Ewen Cheslack-Postava
Closes#2702 from guozhangwang/KMinor-streams-task-creation-log4j-improvements
Changing getCanonicalName() references to getName() so that docs update with "$" instead of ".". Also added a connect-plugin-discovery.sh CLI to list all of the transformations available.
Author: Bruce Szalwinski <bruce.szalwinski@cdk.com>
Reviewers: Gwen Shapira
Closes#2720 from bruce-szalwinski/transforms and squashes the following commits:
ec3b5b9 [Bruce Szalwinski] remove connect-plugin-discovery. will submit in a different PR
eba0af7 [Bruce Szalwinski] Key / Value transformations are static nested classes and so are referenced using OuterClass$Key and OuterClass$Value.
Sometimes `ResetIntegrationTest` hangs and thus the build times out. We suspect, that this happens if no data is written into the input topics. Right now, input data is written once and reused for both test cases. If for some reason, the broker gets recreated (between both test cases), no data will be available for the second test method and thus the test hangs.
This change ensures, that input data is written for each test case individually.
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Ismael Juma, Eno Thereska, Guozhang Wang
Closes#2630 from mjsax/minor-reset-integration-test
Debug loggin of the start and end offsets used during state store restoration
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2718 from dguy/log-restore-offsets
remove unused log field from KStreamTransformValuesProcessor
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2717 from dguy/remove-unused-log-para
iterate over all keys returned from the rocksdb iterator so we don't miss any results
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Xavier Léauté, Guozhang Wang
Closes#2713 from dguy/window-iter
This uses JUnit Categories to identify integration tests. Adds 2 new build targets:
`integrationTest` and `unitTest`.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska <eno@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Closes#2695 from dguy/junit-categories