Ignore headers when down-converting to V0/V1 since they are not supported. Added a test-case to verify down-conversion sanity in presence of headers.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
## Summary of the problem
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.
This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.
A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.
## The Changes
The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used.
The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set).
Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.
Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
## Testing
Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration:
1. default value
1. worker configuration has `header.converter` explicitly set to the default
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. connector configuration has `header.converter` explicitly set to the default
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests.
Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4815 from rhauch/kafka-6728
https://github.com/apache/kafka/pull/4760 unintentionally included extra raw class files in the release tarballs by making the .class file output (instead of the jar) for a streams a dependency of the streams-test-utils. This fixes that issue by instead breaking the circular dependency by using a `compileOnly`/`provided` dependency on those sources and also including the dependency as a test dependency.
I verified by using `gradlew clean installAll releaseTarGzAll`, then checking that the release tarball doesn't have the extraneous files and the installed pom file has the expected dependencies. The dependency on kafka-streams is now in the `test` scope, but that should be fine since a streams application would only use this dependency if it already had a dependency on streams in `compile` (or in weird edge cases the user could handle specifying the right dependencies). This actually seems to even be an improvement over the previous situation where the actual dependency was not expressed in the pom at all (since the dependency was on the sourceSet output rather than the actual project).
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@users.noreply.github.com>
Closes#4816 from ewencp/fix-streams-dependencies
In the group coordinator, we currently check whether the partition is owned before checking whether it is loading. Since loading is a prerequisite for partition ownership, it means that it is not actually possible to see the COORDINATOR_LOAD_IN_PROGRESS error. The impact is mostly harmless: while loading the group, the client may send unnecessary FindCoordinator requests to rediscover the coordinator. I've fixed the bug and restructured the code to enable testing.
In the process of fixing this bug, the following improvements have been made:
1. We now verify valid groupId in all request handlers.
2. Currently if the coordinator is loading when a SyncGroup is received, we'll return NOT_COORDINATOR. I've changed this to return REBALANCE_IN_PROGRESS since the rebalance state will have been lost on coordinator failover. This effectively forces the consumer to rejoin the group, which seems preferable over unnecessarily rediscovering the coordinator.
3. I added a check for the COORDINATOR_LOAD_IN_PROGRESS handler in SyncGroup. Although we do not currently return this error, it seems reasonable that we might want to some day, so it seems better to get the check in now.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
KafkaStreams.waitOnState() should check the state to be the given one instead of the hard-coded `NOT_RUNNING`.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
In the stream thread capture of the TaskMigration exception, print the task full information in WARN. In other places only log as INFO, plus additional context information.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Prior to this, there have been instances where invalid data was allowed to be persisted in
ZooKeeper, which causes ClassCastExceptions when a broker is restarted and reads this
type-unsafe data.
Adds basic structural and type validation for the reassignment JSON via
introduction of Scala case classes that map to the expected JSON
structure. Also use the Scala case classes to deserialize the JSON
to avoid duplication.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Ismael Juma <ismael@juma.me.uk>
This pull request targets https://issues.apache.org/jira/browse/KAFKA-6386
The minor fix to deprecate usage of `StreamsConfig` in favor of `java.util.Properties`.
I created separate public constructors using `Properties` in order to replace the old ones,
and prioritize new functions in the `KafkaStreams.java` file.
Since this is my first time doing open source contribution, I'm very happy to get
any comment or pointer to be more professional and get better next time, thank you Guozhang guozhangwang and Liquan Ishiihara!
testing strategy: existing unit test should be suffice to cover this change.
Author: cs427fa16staff <bchen11@outlook.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
Closes#4354 from abbccdda/starter
github comments
Currently the `initTransactions()` API blocks indefinitely if the broker cannot be reached. This patch changes the behavior to raise a `TimeoutException` after waiting for `max.block.ms`.
Reviewers: Apurva Mehta <apurva@confluent.io>, Jason Gustafson <jason@confluent.io>
The standby-task test failed due to standby task distribution not be exactly equal. I think this will be the case from time to time, so I've updated test to make sure the standby task assignment count is not zero.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This resolves the issue found when running the brokers on Windows which prevents the coordinator from sending WriteTxnMarkers requests to complete a transaction.
`Thread.sleep` in `LogManager.deleteLogs` potentially blocks a scheduler thread for up to `log.segment.delete.delay.ms` with a default value of a minute. To avoid this, `deleteLogs` now deletes the logs for which `currentDefaultConfig.fileDeleteDelayMs` has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed are considered for deletion in the next iteration of `deleteLogs`, which is scheduled sooner if required.
Reviewers: Jun Rao <junrao@gmail.com>, Dong Lin <lindong28@gmail.com>, Ted Yu <yuzhihong@gmail.com>
Require a minimum value of 1 for `segment.ms` to avoid division by zero when computing random jitter.
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch adds logic in handling the PartitionModifications event, so that if the partition count is increased when a topic deletion is still in progress, the controller will restore the data of the path /brokers/topics/"topic" to remove the added partitions.
Testing done:
Added a new test method to cover the bug
Author: Lucas Wang <luwang@linkedin.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4666 from gitlw/prevent_increasing_partition_count_during_topic_deletion
DynamicBrokerReconfigurationTest currently assumes that passwords encoded with one secret will fail with an exception if decoded with another secret and configures an old.secret in setUp. This could potentially cause test failures if a password was incorrectly decoded with the wrong secret, since the test writes passwords encoded with the new secret directly to ZooKeeper. Since old.secret is only used in one test for verifying secret rotation, this config can be moved to that test to avoid transient failures.
Reviewers: Ismael Juma <ismael@juma.me.uk>
* Added dependencies so that Trogdor and Connect work with Java 9 and 10
* Updated Jacoco to 0.8.1 so that it works with Java 10
* Updated Gradle to 4.6
* A few minor version bumps (not related to Java9/10 fixes)
I tested manually that we can run ./gradlew test with Java 10
after these changes. There are test failures as EasyMock
and PowerMock will have to be updated to use a newer
ASM version. But compiling successfully and most tests
passing is progress. :)
I also tested manually that Trogdor can be started with Java 10.
It previously failed with a ClassNotFoundError.
Reviewers: Jason Gustafson <jason@confluent.io>
We were unintentionally mutating the cached queue of batches prior to appending to the log. This could have several bad consequences if the append ultimately failed or was truncated. In the reporter's case, it caused the snapshot to be invalid after a segment roll. The snapshot contained producer state at offsets higher than the snapshot offset. If we ever had to load from that snapshot, the state was left inconsistent, which led to an error that ultimately crashed the replica fetcher.
The fix required some refactoring to avoid sharing the same underlying queue inside ProducerAppendInfo. I have added test cases which reproduce the invalid snapshot state. I have also made an effort to clean up logging since it was not easy to track this problem down.
One final note: I have removed the duplicate check inside ProducerStateManager since it was both redundant and incorrect. The redundancy was in the checking of the cached batches: we already check these in Log.analyzeAndValidateProducerState. The incorrectness was the handling of sequence number overflow: we were only handling one very specific case of overflow, but others would have resulted in an invalid assertion. Instead, we now throw OutOfOrderSequenceException.
Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>
1. Use JmxMixin for SimpleBenchmark (will remove the self reporting in #4744), only when loading phase is false (i.e. we are in fact starting the streams app).
2. Reported the full jmx reported metrics in log files, and in the returned data only return the max values: this is because we want to skip the warming up and cooling down periods that will have lower rate numbers, while max represents the actual rate at full speed.
3. Incorporates two other improves to JMXTool: #1241 and #2950
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Rohan Desai <desai.p.rohan@gmail.com>
Don't return config values from `describeConfigs` if the config type cannot be determined. Obtain config types correctly for listener configs for `describeConfigs` and password encryption.
Reviewers: Jason Gustafson <jason@confluent.io>
TestUtils#produceMessages should always close the KafkaProducer, even
when there is an exception. Otherwise, the test will leak threads when
there is an error.
TestUtils#createNewProducer should create a producer with a
requestTimeoutMs of 30 seconds by default, not around 10 seconds.
This should avoid tests that flake when the load on Jenkins climbs.
Fix two cases where a very short timeout of 2 seconds was getting set.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Reworded the SASL Authentication sequence to update it to >= 1.0.0
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>, Mickael Maison <mickael.maison@gmail.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Added configs to ProducerBenchSpec:
topicPrefix: name of topics will be of format topicPrefix + topic index. If not provided, default is "produceBenchTopic".
partitionsPerTopic: number of partitions per topic. If not provided, default is 1.
replicationFactor: replication factor per topic. If not provided, default is 3.
The behavior of producer bench is changed such that if some or all topics already exist (with topic names = topicPrefix + topic index), and they have the same number of partitions as requested, the worker uses those topics and does not fail. The producer bench fails if one or more existing topics has number of partitions that is different from expected number of partitions.
Added unit test for WorkerUtils -- for existing methods and new methods.
Fixed bug in MockAdminClient, where createTopics() would over-write existing topic's replication factor and number of partitions while correctly completing the appropriate futures exceptionally with TopicExistsException.
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
- Fix kafkaConfig initialization if there are no dynamic configs defined in ZK.
- Update DynamicListenerConfig.validateReconfiguration() to check new Listeners must be subset of listener map
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This is a general change and is re-requisite to allow streams benchmark test with different streams tests. For the streams benchmark itself I will have a separate PR for switching configs. Details:
1. Create a "streams.properties" file under PERSISTENT_ROOT before all the streams test. For now it will only contain a single config of state.dir pointing to PERSISTENT_ROOT.
2. For all the system test related code, replace the main function parameter of state.dir with propsFilename, then inside the function load the props from the file and apply overrides if necessary.
3. Minor fixes.
Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
KafkaFutureImpl#addWaiter should be protected, just like KafkaFuture#addWaiter. As described in KIP-218, whenComplete is the public API, not addWaiter.