Fix incorrect logging when unable to create an active task. The output was: Failed to create an active task %s:
It should have the taskId.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Ismael Juma, Eno Thereska
Closes#2109 from dguy/minor-logging
Remove `keySerde`, `valSerde`, `OUTERTHIS_NAME`, `OUTEROTHER_NAME`, `LEFTTHIS_NAME`, `LEFTOTHER_NAME` from `KTableImpl` as they are all unused fields
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2119 from dguy/minor-ktable-unused
Given a topology like the one below. If a record arriving in `tableOne` causes a cache eviction, it will trigger the `leftJoin` that will do a `get` from `reducer-store`. If the key is not currently cached in `reducer-store`, but is in the backing store, it will be put into the cache, and it may also trigger an eviction. If it does trigger an eviction and the eldest entry is dirty it will flush the dirty keys. It is at this point that a ClassCastException is thrown. This occurs because the ProcessorContext is still set to the context of the `leftJoin` and the next child in the topology is `mapValues`.
We need to set the correct `ProcessorNode`, on the context, in the `ForwardingCacheFlushListener` prior to calling `context.forward`. We also need to remember to reset the `ProcessorNode` to the previous node once `context.forward` has completed.
```
final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
Override
public KeyValue<String, Long> apply(final Long key, final String value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
.reduce(new Reducer<Long>() {..}, new Reducer<Long>() {..}, "reducer-store");
one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {..})
.mapValues(new ValueMapper<String, String>() {..});
```
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2051 from dguy/kafka-4311
Remove commented out code and System.out.println from KTableKTableJoinIntegrationTest
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang
Closes#2092 from dguy/cleanup-comments
- increased timeout to stabilize test
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2082 from mjsax/kafka-4352-hotfix
Enable user provided consumer and producer configs to override the streams default configs.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2084 from dguy/kafka-4361
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang, Ismael Juma, Michael G. Noll, Eno Thereska
Closes#2076 from mjsax/hotfixTSExtractor
KTableSource is always materialized since IQ:
- removed flag KTableSource#materialized
- removed MaterializedKTableSourceProcessor
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Eno Thereska, Guozhang Wang
Closes#2065 from mjsax/kafka-4302-simplify-ktablesource
1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.
2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.
3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.
4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.
5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.
6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.
7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.
8. Remove all unnecessary member variables in `StreamPartitionAssignor`.
9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Xavier Léauté, Matthias J. Sax, Eno Thereska, Jason Gustafson
Closes#2012 from guozhangwang/K4117-stream-partitionassignro-cleanup
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2068 from mjsax/hotfixImproveWindowRetentionTimeJavaDoc
There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.
The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#1985 from ijuma/remove-unused
- fixed consumer group dead condition
- disabled state store cache
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#2056 from mjsax/KAFKA-4058-instableResetToolTest
This PR makes a couple of enhancements to the `--describe` option of `ConsumerGroupCommand`:
1. Listing members with no assigned partitions.
2. Showing the member id along with the owner of each partition (owner is supposed to be the logical application id and all members in the same group are supposed to set the same owner).
3. Printing a warning indicating whether ZooKeeper based or new consumer API based information is being reported.
It also adds unit tests to verify the added functionality.
Note: The third request on the corresponding JIRA (listing active offsets for empty groups of new consumers) is not implemented as part of this PR, and has been moved to its own JIRA (KAFKA-3853).
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#1336 from vahidhashemian/KAFKA-3144
…d out of topicGroups method. The topicGroups method only called from StreamPartitionAssignor when KafkaStreams object is the leader, needs to be executed for clients.
Author: bbejeck <bbejeck@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#2005 from bbejeck/KAFKA-4269_multiple_kstream_instances_mult_consumers_npe
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Dan Norwood, Matthias J. Sax, Jason Gustafson, Eno Thereska
Closes#2026 from guozhangwang/KMinor-improve-logging
If evict is called on a NamedCache and the cache is empty an NPE is thrown. This was reported on the user list from a developer running 0.10.1.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2024 from dguy/cache-bug
Records that are deleted/removed from the CachingKeyValueStore shouldn't appear in range and all queries.
Modified the iterator such that it skips over the deleted records.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2001 from dguy/kafka-4283
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#1940 from mjsax/hotfix-resetTool
restoreConsumer.assign(..) in removeStandbyTasks was logging an (ignorable) exception due to the restoreConsumer being closed. Moved the restoreConsumer.assign(..) to shutdownTasksAndState as this is done prior to the closing of consumers.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#1986 from dguy/hotfix-assign
Changed the ordering in `StreamThread.shutdown`
1. commitAll (we need to commit so that any cached data is flushed through the topology)
2. close all tasks
3. producer.flush() - so any records produced during close are flushed and we have offsets for them
4. close all state managers
5. close producers/consumers
6. remove the tasks
Also in `onPartitionsRevoked`
1. commitAll
2. close all tasks
3. producer.flush
4. close all state managers
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#1970 from dguy/kafka-4253
pass through user-defined consumer and producer properties from StreamsConfig to ConsumerConfig and ProducerConfig.
For example, consumer interceptor support, i.e, consumer.interceptor.classes=SomeClass
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#1943 from dguy/interceptor
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1938 from mjsax/kafka-4058-resetIntegartionTest
Change the creation of the directories, in the StateDirectory constructor, to use mkdirs so any parents get created. Throw an exception if the directory doesn't exist and couldn't be created
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Michael G. Noll, Eno Thereska, Guozhang Wang
Closes#1942 from dguy/kafka-4233
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Eno Thereska, Damian Guy
Closes#1929 from guozhangwang/KMinor-check-zero-num-partitions and squashes the following commits:
c8edc2d [Guozhang Wang] fix unit test
3127f9c [Guozhang Wang] do not call partitioner if num partitions is non-positive
Turning off auto topic creation in the EmbeddedKafkaCluster used by Streams as it can cause race conditions that lead to build hangs.
Fixed the couple of tests that needed to have some topics manually created
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1941 from dguy/disable-auto-topic-create
Keep track of open Rocks DB iterators. When a store is closed, close all open iterators.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#1917 from dguy/kafka-4223
KafkaExceptions currently thrown from within StreamThread/StreamTask currently bubble up without any additional context. This makes it hard to figure out where something went wrong, i.e, which topic had the serialization exception etc
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#1819 from dguy/kafka-3708 and squashes the following commits:
d6feaa8 [Damian Guy] address comments
15b89e7 [Damian Guy] merge trunk
6b8a8af [Damian Guy] catch exceptions in various places and throw more informative versions
c86eeda [Damian Guy] fix conflicts
8f37e2c [Damian Guy] add some context to exceptions
Adding the test so we know that the State Stores with logging disabled or without a topic don't throw any exceptions.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#1916 from dguy/state-store-logging-disabled
If some StreamsMetadataState methods are called before the onChange method is called a NullPointerException was being thrown. Added null check for cluster in isInitialized method
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#1920 from dguy/fix-npe-streamsmetadata
The original commit interval of 30 seconds might be too large in some cases, e.g., when the verifier finishes before those 30 seconds have elapsed.
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy, Guozhang Wang
Closes#1899 from enothereska/hotfix-smoke-test-commit-interval
Remove isValidCleanupPolicy and related fields as they are never used.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#1888 from dguy/minor-remove-unused
missing javadoc on public API method PersistenKeyValueFactory.enableCaching
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#1891 from dguy/minor-java-doc
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1887 from enothereska/hotfix-metadata-unavailable
The contribution is my original work and I license the work to the project under the project's open source license.
guozhangwang
Author: Elias Levy <fearsome.lucidity@gmail.com>
Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang
Closes#1846 from eliaslevy/KAFKA-4153
During rebalance operations the Cluster object gets set to Cluster.empty(). This can result in NPEs when doing certain operation on StreamsMetadataState. This should throw a StreamsException if the Cluster is empty as it is not yet (re-)initialized
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#1845 from dguy/streams-meta-hotfix
standby tasks should be assigned per consumer not per process
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes#1862 from dguy/kafka-4175