During `onPartitionsAssigned` first close, and remove, any suspended `StandbyTasks` that are no longer assigned to this consumer.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2266 from dguy/kafka-4540
The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2249 from dguy/kafka-4473
partitionsByHostState and metadataWithInternalTopics need to be updated on each call to onAssignment() otherwise they contain invalid/stale metadata.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes#2256 from dguy/4534
Tasks that don't have any `StateStore`s wont have a `StandbyTask`, so `createStandbyTask` can return `null`. We need to check for this in `StandbyTaskCreator.createTask(...)`
Also, the checkpointed offsets for `StandbyTask`s are never loaded.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2255 from dguy/kafka-4539
If a KafkaStreams app is using Standby Tasks then `StreamPartitionAssignor` will add the standby partitions to the partitionsByHostState map for each host. This is incorrect as the partitionHostState map is used to resolve which host is hosting a particular store for a key.
The result is that doing metadata lookups for interactive queries can return an incorrect host
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2254 from dguy/KAFKA-4537
This fixes a problem where the Kafka instance state transition gets stuck on rebalance (Thanks to dguy for pointing out). Also adjusts the test in QueryableStateIntegration test.
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes#2252 from enothereska/hotfix_state_never_running
When building a topology with tables and StateStores, the StateStores are mapped to the source topic names. This map is retrieved via TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive Queries to find the source topics and partitions when resolving the partitions that particular keys will be in.
There is an issue where by this mapping for a table that is originally created with builder.table("topic", "table");, and then is subsequently used in a join, is changed to the internal repartition topic. This is because the mapping is updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for the state store name it should not update the Map.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes#2250 from dguy/kafka-4532
Clear and remove the NamedCache from the ThreadCache when a CachingKeyValueStore or CachingWindowStore is closed.
Validate that the store is open when doing any queries or writes to Caching State Stores.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2235 from dguy/kafka-4516
- break loop in StreamPartitionAssigner.assign() in case partition metadata is missing
- fit state transition issue (follow up to KAFKA-3637: Add method that checks if streams are initialised)
- some test improvements
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Eno Thereska, Ismael Juma, Guozhang Wang
Closes#2209 from mjsax/kafka-4476-stuck-on-missing-metadata
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes#2225 from enothereska/KAFKA-4486-exception-commit
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang
Closes#2117 from mjsax/kafka-4393-improveInvalidTsHandling
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2121 from guozhangwang/K4392-race-dir-cleanup
The NamedCache wasn't correctly dealing with its re-entrant nature. This would result in the LRU becoming corrupted, and the above exception occurring during eviction. For example:
Cache A: dirty key 1
eviction runs on Cache A
Node for key 1 gets marked as clean
Entry for key 1 gets flushed downstream
Downstream there is a processor that also refers to the table fronted by Cache A
Downstream processor puts key 2 into Cache A
This triggers an eviction of key 1 again ( it is still the oldest node as hasn't been removed from the LRU)
As the Node for key 1 is clean flush doesn't run and it is immediately removed from the cache.
So now we have dirtyKey set with key =1, but the value doesn't exist in the cache.
Downstream processor tries to put key = 1 into the cache, it fails as key =1 is in the dirtyKeySet.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2226 from dguy/cache-bug
Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2212 from dguy/standby-task
Also:
* Make all implementations of `Time` thread-safe as they are accessed from multiple threads in some cases.
* Change default implementation of `MockTime` to use two separate variables for `nanoTime` and `currentTimeMillis` as they have different `origins`.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Shikhar Bhushan <shikhar@confluent.io>, Jason Gustafson <jason@confluent.io>, Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com>
Closes#2095 from ijuma/kafka-2247-consolidate-time-interfaces
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Ismael Juma, Dan Norwood, Xavier Léauté, Damian Guy, Michael G. Noll, Matthias J. Sax, Guozhang Wang
Closes#2135 from enothereska/KAFKA-3637-streams-state
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#2171 from enothereska/KAFKA-4427-topicgroups-with-no-tasks
- bug-fix follow up
- Resetter fails if no intermediate topic is used because seekToEnd() commit ALL partitions to EOL
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Michael G. Noll, Roger Hoover, Guozhang Wang
Closes#2138 from mjsax/kafka-4331-streams-resetter-bugfix
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes#2133 from enothereska/KAFKA-4355-topic-not-found
Added `timeout` and `timeUnit` to `KafkaStreams.close(..)`. Now do close on a thread and `join` that thread with the provided `timeout`.
Changed `state` in `KafkaStreams` to use an enum.
Added system test to ensure we don't deadlock on close when an uncaught exception handler that calls `System.exit(..)` is used and there is also a shutdown hook that calls `KafkaStreams.close(...)`
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes#2097 from dguy/kafka-4366
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#2124 from enothereska/KAFKA-4359-intergration-tests-commit1
The `StoreChangeLogger` currently keeps a cache of dirty and removed keys and will batch the changelog records such that we don't send a record for each update. However, with KIP-63 this is unnecessary as the batching and de-duping is done by the caching layer. Further, the `StoreChangeLogger` relies on `context.timestamp()` which is likely to be incorrect when caching is enabled
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes#2103 from dguy/store-change-logger
Fix incorrect logging when unable to create an active task. The output was: Failed to create an active task %s:
It should have the taskId.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Ismael Juma, Eno Thereska
Closes#2109 from dguy/minor-logging
Remove `keySerde`, `valSerde`, `OUTERTHIS_NAME`, `OUTEROTHER_NAME`, `LEFTTHIS_NAME`, `LEFTOTHER_NAME` from `KTableImpl` as they are all unused fields
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2119 from dguy/minor-ktable-unused
Given a topology like the one below. If a record arriving in `tableOne` causes a cache eviction, it will trigger the `leftJoin` that will do a `get` from `reducer-store`. If the key is not currently cached in `reducer-store`, but is in the backing store, it will be put into the cache, and it may also trigger an eviction. If it does trigger an eviction and the eldest entry is dirty it will flush the dirty keys. It is at this point that a ClassCastException is thrown. This occurs because the ProcessorContext is still set to the context of the `leftJoin` and the next child in the topology is `mapValues`.
We need to set the correct `ProcessorNode`, on the context, in the `ForwardingCacheFlushListener` prior to calling `context.forward`. We also need to remember to reset the `ProcessorNode` to the previous node once `context.forward` has completed.
```
final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
Override
public KeyValue<String, Long> apply(final Long key, final String value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
.reduce(new Reducer<Long>() {..}, new Reducer<Long>() {..}, "reducer-store");
one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {..})
.mapValues(new ValueMapper<String, String>() {..});
```
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2051 from dguy/kafka-4311
Remove commented out code and System.out.println from KTableKTableJoinIntegrationTest
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes#2092 from dguy/cleanup-comments
- increased timeout to stabilize test
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2082 from mjsax/kafka-4352-hotfix
Enable user provided consumer and producer configs to override the streams default configs.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2084 from dguy/kafka-4361
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang, Ismael Juma, Michael G. Noll, Eno Thereska
Closes#2076 from mjsax/hotfixTSExtractor
KTableSource is always materialized since IQ:
- removed flag KTableSource#materialized
- removed MaterializedKTableSourceProcessor
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2065 from mjsax/kafka-4302-simplify-ktablesource
1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.
2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.
3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.
4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.
5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.
6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.
7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.
8. Remove all unnecessary member variables in `StreamPartitionAssignor`.
9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Xavier Léauté, Matthias J. Sax, Eno Thereska, Jason Gustafson
Closes#2012 from guozhangwang/K4117-stream-partitionassignro-cleanup
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2068 from mjsax/hotfixImproveWindowRetentionTimeJavaDoc
There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.
The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#1985 from ijuma/remove-unused
- fixed consumer group dead condition
- disabled state store cache
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2056 from mjsax/KAFKA-4058-instableResetToolTest
This PR makes a couple of enhancements to the `--describe` option of `ConsumerGroupCommand`:
1. Listing members with no assigned partitions.
2. Showing the member id along with the owner of each partition (owner is supposed to be the logical application id and all members in the same group are supposed to set the same owner).
3. Printing a warning indicating whether ZooKeeper based or new consumer API based information is being reported.
It also adds unit tests to verify the added functionality.
Note: The third request on the corresponding JIRA (listing active offsets for empty groups of new consumers) is not implemented as part of this PR, and has been moved to its own JIRA (KAFKA-3853).
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#1336 from vahidhashemian/KAFKA-3144