## Summary of the problem
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.
This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.
A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.
## The Changes
The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used.
The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set).
Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.
Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
## Testing
Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration:
1. default value
1. worker configuration has `header.converter` explicitly set to the default
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. connector configuration has `header.converter` explicitly set to the default
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests.
Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4815 from rhauch/kafka-6728
Changed WorkerSinkTaskContext to only resume the consumer topic partitions when the connector/task is not in the paused state.
The context tracks the set of topic partitions that are explicitly paused/resumed by the connector, and when the WorkerSinkTask resumes the tasks it currently resumes all topic partitions *except* those that are still explicitly paused in the context. Therefore, the change above should result in the desired behavior.
Several debug statements were added to record when the context is called by the connector.
This can be backported to older releases, since this bug goes back to 0.10 or 0.9.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4716 from rhauch/kafka-6661
Any exception thrown by calls within a `main()` method are not logged unless explicitly done so. This change simply adds a try-catch block around most of the content of the distributed and standalone `main()` methods.
**NOTE: This should be backported to the `1.1` branch, and is currently a blocker for 1.1.**
The `connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink` system test is failing with the SASL configuration without a sufficient explanation. During the test, the Connect worker fails to start, but the Connect log contains no useful information. There are actual several things compounding to cause the failure and make it difficult to understand the problem.
First, the `tests/kafkatest/tests/connect/templates/connect_standalone.properties` is only adding in the broker's security configuration with the `producer.` and `consumer.` prefixes, but is not adding them with no prefix. The worker uses the AdminClient to connect to the broker to get the Kafka cluster ID and to manage the three internal topics, and the AdminClient is configured via top-level properties. Because the SASL test requires the clients all connect using SASL, the lack of broker security configs means the AdminClient was attempting and failing to connect to the broker. This is corrected by adding the broker's security configuration to the Connect worker configuration file at the top-level. (This was already being done in the `connect_distributed.properties` file.)
Second, the default `request.timeout.ms` for the AdminClient (and the other clients) is 120 seconds, so the AdminClient was retrying for 120 seconds before it would give up and thrown an error. However, the test was only waiting for 60 seconds before determining that the service failed to start. This can be corrected by setting `request.timeout.ms=10000` in the Connect distributed and standalone worker configurations.
Third, the Connect workers were recently changed to lookup the Kafka cluster ID before it started the herder. This is unlike the older uses of the AdminClient to find and manage the internal topics, where failure to connect was not necessarily logged correctly but nevertheless still skipped over, relying upon broker auto-topic creation to create the internal topics. (This may be why the test did not fail prior to the recent change to always require a successful AdminClient connection.) Although the worker never got this far in its startup process, the fact that we missed such an error since the prior releases means that failure to connect with the AdminClient was not being properly reported.
The `ConnectStandaloneFileTest.test_file_source_and_sink` system tests were run locally prior to this fix, and they failed as with the nightlies. Once these fixes were made, the locally run system tests passed.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <me@ewencp.org>
Closes#4610 from rhauch/kafka-6577-trunk
Enabling scans for super types in reflections is required in order to discover Connect plugins.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Sending the response code of an http request issued via `RestClient` in Connect to stdout seems like a unconventional choice.
This PR redirects the responds code with a message in the logs at DEBUG level (usually the same level as the one that the caller of `RestClient.httpRequest` uses.
This fix will also fix system tests that broke by outputting this response code to stdout.
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Damian Guy <damian.guy@gmail.com>
Closes#4591 from kkonstantine/MINOR-Redirect-response-code-in-Connect-RestClient-to-logs-instead-of-stdout
https://github.com/apache/kafka/pull/4356 added `batch.size` config property to `FileStreamSourceConnector` but the property was added as required without a default in config definition (`ConfigDef`). This results in validation error during connector startup.
Unit tests were added for both `FileStreamSourceConnector` and `FileStreamSinkConnector` to avoid such issues in the future.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
This is a small change to parallelize plugin scanning. This may help in some environments where otherwise plugin scanning is slow.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4561 from rayokota/K6503-improve-plugin-scanning
JsonConverter should use object equality rather than reference equality in `convertToJson`.
Reviewers: Bartlomiej Tartanus <bartektartanus@gmail.com>, Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Corrected the parsing of invalid list values. A list can only be parsed if it contains elements that have a common type, and a map can only be parsed if it contains keys with a common type and values with a common type.
Reviewers: Arjun Satish <arjun@confluent.io>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>
Changed call to use the overload of ConnectSchema.validateValue() method with the field name passed in. Ensure that field in put call is not null.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
The commits for KIP-145 (KAFKA-5142) changed how the Connect workers instantiate and configure the Converters, and also added the ability to do the same for the new HeaderConverters. However, the last few commits removed the default value for the `converter.type` property for Converters and HeaderConverters, and this broke how the internal converters were being created.
This change corrects the behavior so that the `converter.type` property is always set by the worker (or by the Plugins class), which means the existing Converter implementations will not have to do this. The built-in JsonConverter, ByteArrayConverter, and StringConverter also implement HeaderConverter which implements Configurable, but the Worker and Plugins methods do not yet use the `Configurable.configure(Map)` method and instead still use the `Converter.configure(Map,boolean)`.
Several tests were modified, and a new PluginsTest was added to verify the new behavior in Plugins for instantiating and configuring the Converter and HeaderConverter instances.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4512 from rhauch/kafka-6513
This change ensures that when sensors are created, they are unique to the metric group associated with the task that created them. Previously the sensors were being shared between task metric groups, causing incorrect metrics.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
KAFKA-3073 added topic regex support for sink connectors. The addition requires that you only specify one of topics or topics.regex settings. This is being validated in one place, but not during submission of connectors. This PR adds validation at `AbstractHerder.validateConnectorConfig` and `WorkerConnector.initialize`.
This adds a test of the new behavior to `AbstractHerderTest`.
Author: Jeff Klukas <jeff@klukas.net>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4251 from jklukas/connect-topics-validation
Submitting a fail safe fix for rare IOExceptions on symbolic links.
The fix is submitted without a test case since it does seem easy to reproduce such type of failures (just having a broken symbolic link does not reproduce the issue) and it's considered pretty low risk.
If accepted, needs to be ported at least to 1.0, if not 0.11
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4481 from kkonstantine/KAFKA-6288-Broken-symlink-interrupts-scanning-the-plugin-path
**[KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect) has been accepted, and this PR implements KIP-145 except without the SMTs.**
Changed the Connect API and runtime to support message headers as described in [KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect).
The new `Header` interface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types.
The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord` (and thus `SourceRecord` and `SinkRecord`). This does allow multiple headers with the same key. The `Headers` contains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record.
A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. A new `SimpleHeaderConverter` implementation has been added, and this serializes to strings and deserializes by inferring the schemas (`Struct` header values are serialized without the schemas, so they can only be deserialized as `Map` instances without a schema.) The `StringConverter`, `JsonConverter`, and `ByteArrayConverter` have all been extended to also be `HeaderConverter` implementations. Each connector can be configured with a different header converter, although by default the `SimpleHeaderConverter` is used to serialize header values as strings without schemas.
Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. Additional test methods are added for the methods added to the `Converter` implementations. Finally, the `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Ted Yu <yuzhihong@gmail.com>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4319 from rhauch/kafka-5142-b
…to check for empty connector name and illegal characters in connector name. This also fixes KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Soenke Liebau <soenke.liebau@opencore.com>
Reviewers: Gwen Shapira <cshapi@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2755 from soenkeliebau/KAFKA-4930
This PR implements the JIRA issue [KAFKA-4029: SSL support for Connect REST API](https://issues.apache.org/jira/browse/KAFKA-4029) / [KIP-208](https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface).
Summary of the main changes:
- Jetty `HttpClient` is used as HTTP client instead of the one shipped with Java. That allows to keep the SSL configuration for Server and Client be in single place (both use the Jetty `SslContextFactory`). It also has much richer configuration than the JDK client (it is easier to configure things such as supported cipher suites etc.).
- The `RestServer` class has been broker into 3 parts. `RestServer` contains the server it self. `RestClient` contains the HTTP client used for forwarding requests etc. and `SSLUtils` contain some helper classes for configuring SSL. One of the reasons for this was Findbugs complaining about the class complexity.
- A new method `valuesWithPrefixAllOrNothing` has been added to `AbstractConfig` to make it easier to handle the situation that we want to use either only the prefixed SSL options or only the non-prefixed. But not mixed them.
Author: Jakub Scholz <www@scholzj.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4429 from scholzj/kip-208
Exclusion for packages that need not be loaded in isolation needs to be extended to all the `org.apache.kafka` packages (that do not belong to transforms and the other whitelisted packages). Most notably, this refers to any classes in `kafka-clients` package.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Adds alphanumeric ordering of dependencies as they added to a Connect plugin's class loader path.
This makes the layout of the dependencies consistent across systems and deployments. Dependencies should still, in principle, not include conflicts and ideally order should not matter.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
`loadClass` needs to be synchronized to protect subsequent calls to `defineClass`.
Details in the javadoc of this PR as well as here too: https://docs.oracle.com/javase/7/docs/technotes/guides/lang/cl-mt.html
/cc ewencp rhauch
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4428 from kkonstantine/KAFKA-6277-Make-loadClass-thread-safe-for-class-loaders-of-Connect-plugins
Making clear that implementations of poll() shouldn't block indefinitely in order to allow the task instance to transition to PAUSED state.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
When using Kafka Connect with a cluster that doesn't allow the user to
create topics (due to ACL configuration), Connect fails when trying to
create its internal topics even if these topics already exist. This is
incorrect behavior according to the documentation, which mentions that
R/W access should be enough.
This happens specifically when using Aiven Kafka, which does not permit
creation of topics via the Kafka Admin Client API.
The patch ignores the returned error, similar to the behavior for older
brokers that don't support the API.
* Implement MockAdminClient.deleteTopics
* Use MockAdminClient instead of MockKafkaAdminClientEnv in StreamsResetterTest
* Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv
* Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest
* Rename KafkaAdminClient to AdminClientUnitTestEnv in KafkaAdminClientTest.java
* Migrate StreamThreadTest to MockAdminClient
* Fix style errors
* Address review comments
* Fix MockAdminClient call
Reviewers: Matthias J. Sax <matthias@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
We are closing the metricGroups created in a Worker, Source task and Sink task before populating them with new metrics. This helps in cases where an Exception is thrown when previously created groups were not cleaned up correctly.
Signed-off-by: Arjun Satish <arjunconfluent.io>
Author: Arjun Satish <arjun@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4397 from wicknicks/KAFKA-6252
When the source file of `FileStreamSource` is a large file, `FileStreamSourceTask.poll()` will result in OOM. This pull request added `batch.size` parameter which can restrict the poll size.
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
Author: Study <ph.study@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4356 from phstudy/KAFKA-4335
This changes the Struct's equals and hashCode method to use Arrays#deepEquals and Arrays#deepHashCode, respectively. This resolves a problem where two structs with values of type byte[] would not be considered equal even though the byte arrays' contents are equal. By using deepEquals, the byte arrays' contents are compared instead of ther identity.
Since this changes the behavior of the equals method for byte array values, the behavior of hashCode must change alongside it to ensure the methods still fulfill the general contract of "equal objects must have equal hashCodes".
Test rationale:
All existing unit tests for equals were untouched and continue to work. A new test method was added to verify the behavior of equals and hashCode for Struct instances that contain a byte array value. I verify the reflixivity and transitivity of equals as well as the fact that equal Structs have equal hashCodes
and not-equal structs do not have equal hashCodes.
Author: Tobias Gies <tobias.gies@trivago.com>
Author: Tobias Gies <tobias@tobiasgies.de>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#4293 from tobiasgies/feature/kafka-6308-deepequals
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#4105 from cmccabe/KAFKA-6102
…from config to own function and added check to create connector call.
Author: Soenke Liebau <soenke.liebau@opencore.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4230 from soenkeliebau/KAFKA-5563
There are more methods that had to be touched than I anticipated when writing [the KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-215%3A+Add+topic+regex+support+for+Connect+sinks).
The implementation here is now complete and includes a test that verifies that there's a call to `consumer.subscribe(Pattern, RebalanceHandler)` when `topics.regex` is provided.
Author: Jeff Klukas <jeff@klukas.net>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4151 from jklukas/connect-topics.regex
Re-arrange order of comparisons in equals() to evaluate non-composite fields first
Cache hash code
Author: tedyu <yuzhihong@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4176 from tedyu/trunk
The ConnectExceptionMapper was originally intended to handle ConnectException errors for some expected cases where we just want to always convert them to a certain response and the ExceptionMapper was the easiest way to do that uniformly across the API. However, in the case that it's not an expected subclass, we should log the information at the error level so the user can track down the cause of the error.
This is only an initial improvement. We should probably also add a more general ExceptionMapper to handle other exceptions we may not have caught and converted to ConnectException.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#4227 from ewencp/better-connect-error-logging
Changed the condition in **if** statement
**(schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))** which
requires two comparisons in worst case with
**(!LOGICAL_NAME.equals(schema.name()))** which requires single comparison
in all cases and _avoids null pointer exception.
![kafka_optimize_if](https://user-images.githubusercontent.com/32234013/32872271-afe0b954-ca3a-11e7-838d-6a3bc416b807.JPG)
_
Author: sachinbhalekar <sachinbansibhalekar@gmail.com>
Author: sachinbhalekar <32234013+sachinbhalekar@users.noreply.github.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4225 from sachinbhalekar/trunk
Author: Richard Yu <richardyu@Richards-Air.attlocal.net>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#4110 from ConcurrencyPractitioner/trunk
Added metrics to the Connect worker and rebalancing metrics to the distributed herder.
This is built on top of #3987, and I can rebase this PR once that is merged.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4011 from rhauch/kafka-5903
A new mechanism was added recently to the Metrics framework to make it easier to generate the documentation. It uses a registry with a MetricsNameTemplate for each metric, and then those templates are used when creating the actual metrics. The metrics framework provides utilities that can generate the HTML documentation from the registry of templates.
This change moves the recently-added Connect metrics over to use these templates and to then generate the metric documentation for Connect.
This PR is based upon #3975 and can be rebased once that has been merged.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#3987 from rhauch/kafka-5990