guozhangwang
An optimization which may reduce unnecessary poll for standby tasks.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#535 from ymatsuda/remove_empty_standby_task
guozhangwang
* added a new config param "num.standby.replicas" (the default value is 0).
* added a new abstract class AbstractTask
* added StandbyTask as a subclass of AbstractTask
* modified StreamTask to a subclass of AbstractTask
* StreamThread
* standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned()
* standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked()
* In addStandbyTasks(), change log partitions are assigned to restoreConsumer.
* In removeStandByTasks(), change log partitions are removed from restoreConsumer.
* StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0.
* If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#526 from ymatsuda/standby_task
Fails when order of elements is incorrect
Author: Grant Henke <granthenke@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes#510 from granthenke/streams-test
guozhangwang
When the rebalance happens each consumer reports the following information to the coordinator.
* Client UUID (a unique id assigned to an instance of KafkaStreaming)
* Task ids of previously running tasks
* Task ids of valid local states on the client's state directory
TaskAssignor does the following
* Assign a task to a client which was running it previously. If there is no such client, assign a task to a client which has its valid local state.
* Try to balance the load among stream threads.
* A client may have more than one stream threads. The assignor tries to assign tasks to a client proportionally to the number of threads.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#497 from ymatsuda/task_assignment
* Added StateStoreSupplier
* StateStore
* Added init(ProcessorContext context) method
* TopologyBuilder
* Added addStateStore(StateStoreSupplier supplier, String... processNames)
* Added connectProessorAndStateStores(String processorName, String... stateStoreNames)
* This is for the case processors are not created when a store is added to the topology. (used by KStream)
* KStream
* add stateStoreNames to process(), transform(), transformValues().
* Refactored existing state stores to implement StateStoreSupplier
guozhangwang
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#387 from ymatsuda/state_store_supplier
StreamThread should keep calling consumer.poll() even when no task is assigned. This is necessary to get a task.
guozhangwang
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#373 from ymatsuda/no_task
guozhangwang
* A task id is now a class, ```TaskId```, that has a topic group id and a partition id fields.
* ```TopologyBuilder``` assigns a topic group id to a topic group. Related methods are changed accordingly.
* A state store uses the partition id part of the task id as the change log partition id.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#365 from ymatsuda/task_id
guozhangwang
* added ```PartitionGrouper``` (abstract class)
* This class is responsible for grouping partitions. Each group forms a task.
* Users may implement this class for custom grouping.
* added ```DefaultPartitionGrouper```
* our default implementation of ```PartitionGrouper```
* added ```KafkaStreamingPartitionAssignor```
* We always use this as ```PartitionAssignor``` of stream consumers.
* Actual grouping is delegated to ```PartitionGrouper```.
* ```TopologyBuilder```
* added ```topicGroups()```
* This returns groups of related topics according to the topology
* added ```copartitionSources(sourceNodes...)```
* This is used by DSL layer. It asserts the specified source nodes must be copartitioned.
* added ```copartitionGroups()```
* This returns groups of copartitioned topics
* KStream layer
* keep track of source nodes to determine copartition sources when steams are joined
* source nodes are set to null when partitioning property is not preserved (ex. ```map()```, ```transform()```), and this indicates the stream is no longer joinable
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#353 from ymatsuda/grouping
Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.
This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired.
Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
* The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing.
* `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Edward Ribeiro, Guozhang Wang
Closes#256 from rhauch/kafka-2594
guozhangwang
StreamTaskTest did not set up a temp directory for each test. This occasionally caused interference between tests through state directory locking.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#317 from ymatsuda/fix_StreamTaskTest
guozhangwang
This change aims to remove unnecessary ```consumer.poll(0)``` calls.
* ```once``` after some partition is resumed
* whenever the size of the top queue in any task is below ```BUFFERED_RECORDS_PER_PARTITION_CONFIG```
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#315 from ymatsuda/less_poll_zero
Add support for the key value stores to use specified serializers and deserializers (aka, "serdes"). Prior to this change, the stores were limited to only the default serdes specified in the topology's configuration and exposed to the processors via the ProcessorContext.
Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both are parameterized on the key and value types, and both have similar multiple static factory methods. The static factory methods either take explicit key and value serdes, take key and value class types so the serdes can be inferred (only for the built-in serdes for string, integer, long, and byte array types), or use the default serdes on the ProcessorContext.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Guozhang Wang
Closes#255 from rhauch/kafka-2593
guozhangwang
Fix the order of flushing. Undoing the change I did sometime ago.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#304 from ymatsuda/flush_order
guozhangwang
* added back type safe stateful transform methods (kstream.transform() and kstream.transformValues())
* changed kstream.process() to void
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Guozhang Wang
Closes#292 from ymatsuda/transform_method
A few of Kafka Stream's interfaces and classes are not as well-aligned with Java 8's functional interfaces. By making these changes, when Kafka moves to Java 8 these classes can extend standard Java 8 functional interfaces while remaining backward compatible. This will make it easier for developers to use Kafka Streams, and may allow us to eventually remove these custom interfaces and just use the standard Java 8 interfaces.
The changes include:
1. The 'apply' method of KStream's `Predicate` functional interface was renamed to `test` to match the method name on `java.util.function.BiPredicate`. This will allow KStream's `Predicate` to extend `BiPredicate` when Kafka moves to Java 8, and for the `KStream.filter` and `filterOut` methods to accept `BiPredicate`.
2. Renamed the `ProcessorDef` and `WindowDef` interfaces to `ProcessorSupplier` and `WindowSupplier`, respectively. Also the `SlidingWindowDef` class was renamed to `SlidingWindowSupplier`, and the `MockProcessorDef` test class was renamed to `MockProcessorSupplier`. The `instance()` method in all were renamed to `get()`, so that all of these can extend/implement Java 8's `java.util.function.Supplier<T>` interface in the future with no other changes and while remaining backward compatible. Variable names that used some form of "def" were changed to use "supplier".
These two sets of changes were made in separate commits.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ismael Juma, Guozhang Wang
Closes#270 from rhauch/kafka-2600
guozhangwang
This code change properly types ProcessorDef. This also makes KStream.process() typesafe.
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Ismael Juma, Guozhang Wang
Closes#289 from ymatsuda/typing_ProcessorDef
ymatsuda junrao Could you take a quick look? The current unit test is failing on this.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Ismael Juma, Jun Rao
Closes#276 from guozhangwang/HF-ProcessorStateManager
Remove state storage upon unclean shutdown and fix streaming metrics used for local state.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Edward Ribeiro, Yasuhiro Matsuda, Jun Rao
Closes#265 from guozhangwang/K2591
This work has been contributed by Jesse Anderson, Randall Hauch, Yasuhiro Matsuda and Guozhang Wang. The detailed design can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client.
Author: Guozhang Wang <wangguoz@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro.matsuda@gmail.com>
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Author: ymatsuda <yasuhiro.matsuda@gmail.com>
Author: Randall Hauch <rhauch@gmail.com>
Author: Jesse Anderson <jesse@smokinghand.com>
Author: Ismael Juma <ismael@juma.me.uk>
Author: Jesse Anderson <eljefe6a@gmail.com>
Reviewers: Ismael Juma, Randall Hauch, Edward Ribeiro, Gwen Shapira, Jun Rao, Jay Kreps, Yasuhiro Matsuda, Guozhang Wang
Closes#130 from guozhangwang/streaming