Implement destroying tasks and workers. This means erasing all record of them on the Coordinator and the Agent.
Workers should be identified by unique 64-bit worker IDs, rather than by the names of the tasks they are implementing. This ensures that when a task is destroyed and re-created with the same task ID, the old workers will be not be treated as part of the new task instance.
Fix some return results from RPCs. In some cases RPCs were returning values that were never used. Attempting to re-create the same task ID with different arguments should fail. Add RequestConflictException to represent HTTP error code 409 (CONFLICT) for this scenario.
If only one worker in a task stops, don't stop all the other workers for that task, unless the worker that stopped had an error.
Reviewers: Anna Povzner <anna@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
SimpleBenchmark:
1.a Do not rely on manual num.records / bytes collection on atomic integers.
1.b Rely on config files for num.threads, bootstrap.servers, etc.
1.c Add parameters for key skewness and value size.
1.d Refactor the tests for loading phase, adding tumbling-windowed count.
1.e For consumer / consumeproduce, collect metrics on consumer instead.
1.f Force stop the test after 3 minutes, this is based on empirical numbers of 10M records.
Other tests: use config for kafka bootstrap servers.
streams_simple_benchmark.py: only use scale 1 for system test, remove yahoo from benchmark tests.
Note that the JMX based metrics is more accurate than the manually collected metrics.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Refactored the return types in consumer group APIs the following way:
```
Map<TopicPartition, KafkaFuture<Void>> DeleteConsumerGroupsResult#deletedGroups()
Map<TopicPartition, KafkaFuture<ConsumerGroupDescription>> DescribeConsumerGroupsResult#describedGroups()
KafkaFuture<Collection<ConsumerGroupListing>> ListConsumerGroupsResult#listings()
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> ListConsumerGroupOffsetsResult#partitionsToOffsetAndMetadata()
```
* For DeleteConsumerGroupsResult and DescribeConsumerGroupsResult, for each group id we have two round-trips to get the coordinator, and then send the delete / describe request; I leave the potential optimization of batching requests for future work.
* For ListConsumerGroupOffsetsResult, it is a simple single round-trip and hence the whole map is wrapped as a Future.
* ListConsumerGroupsResult, it is the most tricky one: we would only know how many futures we should wait for after the first listNode returns, and hence I constructed the flattened future in the middle wrapped with the underlying map of futures; also added an iterator API to compensate the "fail the whole future if any broker returns error" behavior. The iterator future will throw exception on the failing brokers, while return the consumer for other succeeded brokers.
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jason Gustafson <jason@confluent.io>
Removed timeout from get call that caused the test to fail occasionally, this will instead fall back to the wrapping waitUntilTrue timeout. Also added unnesting of exceptions from ExecutionException that was originally missing and put the retrieved value for lowWatermark in the fail message for better readability in case of test failure.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This change makes adding a metric to a sensor idempotent.
That is, if the metric is already added to the sensor, the method
returns with success.
The current behavior is that any attempt to register a second metric
with the same name is an error.
Testing strategy: There is a new unit test covering this behavior
Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
We had a regression in #4788 which caused the offset commit/fetch/describe APIs to fail if the groupId was empty. This should be allowed for backwards compatibility. Additionally, I have modified DeleteGroups to allow removal of the empty group, which was missed in the initial implementation. I've added a test case to ensure that we do not miss this again in the future.
Reviewers: Ismael Juma <ismael@juma.me.uk>
AsyncProducerTest gets an error about an incorrect mock when the logging
level is turned up. Instead of usIng a mock, just create a real
SyncProducerConfig object, since the object is simple to create.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Move creation of quota callback instance out of KafkaConfig constructor to QuotaFactory.instantiate to avoid creating a callback instance for every KafkaConfig since we create temporary KafkaConfigs during dynamic config updates.
Reviewers: Jun Rao <junrao@gmail.com>
The toString() for ConfigResource was using { } instead of ( ) which is inconsistent with the existing toStrings in the code, while toString for Resource was using a mix of ( and }.
Added consumer only workload to Trogdor. The topics must already be pre-populated. The spec lets the user request topic pattern and range of partitions to assign to [startPartition, endPartition].
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
This patch fixes an edge case in producer shutdown which prevents `close()` from completing due to a pending request which will never be sent due to shutdown initiation. I have added a test case which reproduces the scenario.
Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
* Upgrade EasyMock to 3.6 which adds support for Java 10
by upgrading to ASM 6.1.1.
* Ensure that Jacoco is truly disabled for the `core` project.
This was the original intent, since it's in Scala, but it had not
been achieved. This is important because the Jacoco agent
fails when it tries to instrument the classes compiled by
scalac with Java 10.
Currently, WorkerUtils will be able to create topics when there is no security. To be able to work with secure kafka, WorkerUtils.createTopic() needs to be able to take security configs. This PR adds commonClientConf field to both producer bench and roundtrip workload specs so that users can specify security and other common configs once for producer/consumer and adminClient. Also added adminClientConf field to workload specs so that users can specify adminClient specific configs if they want to. For completeness, added consumerConf and producerConf to roundtrip workload spec.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. Mccabe <cmccabe@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
As Frederic reported on mailing list under the subject "kafka-streams Invalid transition attempted from state READY to state ABORTING_TRANSACTION", producer#abortTransaction should only be called when transactionInFlight is true.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
After the punctuate() call, we would like to double check on the scheduled flag since the call itself may cancel it.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
Add a validation check to make sure max.connections.per.ip.overrides is configured when max.connections.per.ip is set zero. Also clean up the config description.
Implementation of KIP-86. Client, server and login callback handlers have been made configurable for both brokers and clients.
Reviewers: Jun Rao <junrao@gmail.com>, Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
The invalid topic name is already handled locally so it is unnecessary to send the DeleteTopicsRequest. This PR adds a count to MockClient for testing.
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jason Gustafson <jason@confluent.io>
Ignore headers when down-converting to V0/V1 since they are not supported. Added a test-case to verify down-conversion sanity in presence of headers.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
## Summary of the problem
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.
This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.
A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.
## The Changes
The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used.
The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set).
Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.
Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
## Testing
Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration:
1. default value
1. worker configuration has `header.converter` explicitly set to the default
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. connector configuration has `header.converter` explicitly set to the default
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests.
Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4815 from rhauch/kafka-6728
https://github.com/apache/kafka/pull/4760 unintentionally included extra raw class files in the release tarballs by making the .class file output (instead of the jar) for a streams a dependency of the streams-test-utils. This fixes that issue by instead breaking the circular dependency by using a `compileOnly`/`provided` dependency on those sources and also including the dependency as a test dependency.
I verified by using `gradlew clean installAll releaseTarGzAll`, then checking that the release tarball doesn't have the extraneous files and the installed pom file has the expected dependencies. The dependency on kafka-streams is now in the `test` scope, but that should be fine since a streams application would only use this dependency if it already had a dependency on streams in `compile` (or in weird edge cases the user could handle specifying the right dependencies). This actually seems to even be an improvement over the previous situation where the actual dependency was not expressed in the pom at all (since the dependency was on the sourceSet output rather than the actual project).
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@users.noreply.github.com>
Closes#4816 from ewencp/fix-streams-dependencies
In the group coordinator, we currently check whether the partition is owned before checking whether it is loading. Since loading is a prerequisite for partition ownership, it means that it is not actually possible to see the COORDINATOR_LOAD_IN_PROGRESS error. The impact is mostly harmless: while loading the group, the client may send unnecessary FindCoordinator requests to rediscover the coordinator. I've fixed the bug and restructured the code to enable testing.
In the process of fixing this bug, the following improvements have been made:
1. We now verify valid groupId in all request handlers.
2. Currently if the coordinator is loading when a SyncGroup is received, we'll return NOT_COORDINATOR. I've changed this to return REBALANCE_IN_PROGRESS since the rebalance state will have been lost on coordinator failover. This effectively forces the consumer to rejoin the group, which seems preferable over unnecessarily rediscovering the coordinator.
3. I added a check for the COORDINATOR_LOAD_IN_PROGRESS handler in SyncGroup. Although we do not currently return this error, it seems reasonable that we might want to some day, so it seems better to get the check in now.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
KafkaStreams.waitOnState() should check the state to be the given one instead of the hard-coded `NOT_RUNNING`.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
In the stream thread capture of the TaskMigration exception, print the task full information in WARN. In other places only log as INFO, plus additional context information.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Prior to this, there have been instances where invalid data was allowed to be persisted in
ZooKeeper, which causes ClassCastExceptions when a broker is restarted and reads this
type-unsafe data.
Adds basic structural and type validation for the reassignment JSON via
introduction of Scala case classes that map to the expected JSON
structure. Also use the Scala case classes to deserialize the JSON
to avoid duplication.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Ismael Juma <ismael@juma.me.uk>