Segmented state stores turn on bulk loading of the underlying RocksDB
when restoring. This is correct for segmented state stores that
are in restore mode on active tasks and the onRestoreStart() and
onRestoreEnd() in RocksDBSegmentsBatchingRestoreCallback take care
of toggling bulk loading mode on and off. However, restoreAll()
in RocksDBSegmentsBatchingRestoreCallback might also turn on bulk loading
mode. When this happens on a stand-by task bulk loading mode is never
turned off. That leads to steadily increasing open file decriptors
in RocksDB because in bulk loading mode RocksDB creates continuously new
files but never compacts them (which is the intended behaviour).
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
* MINOR: Fix typo in RecordAccumulator
* MINOR: Fix typo in several files
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>
Standby task could also at risk of getting into illegal state when not being closed during HandleLostAll:
1. The standby task was initializing as CREATED state, and task corrupted exception was thrown from registerStateStores
2. The task corrupted exception was caught, and do a non-affected task commit
3. The task commit failed due to task migrated exception
4. The handleLostAll didn't close the standby task, leaving it as CREATED state
5. Next rebalance complete, the same task was assigned back as standby task.
6. Illegal Argument exception caught as state store already registered
Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
As stated, we couldn't wait for handleRebalanceComplete in the case of handleLostAll, as we already closed the active task as dirty, and could potentially require its offset in the next thread.runOnce call.
Co-authored-by: Guozhang Wang <wangguoz@gmail.com>
Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Fixes EmbeddedKafkaCluster.deleteTopicAndWait for use with kafka_2.13
Reviewers: Boyang Chen <boyang@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, John Roesler <vvcephei@apache.org>
This is a prerequisite for KAFKA-9501 and will also be useful for KAFKA-9603
There should be no logical changes here: the main difference is the removal of StandbyContextImpl in preparation for contexts to transition between active and standby.
Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite decorators out into a separate file.
Reviewers: Bruno Cadonna <bruno@confluent.io>, John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
The store's registered callback could also be a restore listener, in which case it should be triggered along with the user specified global listener as well.
Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
A small hotfix to avoid an extra probing rebalance the first time an application is launched.
This should particularly improve the testing experience.
Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
Validate that the assignment is always balanced wrt:
* active assignment balance
* stateful assignment balance
* task-parallel balance
Reviewers: Bruno Cadonna <bruno@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
Drops idempotent updates from KTable source operators.
Specifically, drop updates in which the value is unchanged,
and the timestamp is the same or larger.
Implements: KIP-557
Reviewers: Bruno Cadonna <bruno@confluent.io>, John Roesler <vvcephei@apache.org>
We spotted a case in the soak test where a standby task could be in CREATED state during commit, which causes an illegal state exception. To prevent this from happening, the solution is to always enforce a state check.
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Currently we add "Data" to all generated classnames in order to avoid naming collisions with existing Request/Response objects. Generated classes for other persistent schema definitions (such as those used in `GroupCoordinator` and `TransactionCoordinator`) will not necessarily have the same problem, so it would be nice if the generated types could use the name defined in the schema directly.
Reviewers: Boyang Chen <boyang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
Comparing all other test cases, the shouldAllowConcurrentAccesses starts an async producer sending records throughout the test other than just synchronously sent and acked a few records before we start the streams application. Right after the streams app is started, we check that at least one record is sent to the output topic (i.e. completed processing). However since only this test starts the producer async and did not wait for it to complete, it is possible that the async producer gets too longer to produce some records and causing it to fail.
To follow what other tests did, I let this test to first send one round of records synchronously before starting the async producing.
Also encountered some new scala warnings that I fixed along with this PR.
Reviewers: Matthias J. Sax <matthias@confluent.io>
1. Added a recordInternal function to let all other public functions trigger, so that shouldRecord would only be checked once.
2. In Streams, pass along the current wall-clock time inside InternalProcessorContext when process / punctuate which can be passed in to the record function to reduce the calling frequency of SystemTime.milliseconds().
Reviewers: John Roesler <vvcephei@apache.org>
This reverts commit 29e08fd2c2.
There turned out to be more than expected problems with adding the generic parameters.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Since we cannot guarantee to reassign the correct number of
stand-by tasks when reusing the previous assignment and the
reassignment is rather a micro-optimization, it is removed
to keep the algorithm correct and simple.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>
This PR fixes and improves two major issues:
1. When calling KafkaStreams#store we can always get an InvalidStateStoreException, and even waiting for Streams state to become RUNNING is not sufficient (this is also how OptimizedKTableIntegrationTest failed). So I wrapped all the function with a Util wrapper that captures and retries on that exception.
2. While trouble-shooting this issue, I also realized a potential bug in test-util's produceKeyValuesSynchronously, which creates a new producer for each of the record to send in that batch --- i.e. if you are sending N records with a single call, within that call it will create N producers used to send one record each, which is very slow and costly.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>
* add a config to set the TaskAssignor
* set the default assignor to HighAvailabilityTaskAssignor
* fix broken tests (with some TODOs in the system tests)
Implements: KIP-441
Reviewers: Bruno Cadonna <bruno@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
These two options are essentially incompatible, as caching will do nothing to reduce downstream traffic and writes when it has to allow non-unique keys (skipping records where the value is also the same is a separate issue, see KIP-557). But enabling caching on a store that's configured to retain duplicates is actually more than just ineffective, and currently causes incorrect results.
We should just log a warning and disable caching whenever a store is retaining duplicates to avoid introducing a regression. Maybe when 3.0 comes around we should consider throwing an exception instead to alert the user more aggressively.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
When debugging KAFKA-9388, I found the reason that the second test method test takes much longer (10s) than the previous one (~500ms) is because they used the same app.id. When the previous clients are shutdown, they would not send leave-group and hence we are still depending on the session timeout (10s) for the members to be removed out of the group.
When the second test is triggered, they will join the same group because of the same application id, and the prepare-rebalance phase would would for the full rebalance timeout before it kicks out the previous members.
Setting different application ids could resolve such issues for integration tests --- I did a quick search and found some other integration tests have the same issue. And after this PR my local unit test runtime reduced from about 14min to 7min.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, John Roesler <john@confluent.io>
1. In both RocksDBMetrics and Metrics integration tests, we do not need to wait for consumer to consume records from output topics since the sensors / metrics are registered upon task creation.
2. Merged the two test cases of RocksDB with one app that creates two state stores (non-segmented and segmented).
With these two changes, local runtime of these two tests reduced from 2min+ and 3min+ to under a minute.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
One of the new rocksdb unit tests creates a non-temporary rocksdb directory wherever the test is run from, with some rocksdb files left behind after the test(s) are done. We should use the tempDirectory dir for this testing
Reviewers: Guozhang Wang <wangguoz@gmail.com>