Ensure that channel and selection keys are removed from `Selector` collections before propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't ensure that callers (NetworkClient for example) wont try to connect again before the next `poll` and hence we should clear the collections before re-throwing exceptions from `connect()`.
Reviewers: Jason Gustafson <jason@confluent.io>
The example in the producer's javadoc contained an inconsistent value for `delivery.timeout.ms`. This patch removes the inconsistent config and several unnecessary overrides in order to simplify the example.
Reviewers: huxi <huxi_2b@hotmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Reviewers: Guozhang Wang <guozhang@confluent.io>, Nikolay Izhikov <nIzhikov@gmail.com>, Ismael Juma <ismael@confluent.io>, Bill Bejeck <bill@confluent.io>
When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.
This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
Older versions of the Produce API should return an error if zstd is used. This validation existed, but it was done during request parsing, which means that instead of returning an error code, the broker disconnected. This patch fixes the issue by moving the validation outside of the parsing logic. It also fixes several other record validations which had the same problem.
Reviewers: Jason Gustafson <jason@confluent.io>
This is a follow-up PR from the previous PR #5779, where KTabeSource always get old values from the store even if sendOldValues. It gets me to make a pass over all the KTable/KStreamXXX processor to push the sendOldValues at the callers in order to avoid unnecessary store reads.
More details: ForwardingCacheFlushListener and TupleForwarder both need sendOldValues as parameters.
a. For ForwardingCacheFlushListener it is not needed at all, since its callers XXXCachedStore already use the sendOldValues values passed from TupleForwarder to avoid getting old values from underlying stores.
b. For TupleForwarder, it actually only need to pass the boolean flag to the cached store; and then it does not need to keep it as its own variable since the cached store already respects the boolean to pass null or the actual value..
The only other minor bug I found from the pass in on KTableJoinMerge, where we always pass old values and ignores sendOldValues.
Reviewers: Matthias J. Sax <mjsax@apache.org>
Refactor the materialization for source KTables in the way that:
If Materialized.as(queryableName) is specified, materialize;
If the downstream operator requires to fetch from this KTable via ValueGetters, materialize;
If the downstream operator requires to send old values, materialize.
Otherwise do not materialize the KTable. E.g. builder.table("topic").filter().toStream().to("topic") would not create any state stores.
There's a couple of minor changes along with PR as well:
KTableImpl's queryableStoreName and isQueryable are merged into queryableStoreName only, and if it is null it means not queryable. As long as it is not null, it should be queryable (i.e. internally generated names will not be used any more).
To achieve this, splitted MaterializedInternal.storeName() and MaterializedInternal.queryableName(). The former can be internally generated and will not be exposed to users. QueryableName can be modified to set to the internal store name if we decide to materialize it during the DSL parsing / physical topology generation phase. And only if queryableName is specified the corresponding KTable is determined to be materialized.
Found some overlapping unit tests among KTableImplTest, and KTableXXTest, removed them.
There are a few typing bugs found along the way, fixed them as well.
-----------------------
This PR is an illustration of experimenting a poc towards logical materializations.
Today we've logically materialized the KTable for filter / mapValues / transformValues if queryableName is not specified via Materialized, but whenever users specify queryableName we will still always materialize. My original goal is to also consider logically materialize for queryable stores, but when implementing it via a wrapped store to apply the transformations on the fly I realized it is tougher than I thought, because we not only need to support fetch or get, but also needs to support range queries, approximateNumEntries, and isOpen etc as well, which are not efficient to support. So in the end I'd suggest we still stick with the rule of always materializing if queryableName is specified, and only consider logical materialization otherwise.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <mjsax@apache.org>
We saw a log statement in which the cause of the failure to write a checkpoint was not properly logged.
This change logs the exception properly and also verifies the log message.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This test sometimes fails with
```
kafka.tools.MirrorMaker$NoRecordsException
at kafka.tools.MirrorMaker$ConsumerWrapper.receive(MirrorMaker.scala:483)
at kafka.tools.MirrorMakerIntegrationTest$$anonfun$testCommaSeparatedRegex$1.apply$mcZ$sp(MirrorMakerIntegrationTest.scala:92)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:738)
```
The test should catch `NoRecordsException` instead of `TimeoutException`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This attempts to address the flaky test `SaslAuthenticatorTest.testCannotReauthenticateWithDifferentPrincipal()`
I was not able to reproduce locally even after 150 test runs in a loop, but given the error message:
```
org.junit.ComparisonFailure: expected:
<[6QBJiMZ6o5AqbNAjDTDjWtQSa4alfuUWsYKIy2tt7dz5heDaWZlz21yr8Gl4uEJkQABQXeEL0UebdpufDb5k8SvReSK6wYwQ9huP-9]> but was:<[????����????OAUTHBEARER]>
```
`????����????` seems to mean invalid UTF-8.
We now specify the charset when writing out and reading in bytes.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This is the error message we're after:
"You may not specify an explicit partition assignment when using multiple consumers in the same group."
We apparently changed it midway through #5810 and forgot to update the test.
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Ismael Juma <ismael@juma.me.uk>
On the follower side, for the empty `LogAppendInfo` retrieved from the leader, fetcherLagStats set the wrong lag for fetcherLagStats due to `nextOffset` is zero.
There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from this state by deleting and recreating the segment and all of its associated index files.
Reviewers: Jason Gustafson <jason@confluent.io>
Should be ported back to 2.0
Author: Cyrus Vafadari <cyrus@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#6004 from cyrusv/cyrus-npe
Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. While appending new records, complete fetch requests after releasing leaderIsrUpdate of the Partition to which records were appended to avoid deadlocks in request handler threads.
Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
Changes made as part of this commit.
- Improved error message for better readability at millis validation utility
- Corrected java documentation on `AdvanceInterval` check.
- Added caller specific prefix text to make error message more clear to developers/users.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Jacek Laskowski <jacek@japila.pl>
This is a system test for doing a rolling upgrade of a topology with a named repartition topic.
1. An initial Kafka Streams application is started on 3 nodes. The topology has one operation forcing a repartition and the repartition topic is explicitly named.
2. Each node is started and processing of data is validated
3. Then one node is stopped (full stop is verified)
4. A property is set signaling the node to add operations to the topology before the repartition node which forces a renumbering of all operators (except repartition node)
5. Restart the node and confirm processing records
6. Repeat the steps for the other 2 nodes completing the rolling upgrade
I ran two runs of the system test with 25 repeats in each run for a total of 50 test runs.
All test runs passed
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This change adds some basic system tests for delegation token based authentication:
- basic delegation token creation
- producing with a delegation token
- consuming with a delegation token
- expiring a delegation token
- producing with an expired delegation token
New files:
- delegation_tokens.py: a wrapper around kafka-delegation-tokens.sh - executed in container where a secure Broker is running (taking advantage of automatic cleanup)
- delegation_tokens_test.py: basic test to validate the lifecycle of a delegation token
Changes were made in the following file to extend their functionality:
- config_property was updated to be able to configure Kafka brokers with delegation token related settings
- jaas.conf template because a broker needs to support multiple login modules when delegation tokens are used
- consule-consumer and verifiable_producer to override KAFKA_OPTS (to specify custom jaas.conf) and the client properties (to authenticate with delegation token).
Author: Attila Sasvari <asasvari@apache.org>
Reviewers: Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Andras Katona <41361962+akatona84@users.noreply.github.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#5660 from asasvari/KAFKA-4544
This is a security page improvement that adds documentation about Kafka authorization primitives to the security page.
Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Srinivas <srinivas96alluri@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
Closes#5906 from viktorsomogyi/security-page-improvement
This is minor refactoring that brings in the creation of producer and consumer to the Worker. Currently, the consumer is created in the WorkerSinkTask. This should not affect any functionality and it just makes the code structure easier to understand.
Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Randall Hauch <rhauch@gmail.com>, Robert Yokota <rayokota@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5842 from mageshn/KAFKA-7551
Includes Update to ConnectRecord string representation to give
visibility into schemas, useful in SMT tracing
Author: Cyrus Vafadari <cyrus@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5860 from cyrusv/cyrus-logging
EndToEndLatency tool produces a dummy record in case the topic does not exist. This behavior was introduced in this PR https://github.com/apache/kafka/pull/5319 as part of updating the tool to use latest consumer API. However, if we run the tool with producer acks == 1, the high watermark may not be updated before we reset consumer offsets to latest. In rare cases when this happens, the tool will throw an exception in the for loop where the consumer will unexpectedly consume the dummy record. As a result, we occasionally see Benchmark.test_end_to_end_latency system test failures.
This PR checks if topic exists, and creates the topic using AdminClient if it does not exist.
Author: Anna Povzner <anna@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5950 from apovzner/fix-EndToEndLatency
- Remove ZKUtils usage from various tests
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Ismael Juma <ismael@juma.me.uk>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>, Ryanne Dolan <ryannedolan@gmail.com>
Closes#5480 from omkreddy/zkutils
A heap dump provided by Patrik Kleindl in https://issues.apache.org/jira/browse/KAFKA-7660 identifies the childrenSensors map in Metrics as keeping references to sensors alive after they have been removed.
This PR fixes it and adds a test to be sure.
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Gradle 5.0 was released on 26 November. See the release notes for enhancements and fixes: https://docs.gradle.org/5.0/release-notes.html.
A notable bugfix ensures that Javadoc artifacts are cleaned between builds.
The upgraded wrapper was spot-checked against the commands in the README and the
README was updated not to use removed system property `-Dtest.single`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
In StreamsMetricsImpl, the parentSensors map was keeping references to Sensors after the sensors themselves had been removed.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Current code will fall into non-stop loop and send more message to broker, and Stat in PerfCallback method record will throw ArrayIndexOutOfBoundsException
Some deserializer needs the topic name to be able to correctly deserialize the payload of the message.
Console consumer works great with Deserializer<String> however it calls deserializer with topic set as null.
This breaks the API and the topic information is available in the ConsumerRecord.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Gardner Vickers <gardner@vickers.me>, Jun Rao <junrao@gmail.com>
This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.
I've added a test which fails without the fix.
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>