The latter return `Iterable` instead of `Iterator` so that enhanced foreach can be used
in Java.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2261 from ijuma/deepEntries-shallowEntries
Tasks that don't have any `StateStore`s wont have a `StandbyTask`, so `createStandbyTask` can return `null`. We need to check for this in `StandbyTaskCreator.createTask(...)`
Also, the checkpointed offsets for `StandbyTask`s are never loaded.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes#2255 from dguy/kafka-4539
Fix OffsetIndex overflow when replicating a highly compacted topic.
https://issues.apache.org/jira/browse/KAFKA-4451
Author: Michael Schiff <schiff.michael@gmail.com>
Author: Michael Schiff <michael.schiff@tubemogul.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#2210 from michaelschiff/bug/4451
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#2140 from hachikuji/KAFKA4390
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#2193 from enothereska/KAFKA-4405-prefetch
Improve consumer metric collection by collecting and recording metrics per topic.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#1684 from vahidhashemian/KAFKA-4000
Collecting socket server metrics during shutdown may throw NullPointerException
Author: Xavier Léauté <xavier@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#2221 from xvrl/fix-metrics-npe-on-shutdown
Also:
* Make all implementations of `Time` thread-safe as they are accessed from multiple threads in some cases.
* Change default implementation of `MockTime` to use two separate variables for `nanoTime` and `currentTimeMillis` as they have different `origins`.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Shikhar Bhushan <shikhar@confluent.io>, Jason Gustafson <jason@confluent.io>, Eno Thereska <eno.thereska@gmail.com>, Damian Guy <damian.guy@gmail.com>
Closes#2095 from ijuma/kafka-2247-consolidate-time-interfaces
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
Closes#2190 from hachikuji/KAFKA-4469
Author: Antony Stubbs <antony.stubbs@gmail.com>
Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2157 from astubbs/trunk
Process requests received from channels before they were closed. For consumers, wait for coordinator requests to complete before returning from close.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1836 from rajinisivaram/KAFKA-3703
Author: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#1827 from mimaison/KAFKA-4081
[KAFKA-4284](https://issues.apache.org/jira/browse/KAFKA-4284)
Even though Partitioner has a close method it is not closed when the producer is closed. Serializers, interceptors and metrics are all closed, so partitioners should be closed to.
To be able to use the same mechanism to close the partitioner as the serializers, etc. I had to make the `Partitioner` interface extend `Closeable`. Since this doesn't change the interface that feels ok and should be backwards compatible.
Looking at [KAFKA-2091](https://issues.apache.org/jira/browse/KAFKA-2091) (d6c45c70fb) that introduced the `Partitioner` interface it looks like the intention was that the producer should close the partitioner.
This contribution is my original work and I license the work to the project under the project's open source license.
Author: Theo <theo@iconara.net>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2000 from iconara/kafka-4284
I think the Javadoc should describe what happens if wakeup is called and no other thread is currently blocking. This may be important in some cases, e.g. trying to shut down a poll thread, followed by manually committing offsets.
Author: Stig Rohde Døssing <sdo@it-minds.dk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2093 from srdo/minor-expand-wakeup-javadoc
Fixes a bug that inappropriately applies backoff as interval between metadata updates even though the current one is outdated.
Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#1707 from kawamuray/KAFKA-4024-metadata-backoff
1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.
2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.
3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.
4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.
5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.
6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.
7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.
8. Remove all unnecessary member variables in `StreamPartitionAssignor`.
9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Xavier Léauté, Matthias J. Sax, Eno Thereska, Jason Gustafson
Closes#2012 from guozhangwang/K4117-stream-partitionassignro-cleanup
There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.
The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#1985 from ijuma/remove-unused
Increase timeout in test to avoid transient failures due to long GC or slow machine.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#2057 from rajinisivaram/KAFKA-2089
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#2031 from hachikuji/KAFKA-4303
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#1995 from kkonstantine/KAFKA-4254-Update-producers-metadata-before-failing-on-non-existent-partition
And improve readability by adding proper punctuations.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#2002 from vahidhashemian/doc/fix_typos
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#1993 from becketqin/KAFKA-4274
Reopening of https://github.com/apache/kafka/pull/1428
Author: Edoardo Comar <ecomar@uk.ibm.com>
Author: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#1908 from edoardocomar/KAFKA-3396
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1936 from hachikuji/KAFKA-3824
Temporarily disable the offset commit (when auto commit is enabled) in the new consumer's `unsubscribe()` method towards a workaround for the issue reported in [KAFKA-3491](https://issues.apache.org/jira/browse/KAFKA-3491).
For now, a call to `unsubscribe()` can be made to reset the offsets in case processing the batch received from the most recent `poll()` is interrupted (due to some exception).
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#1944 from vahidhashemian/KAFKA-4234
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1938 from mjsax/kafka-4058-resetIntegartionTest
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Joel Koshy <jjkoshy.w@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#1939 from becketqin/KAFKA-4235
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#1897 from becketqin/KAFKA-4194
Technically this does not strictly adhere to RFC-952 however it is valid for domain names, urls and uris so we should loosen the requirements a tad.
Author: Ryan Pridgeon <ryan.n.pridgeon@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#1856 from rnpridgeon/KAFKA-3719
Author: Jiangjie Qin <becket.qin@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Closes#1852 from becketqin/KAFKA-4148
When the consumer is not subscribed to any topic or, in the case of manual assignment, is not assigned any partition, calling `poll()` should raise an exception.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#1839 from vahidhashemian/KAFKA-4135
Implementation and tests for secure quotas at <user> and <user, client-id> levels as described in KIP-55. Also adds dynamic default quotas for <client-id>, <user> and <user-client-id>. For each client connection, the most specific quota matching the connection is used, with user quota taking precedence over client-id quota.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#1753 from rajinisivaram/KAFKA-3492
This PR implements KIP-78:Cluster Identifiers [(link)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id#KIP-78:ClusterId-Overview) and includes the following changes:
1. Changes to broker code
- generate cluster id and store it in Zookeeper
- update protocol to add cluster id to metadata request and response
- add ClusterResourceListener interface, ClusterResource class and ClusterMetadataListeners utility class
- send ClusterResource events to the metric reporters
2. Changes to client code
- update Cluster and Metadata code to support cluster id
- update clients for sending ClusterResource events to interceptors, (de)serializers and metric reporters
3. Integration tests for interceptors, (de)serializers and metric reporters for clients and for protocol changes and metric reporters for broker.
4. System tests for upgrading from previous versions.
Author: Sumit Arrawatia <sumit.arrawatia@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#1830 from arrawatia/kip-78
This applies to Replication Quotas
based on KIP-73 [(link)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas) originally motivated by KAFKA-1464.
System Tests Run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/544/
**This first PR demonstrates the approach**.
**_Overview of Change_**
The guts of this change are relatively small. Throttling occurs on both leader and follower sides. A single class tracks the throttled throughput in and out of each broker (**_ReplicationQuotaManager_**).
On the follower side, the Follower Throttled Rate is calculated as fetch responses arrive. Then, before the next fetch request is sent, we check to see if the quota is violated, removing throttled partitions from the request if it is. This is all encapsulated in a few lines of code in the **_ReplicaFetcherThread_**. There is existing code to handle temporal back off, if the request ends up being empty.
On the leader side it's a little more complex. When a fetch request arrives in the leader, it is built, partition by partition, in **_ReplicaManager.readFromLocalLog_**. As we put each partition into the fetch response, we check if the total size fits in the current quota. If the quota is exceeded, the partition will not be added to the fetch response. Importantly, we don't increase the quota at this point, we just check to see if the bytes will fit.
Now, if there aren't enough bytes to send the response immediately, which is common if we're catching up and throttled, then the request will be put in purgatory. I've added some simple code to **_DelayedFetch_** to handle throttled partitions (throttled partitions are checked against the quota, rather than the messages available in the log).
When the delayed fetch completes, and exits purgatory, _**ReplicaManager.readFromLocalLog**_ will be called again. This is why _**ReplicaManager.readFromLocalLog**_ does not actually increase the quota, it just checks whether enough bytes are available for a partition.
Finally, when there are enough bytes to be sent, or the delayed fetch times out, the response will be sent. Before it is sent the throttled-outbound-rate is increased, based on the size of throttled partitions being sent. This is at the end of _**KafkaApis.handleFetchRequest**_, exactly where client quotas are recorded.
There is an acceptance test which asserts the whole throttling process stabilises on the desired value. This covers a number of use cases including many-to-many replication. See **_ReplicationQuotaTest_**.
Note:
It should be noted that this protocol can over-request. The request is built, based on the quota at time t1 (_ReplicaManager.readFromLocalLog_). The bytes in the response are recorded at time t2 (end of _KafkaApis.handleFetchRequest_), where t2 > t1. For this reason I originally included an OverRequestedRate as a JMX metric, but testing has not seen revealed any obvious issue. Over-requesting is quickly compensated by subsequent requests, stabilising close to the quota value.
_**Main stuff left to do:**_
- The fetch size is currently unbounded. This will be addressed in KIP-74, but we need to ensure this ensures requests don’t go beyond the throttle window.
- There are two failures showing up in the system tests on this branch: StreamsSmokeTest.test_streams (which looks like it fails regularly) and OffsetValidationTest.test_broker_rolling_bounce (which I need to look into)
_**Stuff left to do that could be deferred:**_
- Add the extra metrics specified in the KIP.
- There are no system tests.
- There is no validation for the cluster size / throttle combination that could lead to ISR dropouts
Author: Ben Stopford <benstopford@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>
Closes#1776 from benstopford/rep-quotas-v2