Refresh metadata if broker connection fails so that new calls are sent only to nodes that are alive and requests to controller are sent to the new controller if controller changes due to broker failure. Also reassign calls that could not be sent.
Reviewers: Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
Document cases where `IllegalStateException` is raised when attempting an invalid operation on an unassigned partition. Also change `position()` to raise `IllegalStateException` when called on an unassigned partition for consistency.
Add the new stricter-timeout version of `poll` proposed in KIP-266.
The pre-existing variant `poll(long timeout)` would block indefinitely for metadata
updates if they were needed, then it would issue a fetch and poll for `timeout` ms
for new records. The initial indefinite metadata block caused applications to become
stuck when the brokers became unavailable. The existence of the timeout parameter
made the indefinite block especially unintuitive.
This PR adds `poll(Duration timeout)` with the semantics:
1. iff a metadata update is needed:
1. send (asynchronous) metadata requests
2. poll for metadata responses (counts against timeout)
- if no response within timeout, **return an empty collection immediately**
2. if there is fetch data available, **return it immediately**
3. if there is no fetch request in flight, send fetch requests
4. poll for fetch responses (counts against timeout)
- if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
- if we get a response, **return the response**
The old method, `poll(long timeout)` is deprecated, but we do not change its semantics, so it remains:
1. iff a metadata update is needed:
1. send (asynchronous) metadata requests
2. poll for metadata responses *indefinitely until we get it*
2. if there is fetch data available, **return it immediately**
3. if there is no fetch request in flight, send fetch requests
4. poll for fetch responses (counts against timeout)
- if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
- if we get a response, **return the response**
One notable usage is prohibited by the new `poll`: previously, you could call `poll(0)` to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that `poll(0)` won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.
This KIP adds the following functionality related to SASL/OAUTHBEARER:
1) Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server based on the declaration of a custom login CallbackHandler implementation and have that access token transparently and automatically transmitted to a broker for authentication.
2) Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on the declaration of a custom SASL Server CallbackHandler implementation.
3) Provide implementations of the above retrieval and validation features based on an unsecured JSON Web Token that function out-of-the-box with minimal configuration required (i.e. implementations of the two types of callback handlers mentioned above will be used by default with no need to explicitly declare them).
4) Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.
AdminClient should backoff when retrying a Call. Fixed and added a unit test
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5077 from hachikuji/admin-client-retry-backoff
When any metric (e.g. per-partition metric) is created or deleted,
registerMBean() is called which in turn calls getMBeanInfo().getClassName().
However, KafkaMbean.getMBeanInfo() instantiates an array of all sensors even
though we only need the class name. This costs a lot of CPU to register
sensors when consumer with large partition assignment starts. For example, it
takes 5 minutes to start a consumer with 35k partitions. This patch reduces the
consumer startup time seconds.
Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
Reviewers: Satish Duggana <satish.duggana@gmail.com>, Dong Lin <lindong28@gmail.com>
Closes#5011 from radai-rosenblatt/fun-with-jmx
The Signal classes are not available in the compile classpath
if --release is used so we use reflection as a workaround.
As part of that moved the code to Java and added a simple
unit test.
Also disabled the signal handler if the IBM JDK is being used
due to KAFKA-6918.
Manually tested shutdown via ctrl+c and verified that
the message is printed.
Fix the check, add unit test to verify the change, update `DynamicBrokerReconfigurationTest` to avoid dynamic keystore update in tests which are not expected to update keystores.
Avoid dependence on the internal __consumer_offsets topic to handle `listConsumerGroups()` since it unnecessarily requires users to have Describe access on an internal topic. Instead we query each broker independently. For most clusters, this amounts to the same thing since the default number of partitions for __consumer_offsets is 50. This also provides better encapsulation since it avoids exposing the use of __consumer_offsets, which gives us more flexibility in the future.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5007 from hachikuji/remove-admin-use-of-offsets-topic
These constructors should be public to allow users to write test cases using them. We follow a similar pattern for the other domain objects that we expose in `AdminClient` (e.g. `TopicDescription`).
Reviewers: Ismael Juma <ismael@juma.me.uk>
If the internal metadata request fails, we must reset the state inside `AdminClientMetadataManager` or we will be stuck indefinitely in the `UPDATE_PENDING` state and have no way to fetch new metadata.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Dong Lin <lindong28@gmail.com>
Closes#5057 from hachikuji/fix-admin-client-metadata-update-failure
We no longer need them since we now require Java 8.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Andras Beni <andrasbeni@cloudera.com>, Manikumar Reddy O <manikumar.reddy@gmail.com>, Dong Lin <lindong28@gmail.com>
Closes#5049 from ijuma/remove-base64
* Set --source, --target and --release to 1.8.
* Build Scala 2.12 by default.
* Remove some conditionals in the build file now that Java 8
is the minimum version.
* Bump the version of Jetty, Jersey and Checkstyle (the newer
versions require Java 8).
* Fixed issues uncovered by the new version if Checkstyle.
* A couple of minor updates to handle an incompatible source
change in the new version of Jetty.
* Add dependency to jersey-hk2 to fix failing tests caused
by Jersey upgrade.
* Update release script to use Java 8 and to take into account
that Scala 2.12 is now built by default.
* While we're at it, bump the version of Gradle, Gradle plugins,
ScalaLogging, JMH and apache directory api.
* Minor documentation updates including the readme and upgrade
notes. A number of Streams Java 7 examples can be removed
subsequently.
test_broker_type_bounce_at_start tries to validate that when the controller is down, the streams client will always fail trying to create the topic; with the current behavior of admin client it is actually not always true: the actual behavior depends on the admin client internals as well as when the controller becomes unavailable during the leader assign partitions phase. I'd suggest at least ignore this test for now until the admin client has more stable (personally I'd even suggest removing this test as its coverage benefits is smaller than its introduced issues to me).
Also adding a few more log4j entries as a result of investigating this issue.
Reviewers: Matthias J. Sax <matthias@confluent.io>
* The consumer groups API should expose group state and coordinator information. This information is needed by administrative tools and scripts that access consume groups.
* The partition assignment will be empty when the group is rebalancing. Fix an issue where the adminclient attempted to deserialize this empty buffer.
* Remove nulls from the API and make all collections immutable.
* DescribeConsumerGroupsResult#all should return a result as expected, rather than Void
* Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. It was being filled in as "The group id The group id does not exist was not found" and similar.
Reviewers: Attila Sasvari <asasvari@apache.org>, Andras Beni <andrasbeni@cloudera.com>, Dong Lin <lindong28@gmail.com>, Jason Gustafson <jason@confluent.io>
We would like to also export the producer metrics from StreamThread just like consumer metrics, so that we could gain more visibility of stream application. The approach is to pass in the threadProducer into the StreamThread so that we could export its metrics in dynamic.
Note that this is a pure internal change that doesn't require a KIP, and in the future we also want to export admin client metrics. A followup KIP for admin client will be created once this is merged.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Also include a few clean-ups:
* Method/variable/parameter renames to make them consistent with
the class name
* Return `ApiVersion` from `minSupportedFor`
* Use `values` to remove some code duplication
* Reduce duplication in `ApiVersion` by introducing the `shortVersion`
method and building the versions map programatically
* Avoid unnecessary `regex` in `ApiVersion.apply`
* Added scaladoc to a few methods
Some of these were originally discussed in:
https://github.com/apache/kafka/pull/4583#pullrequestreview-98089400
Added a test for `ApiVersion.shortVersion`. Relying on existing tests
for the rest since there is no change in behaviour.
Reviewers: Jason Gustafson <jason@confluent.io>
Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over
In summary:
- Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
- Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch)
- If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with.
Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
When AdminClient gets a NOT_CONTROLLER error, it should refresh its metadata and retry the request, rather than making the end-user deal with NotControllerException.
Move AdminClient's metadata management outside of NetworkClient and into AdminMetadataManager. This will make it easier to do more sophisticated metadata management in the future, such as implementing a NodeProvider which fetches the leaders for topics.
Rather than manipulating newCalls directly, the AdminClient service thread now drains it directly into pendingCalls. This minimizes the amount of locking we have to do, since pendingCalls is only accessed from the service thread.
Grow buffers in log cleaner to hold one message set after sanity check even if message set is bigger than max.message.bytes.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
This patch removed a completedFetch from the completedFetches queue upon a failed parse if it contains no records. The following scenario explains why this is needed for an instance of this case – i.e. in TopicAuthorizationException.
0. Let's assume a scenario, in which the consumer is attempting to read from a topic without the necessary read permission.
1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the Fetcher#parseCompletedFetch(CompletedFetch) throws a TopicAuthorizationException (as expected).
2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up without having a chance to poll completedFetches. So, the same completedFetch remains at the completedFetches queue.
3. Upon following calls to Fetcher#fetchedRecords(), peeking the completedFetches will always return the same completedFetch independent of any updates to the ACL that the topic is trying to read from.
4. Hence, despite the creation of an ACL with correct permissions, once the consumer sees the TopicAuthorizationException, it will be unable to recover without a bounce.
Author: Adem Efe Gencer <agencer@linkedin.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4974 from efeg/fix/parseCompletedFetchRemainsInQueue
Switch from sum operations to subtraction to avoid type casting in checks and type overflow during `FlieLogInputStream` work, especially in cases where property `log.segment.bytes` was set close to the `Integer.MAX_VALUE` and used as a `position` inside `nextBatch()` function.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Log cleaner grows buffers when result.messagesRead is zero. This contains the number of filtered messages read from source which can be zero when transactions are used because batches may be discarded. Log cleaner incorrectly assumes that messages were not read because the buffer was too small and attempts to double the buffer size unnecessarily, failing with an exception if the buffer is already max.message.bytes. Additional check for discarded batches has been added to avoid growing buffers when batches are discarded.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's behavior to retry after this error. While this is a rare case since the user would not commit offsets for topics unless they had been able to fetch from them, but this doesn't really handle the situation where the broker hasn't received any metadata updates.
Reviewers: Jason Gustafson <jason@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Prevent exception thrown by metric reporters to impact request processing and other reporters.
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
The current Iterator-based ListConsumerGroups API is synchronous. The API should be asynchronous to fit in with the other AdminClient APIs. Also fix some error handling corner cases.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
General cleanup of Streams code, mostly resolving compiler warnings and re-formatting.
The regular testing suite should be sufficient.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Quota tests wait for throttle metric to be updated without waiting for requests to complete to avoid waiting for potentially large throttle times. This requires the test to read metric values while a broker may be updating the value, resulting in exception in the test. Since this issue can also occur with JMX metrics reporter, change synchronization on metrics with sensors to use the sensor as lock.
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
SimpleBenchmark:
1.a Do not rely on manual num.records / bytes collection on atomic integers.
1.b Rely on config files for num.threads, bootstrap.servers, etc.
1.c Add parameters for key skewness and value size.
1.d Refactor the tests for loading phase, adding tumbling-windowed count.
1.e For consumer / consumeproduce, collect metrics on consumer instead.
1.f Force stop the test after 3 minutes, this is based on empirical numbers of 10M records.
Other tests: use config for kafka bootstrap servers.
streams_simple_benchmark.py: only use scale 1 for system test, remove yahoo from benchmark tests.
Note that the JMX based metrics is more accurate than the manually collected metrics.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Refactored the return types in consumer group APIs the following way:
```
Map<TopicPartition, KafkaFuture<Void>> DeleteConsumerGroupsResult#deletedGroups()
Map<TopicPartition, KafkaFuture<ConsumerGroupDescription>> DescribeConsumerGroupsResult#describedGroups()
KafkaFuture<Collection<ConsumerGroupListing>> ListConsumerGroupsResult#listings()
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> ListConsumerGroupOffsetsResult#partitionsToOffsetAndMetadata()
```
* For DeleteConsumerGroupsResult and DescribeConsumerGroupsResult, for each group id we have two round-trips to get the coordinator, and then send the delete / describe request; I leave the potential optimization of batching requests for future work.
* For ListConsumerGroupOffsetsResult, it is a simple single round-trip and hence the whole map is wrapped as a Future.
* ListConsumerGroupsResult, it is the most tricky one: we would only know how many futures we should wait for after the first listNode returns, and hence I constructed the flattened future in the middle wrapped with the underlying map of futures; also added an iterator API to compensate the "fail the whole future if any broker returns error" behavior. The iterator future will throw exception on the failing brokers, while return the consumer for other succeeded brokers.
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jason Gustafson <jason@confluent.io>
This change makes adding a metric to a sensor idempotent.
That is, if the metric is already added to the sensor, the method
returns with success.
The current behavior is that any attempt to register a second metric
with the same name is an error.
Testing strategy: There is a new unit test covering this behavior
Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>