Right now if a repartition is required and users choose to name the repartition topic for an aggregation i.e. kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); The resulting KGroupedStream can't be reused
with optimizations are disabled, as Streams will attempt to create two repartiton topics with the same name.
However, if optimizations are enabled then the resulting KGroupedStream can be re-used
For example the following will work if optimizations are enabled.
This PR provides a unit test proving as much.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Previous PR #6043 reduced throughput for VerifiableProducer in base class, but the streams_standby_replica_test needs higher throughput for consumer to complete verification in 60 seconds
Update system test and kicked off branch builder with 25 repeats https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2201/
Reviewers: Guozhang Wang <wangguoz@gmail.com>
**User Interface Improvement :** If topic doesn't exist then Kafka describe command should throw topic doesn't exist exception, like alter and delete commands
Author: Manohar Vanam <manohar.crazy09@gmail.com>
Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Jason Gustafson <jason@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#5211 from ManoharVanam/KAFKA-7054
Switch to lambda when ever possible instead of old anonymous way
in tools module
Author: Srinivas Reddy <srinivas96alluri@gmail.com>
Author: Srinivas Reddy <mrsrinivas@users.noreply.github.com>
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#6013 from mrsrinivas/tools-switch-to-java8
This patch disables support for WADL output in the Connect REST API since it was never intended to be exposed.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
This PR addresses a few issues with this system test flakiness. This PR is a cherry-picked duplicate of #6041 but for the trunk branch, hence I won't repeat the inline comments here.
1. Need to grab the monitor before a given operation to observe logs for signal
2. Relied too much on a timely rebalance and only sent a handful of messages.
I've updated the test and ran it here https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2143/ parameterized for 15 repeats all passed.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
After a recent leader election, the leaders high-water mark might lag behind the offset at the beginning of the new epoch (as well as the previous leader's HW). This can lead to offsets going backwards from a client perspective, which is confusing and leads to strange behavior in some clients.
This change causes Partition#fetchOffsetForTimestamp to throw an exception to indicate the offsets are not yet available from the leader. For new clients, a new OFFSET_NOT_AVAILABLE error is added. For existing clients, a LEADER_NOT_AVAILABLE is thrown.
This is an implementation of [KIP-207](https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change).
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>
Updating the documentation for table operation because I believe it is incorrect.
In PR #5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization.
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Ensure that channel and selection keys are removed from `Selector` collections before propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't ensure that callers (NetworkClient for example) wont try to connect again before the next `poll` and hence we should clear the collections before re-throwing exceptions from `connect()`.
Reviewers: Jason Gustafson <jason@confluent.io>
The example in the producer's javadoc contained an inconsistent value for `delivery.timeout.ms`. This patch removes the inconsistent config and several unnecessary overrides in order to simplify the example.
Reviewers: huxi <huxi_2b@hotmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Reviewers: Guozhang Wang <guozhang@confluent.io>, Nikolay Izhikov <nIzhikov@gmail.com>, Ismael Juma <ismael@confluent.io>, Bill Bejeck <bill@confluent.io>
When a consumer first joins a group, it doesn't have an assigned memberId. If the rebalance is delayed for some reason, the client may disconnect after a request timeout and retry. Since the client had not received its memberId, then we do not have a way to detect the retry and expire the previously generated member id. This can lead to unbounded growth in the size of the group until the rebalance has completed.
This patch fixes the problem by proactively completing all JoinGroup requests for new members after a timeout of 5 minutes. If the client is still around, we expect it to retry.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Boyang Chen <bchen11@outlook.com>, Guozhang Wang <wangguoz@gmail.com>
Older versions of the Produce API should return an error if zstd is used. This validation existed, but it was done during request parsing, which means that instead of returning an error code, the broker disconnected. This patch fixes the issue by moving the validation outside of the parsing logic. It also fixes several other record validations which had the same problem.
Reviewers: Jason Gustafson <jason@confluent.io>
This is a follow-up PR from the previous PR #5779, where KTabeSource always get old values from the store even if sendOldValues. It gets me to make a pass over all the KTable/KStreamXXX processor to push the sendOldValues at the callers in order to avoid unnecessary store reads.
More details: ForwardingCacheFlushListener and TupleForwarder both need sendOldValues as parameters.
a. For ForwardingCacheFlushListener it is not needed at all, since its callers XXXCachedStore already use the sendOldValues values passed from TupleForwarder to avoid getting old values from underlying stores.
b. For TupleForwarder, it actually only need to pass the boolean flag to the cached store; and then it does not need to keep it as its own variable since the cached store already respects the boolean to pass null or the actual value..
The only other minor bug I found from the pass in on KTableJoinMerge, where we always pass old values and ignores sendOldValues.
Reviewers: Matthias J. Sax <mjsax@apache.org>
Refactor the materialization for source KTables in the way that:
If Materialized.as(queryableName) is specified, materialize;
If the downstream operator requires to fetch from this KTable via ValueGetters, materialize;
If the downstream operator requires to send old values, materialize.
Otherwise do not materialize the KTable. E.g. builder.table("topic").filter().toStream().to("topic") would not create any state stores.
There's a couple of minor changes along with PR as well:
KTableImpl's queryableStoreName and isQueryable are merged into queryableStoreName only, and if it is null it means not queryable. As long as it is not null, it should be queryable (i.e. internally generated names will not be used any more).
To achieve this, splitted MaterializedInternal.storeName() and MaterializedInternal.queryableName(). The former can be internally generated and will not be exposed to users. QueryableName can be modified to set to the internal store name if we decide to materialize it during the DSL parsing / physical topology generation phase. And only if queryableName is specified the corresponding KTable is determined to be materialized.
Found some overlapping unit tests among KTableImplTest, and KTableXXTest, removed them.
There are a few typing bugs found along the way, fixed them as well.
-----------------------
This PR is an illustration of experimenting a poc towards logical materializations.
Today we've logically materialized the KTable for filter / mapValues / transformValues if queryableName is not specified via Materialized, but whenever users specify queryableName we will still always materialize. My original goal is to also consider logically materialize for queryable stores, but when implementing it via a wrapped store to apply the transformations on the fly I realized it is tougher than I thought, because we not only need to support fetch or get, but also needs to support range queries, approximateNumEntries, and isOpen etc as well, which are not efficient to support. So in the end I'd suggest we still stick with the rule of always materializing if queryableName is specified, and only consider logical materialization otherwise.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <mjsax@apache.org>
We saw a log statement in which the cause of the failure to write a checkpoint was not properly logged.
This change logs the exception properly and also verifies the log message.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This test sometimes fails with
```
kafka.tools.MirrorMaker$NoRecordsException
at kafka.tools.MirrorMaker$ConsumerWrapper.receive(MirrorMaker.scala:483)
at kafka.tools.MirrorMakerIntegrationTest$$anonfun$testCommaSeparatedRegex$1.apply$mcZ$sp(MirrorMakerIntegrationTest.scala:92)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:738)
```
The test should catch `NoRecordsException` instead of `TimeoutException`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This attempts to address the flaky test `SaslAuthenticatorTest.testCannotReauthenticateWithDifferentPrincipal()`
I was not able to reproduce locally even after 150 test runs in a loop, but given the error message:
```
org.junit.ComparisonFailure: expected:
<[6QBJiMZ6o5AqbNAjDTDjWtQSa4alfuUWsYKIy2tt7dz5heDaWZlz21yr8Gl4uEJkQABQXeEL0UebdpufDb5k8SvReSK6wYwQ9huP-9]> but was:<[????����????OAUTHBEARER]>
```
`????����????` seems to mean invalid UTF-8.
We now specify the charset when writing out and reading in bytes.
Reviewers: Ismael Juma <ismael@juma.me.uk>
This is the error message we're after:
"You may not specify an explicit partition assignment when using multiple consumers in the same group."
We apparently changed it midway through #5810 and forgot to update the test.
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Ismael Juma <ismael@juma.me.uk>
On the follower side, for the empty `LogAppendInfo` retrieved from the leader, fetcherLagStats set the wrong lag for fetcherLagStats due to `nextOffset` is zero.
There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from this state by deleting and recreating the segment and all of its associated index files.
Reviewers: Jason Gustafson <jason@confluent.io>
Should be ported back to 2.0
Author: Cyrus Vafadari <cyrus@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#6004 from cyrusv/cyrus-npe
Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. While appending new records, complete fetch requests after releasing leaderIsrUpdate of the Partition to which records were appended to avoid deadlocks in request handler threads.
Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>
Changes made as part of this commit.
- Improved error message for better readability at millis validation utility
- Corrected java documentation on `AdvanceInterval` check.
- Added caller specific prefix text to make error message more clear to developers/users.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Jacek Laskowski <jacek@japila.pl>
This is a system test for doing a rolling upgrade of a topology with a named repartition topic.
1. An initial Kafka Streams application is started on 3 nodes. The topology has one operation forcing a repartition and the repartition topic is explicitly named.
2. Each node is started and processing of data is validated
3. Then one node is stopped (full stop is verified)
4. A property is set signaling the node to add operations to the topology before the repartition node which forces a renumbering of all operators (except repartition node)
5. Restart the node and confirm processing records
6. Repeat the steps for the other 2 nodes completing the rolling upgrade
I ran two runs of the system test with 25 repeats in each run for a total of 50 test runs.
All test runs passed
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>