This patch reverts the removal of the --execute option in the offset reset tool and the change to the default behavior when no options were present. For consistency, this patch adds the --execute flag to the streams reset tool, but keeps its current default behavior. A note has been added to both of these commands to warn the user that future default behavior will be to prompt before acting.
Test cases were not actually validating that offsets were committed when the --execute option was present, so I have fixed that and added basic assertions for the dry-run behavior. I also removed some duplicated test boilerplate.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Update `KafkaController.brokerInfo` when listeners are updated since this value is used to register the broker in ZooKeeper if there ZK session expires. Also added test to verify values in ZK after session expiry.
This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2.
I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4583 from hachikuji/KAFKA-6328-REOPENED
Add validation checks that the offset range is valid and aligned with the batch count prior to appending to the log. Several unit tests have been added to verify the various invalid cases.
The log cleaner should not naively remove the partition from in progress map without checking the partition state. This may cause the other thread calling `LogCleanerManager.abortAndPauseCleaning()` to hang indefinitely.
Enable deep-iteration option when print-data-log is enabled in DumpLogSegments. Otherwise data is not printed.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
ZooKeeperClient acquires initializationLock#writeLock to establish a new connection while processing session expiry WatchEvent. ZooKeeperClient#handleRequests acquires initializationLock#readLock, allowing multiple batches of requests to be processed concurrently, but preventing reconnections while processing requests. At the moment, handleRequests holds onto the readLock throughout the method, even while waiting for responses and inflight requests to complete. But responses cannot be delivered if event thread is blocked on the writeLock to process session expiry event. This results in a deadlock. During broker shutdown, the shutdown thread is also blocked since it needs the readLock to perform ZooKeeperClient#unregisterStateChangeHandler, which cannot be acquired if a session expiry had occurred earlier since this thread gets queued behind the event handler thread waiting for writeLock.
This commit reduces locking in ZooKeeperClient#handleRequests to just the non-blocking send, so that session expiry handling doesn't get blocked when a send is blocked waiting for responses. Also moves session expiry handling to a separate thread so that Kafka controller doesn't block the event handler thread when processing session expiry.
Prior to this patch, the consumer always blocks in poll() if there are any partitions which are awaiting their initial positions. This behavior was inconsistent with normal fetch behavior since we allow fetching on available partitions even if one or more of the assigned partitions becomes unavailable _after_ initial offset lookup. With this patch, the consumer will do offset resets asynchronously, which allows other partitions to make progress even if the initial positions for some partitions cannot be found.
I have added several new unit tests in `FetcherTest` and `KafkaConsumerTest` to verify the new behavior. One minor compatibility implication worth mentioning is apparent from the change I made in `DynamicBrokerReconfigurationTest`. Previously it was possible to assume that all partitions had a fetch position after `poll()` completed with a non-empty assignment. This assumption is no longer generally true, but you can force the positions to be updated using the `position()` API which still blocks indefinitely until a position is available.
Note that this this patch also removes the logic to cache committed offsets in `SubscriptionState` since it was no longer needed (the consumer's `committed()` API always does an offset lookup anyway). In addition to avoiding the complexity of maintaining the cache, this avoids wasteful offset lookups to refresh the cache when `commitAsync()` is used.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed.
Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket.
Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala.
Author: Graham Campbell <graham.campbell@salesforce.com>
Reviewers: Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Ted Yu <yuzhihong@gmail.com>
1. Handle listener-not-found in MetadataCache since this can occur when listeners are being updated. To avoid breaking clients, this is handled in the same way as broker-not-available so that clients may retry.
2. Set retries=1000 for listener reconfiguration tests to avoid transient failures when metadata cache has not been updated
3. Remove IdlePercent metric when Processor is deleted, add test
4. Reduce log segment size used during reconfiguration to avoid timeout while waiting for log rolling
5.Test markPartitionsForTruncation after fetcher thread resize
6. Move per-processor ResponseQueueSize metric back to RequestChannel.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
Add locking to access AbstractFetcherThread#partitionStates during dynamic thread update. Also make testing of thread updates that trigger retries more resilient.
Reviewers: Jason Gustafson <jason@confluent.io>
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#4418 from cmccabe/KAFKA-6254
Dynamic update of listeners as described in KIP-226. This includes:
- Addition of new listeners with listener-prefixed security configs
- Removal of existing listeners
- Password encryption
- sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name
Use new AdminClient for describing and altering broker configs using ConfigCommand. Broker quota configs as well as other configs will continue to be processed directly using ZooKeeper until KIP-248 is implemented.
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch ensures that truncation to an empty segment forces resizing of the index file in order to prevent premature rolling.
I have added unit tests which verify that appends are permitted following truncation to an empty segment. Without the fix, this test case reproduces the failure in which the rolled segment matches the current active segment.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4498 from hachikuji/KAFKA-6492
Remove caching of brokerId in DynamicBrokerConfig constructor and delay initialization until brokerId is set in KafkaConfig.
Reviewers: Jason Gustafson <jason@confluent.io>
Dynamic metrics reporter updates described in KIP-226. This includes:
- Addition and removal of metrics reporters
- Reconfiguration of custom metrics reporter configs
- Tests for metrics reporter updates at default cluster-level and as per-broker config for testing
Reviewers: Jason Gustafson <jason@confluent.io>
Dynamic resize of broker thread pools as described in KIP-226:
- num.network.threads
- num.io.threads
- num.replica.fetchers
- num.recovery.threads.per.data.dir
- background.threads
Reviewers: Jason Gustafson <jason@confluent.io>
The cause for compilation error in JDK 9.0 was an ambiguity issue in scalac:
```
both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit
and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit
match argument types (java.util.Properties)
newProps.putAll(props)
```
Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#4482 from ConcurrencyPractitioner/trunk
- Use newly added pause method in LogCleaner and ControllerChannelManager classes
- Remove LogCleaner, Cleaner exclusions from findbugs-exclude.xml
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
With this patch, simple consumer groups which only use Kafka for offset storage will be viewable using the `--list` option in consumer-groups.sh. In addition, this patch fixes a bug in the offset loading logic which caused us to lose the protocol type of empty groups on coordinator failover. I also did some cleanup of the various consumer group command test cases.
For testing, I have added new integration tests which cover listing and describing simple consumer groups. I also added unit tests to cover loading empty groups with assertions on the protocol type.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Enable dynamic broker configuration (see KIP-226 for details). Includes
- Base implementation to allow specific broker configs and custom configs to be dynamically updated
- Extend DescribeConfigsRequest/Response to return all synonym configs and their sources in the order of precedence
- Extend AdminClient to alter dynamic broker configs
- Dynamic update of SSL keystores
Reviewers: Ted Yu <yuzhihong@gmail.com>, Jason Gustafson <jason@confluent.io>
add check to KafkaApis, add unit test specific to follower fetch
developed with @mimaison
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
- Add capability to create delegation token
- Add authentication based on delegation token.
- Add capability to renew/expire delegation tokens.
- Add units tests and integration tests
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#3616 from omkreddy/KAFKA-4541
This is the implementation of KIP-225.
It marks the previous metrics as deprecated in the documentation and adds new metrics using tags.
Testing verifies that both the new and the old metric report the same value.
Author: cmolter <cmolter@apple.com>
Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>
Closes#4362 from lahabana/kafka-5890
- Add missing locking/volatile in MetadataCache.aliveEndPoint
- Fix topic metadata not to throw BrokerNotAvailableException
when listeners are inconsistent. Add test verifying the fix. As
part of this fix, renamed Broker methods to follow Map
convention where the `get` version returns `Option`.
Reviewers: Jason Gustafson <jason@confluent.io>
- Fix zk session state and session change rate metric names: type
should be SessionExpireListener instead of KafkaHealthCheck. Test
verifying the fix was included.
- Handle missing controller in controlled shutdown in the same way as if
the broker is not registered (i.e. retry after backoff).
- Restructure BrokerInfo to reduce duplication. It now contains a
Broker instance and the JSON serde is done in BrokerIdZNode
since `Broker` does not contain all the fields.
- Remove dead code from `ZooKeeperClient.initialize` and remove
redundant `close` calls.
- Move ACL handling and persistent paths definition from ZkUtils to
ZkData (and call ZkData from ZkUtils).
- Remove ZooKeeperClientWrapper and ZooKeeperClientMetrics from
ZkUtils (avoids metrics clash if third party users create a ZkUtils
instance in the same process as the broker).
- Introduce factory method in KafkaZkClient that creates
ZooKeeperClient and remove metric name defaults from
ZooKeeperClient.
- Fix a few instances where ZooKeeperClient was not closed in tests.
- Update a few TestUtils methods to use KafkaZkClient instead of
ZkUtils.
- Add test verifying SessionState metric.
- Various clean-ups.
Testing: mostly relying on existing tests, but added a couple
of new tests as mentioned above.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jun Rao <junrao@gmail.com>
Closes#4359 from ijuma/kafka-6320-kafka-health-zk-metrics-follow-up
Updates:
- Gradle, gradle plugins and maven artifact updated
- Bug fix updates for ZooKeeper, Jackson, EasyMock and Snappy
Not updated:
- RocksDB as it often causes issues, so better done separately
- args4j as our test coverage is weak and the update was a
feature release
Also fixed scala-reflect version to match scala-library.
Release notes for ZooKeeper 3.4.11:
https://zookeeper.apache.org/doc/r3.4.11/releasenotes.html
A notable fix is improved handling of UnknownHostException:
https://issues.apache.org/jira/browse/ZOOKEEPER-2614
Manually tested that IntelliJ import and build still works.
Relying on existing test suite otherwise.
Reviewers: Jun Rao <junrao@gmail.com>
* Moved metrics in KafkaHealthCheck to ZookeeperClient.
* Converted remaining ZkUtils usage in KafkaServer to ZookeeperClient and removed ZkUtils from KafkaServer.
* Made the re-creation of ZooKeeper during ZK session expiration with infinite retries.
* Added unit tests for all new methods in KafkaZkClient.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
* Use KafkaZkClient in ReassignPartitionsCommand
* Use KafkaZkClient in PreferredReplicaLeaderElectionCommand
* Updated test classes to use new methods
* All existing tests should pass
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#4260 from omkreddy/KAFKA-5647-ADMINCOMMANDS
(WIP: this commit isn't ready to be reviewed yet. I was checking the travis-ci build with the configuration changes in my account and opened the PR prematurely against trunk. I will make it consistent with Contribution guidelines once it's well tested.)
https://issues.apache.org/jira/browse/KAFKA-5473
Design:
`zookeeper.connection.retry.timeout.ms` => this determines how long to wait before triggering the shutdown. The default is 60000ms.
Currently the implementation only handles the `handleSessionEstablishmentError` by waiting for the sessionTimeout.
Author: Prasanna Gautam <prasannagautam@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#3990 from prasincs/KAFKA-5473
The `--describe` option of ConsumerGroupCommand is expanded, as proposed in [KIP-175](https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A+Additional+%27--describe%27+views+for+ConsumerGroupCommand), to support:
* `--describe` or `--describe --offsets`: listing of current group offsets
* `--describe --members` or `--describe --members --verbose`: listing of group members
* `--describe --state`: group status
Example: With a single partition topic `test1` and a double partition topic `test2`, consumers `consumer1` and `consumer11` subscribed to `test`, consumers `consumer2` and `consumer22` and `consumer222` subscribed to `test2`, and all consumers belonging to group `test-group`, this is an output example of the new options above for `test-group`:
```
--describe, or --describe --offsets:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test2 0 0 0 0 consumer2-bad9496d-0889-47ab-98ff-af17d9460382 /127.0.0.1 consumer2
test2 1 0 0 0 consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1 consumer22
test1 0 0 0 0 consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf /127.0.0.1 consumer1
```
```
--describe --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer2-bad9496d-0889-47ab-98ff-af17d9460382 /127.0.0.1 consumer2 1
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1 consumer222 0
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760 /127.0.0.1 consumer11 0
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1 consumer22 1
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf /127.0.0.1 consumer1 1
```
```
--describe --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer2-bad9496d-0889-47ab-98ff-af17d9460382 /127.0.0.1 consumer2 1 test2(0)
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1 consumer222 0 -
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760 /127.0.0.1 consumer11 0 -
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1 consumer22 1 test2(1)
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf /127.0.0.1 consumer1 1 test1(0)
```
```
--describe --state
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
localhost:9092 (0) range Stable 5
```
Note that this PR also addresses the issue reported in [KAFKA-6158](https://issues.apache.org/jira/browse/KAFKA-6158) by dynamically setting the width of columns `TOPIC`, `CONSUMER-ID`, `HOST`, `CLIENT-ID` and `COORDINATOR (ID)`. This avoid truncation of column values when they go over the current fixed width of these columns.
The code has been restructured to better support testing of individual values and also the console output. Unit tests have been updated and extended to take advantage of this restructuring.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#4271 from vahidhashemian/KAFKA-5526
- Rename `encode` to `legacyEncodeAsString`, we
can remove this when we remove `ZkUtils`.
- Introduce `encodeAsString` that uses Jackson.
- Change `encodeAsBytes` to use Jackson.
- Avoid intermediate string when converting
Broker to json bytes.
The methods that use Jackson only support
Java collections unlike `legacyEncodeAsString`.
Tests were added `encodeAsString` and
`encodeAsBytes`.
Author: umesh chaudhary <umesh9794@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4259 from umesh9794/KAFKA-5631
This is required for ACLs where SSL principals contain
special characters (e.g. comma) that are escaped using
backslash. The strings need to be quoted for JSON to
ensure that the JSON stored in ZK is valid.
Also converted `SslEndToEndAuthorizationTest` to use a
principal with special characters to ensure that this
path is tested.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4303 from rajinisivaram/KAFKA-6319
- Rename `delete()` to `deleteIfExists()` in `LogSegment`, `AbstractIndex`
and `TxnIndex`. Throw exception in case of IO errors for more informative
errors and to make it less likely that errors are ignored, `boolean` is used
for the case where the file does not exist (like `Files.deleteIfExists()`).
- Fix an instance of delete while open (should fix KAFKA-6322 and
KAFKA-6075).
- `LogSegment.deleteIfExists` no longer throws an exception if any of
the files it tries to delete does not exist (fixes KAFKA-6194).
- Remove unnecessary `FileChannel.force(true)` when deleting file.
- Introduce `LogSegment.open()` and use it to improve encapsulation
and reduce duplication.
- Expand functionality of `LogSegment.onBecomeInactiveSegment()`
to reduce duplication and improve encapsulation.
- Use `AbstractIndex.deleteIfExists()` instead of deleting files manually.
- Improve logging when deleting swap files.
- Use CorruptIndexException instead of IllegalArgumentException.
- Simplify `LogCleaner.cleanSegments()` to reduce duplication and
improve encapsulation.
- A few other clean-ups in Log, LogSegment, etc.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>, Ted Yu <yuzhihong@gmail.com>
Closes#4040 from ijuma/kafka-5829-follow-up