Closed the last PR I made for this because I accidentally borked it with my other PR. Small error; I figure this is from copy-pasting the above doc
Author: Nikki Thean <nthean@etsy.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2336 from nixsticks/trunk
When creating the source node in TopologyBuilder, we need to decorate its input topics if they are inner (i.e. repartition) topics with the prefix.
Also did some minor cleanup in the printing function for better visualization in debugging.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Eno Thereska, Damian Guy, Eno Thereska, Jun Rao
Closes#2320 from guozhangwang/KMinor-source-topic-fix
Remove use of TestTimestampExtractor as it causes the logs to roll and segments get deleted.
Remove the wcnt example as it is dependent on the TestTimestampExtractor - windowed counting is covered elsewhere.
Change all aggregate operations to use TimeWindow as use of UnlimitedWindow was causing logs to roll and segments being deleted.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang, Eno Thereska
Closes#2319 from dguy/smoke-test
Add support for SessionWindows based on design detailed in https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows.
This includes refactoring of the RocksDBWindowStore such that functionality common with the RocksDBSessionStore isn't duplicated.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska <eno.thereska@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#2166 from dguy/kafka-3452-session-merge
Jason recently cleaned things up significantly by consolidating the Message/Record classes
into the common Java code in the clients module. While reviewing that, I noticed a few things
that could be improved a little more. To make reviewing easier, there will be multiple PRs.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Jason Gustafson <jason@confluent.io>
Closes#2271 from ijuma/records-minor-fixes
In `shutdownTasksAndState` and `suspendTasksAndState` we commit offsets BEFORE we flush any state. This is wrong as if an exception occurs during a flush, we may violate the at-least-once guarantees, that is we would have committed some offsets but NOT sent the processed data on to other Sinks.
Also during suspend and shutdown, we should try and complete all tasks even when exceptions occur. We should just keep track of the exception and rethrow it at the end if necessary. This helps with ensuring that StateStores etc are closed.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2281 from dguy/kafka-4561
This was changed in b58b6a1bef and caused the `ReplicaVerificationToolTest.test_replica_lags`
system test to start failing.
I also added a unit test and a couple of other minor clean-ups.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2280 from ijuma/kafka-4554-fix-replica-buffer-verify-checksum
During `onPartitionsAssigned` first close, and remove, any suspended `StandbyTasks` that are no longer assigned to this consumer.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2266 from dguy/kafka-4540
The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2249 from dguy/kafka-4473
partitionsByHostState and metadataWithInternalTopics need to be updated on each call to onAssignment() otherwise they contain invalid/stale metadata.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes#2256 from dguy/4534
Tasks that don't have any `StateStore`s wont have a `StandbyTask`, so `createStandbyTask` can return `null`. We need to check for this in `StandbyTaskCreator.createTask(...)`
Also, the checkpointed offsets for `StandbyTask`s are never loaded.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2255 from dguy/kafka-4539
If a KafkaStreams app is using Standby Tasks then `StreamPartitionAssignor` will add the standby partitions to the partitionsByHostState map for each host. This is incorrect as the partitionHostState map is used to resolve which host is hosting a particular store for a key.
The result is that doing metadata lookups for interactive queries can return an incorrect host
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2254 from dguy/KAFKA-4537
This fixes a problem where the Kafka instance state transition gets stuck on rebalance (Thanks to dguy for pointing out). Also adjusts the test in QueryableStateIntegration test.
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes#2252 from enothereska/hotfix_state_never_running
When building a topology with tables and StateStores, the StateStores are mapped to the source topic names. This map is retrieved via TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive Queries to find the source topics and partitions when resolving the partitions that particular keys will be in.
There is an issue where by this mapping for a table that is originally created with builder.table("topic", "table");, and then is subsequently used in a join, is changed to the internal repartition topic. This is because the mapping is updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for the state store name it should not update the Map.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes#2250 from dguy/kafka-4532
Clear and remove the NamedCache from the ThreadCache when a CachingKeyValueStore or CachingWindowStore is closed.
Validate that the store is open when doing any queries or writes to Caching State Stores.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2235 from dguy/kafka-4516
- break loop in StreamPartitionAssigner.assign() in case partition metadata is missing
- fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised)
- some test improvements
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang
Closes#2209 from mjsax/kafka-4476-stuck-on-missing-metadata
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes#2225 from enothereska/KAFKA-4486-exception-commit
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang
Closes#2117 from mjsax/kafka-4393-improveInvalidTsHandling
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2121 from guozhangwang/K4392-race-dir-cleanup
The NamedCache wasn't correctly dealing with its re-entrant nature. This would result in the LRU becoming corrupted, and the above exception occurring during eviction. For example:
Cache A: dirty key 1
eviction runs on Cache A
Node for key 1 gets marked as clean
Entry for key 1 gets flushed downstream
Downstream there is a processor that also refers to the table fronted by Cache A
Downstream processor puts key 2 into Cache A
This triggers an eviction of key 1 again ( it is still the oldest node as hasn't been removed from the LRU)
As the Node for key 1 is clean flush doesn't run and it is immediately removed from the cache.
So now we have dirtyKey set with key =1, but the value doesn't exist in the cache.
Downstream processor tries to put key = 1 into the cache, it fails as key =1 is in the dirtyKeySet.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2226 from dguy/cache-bug
Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2212 from dguy/standby-task
Also:
* Make all implementations of `Time` thread-safe as they are accessed from multiple threads in some cases.
* Change default implementation of `MockTime` to use two separate variables for `nanoTime` and `currentTimeMillis` as they have different `origins`.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Shikhar Bhushan <shikhar@confluent.io>, Jason Gustafson <jason@confluent.io>, Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com>
Closes#2095 from ijuma/kafka-2247-consolidate-time-interfaces
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Ismael Juma, Dan Norwood, Xavier Léauté, Damian Guy, Michael G. Noll, Matthias J. Sax, Guozhang Wang
Closes#2135 from enothereska/KAFKA-3637-streams-state
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#2171 from enothereska/KAFKA-4427-topicgroups-with-no-tasks
- bug-fix follow up
- Resetter fails if no intermediate topic is used because seekToEnd() commit ALL partitions to EOL
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Michael G. Noll, Roger Hoover, Guozhang Wang
Closes#2138 from mjsax/kafka-4331-streams-resetter-bugfix
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes#2133 from enothereska/KAFKA-4355-topic-not-found
Added `timeout` and `timeUnit` to `KafkaStreams.close(..)`. Now do close on a thread and `join` that thread with the provided `timeout`.
Changed `state` in `KafkaStreams` to use an enum.
Added system test to ensure we don't deadlock on close when an uncaught exception handler that calls `System.exit(..)` is used and there is also a shutdown hook that calls `KafkaStreams.close(...)`
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes#2097 from dguy/kafka-4366
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#2124 from enothereska/KAFKA-4359-intergration-tests-commit1
The `StoreChangeLogger` currently keeps a cache of dirty and removed keys and will batch the changelog records such that we don't send a record for each update. However, with KIP-63 this is unnecessary as the batching and de-duping is done by the caching layer. Further, the `StoreChangeLogger` relies on `context.timestamp()` which is likely to be incorrect when caching is enabled
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes#2103 from dguy/store-change-logger
Fix incorrect logging when unable to create an active task. The output was: Failed to create an active task %s:
It should have the taskId.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Ismael Juma, Eno Thereska
Closes#2109 from dguy/minor-logging
Remove `keySerde`, `valSerde`, `OUTERTHIS_NAME`, `OUTEROTHER_NAME`, `LEFTTHIS_NAME`, `LEFTOTHER_NAME` from `KTableImpl` as they are all unused fields
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2119 from dguy/minor-ktable-unused
Given a topology like the one below. If a record arriving in `tableOne` causes a cache eviction, it will trigger the `leftJoin` that will do a `get` from `reducer-store`. If the key is not currently cached in `reducer-store`, but is in the backing store, it will be put into the cache, and it may also trigger an eviction. If it does trigger an eviction and the eldest entry is dirty it will flush the dirty keys. It is at this point that a ClassCastException is thrown. This occurs because the ProcessorContext is still set to the context of the `leftJoin` and the next child in the topology is `mapValues`.
We need to set the correct `ProcessorNode`, on the context, in the `ForwardingCacheFlushListener` prior to calling `context.forward`. We also need to remember to reset the `ProcessorNode` to the previous node once `context.forward` has completed.
```
final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
Override
public KeyValue<String, Long> apply(final Long key, final String value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
.reduce(new Reducer<Long>() {..}, new Reducer<Long>() {..}, "reducer-store");
one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {..})
.mapValues(new ValueMapper<String, String>() {..});
```
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2051 from dguy/kafka-4311