* Implement MockAdminClient.deleteTopics
* Use MockAdminClient instead of MockKafkaAdminClientEnv in StreamsResetterTest
* Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv
* Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest
* Rename KafkaAdminClient to AdminClientUnitTestEnv in KafkaAdminClientTest.java
* Migrate StreamThreadTest to MockAdminClient
* Fix style errors
* Address review comments
* Fix MockAdminClient call
Reviewers: Matthias J. Sax <matthias@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
- Fix zk session state and session change rate metric names: type
should be SessionExpireListener instead of KafkaHealthCheck. Test
verifying the fix was included.
- Handle missing controller in controlled shutdown in the same way as if
the broker is not registered (i.e. retry after backoff).
- Restructure BrokerInfo to reduce duplication. It now contains a
Broker instance and the JSON serde is done in BrokerIdZNode
since `Broker` does not contain all the fields.
- Remove dead code from `ZooKeeperClient.initialize` and remove
redundant `close` calls.
- Move ACL handling and persistent paths definition from ZkUtils to
ZkData (and call ZkData from ZkUtils).
- Remove ZooKeeperClientWrapper and ZooKeeperClientMetrics from
ZkUtils (avoids metrics clash if third party users create a ZkUtils
instance in the same process as the broker).
- Introduce factory method in KafkaZkClient that creates
ZooKeeperClient and remove metric name defaults from
ZooKeeperClient.
- Fix a few instances where ZooKeeperClient was not closed in tests.
- Update a few TestUtils methods to use KafkaZkClient instead of
ZkUtils.
- Add test verifying SessionState metric.
- Various clean-ups.
Testing: mostly relying on existing tests, but added a couple
of new tests as mentioned above.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#4359 from ijuma/kafka-6320-kafka-health-zk-metrics-follow-up
Increase commit interval to make it less likely that we flush the cache in-between.
To make it fool-proof, only compare the "final" result records if cache is enabled.
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4364 from mjsax/kafka-6256-flaky-kstream-ktable-join-with-caching-test
* Removed code duplicate from GlobalProcessorContextImpl and ProcessorContextImpl to parent class AbstractProcessorContext
* Exchanged concrete implementations with interfaces to make code more maintainable
* Refactored major code duplicates in InternalTopologyBuilder
* Formatted function parameters as per code review
Added final to code introduced in this PR
* Added missing finals to putNodeGroupName function
Rearranged parameters for resetTopicsPattern function
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
* ensure topics are created with correct partitions BEFORE building the metadata for our stream tasks
* Added a test case. The test should fail with the old logic, because:
While stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition is created correctly with four partitions, the StreamPartitionAssignor will only assign three tasks to the topic. Test passes with the new logic.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, Ted Yu <yuzhihong@gmail.com>
* Return offset of next record of records left after restore completed
* Changed check for restoring partition to remove the "+1" in the guard condition
Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
* KAFKA-6383: complete shutdown for CREATED StreamThreads
When transitioning StreamThread from CREATED to PENDING_SHUTDOWN
free up resources from the caller, rather than the stream thread,
since in this case the stream thread was never actually started.
Have StreamThread.setState return the old state. If the old state is
CREATED in StreamThread.shutdown() then start the thread so that it
can free the resources owned by the StreamThread.
Add a KafkaStreams test to verify that the producer gets closed even
if KafkaStreams was not started
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
1. added functions for KafkaStreams and KafkaClientSupplier.
2. added prefix for admin client in StreamsConfig.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Closes#4338 from guozhangwang/K6150-doc-changes
* Moved metrics in KafkaHealthCheck to ZookeeperClient.
* Converted remaining ZkUtils usage in KafkaServer to ZookeeperClient and removed ZkUtils from KafkaServer.
* Made the re-creation of ZooKeeper during ZK session expiration with infinite retries.
* Added unit tests for all new methods in KafkaZkClient.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4322 from mjsax/kafka-6126-remove-topic-check-on-rebalance-2
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4342 from mjsax/kafka-4263-concurrentAccess
* Use KafkaZkClient in ReassignPartitionsCommand
* Use KafkaZkClient in PreferredReplicaLeaderElectionCommand
* Updated test classes to use new methods
* All existing tests should pass
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#4260 from omkreddy/KAFKA-5647-ADMINCOMMANDS
1. Create default internal topic configs in StreamsConfig, especially for repartition topics change the segment size and time to smaller value.
2. Consolidate the default internal topic settings to InternalTopicManager and simplify InternalTopicConfig correspondingly.
3. Add an integration test for purging data.
4. MINOR: change TopologyBuilderException to IllegalStateException in StreamPartitionAssignor (part of https://issues.apache.org/jira/browse/KAFKA-5660).
Here are a few public facing APIs that get added:
1. AbstractConfig#originalsWithPrefix(String prefix, boolean strip): this for simplify the logic of passing admin and topic prefixed configs to consumer properties.
2. KafkaStreams constructor with Time object for convienent mocking in tests.
Will update KIP-204 accordingly if people re-votes these changes.
Author: Guozhang Wang <wangguoz@gmail.com>
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>
Closes#4315 from guozhangwang/K6150-segment-size
This is a workaround until KIP-91 is merged. We tried increasing the timeout multiple times already but tests are still flaky.
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4329 from mjsax/hotfix-system-tests
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4335 from mjsax/minor-improve-KafkaStreams-javadocs
This PR creates and implements the `ProductionExceptionHandler` as described in [KIP-210](https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce).
I've additionally provided a default implementation preserving the existing behavior. I fixed various compile errors in the tests that resulted from my changing of method signatures, and added tests to cover the new behavior.
Author: Matt Farmer <mfarmer@rsglab.com>
Author: Matt Farmer <matt@frmr.me>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>
Closes#4165 from farmdawgnation/msf/kafka-6086
Now that we support re-initializing state stores, we need to clear the segments when the store is closed so that they can be re-opened.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ted Yu <yuzhihong@gmail.com>
Closes#4324 from dguy/kafka-6360
Fixes a `ConcurrentModificationException` in`AbstractStateManager` that is triggered when a `StateStore` is re-initialized and there are multiple stores in the context.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, GuozhangWang <wangguoz@gmail.com>
Closes#4317 from dguy/kafka-6349
Fix warn log message in RecordCollectorImpl so it prints the exception message rather than `{}`
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, GuozhangWang <wangguoz@gmail.com>
Closes#4318 from dguy/minor-logging-record-collector
- set auto.offset.reste to "none" for restore and global consumer
- handle InvalidOffsetException for restore and global consumer
- add corresponding tests
- some minor cleanup
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com, Bill Bejeck <bill@confluent.io>, GuozhangWang <wangguoz@gmail.com>
Closes#4215 from mjsax/kafka-6121-restore-global-consumer-handle-reset
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4242 from mjsax/kafka-4857-admit-client
1. Add the repartition topics information into ProcessorTopology: personally I do not like leaking this information into the topology but it seems not other simple way around.
2. StreamTask: added one more function to expose the consumed offsets from repartition topics only.
3. TaskManager: use the AdminClient to send the gathered offsets to delete only if a) previous call has completed and client intentionally ignore-and-log any errors, or b) no requests have ever called before.
NOTE that this code depends on the assumption that purge is only called right after the commit has succeeded, hence we presume all consumed offsets are committed.
4. MINOR: Added a few more constructor for ProcessorTopology for cleaner unit tests.
5. MINOR: Extracted MockStateStore out of the deprecated class.
6. MINOR: Made a pass over some unit test classes for clean ups.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>
Closes#4270 from guozhangwang/K6150-purge-repartition-topics
Increase `REQUEST_TIMOUT_MS` to improve a flaky system test until KIP-91 merged
Author: Bill Bejeck <bill@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#4291 from bbejeck/MINOR_increase_request_timeout_for_streams_bounce_test
Updated the System test `stream_broker_compatibility_test.py` to address system test failures as we have removed explicit broker version checking
- Ignore the `0.8.2.2` and `0.9.0.0` tests because the `NetworkClient` only logs `UnsupportedVersionException`s that occur and will continue to retry connecting. Once issue https://issues.apache.org/jira/browse/KAFKA-6297 is addressed, we may re-enable these tests.
- Updated existing tests expected error messages
- Updated Streams code in test for to make sure we fail fast for incompatible brokers
Author: Bill Bejeck <bill@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4286 from bbejeck/MINOR_fix_broker_compatibility_tests
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4284 from mjsax/minor-improve-eos-docs
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4255 from mjsax/kafka-6259-clean-global-state-dir
add log4j entry
A rebased version of the code.
Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4258 from ConcurrencyPractitioner/trunk
Add EmptyWindowStoreIterator to NoOpWindowStore
Use `topic-` as the prefix of the auto-generated state store name.
Also add a unit test for this functionality.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Ted Yu <yuzhihong@gmail.com>
Closes#4268 from guozhangwang/K6274-table-source-store-name
This refactoring is discussed in https://github.com/apache/kafka/pull/3624#discussion_r132614639. More specifically:
1. Moved the access of `StreamThread` in `StreamPartitionAssignor` to `TaskManager`, removed any fields stored in `StreamThread` such as `processId` and `clientId` that are only to be used in `StreamPartitionAssignor`, and pass them to `TaskManager` if necessary.
2. Moved any in-memory states, `metadataWithInternalTopics`, `partitionsByHostState`, `standbyTasks`, `activeTasks` to `TaskManager` so that `StreamPartitionAssignor` becomes a stateless thin layer that access TaskManager directly.
3. Remove the reference of `StreamPartitionAssignor` in `StreamThread`, instead consolidate all related functionalities such as `cachedTasksIds ` in `TaskManager` which could be retrieved by the `StreamThread` and the `StreamPartitionAssignor` directly.
4. Finally, removed the two interfaces used for `StreamThread` and `StreamPartitionAssignor`.
5. Some minor fixes on logPrefixes, etc.
Future work: when replacing the StreamsKafkaClient, we would let `StreamPartitionAssignor` to retrieve it from `TaskManager` directly, and also its closing call do not need to be called (`KafkaStreams` will be responsible for closing it).
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Closes#4224 from guozhangwang/K6170-refactor-assignor
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4253 from mjsax/minor-imporve-error-message
Handle TimeoutException in Producer callback and retry sending input data
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>
Closes#4244 from mjsax/improve-flaky-system-test
Remove the flag in `ProcessorStateManager` that checks if a store is persistent when registering it as a standby task.
Updated the smoke test to use an in-memory store.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Closes#4239 from dguy/kafka-6214
1. Add The AdminClient into Kafka Streams, which is shared among all the threads.
2. Add ADMIN_PREFIX to StreamsConfig.
3. Also made a few tweaks on the metrics of the AdminClient, which is slightly different from the StreamsKafkaClient (note these changes will not be reflected in this PR but only take place when we eventually replace StreamsKafkaClient):
3.1. "clientId" tag will be set as "clientId-admin": in StreamsKafkaClient it is whatever user sets, and hence could even be null.
3.2. "groupPrefix" will be set as "admin-client": in StreamsKafkaClient it will be "kafka-client".
So the metrics from `StreamsKafkaClient` to `AdminClient` would be changed from
`kafka.admin.client:type=kafka-client-metrics,client-id=`
to
`kafka.admin.client:type=admin-client-metrics,client-id=myApp-UUID-admin`
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Ted Yu <yuzhihong@gmail.com>
Closes#4211 from guozhangwang/K6170-admin-client
Implements KIP-224:
- adding new StreamsConfig `retires`
- uses `retires` and `retry.backoff.ms` to handle TimeoutException in GlobalStateManager
- adds two new tests to trigger TimeoutException in global consumer
- some minor code cleanup to reduce number of parameters we need to pass around
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4206 from mjsax/kafka-6122-global-consumer-timeout-exception
Clarify that state directory must use `storeName`
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4228 from mjsax/minor-state-store-javadoc
- change segment delimiter to .
- added upgrade path
- added test for old and new upgrade path
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4210 from mjsax/kafka-6167-windows-issue
- set streams state.dir to test-dir (default /tmp is not reliable)
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ted Yu <yuzhihong@gmail.com>
Closes#4221 from mjsax/minor-fix-instable-tests
Calculate offset using consumer.position() in GlobalStateManagerImp#restoreState
Author: Alex Good <alexjsgood@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4197 from alexjg/0.11.0
(cherry picked from commit 1321d89484)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
This test has been completely subsumed by the coverage of reset integration test, and hence can be removed.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Closes#4184 from guozhangwang/KMinor-remove-fanout-integration
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>
Closes#4186 from guozhangwang/K6179-cleanup-timestamp-tracker-on-clear
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4148 from mjsax/kafka-6120-recordCollector