Refactor internal store wrapping for improved maintainability.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This condition is a fatal error, so error level is warranted, to provide more context on why Streams shuts down.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see #4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.
2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending `null -> null` in this case.
Modifies corresponding unit tests.
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
JUnit 4.13 fixes the issue where `Category` and `Parameterized` annotations
could not be used together. It also deprecates `ExpectedException` and
`assertThat`. Given this, we:
- Replace `ExpectedException` with the newly introduced `assertThrows`.
- Replace `Assert.assertThat` with `MatcherAssert.assertThat`.
- Annotate `AbstractLogCleanerIntegrationTest` with `IntegrationTest` category.
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, David Arthur <mumrah@gmail.com>
This PR addressed the following test failure:
```
java.lang.AssertionError:
Expected: a string starting with "process-state-manager-test Failed to write offset checkpoint file to ["
but: was "[AdminClient clientId=adminclient-874] Connection to node -1 (localhost/127.0.0.1:8080) could not be established. Broker may not be available."
```
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This PR adds a upgrade notes and changes examples to use the bootstrap-server.
Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Reviewers: Srinivas <srinivas96alluri@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#6118 from viktorsomogyi/topiccommand-adminclient-doc
Reviewers: Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>, Boyang Chen <bchen11@outlook.com>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.
I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
This PR enables BloomFilters for RocksDB to speed up point lookups.
The request for this has been around for some time - https://issues.apache.org/jira/browse/KAFKA-4850
For testing, I've done the following
Ran the standard streams suite of unit and integration tests
Kicked off the simple benchmark test with bloom filters enabled
Kicked off the simple benchmark test with bloom filters not enabled
Kicked off streams system tests
Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
Let findSessions(final K key) to call on underlying bytes store directly, using the more restricted range.
Fix the conservative upper range for multi-key range in session schema.
Minor: removed unnecessary private WrappedSessionStoreBytesIterator class as it is only used in unit test.
Minor: removed unnecessary schema#init function by using the direct bytes-to-binary function.
Please read the original PR for more detailed explanation of the root cause of the bug.
Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>
Note we can only remove this handling in 2.2 but not in 2.1 since #6124 is only in 2.2.
Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, Matthias J. Sax <matthias@confluent.io>
In RocksDBStore.openDB we call
Files.createDirectories(dir.getParentFile().toPath());
return RocksDB.open(options, dir.getAbsolutePath());
We would also add the absolute file path as well to avoid the extra logging.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
I've observed several reports of sudden unexpected streamthread shutdown with the log entry like:
State transition from PENDING_SHUTDOWN to DEAD
but there is no related error logs before this line at all. I suspect this is because we intentionally do not log for KafkaException and there's some edge cases where we miss internally and hence caused this. I'm adding the ERROR level log entry here in order to reveal more information in case I saw this again in the future.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Ismael Juma <ismael@confuent.io>
The existing javadoc for PartitionGroup is a little confusing.
It's relatively important for these concepts to be clear, since
they form the basis for stream-time in Kafka Streams.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
KIP-291 Implementation : Added code to separate controller connections and requests from the data plane.
Tested with local deployment that the controller request are handled by the control plane and other requests are handled by the data plane.
Also added unit tests in order to test the functionality.
Author: Lucas Wang <luwang@linkedin.com>,
Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
Reviewers: Joel Koshy <jjkoshy@gmail.com>, Jun Rao <junrao@gmail.com>
Using AdminClient#alterConfigs, topic `retention.ms` property can be assigned to a value lesser than -1. This leads to inconsistency while describing the topic configuration. We should not allow values lesser than -1.
Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>,Matthias J. Sax <matthias@confluent.io>
Closes#6082 from kamalcph/KAFKA-7781
We recently improved the handling of the InternalTopicManager retries with #6085. The AdminClient will throw an InvalidTopicException if the topic is not found. We need to ignore that exception as when calling AdminClient#describe we may not have had a chance to create the topic yet, especially with the case of internal topics
I've created a new test asserting that when an InvalidTopicException is thrown when the topic is not found we continue on.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Guozhang Wang <guozhang@confluent.io>
The StreamsUpgradeTest::test_upgrade_downgrade_brokers used sleep calls in the test which led to flaky test performance and as a result, we placed an @ignore annotation on the test. This PR uses log events instead of the sleep calls hence we can now remove the @ignore setting.
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
While looking into KAFKA-7657, I found there are a few loopholes in this logic:
We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams instance-level, in addition to the instance state itself. stateLock is used when accessing the instance state, however when we are in the thread state change callback, we are accessing both the thread-states as well as the instance state at the same time in the callers of setState without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce a threadStatesLock in addition to the stateLock, which should always be grabbed to modify the thread-states map before the stateLock for modifying the instance level; and we also defer the checking of the instance-level state inside the setState call.
When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD state, this is because some threads maybe dead at the rebalance period but we should still proceed to RUNNING if the rest of threads are still transiting to RUNNING.
Added unit test for 2) above. Also simplified another test as a nit change.
Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Matthias J. Sax <mjsax@apache.org>
While looking into KAFKA-7657, I found there are a few loopholes in this logic:
1. We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams instance-level, in addition to the instance state itself. stateLock is used when accessing the instance state, however when we are in the thread state change callback, we are accessing both the thread-states as well as the instance state at the same time in the callers of setState without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce a threadStatesLock in addition to the stateLock, which should always be grabbed to modify the thread-states map before the stateLock for modifying the instance level; and we also defer the checking of the instance-level state inside the setState call.
2. When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD state, this is because some threads maybe dead at the rebalance period but we should still proceed to RUNNING if the rest of threads are still transiting to RUNNING.
Added unit test for 2) above. Also simplified another test as a nit change.
Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Matthias J. Sax <mjsax@apache.org>
Currently the TimeWindowedSerde does not deserialize the windowed keys from a changelog topic properly. There are a few assumptions made in the TimeWindowedDeserializer that prevents the changelog windowed keys from being correctly deserialized. This PR will introduce a new WindowSerde to allow proper deserialization of changelog windowed keys.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. The retry loop of the InternalTopicManager would just be: a) describe topics, and exclude those which already exist with the right num.partitions, b) for the remaining topics, try to create them. Remove any inner loops.
2. In CreateTopicResponse and MetadataResponse (for describe topic), handle the special error code of TopicExist and UnknownTopicOrPartition in order to retry in the next loop.
3. Do not handle TimeoutException since it should already been handled inside AdminClient.
Add corresponding unit tests for a) topic marked for deletion but not complete yet, in which case metadata response would not contain this topic, but create topic would return error TopicExists; b) request keep getting timed out.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This pull request replaces HashMap with LinkedHashMap to guarantee ordering of metrics tags.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <guozhang@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>
Right now if a repartition is required and users choose to name the repartition topic for an aggregation i.e. kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping")); The resulting KGroupedStream can't be reused
with optimizations are disabled, as Streams will attempt to create two repartiton topics with the same name.
However, if optimizations are enabled then the resulting KGroupedStream can be re-used
For example the following will work if optimizations are enabled.
This PR provides a unit test proving as much.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Updating the documentation for table operation because I believe it is incorrect.
In PR #5163 the table operation stopped disabling the changelog topic by default and instead moved that optimization to a configuration that is not enabled by default. This PR updates the documentation to reflect the change in behavior and point to the new configuration for optimization.
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>