Logging can get spammy during the reconnect blackout period because any requests we send to ConsumerNetworkClient will immediately be failed when poll() returns. This patch checks for connection failures prior to sending fetches and offset lookups and skips sending to any failed nodes. Test cases added for both.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
NetworkClient should use FIFO order when completing inflight requests following a disconnect.
I've added new unit tests for `InFlightRequests` and `NetworkClient` which verify completion order.
Reviewers: Jun Rao <junrao@gmail.com>
We need to reset the auto-commit deadline after sending the offset commit request so that we do not resend it while the request is still inflight.
Added unit tests ensuring this behavior and proper backoff in the case of a failure.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Contention for the lock in ConsumerNetworkClient can lead to a livelock situation in which an active commitSync is unable to make progress because its completion is blocked in the heartbeat thread. The fix is twofold:
1) We change ConsumerNetworkClient to use a fair lock to reduce the chance of each thread getting starved.
2) We eliminate the dependence on the lock in ConsumerNetworkClient for callback completion so that callbacks will not be blocked by an active poll().
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The fix is in two folds:
For tasks that's closed in closeZombieTask, their corresponding partitions are still in runningByPartition so those closed tasks may still be returned in activeTasks and standbyTasks. Adding guards on the returned tasks and if they are closed notify the thread to trigger rebalance immediately.
When triggering a rebalance, un-subscribe and re-subscribe immediately to make sure we are not dependent on the background heartbeat thread timing.
Some minor changes on log4j. More specifically, I moved the log entry of closeZombieTask to its callers with more context information and the action going to take.
I can re-produce the issue with EosIntegrationTest may hand-code the heartbeat thread to GC, and confirmed this patch fixed the issue. Unfortunately this test cannot be added to AK since currently we do not have ways to manipulate the heartbeat thread in unit tests.
Reviewers: Jason Gustafson <jason@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
fixes lgmt.com warnings
cleanup PrintForeachAction and Printed
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Sebastian Bauersfeld <sebastianbauersfeld@gmx.de>, Damian Guy <damian@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2.
I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4583 from hachikuji/KAFKA-6328-REOPENED
Add validation checks that the offset range is valid and aligned with the batch count prior to appending to the log. Several unit tests have been added to verify the various invalid cases.
Make it clear in the docs that the rebalance listener is only invoked during an active call to `poll()`. Plus a few additional doc cleanups.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Currently we hold onto all Records references in a multi-partition fetch response until the full response has completed. This can be a problem when the records have been down-converted since they will be occupying a (potentially large) chunk of memory. This patch changes the behavior in MultiSend so that once a Send is completed, we no longer keep a reference to it, which will allow the Records objects to be freed sooner.
I have added a simple unit test to verify that sends are removed as the MultiSend progresses.
Reviewers: Ismael Juma <ismael@juma.me.uk>
`Node` is immutable so this is safe.
With 100 brokers, 150 topics and 350 partitions, `HashSet.contains` in `RecordAccumulator.ready` took about 40% of the application time. It
is caused by re-calculating a hash code of a leader (Node instance) for
every batch entry. Caching the hashCode reduced the time of
`HashSet.contains` in `RecordAccumulator.ready` to ~2%. The
measurements were taken with Flight Recorder.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ted Yu <yuzhihong@gmail.com>, Ismael Juma <ismael@juma.me.uk>
ProducerBatch retains references to MemoryRecordsBuilder and cannot be freed until acks are received. Removing references to buffers used for compression after records are built will enable these to be garbage collected sooner, reducing the risk of OOM.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Lothsahn <Lothsahn@gmail.com>
This fixes two alerts flagged on lgtm.com for Apache Kafka.
This dead code alert where InvalidTypeIdException indirectly extends JsonMappingException. The flagged condition with the type test appears after the type test for the latter and thus makes its body dead. I opted to change the order of the tests. Please let me know if this is the intended behavior.
The second commit addresses this out-of-bounds alert.
More alerts can be found here. Note that my colleague Aditya Sharad addressed some of those in the now outdated #2939.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
Added a second check for race condition where store changelog topic updated during restore, but not if a KTable changelog topic. This will be tricky to test, but I wanted to push the PR to get feedback on the approach.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Prior to this patch, the consumer always blocks in poll() if there are any partitions which are awaiting their initial positions. This behavior was inconsistent with normal fetch behavior since we allow fetching on available partitions even if one or more of the assigned partitions becomes unavailable _after_ initial offset lookup. With this patch, the consumer will do offset resets asynchronously, which allows other partitions to make progress even if the initial positions for some partitions cannot be found.
I have added several new unit tests in `FetcherTest` and `KafkaConsumerTest` to verify the new behavior. One minor compatibility implication worth mentioning is apparent from the change I made in `DynamicBrokerReconfigurationTest`. Previously it was possible to assume that all partitions had a fetch position after `poll()` completed with a non-empty assignment. This assumption is no longer generally true, but you can force the positions to be updated using the `position()` API which still blocks indefinitely until a position is available.
Note that this this patch also removes the logic to cache committed offsets in `SubscriptionState` since it was no longer needed (the consumer's `committed()` API always does an offset lookup anyway). In addition to avoiding the complexity of maintaining the cache, this avoids wasteful offset lookups to refresh the cache when `commitAsync()` is used.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
* do not use static properties
* use new object to take appID
* capture timeout exception inside condition
Reviewers: Matthias J. Sax <matthias@confluent.io>
If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed.
Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket.
Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala.
Author: Graham Campbell <graham.campbell@salesforce.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Ted Yu <yuzhihong@gmail.com>
Currently `maybeAutoCommitOffsetsAsync` does not try to find the coordinator if it is unknown. As a result, asynchronous auto-commits will fail indefinitely. This patch changes the behavior to add coordinator discovery to the async auto-commit path.
Keep delegation token implementation internal without exposing implementation details to pluggable classes:
1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override.
2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future.
3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal).
Reviewers: Jun Rao <junrao@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
The `MetricNameTemplate` is changed to used a `LinkedHashSet` to maintain the same order of the tags that are passed in. This tag order is then maintained when `Metrics.toHtmlTable` generates the MBean names for each of the metrics.
The `SenderMetricsRegistry` and `FetcherMetricsRegistry` both contain templates used in the producer and consumer, respectively, and these were changed to use a `LinkedHashSet` to maintain the order of the tags.
Before this change, the generated HTML documentation might use MBean names like the following and order them:
```
kafka.connect:type=sink-task-metrics,connector={connector},partition={partition},task={task},topic={topic}
kafka.connect:type=sink-task-metrics,connector={connector},task={task}
```
However, after this change, the documentation would use the following order:
```
kafka.connect:type=sink-task-metrics,connector={connector},task={task}
kafka.connect:type=sink-task-metrics,connector={connector},task={task},topic={topic},partition={partition}
```
This is more readable as the code that is creating the templates has control over the order of the tags.
Note that JMX MBean names use ObjectName that does not maintain order of the properties (tags), so this change should have no impact on the actual JMX MBean names used in the metrics.
cc wushujames
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: James Cheng <jylcheng@yahoo.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#3985 from rhauch/kafka-5987
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#4418 from cmccabe/KAFKA-6254
Dynamic update of listeners as described in KIP-226. This includes:
- Addition of new listeners with listener-prefixed security configs
- Removal of existing listeners
- Password encryption
- sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name
Use new AdminClient for describing and altering broker configs using ConfigCommand. Broker quota configs as well as other configs will continue to be processed directly using ZooKeeper until KIP-248 is implemented.
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
Currently if users call KafkaConsumer.offsetsForTimes() with a large set of partitions. The consumer will add one topic at a time for the metadata refresh. We should add all the topics to the metadata topics and just do one metadata refresh instead.
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#4478 from becketqin/KAFKA-6849
…to check for empty connector name and illegal characters in connector name. This also fixes KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Soenke Liebau <soenke.liebau@opencore.com>
Reviewers: Gwen Shapira <cshapi@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2755 from soenkeliebau/KAFKA-4930
Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface makes
them a functional interface. This makes them Java 8 lambda compatible.
Author: Colin P. Mccabe <cmccabe@confluent.io>
Author: Steven Aerts <steven.aerts@gmail.com>
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Xavier Léauté <xl+github@xvrl.net>, Tom Bentley <tbentley@redhat.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4033 from steven-aerts/KAFKA-6018
This PR implements the JIRA issue [KAFKA-4029: SSL support for Connect REST API](https://issues.apache.org/jira/browse/KAFKA-4029) / [KIP-208](https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface).
Summary of the main changes:
- Jetty `HttpClient` is used as HTTP client instead of the one shipped with Java. That allows to keep the SSL configuration for Server and Client be in single place (both use the Jetty `SslContextFactory`). It also has much richer configuration than the JDK client (it is easier to configure things such as supported cipher suites etc.).
- The `RestServer` class has been broker into 3 parts. `RestServer` contains the server it self. `RestClient` contains the HTTP client used for forwarding requests etc. and `SSLUtils` contain some helper classes for configuring SSL. One of the reasons for this was Findbugs complaining about the class complexity.
- A new method `valuesWithPrefixAllOrNothing` has been added to `AbstractConfig` to make it easier to handle the situation that we want to use either only the prefixed SSL options or only the non-prefixed. But not mixed them.
Author: Jakub Scholz <www@scholzj.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4429 from scholzj/kip-208