This patch implements KIP-281, which adds a configurable timeout to the consumer performance tool with a default value of 10 seconds. The old timeout was hard-coded as 1 second. Additionally, this patch adds a warning message when the tool exits after a timeout rather than returning silently.
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>
This reverts commit c9ec292135 (#5011). That
commit introduces an anonymous inner class which retains a
reference to the non-serializable outer class `KafkaMbean`
breaking Serialization. This means that reading JMX metrics
via JConsole or JmxTool no longer works since RMI relies
on Java Serialization.
Reviewers: Jason Gustafson <jason@confluent.io>, Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>
- Removed internal kafka.admin.AdminClient.deleteRecordsBefore since it's
no longer used.
- Removed redundant tests and rewrote non redundant ones to use the Java
AdminClient.
Reviewers: Viktor Somogyi <viktor.somogyi@cloudera.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Changes to keep the operation name as is and make the sensor name unique.
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Specifying an invalid config (i.e. something other than `CreateTime` or
`LogAppendTime`) via `TopicCommand` would previously cause the
broker to fail on start-up.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
In #4919 we propagate the SerDes for each of these aggregation operators.
As @guozhangwang mentioned in that PR:
```
reduce: inherit the key and value serdes from the parent XXImpl class.
count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes.
aggregate: inherit the key serdes, do not set for value serdes internally.
```
Although it's all good for reduce and count, it is quiet unsafe to have aggregate without Materialized given. In fact I don't see why we would not give a Materialized for the aggregate since the result type will always be different (otherwise use reduce) and also the value Serde is simply not propagated.
This has been discussed previously in a broader PR before but I believe for aggregate we could pass implicitly a Materialized the same way we pass a Joined, just to avoid the stupid case. Then if the user wants to specialize, he can give his own Materialized.
Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
Implementation for lazy down-conversion in a chunked manner for efficient memory usage during down-conversion. This pull request is mainly to get initial feedback on the direction of the patch. The patch includes all the main components from KIP-283.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch contains a few follow-up improvements/cleanup for KIP-266:
- Add upgrade notes
- Add missing `commitSync(Duration)` API
- Improve timeout messages and fix some naming inconsistencies
- Various small cleanups
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit allows secrets in Connect configs to be externalized and replaced with variable references of the form `${provider:[path:]key}`, where the "path" is optional.
There are 2 main additions to `org.apache.kafka.common.config`: a `ConfigProvider` and a `ConfigTransformer`. The `ConfigProvider` is an interface that allows key-value pairs to be provided by an external source for a given "path". An a TTL can be associated with the key-value pairs returned from the path. The `ConfigTransformer` will use instances of `ConfigProvider` to replace variable references in a set of configuration values.
In the Connect framework, `ConfigProvider` classes can be specified in the worker config, and then variable references can be used in the connector config. In addition, the herder can be configured to restart connectors (or not) based on the TTL returned from a `ConfigProvider`. The main class that performs restarts and transformations is `WorkerConfigTransformer`.
Finally, a `configs()` method has been added to both `SourceTaskContext` and `SinkTaskContext`. This allows connectors to get configs with variables replaced by the latest values from instances of `ConfigProvider`.
Most of the other changes in the Connect framework are threading various objects through classes to enable the above functionality.
Author: Robert Yokota <rayokota@gmail.com>
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5068 from rayokota/KAFKA-6886-connect-secrets
This PR implements the features described in this KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
This PR changes the Connect framework to allow it to automatically deal with errors encountered while processing records in a Connector. The following behavior changes are introduced here:
**Retry on Failure**: Retry the failed operation a configurable number of times, with backoff between each retry.
**Task Tolerance Limits**: Tolerate a configurable number of failures in a task.
We also add the following ways to report errors, along with sufficient context to simplify the debugging process:
**Log Error Context**: The error information along with processing context is logged along with standard application logs.
**Dead Letter Queue**: Produce the original message into a Kafka topic (applicable only to sink connectors).
New **metrics** which will monitor the number of failures, and the behavior of the response handler are added.
The changes proposed here **are backward compatible**. The current behavior in Connect is to kill the task on the first error in any stage. This will remain the default behavior if the connector does not override any of the new configurations which are provided as part of this feature.
Testing: added multiple unit tests to test the retry and tolerance logic.
Author: Arjun Satish <arjun@confluent.io>
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5065 from wicknicks/KAFKA-6378
Currently, the AbstractStream class defines a copy-constructor that allow to extend KStream and KTable APIs with new methods without impacting the public interface.
However adding new processor or/and store to the topology is made throught the internalTopologyBuilder that is not accessible from AbstractStream subclasses defined outside of the package (package visibility).
Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This implements KIP-219, where a broker returns a response with throttle time on
quota violation immediately after processing the corresponding request. After
the response is sent out, the broker will keep the channel muted until the
throttle time is over. Also, on receiving a response with throttle time, client
will block outgoing communication to the broker for the specified throttle time.
See PR 4830, 5064 and 5094 for all the review history
Author: Jon Lee <jonlee@jonlee-ld1.linkedin.biz>
Reviewers: Jun Rao <junrao@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Dong Lin <lindong28@gmail.com>
Closes#5064 from jonlee2/kip-219
This patch implements the consumer timeout APIs from KIP-266 (everything except `poll()`, which was done separately).
Reviewers: John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>
KafkaController currently writes reassignment znode once for every partition that has been successfully reassigned. This is unnecessary and controller should be able to update reassignment znode once to remove all partitions that have been reassigned from the reassignment znode.
Author: Dong Lin <dolin@linkedin.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4659 from lindong28/KAFKA-6617
This PR provides the implementation for KIP-285 and also a reference implementation for authenticating BasicAuth credentials using JAAS LoginModule
Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4931 from mageshn/KIP-285
*[KIP-305](https://cwiki.apache.org/confluence/display/KAFKA/KIP-305%3A+Add+Connect+primitive+number+converters) has been approved.*
Added converters and header converters for the primitive number types for which Kafka already had serializers and deserializers. All extend a common base class, `NumberConverter`, that encapsulates most of the shared functionality. Unit tests were added to check the basic functionality.
These classes are not used by any other Connect code, and must be explicitly used in Connect workers and connectors.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Arjun Satish <wicknicks@users.noreply.github.com>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5034 from rhauch/kafka-6913
Implementation of [KIP-174](https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig)
Configuration properties 'internal.key.converter' and 'internal.value.converter'
are deprecated, and default to org.apache.kafka.connect.json.JsonConverter.
Warnings are logged if values are specified for either, or if properties that
appear to configure instances of internal converters (i.e., ones prefixed with
either 'internal.key.converter.' or 'internal.value.converter.') are given.
The property 'schemas.enable' is also defaulted to false for internal
JsonConverter instances (both for keys and values) if it isn't specified.
Documentation and code have also been updated with deprecation notices and
annotations, respectively.
Unit tests have been updated in `PluginsTest` to account for the new defaults for `schemas.enable` for internal key/value converters, and to ensure that (for the time being), internal key/value converters are still configurable despite being deprecated.
Author: Chris Egerton <chrise@confluent.io>
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4693 from C0urante/kafka-5540
Refresh metadata if broker connection fails so that new calls are sent only to nodes that are alive and requests to controller are sent to the new controller if controller changes due to broker failure. Also reassign calls that could not be sent.
Reviewers: Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
KAFKA-6921 removed deprecated scala producer. This pull request removes the now unnecessary findbugs exclusion that matched one of the affected classes.
Document cases where `IllegalStateException` is raised when attempting an invalid operation on an unassigned partition. Also change `position()` to raise `IllegalStateException` when called on an unassigned partition for consistency.
No longer needed since we dropped support for Java 7.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5083 from ijuma/remove-max-perm-size
Add the new stricter-timeout version of `poll` proposed in KIP-266.
The pre-existing variant `poll(long timeout)` would block indefinitely for metadata
updates if they were needed, then it would issue a fetch and poll for `timeout` ms
for new records. The initial indefinite metadata block caused applications to become
stuck when the brokers became unavailable. The existence of the timeout parameter
made the indefinite block especially unintuitive.
This PR adds `poll(Duration timeout)` with the semantics:
1. iff a metadata update is needed:
1. send (asynchronous) metadata requests
2. poll for metadata responses (counts against timeout)
- if no response within timeout, **return an empty collection immediately**
2. if there is fetch data available, **return it immediately**
3. if there is no fetch request in flight, send fetch requests
4. poll for fetch responses (counts against timeout)
- if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
- if we get a response, **return the response**
The old method, `poll(long timeout)` is deprecated, but we do not change its semantics, so it remains:
1. iff a metadata update is needed:
1. send (asynchronous) metadata requests
2. poll for metadata responses *indefinitely until we get it*
2. if there is fetch data available, **return it immediately**
3. if there is no fetch request in flight, send fetch requests
4. poll for fetch responses (counts against timeout)
- if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
- if we get a response, **return the response**
One notable usage is prohibited by the new `poll`: previously, you could call `poll(0)` to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that `poll(0)` won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.
This KIP adds the following functionality related to SASL/OAUTHBEARER:
1) Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server based on the declaration of a custom login CallbackHandler implementation and have that access token transparently and automatically transmitted to a broker for authentication.
2) Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on the declaration of a custom SASL Server CallbackHandler implementation.
3) Provide implementations of the above retrieval and validation features based on an unsecured JSON Web Token that function out-of-the-box with minimal configuration required (i.e. implementations of the two types of callback handlers mentioned above will be used by default with no need to explicitly declare them).
4) Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.
AdminClient should backoff when retrying a Call. Fixed and added a unit test
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5077 from hachikuji/admin-client-retry-backoff
consumer offset path in zookeeper should be /consumers/${group}/offsets/${topic}/${partition} instead of /consumers/${group}/offset/${topic}/${partition}. Added `s` to the word `offset`.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Manikumar Reddy O <manikumar.reddy@gmail.com>, Jun Rao <junrao@gmail.com>
When any metric (e.g. per-partition metric) is created or deleted,
registerMBean() is called which in turn calls getMBeanInfo().getClassName().
However, KafkaMbean.getMBeanInfo() instantiates an array of all sensors even
though we only need the class name. This costs a lot of CPU to register
sensors when consumer with large partition assignment starts. For example, it
takes 5 minutes to start a consumer with 35k partitions. This patch reduces the
consumer startup time seconds.
Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
Reviewers: Satish Duggana <satish.duggana@gmail.com>, Dong Lin <lindong28@gmail.com>
Closes#5011 from radai-rosenblatt/fun-with-jmx
The Signal classes are not available in the compile classpath
if --release is used so we use reflection as a workaround.
As part of that moved the code to Java and added a simple
unit test.
Also disabled the signal handler if the IBM JDK is being used
due to KAFKA-6918.
Manually tested shutdown via ctrl+c and verified that
the message is printed.
* Removed Scala producers, request classes, kafka.tools.ProducerPerformance, encoders,
tests.
* Updated ConsoleProducer to remove Scala producer support (removed `BaseProducer`
and several options that are not used by the Java producer).
* Updated a few Scala consumer tests to use the new producer (including a minor
refactor of `produceMessages` methods in `TestUtils`).
* Updated `ClientUtils.fetchTopicMetadata` to use `SimpleConsumer` instead of
`SyncProducer`.
* Removed `TestKafkaAppender` as it looks useless and it defined an `Encoder`.
* Minor import clean-ups
No new tests added since behaviour should remain the same after these changes.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Dong Lin <lindong28@gmail.com>
Closes#5045 from ijuma/kafka-6921-remove-old-producer
Fix the check, add unit test to verify the change, update `DynamicBrokerReconfigurationTest` to avoid dynamic keystore update in tests which are not expected to update keystores.
Avoid dependence on the internal __consumer_offsets topic to handle `listConsumerGroups()` since it unnecessarily requires users to have Describe access on an internal topic. Instead we query each broker independently. For most clusters, this amounts to the same thing since the default number of partitions for __consumer_offsets is 50. This also provides better encapsulation since it avoids exposing the use of __consumer_offsets, which gives us more flexibility in the future.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5007 from hachikuji/remove-admin-use-of-offsets-topic
Little back story on this. Was helping a user over email. This could be much easier to debug if we assume that the connector developer might not return valid configs. For example Intellij will generate a stub that returns a null. This was the case that inspired this JIRA.
Author: Jeremy Custenborder <jcustenborder@gmail.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#3762 from jcustenborder/KAFKA-5807
These constructors should be public to allow users to write test cases using them. We follow a similar pattern for the other domain objects that we expose in `AdminClient` (e.g. `TopicDescription`).
Reviewers: Ismael Juma <ismael@juma.me.uk>
If the internal metadata request fails, we must reset the state inside `AdminClientMetadataManager` or we will be stuck indefinitely in the `UPDATE_PENDING` state and have no way to fetch new metadata.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5057 from hachikuji/fix-admin-client-metadata-update-failure
We no longer need them since we now require Java 8.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Andras Beni <andrasbeni@cloudera.com>, Manikumar Reddy O <manikumar.reddy@gmail.com>, Dong Lin <lindong28@gmail.com>
Closes#5049 from ijuma/remove-base64