This patch adds a framework to automatically generate the request/response classes for Kafka's protocol. The code will be updated to use the generated classes in follow-up patches. Below is a brief summary of the included components:
**buildSrc/src**
The message generator code is here. This code is automatically re-run by gradle when one of the schema files changes. The entire directory is processed at once to minimize the number of times we have to start a new JVM. We use Jackson to translate the JSON files into Java objects.
**clients/src/main/java/org/apache/kafka/common/protocol/Message.java**
This is the interface implemented by all automatically generated messages.
**clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java**
Some utility functions used by the generated message code.
**clients/src/main/java/org/apache/kafka/common/protocol/Readable.java, Writable.java, ByteBufferAccessor.java**
The generated message code uses these classes for writing to a buffer.
**clients/src/main/message/README.md**
This README file explains how the JSON schemas work.
**clients/src/main/message/\*.json**
The JSON files in this directory implement every supported version of every Kafka API. The unit tests automatically validate that the generated schemas match the hand-written schemas in our code. Additionally, there are some things like request and response headers that have schemas here.
**clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java**
I added an optimization here for empty sets. This is useful here because I want all messages to start with empty sets by default prior to being loaded with data. This is similar to the "empty list" optimizations in the `java.util.ArrayList` class.
Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>, Bob Barrett <bob.barrett@outlook.com>, Jason Gustafson <jason@confluent.io>
* Update KafkaAdminClient#describeTopics to throw UnknownTopicOrPartitionException.
* Remove unused method: WorkerUtils#getMatchingTopicPartitions.
* Add some JavaDoc.
Reviewed-by: Colin P. McCabe <cmccabe@apache.org>, Ryanne Dolan <ryannedolan@gmail.com>
There is a race condition in ReplicaFetcherThread, where we can update PartitionFetchState with the new leader epoch (same leader) before handling the OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error which causes removing partition from partitionStates, which in turn causes no fetching until the next LeaderAndIsr.
This patch adds logic to ensure that the leader epoch doesn't change while an OffsetsForLeaderEpoch request is in flight (which could happen with back-to-back leader elections). If it has changed, we ignore the response.
Also added toString() implementation to PartitionData, because some log messages did not show useful info which I found while investigating the above system test failure.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Use `info` for failed authentications and `debug` for successful ones.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
This patch changes the behavior of KafkaProducer.waitOnMetadata to wait up to max.block.ms when the partition specified in the produce request is out of the range of partitions present in the metadata. This improves the user experience in the case when partitions are added to a topic and a client attempts to produce to one of the new partitions before the metadata has propagated to the brokers. Tested with unit tests.
Reviewers: Arjun Satish <arjun@confluent.io>, Jason Gustafson <jason@confluent.io>
Lookup client host name after every full iteration through the addresses returned.
Reviewers: Loïc Monney <loicmonney@github.com>, Edoardo Comar <ecomar@uk.ibm.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
1. The retry loop of the InternalTopicManager would just be: a) describe topics, and exclude those which already exist with the right num.partitions, b) for the remaining topics, try to create them. Remove any inner loops.
2. In CreateTopicResponse and MetadataResponse (for describe topic), handle the special error code of TopicExist and UnknownTopicOrPartition in order to retry in the next loop.
3. Do not handle TimeoutException since it should already been handled inside AdminClient.
Add corresponding unit tests for a) topic marked for deletion but not complete yet, in which case metadata response would not contain this topic, but create topic would return error TopicExists; b) request keep getting timed out.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This pull request replaces HashMap with LinkedHashMap to guarantee ordering of metrics tags.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <guozhang@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>
Check `running` in `Sender.maybeWaitForProducerId` to ensure that the producer can be closed while awaiting initialization of the producerId.
Reviewers: Jason Gustafson <jason@confluent.io>
Replace `channel` by `fileRecords` in potentially thrown KafkaException
descriptions when loading/writing `FileChannelRecordBatch`. This makes exception
messages more readable (channel only shows an object hashcode, fileRecords shows
the path of the file being read and start/end positions in the file).
Reviewers: Jason Gustafson <jason@confluent.io>
After a recent leader election, the leaders high-water mark might lag behind the offset at the beginning of the new epoch (as well as the previous leader's HW). This can lead to offsets going backwards from a client perspective, which is confusing and leads to strange behavior in some clients.
This change causes Partition#fetchOffsetForTimestamp to throw an exception to indicate the offsets are not yet available from the leader. For new clients, a new OFFSET_NOT_AVAILABLE error is added. For existing clients, a LEADER_NOT_AVAILABLE is thrown.
This is an implementation of [KIP-207](https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change).
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Dhruvil Shah <dhruvil@confluent.io>, Jason Gustafson <jason@confluent.io>
Ensure that channel and selection keys are removed from `Selector` collections before propagating connect exceptions. They are currently cleared on the next `poll()`, but we can't ensure that callers (NetworkClient for example) wont try to connect again before the next `poll` and hence we should clear the collections before re-throwing exceptions from `connect()`.
Reviewers: Jason Gustafson <jason@confluent.io>
The example in the producer's javadoc contained an inconsistent value for `delivery.timeout.ms`. This patch removes the inconsistent config and several unnecessary overrides in order to simplify the example.
Reviewers: huxi <huxi_2b@hotmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Older versions of the Produce API should return an error if zstd is used. This validation existed, but it was done during request parsing, which means that instead of returning an error code, the broker disconnected. This patch fixes the issue by moving the validation outside of the parsing logic. It also fixes several other record validations which had the same problem.
Reviewers: Jason Gustafson <jason@confluent.io>
This attempts to address the flaky test `SaslAuthenticatorTest.testCannotReauthenticateWithDifferentPrincipal()`
I was not able to reproduce locally even after 150 test runs in a loop, but given the error message:
```
org.junit.ComparisonFailure: expected:
<[6QBJiMZ6o5AqbNAjDTDjWtQSa4alfuUWsYKIy2tt7dz5heDaWZlz21yr8Gl4uEJkQABQXeEL0UebdpufDb5k8SvReSK6wYwQ9huP-9]> but was:<[????����????OAUTHBEARER]>
```
`????����????` seems to mean invalid UTF-8.
We now specify the charset when writing out and reading in bytes.
Reviewers: Ismael Juma <ismael@juma.me.uk>
A heap dump provided by Patrik Kleindl in https://issues.apache.org/jira/browse/KAFKA-7660 identifies the childrenSensors map in Metrics as keeping references to sensors alive after they have been removed.
This PR fixes it and adds a test to be sure.
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
While metrics like Min, Avg and Max make sense to respective use Double.MAX_VALUE, 0.0 and Double.MIN_VALUE as default values to ease computation logic, exposing those values makes reading them a bit misleading. For instance, how would you differentiate whether your -avg metric has a value of 0 because it was given samples of 0 or no samples were fed to it?
It makes sense to standardize on the output of these metrics with something that clearly denotes that no values have been recorded.
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…eturned by poll() if there are any records to return
The MockConsumer behaves unlike the real consumer in that it can return a non-empty ConsumerRecords from poll, that also has a count of 0. This change makes the MockConsumer only add partitions to the ConsumerRecords if there are records to return for those partitions.
A unit test in MockConsumerTest demonstrates the issue.
Author: Stig Rohde Døssing <stigdoessing@gmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#5901 from srdo/KAFKA-7616
Improve the default group id behavior by:
* changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed
* deprecating the use of empty (`""`) consumer group on the client
Reviewers: Jason Gustafson <jason@confluent.io>
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
- Use Xlint:all with 3 exclusions (filed KAFKA-7613 to remove the exclusions)
- Use the same javac options when compiling tests (seems accidental that
we didn't do this before)
- Replaced several deprecated method calls with non-deprecated ones:
- `KafkaConsumer.poll(long)` and `KafkaConsumer.close(long)`
- `Class.newInstance` and `new Integer/Long` (deprecated since Java 9)
- `scala.Console` (deprecated in Scala 2.11)
- `PartitionData` taking a timestamp (one of them seemingly a bug)
- `JsonMappingException` single parameter constructor
- Fix unnecessary usage of raw types in several places.
- Add @SuppressWarnings for deprecations, unchecked and switch fallthrough in
several places.
- Scala clean-ups (var -> val, ETA expansion warnings, avoid reflective calls)
- Use lambdas to simplify code in a few places
- Add @SafeVarargs, fix varargs usage and remove unnecessary `Utils.mkList` method
Reviewers: Matthias J. Sax <mjsax@apache.org>, Manikumar Reddy <manikumar.reddy@gmail.com>, Randall Hauch <rhauch@gmail.com>, Bill Bejeck <bill@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
The problem is the concurrent metadata updates in the foreground and in the heartbeat thread. Changed the code to use ConsumerNetworkClient.poll, which enforces mutual exclusion when accessing the underlying client.
As part of KIP-320, the ListOffsets API should return the leader epoch of any fetched offset. We either get this epoch from the log itself for a timestamp query or from the epoch cache if we are searching the earliest or latest offset in the log. When handling queries for the latest offset, we have elected to choose the current leader epoch, which is consistent with other handling (e.g. OffsetsForTimes).
Reviewers: Jun Rao <junrao@gmail.com>
This patch makes two improvements to internal metadata handling logic and testing:
1. It reduce dependence on the public object `Cluster` for internal metadata propagation since it is not easy to evolve. As an example, we need to propagate leader epochs from the metadata response to `Metadata`, but it is not straightforward to do this without exposing it in `PartitionInfo` since that is what `Cluster` uses internally. By doing this change, we are able to remove some redundant `Cluster` building logic.
2. We want to make the metadata handling in `MockClient` simpler and more consistent. Currently we have mix of metadata update mechanisms which are internally inconsistent with each other and do not match the implementation in `NetworkClient`.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
KIP-368 implementation to enable periodic re-authentication of SASL clients. Also adds a broker configuration option to terminate client connections that do not re-authenticate within the configured interval.
Just a doc change
Author: John Eismeier <john.eismeier@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4573 from jeis2497052/trunk
Decrease the lower bound for expected available memory, as thread
scheduling entails that a variable amount of deallocation happens by
the point of assertion.
Also make minor clarifications to test logic and comments.
The passing rate improved from 98% to 100% locally after these
changes (100+ runs).
Reviewers: Ismael Juma <ismael@juma.me.uk>
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Reduce tick interval of the mock timer and avoid large timer increments to avoid hitting idle expiry on the client-side before delayed close is processed by the server. Also reduce poll interval in the server to make the test complete faster (since delayed close is only processed when poll returns).
Reviewers: Ismael Juma <ismael@juma.me.uk>
Add CreatePartitionsRequest.PartitionDetails similar to CreateTopicsRequest.TopicDetails to avoid references from `common.requests` package to `clients`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Update `ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup` test to work in environments where ipv6 is not enabled and `InetAddress.getAllByName` doesn't return ipv6 addresses.
Reviewers: Ismael Juma <ismael@juma.me.uk>