See KIP-31 and KIP-32 for details.
A few notes on the patch:
1. This patch implements KIP-31 and KIP-32. The patch includes features in both KAFKA-3025, KAFKA-3026 and KAFKA-3036
2. All unit tests passed.
3. The unit tests were run with new and old message format.
4. When message format conversion occurs during consumption, the consumer will not be able to detect the message size too large situation. I did not try to fix this because the situation seems rare and only happen during migration phase.
Author: Jiangjie Qin <becket.qin@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Anna Povzner <anna@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#764 from becketqin/KAFKA-3025
Removing streams' specific config params from producer/consumer configs to reduce warning messages.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#906 from ymatsuda/clean_config
Pass through the root StateStore in the init method so the inner StateStore can register that object.
Author: tomdearman <tom.dearman@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes#904 from tomdearman/KAFKA-3229
* We need to poll periodically even when all partitions are paused in order to respond to a possible rebalance promptly.
* There is a race condition when two (or more) threads try to clean up the same state directory. One of the thread fails with FileNotFoundException. Thus the new code simply catches it and ignore.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Gwen Shapira
Closes#893 from ymatsuda/hotfix
* During window store initialization, we have to open segments in the segment id order and update ```currentSegmentId```, otherwise cleanup won't work.
* ```getSegment()``` should not create a segment and clean up old segments if the segment id is greater than ```currentSegmentId```. Segment maintenance should be driven not by query but only by data insertion.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#891 from ymatsuda/hotfix2
Buffered records of change logs must be cleared upon reassignment of standby tasks.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#889 from ymatsuda/hotfix
guozhangwang
A window store should open all existing segments. This is important for segment cleanup, and it also ensures that the first fetch() call returns the hits, the values in the search range. (previously, it missed the hits in fetch() immediately after initialization).
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#886 from ymatsuda/hotfix3
The range is inclusive according to KeyValueStore's java doc.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#883 from ymatsuda/minor
* RocksDBStore.putInternal should bypass logging.
* StoreChangeLogger should not call context.recordCollector() when nothing to log
* This is for standby tasks. In standby task, recordCollector() throws an exception. There should be nothing to log anyway.
* fixed ConcurrentModificationException in StreamThread
guozhangwang
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#877 from ymatsuda/hotfix2
workround partition ordering not preserved by the consumer group management.
guozhangwang
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#868 from ymatsuda/partitionOrder
guozhangwang
Temporarily disabled state store access checking.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#864 from ymatsuda/fix_table_lookup
We need to close producer first before closing tasks to make sure all messages are acked and hence checkpoint offsets are updated before closing tasks and their state. It was re-ordered mistakenly before.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Yasuhiro Matsuda <yasuhiro@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#845 from guozhangwang/KStreamState
Changes include:
1) Move logging logic from MeteredXXXStore to internal stores, and leave WindowedStore API clean by removed all internalPut/Get functions.
2) Wrap common logging behavior of InMemory and LRUCache stores into one class.
3) Fix a bug for StoreChangeLogger where byte arrays are not comparable in HashSet by using a specified RawStoreChangeLogger.
4) Add a caching layer on top of RocksDBStore with object caching, it relies on the object's equals and hashCode function to be consistent with serdes.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Yasuhiro Matsuda <yasuhiro@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#826 from guozhangwang/K3060
guozhangwang
removing an unused class, FilteredIterator, and its test.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Gwen Shapira
Closes#816 from ymatsuda/remove_obsolete_class
It behaves better on Windows and provides more useful error messages.
Also:
* Minor inconsistency fix in `kafka.server.OffsetCheckpoint`.
* Remove delete from `streams.state.OffsetCheckpoint` constructor (similar to the change in `kafka.server.OffsetCheckpoint` in 836cb19633 (diff-2503b32f29cbbd61ed8316f127829455L29)).
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#771 from ijuma/kafka-3105-use-atomic-move-with-fallback-instead-of-rename
guozhangwang
When ```WindowedSerializer``` is specified in ```to(...)``` or ```through(...)``` for a key, we use ```WindowedStreamPartitioner```.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#779 from ymatsuda/partitioner
Added option to use custom partitioning logic within each topology sink.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Guozhang Wang
Closes#309 from rhauch/kafka-2649
guozhangwang
An implementation of local store for join window. This implementation uses "rolling" of RocksDB instances for timestamp based truncation.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#726 from ymatsuda/windowed_join
guozhangwang
At DAG level, `KTable<K,V>` sends (key, (new value, old value)) to down stream. This is done by wrapping the new value and the old value in an instance of `Change<V>` class and sending it as a "value" part of the stream. The old value is omitted (set to null) by default for optimization. When any downstream processor needs to use the old value, the framework should enable it (see `KTableImpl.enableSendingOldValues()` and implementations of `KTableProcessorSupplier.enableSensingOldValues()`).
NOTE: This is meant to be used by aggregation. But, if there is a use case like a SQL database trigger, we can add a new KTable method to expose this.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#672 from ymatsuda/trigger
guozhangwang
* a test for ktable state store creation
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#661 from ymatsuda/more_ktable_test
onurkaraman becketqin Do you have time to review this patch? It addresses the ticket that jjkoshy filed in KAFKA-2668.
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Onur Karaman <okaraman@linkedin.com>, Joel Koshy <jjkoshy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#328 from lindong28/KAFKA-2668