Prior to this patch, the consumer always blocks in poll() if there are any partitions which are awaiting their initial positions. This behavior was inconsistent with normal fetch behavior since we allow fetching on available partitions even if one or more of the assigned partitions becomes unavailable _after_ initial offset lookup. With this patch, the consumer will do offset resets asynchronously, which allows other partitions to make progress even if the initial positions for some partitions cannot be found.
I have added several new unit tests in `FetcherTest` and `KafkaConsumerTest` to verify the new behavior. One minor compatibility implication worth mentioning is apparent from the change I made in `DynamicBrokerReconfigurationTest`. Previously it was possible to assume that all partitions had a fetch position after `poll()` completed with a non-empty assignment. This assumption is no longer generally true, but you can force the positions to be updated using the `position()` API which still blocks indefinitely until a position is available.
Note that this this patch also removes the logic to cache committed offsets in `SubscriptionState` since it was no longer needed (the consumer's `committed()` API always does an offset lookup anyway). In addition to avoiding the complexity of maintaining the cache, this avoids wasteful offset lookups to refresh the cache when `commitAsync()` is used.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
* do not use static properties
* use new object to take appID
* capture timeout exception inside condition
Reviewers: Matthias J. Sax <matthias@confluent.io>
If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed.
Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket.
Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala.
Author: Graham Campbell <graham.campbell@salesforce.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Ted Yu <yuzhihong@gmail.com>
Currently `maybeAutoCommitOffsetsAsync` does not try to find the coordinator if it is unknown. As a result, asynchronous auto-commits will fail indefinitely. This patch changes the behavior to add coordinator discovery to the async auto-commit path.
Keep delegation token implementation internal without exposing implementation details to pluggable classes:
1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override.
2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future.
3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal).
Reviewers: Jun Rao <junrao@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
The `MetricNameTemplate` is changed to used a `LinkedHashSet` to maintain the same order of the tags that are passed in. This tag order is then maintained when `Metrics.toHtmlTable` generates the MBean names for each of the metrics.
The `SenderMetricsRegistry` and `FetcherMetricsRegistry` both contain templates used in the producer and consumer, respectively, and these were changed to use a `LinkedHashSet` to maintain the order of the tags.
Before this change, the generated HTML documentation might use MBean names like the following and order them:
```
kafka.connect:type=sink-task-metrics,connector={connector},partition={partition},task={task},topic={topic}
kafka.connect:type=sink-task-metrics,connector={connector},task={task}
```
However, after this change, the documentation would use the following order:
```
kafka.connect:type=sink-task-metrics,connector={connector},task={task}
kafka.connect:type=sink-task-metrics,connector={connector},task={task},topic={topic},partition={partition}
```
This is more readable as the code that is creating the templates has control over the order of the tags.
Note that JMX MBean names use ObjectName that does not maintain order of the properties (tags), so this change should have no impact on the actual JMX MBean names used in the metrics.
cc wushujames
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: James Cheng <jylcheng@yahoo.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#3985 from rhauch/kafka-5987
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#4418 from cmccabe/KAFKA-6254
Dynamic update of listeners as described in KIP-226. This includes:
- Addition of new listeners with listener-prefixed security configs
- Removal of existing listeners
- Password encryption
- sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name
Use new AdminClient for describing and altering broker configs using ConfigCommand. Broker quota configs as well as other configs will continue to be processed directly using ZooKeeper until KIP-248 is implemented.
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
Currently if users call KafkaConsumer.offsetsForTimes() with a large set of partitions. The consumer will add one topic at a time for the metadata refresh. We should add all the topics to the metadata topics and just do one metadata refresh instead.
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#4478 from becketqin/KAFKA-6849
…to check for empty connector name and illegal characters in connector name. This also fixes KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Soenke Liebau <soenke.liebau@opencore.com>
Reviewers: Gwen Shapira <cshapi@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2755 from soenkeliebau/KAFKA-4930
Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface makes
them a functional interface. This makes them Java 8 lambda compatible.
Author: Colin P. Mccabe <cmccabe@confluent.io>
Author: Steven Aerts <steven.aerts@gmail.com>
Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Xavier Léauté <xl+github@xvrl.net>, Tom Bentley <tbentley@redhat.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4033 from steven-aerts/KAFKA-6018
This PR implements the JIRA issue [KAFKA-4029: SSL support for Connect REST API](https://issues.apache.org/jira/browse/KAFKA-4029) / [KIP-208](https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface).
Summary of the main changes:
- Jetty `HttpClient` is used as HTTP client instead of the one shipped with Java. That allows to keep the SSL configuration for Server and Client be in single place (both use the Jetty `SslContextFactory`). It also has much richer configuration than the JDK client (it is easier to configure things such as supported cipher suites etc.).
- The `RestServer` class has been broker into 3 parts. `RestServer` contains the server it self. `RestClient` contains the HTTP client used for forwarding requests etc. and `SSLUtils` contain some helper classes for configuring SSL. One of the reasons for this was Findbugs complaining about the class complexity.
- A new method `valuesWithPrefixAllOrNothing` has been added to `AbstractConfig` to make it easier to handle the situation that we want to use either only the prefixed SSL options or only the non-prefixed. But not mixed them.
Author: Jakub Scholz <www@scholzj.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4429 from scholzj/kip-208
Dynamic metrics reporter updates described in KIP-226. This includes:
- Addition and removal of metrics reporters
- Reconfiguration of custom metrics reporter configs
- Tests for metrics reporter updates at default cluster-level and as per-broker config for testing
Reviewers: Jason Gustafson <jason@confluent.io>
This keeps a separate count of the number of in flight requests so that sensor threads will not need to deal with ConcurrentModfiicationException.
This would probably still be correct with volatile rather than AtomicInteger, but FindBugs flags the use of volatile as the count is incremented and decremented.
Author: Sean McCauliff <smccauliff@linkedin.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4460 from smccauliff/KAFKA-6345
The org.apache.kafka.common.utils.Base64 class defers Base64 encoding/decoding to the java.util.Base64 class beginning with JRE 1.8 but leverages javax.xml.bind.DatatypeConverter under JRE 1.7. The implementation of the encodeToString(bytes[]) method returned under JRE 1.7 by Base64.urlEncoderNoPadding() blindly removed the last two trailing characters of the Base64 encoding under the assumption that they would always be the string "==" but that is incorrect; padding can be "=", "==", or non-existent. This commit fixes that problem.
The commit also adds a Base64.urlDecoder() method that defers to java.util.Base64 under JRE 1.8+ but leverages javax.xml.bind.DatatypeConverter under JRE 1.7.
Finally, there is a unit test to confirm that encode/decode are inverses in both the Base64 and Base64URL cases.
* KAFKA-3625: Add public test utils for Kafka Streams
- add new artifact test-utils
- add TopologyTestDriver
- add MockTime, TestRecord, add TestRecordFactory
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>
In MetadataResponse deserialization, if the partition leader key is set
to -1, the leader is set to null. The MetadataResponse#toStruct code
should handle this correctly as well.
Also fix a case in KafkaApis where we were not taking into account the
possibility of the leader being null.
RequestResponseTest should test this as well.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This is something I did after my working hours, I would ask people reviewing this do the same, don't take time for this during your work hours.
I try to keep such a PR as limited as possible, for clarity of reading.
==========
Using an empty string concat in order to achieve the String representation of the value you want is bad for 2 reasons, as explained here: (https://stackoverflow.com/questions/1572708/is-conversion-to-string-using-int-value-bad-practice
Readability: it shows what you're trying to do.
Depending on your compiler, it might attempt to create your String by first creating a StringBuffer, appending your value to it and then doing .toString() on that. Which is inefficient.
Also, the Metrics.java file had an empty string being added for the sole reason that the page width forced a string to continue on a new line. Removed that.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Enable dynamic broker configuration (see KIP-226 for details). Includes
- Base implementation to allow specific broker configs and custom configs to be dynamically updated
- Extend DescribeConfigsRequest/Response to return all synonym configs and their sources in the order of precedence
- Extend AdminClient to alter dynamic broker configs
- Dynamic update of SSL keystores
Reviewers: Ted Yu <yuzhihong@gmail.com>, Jason Gustafson <jason@confluent.io>
When the coordinator is marked unknown, we explicitly disconnect its connection and cancel pending requests. Currently the disconnect happens before the coordinator state is set to null, which means that callbacks which inspect the coordinator state will see it still as active. If there are offset commit requests which need to be cancelled, each request callback will inspect the coordinator state and attempt to mark the coordinator dead again. In the worst case, if there are many pending offset commit requests that need to be cancelled, this can cause a stack overflow. To fix the problem, we need to set the coordinator to null prior to cancelling pending requests.
I have added a test case which reproduced the stack overflow with many pending offset commits. I have also added a basic test case to verify that callbacks for in-flight or unsent requests see the coordinator as unknown which prevents them from attempting to resend.
This patch also includes some minor cleanups which were noticed along the way.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
- Add capability to create delegation token
- Add authentication based on delegation token.
- Add capability to renew/expire delegation tokens.
- Add units tests and integration tests
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#3616 from omkreddy/KAFKA-4541
This is the implementation of KIP-225.
It marks the previous metrics as deprecated in the documentation and adds new metrics using tags.
Testing verifies that both the new and the old metric report the same value.
Author: cmolter <cmolter@apple.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4362 from lahabana/kafka-5890
* Implement MockAdminClient.deleteTopics
* Use MockAdminClient instead of MockKafkaAdminClientEnv in StreamsResetterTest
* Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv
* Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest
* Rename KafkaAdminClient to AdminClientUnitTestEnv in KafkaAdminClientTest.java
* Migrate StreamThreadTest to MockAdminClient
* Fix style errors
* Address review comments
* Fix MockAdminClient call
Reviewers: Matthias J. Sax <matthias@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
LogContext to have two different implementations of Logger. One will be picked based on availability of LocationAwareLogger API.
Reviewers: Jason Gustafson <jason@confluent.io>
* Return offset of next record of records left after restore completed
* Changed check for restoring partition to remove the "+1" in the guard condition
Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
We should remove the map entry from mbeans if it becomes
empty during metric removal.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Satish Duggana <satish.duggana@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Utils with static methods should not be instantiated, hence marking the classes `final` and adding a `private` constructor.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>