The `--describe` option of ConsumerGroupCommand is expanded, as proposed in [KIP-175](https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A+Additional+%27--describe%27+views+for+ConsumerGroupCommand), to support:
* `--describe` or `--describe --offsets`: listing of current group offsets
* `--describe --members` or `--describe --members --verbose`: listing of group members
* `--describe --state`: group status
Example: With a single partition topic `test1` and a double partition topic `test2`, consumers `consumer1` and `consumer11` subscribed to `test`, consumers `consumer2` and `consumer22` and `consumer222` subscribed to `test2`, and all consumers belonging to group `test-group`, this is an output example of the new options above for `test-group`:
```
--describe, or --describe --offsets:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test2 0 0 0 0 consumer2-bad9496d-0889-47ab-98ff-af17d9460382 /127.0.0.1 consumer2
test2 1 0 0 0 consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1 consumer22
test1 0 0 0 0 consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf /127.0.0.1 consumer1
```
```
--describe --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer2-bad9496d-0889-47ab-98ff-af17d9460382 /127.0.0.1 consumer2 1
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1 consumer222 0
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760 /127.0.0.1 consumer11 0
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1 consumer22 1
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf /127.0.0.1 consumer1 1
```
```
--describe --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer2-bad9496d-0889-47ab-98ff-af17d9460382 /127.0.0.1 consumer2 1 test2(0)
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1 consumer222 0 -
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760 /127.0.0.1 consumer11 0 -
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1 consumer22 1 test2(1)
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf /127.0.0.1 consumer1 1 test1(0)
```
```
--describe --state
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
localhost:9092 (0) range Stable 5
```
Note that this PR also addresses the issue reported in [KAFKA-6158](https://issues.apache.org/jira/browse/KAFKA-6158) by dynamically setting the width of columns `TOPIC`, `CONSUMER-ID`, `HOST`, `CLIENT-ID` and `COORDINATOR (ID)`. This avoid truncation of column values when they go over the current fixed width of these columns.
The code has been restructured to better support testing of individual values and also the console output. Unit tests have been updated and extended to take advantage of this restructuring.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#4271 from vahidhashemian/KAFKA-5526
This PR creates and implements the `ProductionExceptionHandler` as described in [KIP-210](https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce).
I've additionally provided a default implementation preserving the existing behavior. I fixed various compile errors in the tests that resulted from my changing of method signatures, and added tests to cover the new behavior.
Author: Matt Farmer <mfarmer@rsglab.com>
Author: Matt Farmer <matt@frmr.me>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>
Closes#4165 from farmdawgnation/msf/kafka-6086
This changes the Struct's equals and hashCode method to use Arrays#deepEquals and Arrays#deepHashCode, respectively. This resolves a problem where two structs with values of type byte[] would not be considered equal even though the byte arrays' contents are equal. By using deepEquals, the byte arrays' contents are compared instead of ther identity.
Since this changes the behavior of the equals method for byte array values, the behavior of hashCode must change alongside it to ensure the methods still fulfill the general contract of "equal objects must have equal hashCodes".
Test rationale:
All existing unit tests for equals were untouched and continue to work. A new test method was added to verify the behavior of equals and hashCode for Struct instances that contain a byte array value. I verify the reflixivity and transitivity of equals as well as the fact that equal Structs have equal hashCodes
and not-equal structs do not have equal hashCodes.
Author: Tobias Gies <tobias.gies@trivago.com>
Author: Tobias Gies <tobias@tobiasgies.de>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#4293 from tobiasgies/feature/kafka-6308-deepequals
Author: Colin P. Mccabe <cmccabe@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#4105 from cmccabe/KAFKA-6102
Now that we support re-initializing state stores, we need to clear the segments when the store is closed so that they can be re-opened.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ted Yu <yuzhihong@gmail.com>
Closes#4324 from dguy/kafka-6360
Fixes a `ConcurrentModificationException` in`AbstractStateManager` that is triggered when a `StateStore` is re-initialized and there are multiple stores in the context.
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, GuozhangWang <wangguoz@gmail.com>
Closes#4317 from dguy/kafka-6349
Fix warn log message in RecordCollectorImpl so it prints the exception message rather than `{}`
Author: Damian Guy <damian.guy@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, GuozhangWang <wangguoz@gmail.com>
Closes#4318 from dguy/minor-logging-record-collector
- Rename `encode` to `legacyEncodeAsString`, we
can remove this when we remove `ZkUtils`.
- Introduce `encodeAsString` that uses Jackson.
- Change `encodeAsBytes` to use Jackson.
- Avoid intermediate string when converting
Broker to json bytes.
The methods that use Jackson only support
Java collections unlike `legacyEncodeAsString`.
Tests were added `encodeAsString` and
`encodeAsBytes`.
Author: umesh chaudhary <umesh9794@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4259 from umesh9794/KAFKA-5631
This is required for ACLs where SSL principals contain
special characters (e.g. comma) that are escaped using
backslash. The strings need to be quoted for JSON to
ensure that the JSON stored in ZK is valid.
Also converted `SslEndToEndAuthorizationTest` to use a
principal with special characters to ensure that this
path is tested.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4303 from rajinisivaram/KAFKA-6319
- Rename `delete()` to `deleteIfExists()` in `LogSegment`, `AbstractIndex`
and `TxnIndex`. Throw exception in case of IO errors for more informative
errors and to make it less likely that errors are ignored, `boolean` is used
for the case where the file does not exist (like `Files.deleteIfExists()`).
- Fix an instance of delete while open (should fix KAFKA-6322 and
KAFKA-6075).
- `LogSegment.deleteIfExists` no longer throws an exception if any of
the files it tries to delete does not exist (fixes KAFKA-6194).
- Remove unnecessary `FileChannel.force(true)` when deleting file.
- Introduce `LogSegment.open()` and use it to improve encapsulation
and reduce duplication.
- Expand functionality of `LogSegment.onBecomeInactiveSegment()`
to reduce duplication and improve encapsulation.
- Use `AbstractIndex.deleteIfExists()` instead of deleting files manually.
- Improve logging when deleting swap files.
- Use CorruptIndexException instead of IllegalArgumentException.
- Simplify `LogCleaner.cleanSegments()` to reduce duplication and
improve encapsulation.
- A few other clean-ups in Log, LogSegment, etc.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>, Ted Yu <yuzhihong@gmail.com>
Closes#4040 from ijuma/kafka-5829-follow-up
Increase the number of messages produced to make the test more reliable. The test failed in a recent build and also fails intermittently when run locally. Since the producer uses acks=0 and the test stops as soon as a lag is observed, the change shouldn't have a big impact on the time taken to run when lag is observed sooner.
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Jason Gustafson <jason@confluent.io>
Closes#4312 from rajinisivaram/MINOR-replicaverification-test
- set auto.offset.reste to "none" for restore and global consumer
- handle InvalidOffsetException for restore and global consumer
- add corresponding tests
- some minor cleanup
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Damian Guy <damian.guy@gmail.com, Bill Bejeck <bill@confluent.io>, GuozhangWang <wangguoz@gmail.com>
Closes#4215 from mjsax/kafka-6121-restore-global-consumer-handle-reset
The NetworkClient internally ApiVersion requests to each broker following connection establishment. If this request happens to fail (perhaps due to an incompatible broker), the NetworkClient includes the response in the result of poll(). Applications will generally not be expecting this response which may lead to failed assertions (or in the case of AdminClient, an obscure log message).
I've added test cases which await the ApiVersion request sent by NetworkClient to be in-flight, and then disconnect the connection and verify that the response is not included from poll().
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#4280 from hachikuji/KAFKA-6289
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4242 from mjsax/kafka-4857-admit-client
Recent changes are now directly using the SLF4J API, so we should have a direct dependency.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4296 from rhauch/kafka-6313
Also:
- Fix bug in result type of `createSequentialPersistentPath`
- Remove duplicated code from `ReplicationUtils`
- Move `propagateIsrChanges` from `ReplicationUtils` to `KafkaZkClient`
- Add tests
- Minor clean-ups
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>, Ted Yu <yuzhihong@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#4261 from ijuma/zk-data-improvements
Measures the latency of each request.
Updated existing `ZkUtils` test to use `KafkaZkClient`
instead.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#4265 from ijuma/kafka-6065-async-zk-metrics
- Ensure that `partitionsBeingReassigned` is fully populated before
`removePartitionFromReassignedPartitions` is invoked. This is
necessary to avoid premature deletion of the `reassign_partitions`
znode.
- Modify and add tests to verify the fixes.
- Add documentation.
- Use `info` log message if assignedReplicas == newReplicas and
remove control flow based on exceptions.
- General logging improvements.
- Simplify `initializePartitionAssignment` by relying on logic already
present in `maybeTriggerPartitionReassignment`.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4283 from ijuma/kafka-6193-flaky-shouldPerformMultipleReassignmentOperationsOverVariousTopics
* Add maybeThrow method to AsyncResponse
* Update KafkaZkClient to use newly introduced maybeThrow
* Change AsyncResponse from trait to abstract class for
more readable stacktraces (there's no benefit in using a
trait here)
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4266 from omkreddy/KAFKAZKCLEINT_EXCEPTION_CLEANUP
From 0.11 to 1.0, we moved `DescribeClusterOptions timeoutMs(Integer timeoutMs)` from
DescribeClusterOptions to AbstractOptions (similarly for other Options classes). This can
cause code compiled against 0.11.0.x to fail when it is executed with 1.0 kafka-clients jar.
This patch adds back these methods to restore binary compatibility with 0.11.
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4257 from lindong28/KAFKA-6174
1. Add the repartition topics information into ProcessorTopology: personally I do not like leaking this information into the topology but it seems not other simple way around.
2. StreamTask: added one more function to expose the consumed offsets from repartition topics only.
3. TaskManager: use the AdminClient to send the gathered offsets to delete only if a) previous call has completed and client intentionally ignore-and-log any errors, or b) no requests have ever called before.
NOTE that this code depends on the assumption that purge is only called right after the commit has succeeded, hence we presume all consumed offsets are committed.
4. MINOR: Added a few more constructor for ProcessorTopology for cleaner unit tests.
5. MINOR: Extracted MockStateStore out of the deprecated class.
6. MINOR: Made a pass over some unit test classes for clean ups.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>
Closes#4270 from guozhangwang/K6150-purge-repartition-topics
Increase `REQUEST_TIMOUT_MS` to improve a flaky system test until KIP-91 merged
Author: Bill Bejeck <bill@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#4291 from bbejeck/MINOR_increase_request_timeout_for_streams_bounce_test
Updated the System test `stream_broker_compatibility_test.py` to address system test failures as we have removed explicit broker version checking
- Ignore the `0.8.2.2` and `0.9.0.0` tests because the `NetworkClient` only logs `UnsupportedVersionException`s that occur and will continue to retry connecting. Once issue https://issues.apache.org/jira/browse/KAFKA-6297 is addressed, we may re-enable these tests.
- Updated existing tests expected error messages
- Updated Streams code in test for to make sure we fail fast for incompatible brokers
Author: Bill Bejeck <bill@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4286 from bbejeck/MINOR_fix_broker_compatibility_tests
It's rare, but it can happen that the initial FindCoordinator request returns before the first Metadata request. Both authorization errors are fine for this test case.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Closes#4287 from hachikuji/KAFKA-6118
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4284 from mjsax/minor-improve-eos-docs
When a socket is closed, we must remove corresponding selection keys from internal collections. This fixes an NPE which is caused by attempting to access the selection key's attached channel after it had been cleared after disconnecting.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#4276 from hachikuji/KAFKA-6260
`topics.regex` was added in KAFKA-3073. This change fixes the test that invokes `/validate` to ensure that all the configdefs are returned as expected.
Author: Mikkin <mikkin@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4279 from mikkin/KAFKA-6284
If the ControllerEventThread is interrupted when a request is
being sent, it may lead to an IllegalStateException being thrown.
This, in turn, can lead to a NullPointerException in
unregisterPartitionReassignmentIsrChangeHandlers,
To avoid these issues, we make the ControllerEventThread
uninterruptable and we shut it down by clearing the queue
and enqueuing a special event.
To make the code more robust, we also set
ReassignedPartitionsContext.reassignIsrChangeHandler
during construction instead of setting it to null first.
Finally, misleading log messages in ephemeral node
creation have been clarified.
For reference, the relevant log lines from the relevant
flaky test:
```text
[2017-11-15 10:30:13,869] ERROR Error while creating ephemeral at /controller with return code: OK (kafka.zk.KafkaZkClient$CheckedEphemeral:101)
[2017-11-15 10:30:14,155] ERROR Haven't been able to send leader and isr requests, current state of the map is Map(101 -> Map(topic1-0 -> PartitionState(controllerEpoch=2, leader=101, leaderEpoch=3, isr=101, zkVersion=3, replicas=100,102,101, isNew=false)), 100 -> Map(topic1-0 -> PartitionState(controllerEpoch=2, leader=101, leaderEpoch=3, isr=101, zkVersion=3, replicas=100,102,101, isNew=false)), 102 -> Map(topic1-0 -> PartitionState(controllerEpoch=2, leader=101, leaderEpoch=3, isr=101, zkVersion=3, replicas=100,102,101, isNew=false))). Exception message: java.lang.InterruptedException (kafka.controller.ControllerBrokerRequestBatch:101)
[2017-11-15 10:30:14,156] ERROR Haven't been able to send metadata update requests to brokers Set(102, 103, 104, 101, 105), current state of the partition info is Map(topic1-0 -> PartitionState(controllerEpoch=1, leader=101, leaderEpoch=2, isr=[101], zkVersion=2, replicas=[100, 102, 101], offlineReplicas=[100])). Exception message: java.lang.InterruptedException (kafka.controller.ControllerBrokerRequestBatch:101)
[2017-11-15 10:30:14,158] ERROR [Controller id=101] Forcing the controller to resign (kafka.controller.KafkaController:101)
[2017-11-15 10:30:14,158] ERROR [Controller id=101] Error completing reassignment of partition topic1-0 (kafka.controller.KafkaController:107)
java.lang.NullPointerException
at kafka.controller.KafkaController$$anonfun$unregisterPartitionReassignmentIsrChangeHandlers$1.apply(KafkaController.scala:784)
at kafka.controller.KafkaController$$anonfun$unregisterPartitionReassignmentIsrChangeHandlers$1.apply(KafkaController.scala:783)
```
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Jun Rao <junrao@gmail.com>
Closes#4219 from ijuma/fix-npe-unregister-zk-listener
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Closes#3912 from mjsax/kafka-5936-producer-close
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4255 from mjsax/kafka-6259-clean-global-state-dir
add log4j entry
A rebased version of the code.
Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Closes#4258 from ConcurrencyPractitioner/trunk
Add EmptyWindowStoreIterator to NoOpWindowStore
Use `topic-` as the prefix of the auto-generated state store name.
Also add a unit test for this functionality.
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Ted Yu <yuzhihong@gmail.com>
Closes#4268 from guozhangwang/K6274-table-source-store-name
This refactoring is discussed in https://github.com/apache/kafka/pull/3624#discussion_r132614639. More specifically:
1. Moved the access of `StreamThread` in `StreamPartitionAssignor` to `TaskManager`, removed any fields stored in `StreamThread` such as `processId` and `clientId` that are only to be used in `StreamPartitionAssignor`, and pass them to `TaskManager` if necessary.
2. Moved any in-memory states, `metadataWithInternalTopics`, `partitionsByHostState`, `standbyTasks`, `activeTasks` to `TaskManager` so that `StreamPartitionAssignor` becomes a stateless thin layer that access TaskManager directly.
3. Remove the reference of `StreamPartitionAssignor` in `StreamThread`, instead consolidate all related functionalities such as `cachedTasksIds ` in `TaskManager` which could be retrieved by the `StreamThread` and the `StreamPartitionAssignor` directly.
4. Finally, removed the two interfaces used for `StreamThread` and `StreamPartitionAssignor`.
5. Some minor fixes on logPrefixes, etc.
Future work: when replacing the StreamsKafkaClient, we would let `StreamPartitionAssignor` to retrieve it from `TaskManager` directly, and also its closing call do not need to be called (`KafkaStreams` will be responsible for closing it).
Author: Guozhang Wang <wangguoz@gmail.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Closes#4224 from guozhangwang/K6170-refactor-assignor
Author: Matthias J. Sax <matthias@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Closes#4253 from mjsax/minor-imporve-error-message
…from config to own function and added check to create connector call.
Author: Soenke Liebau <soenke.liebau@opencore.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4230 from soenkeliebau/KAFKA-5563
Only expect responseAsString to be set if request logging is
enabled _and_ responseSend is defined.
Also fixed a couple of issues that would manifest themselves
if trace logging is enabled:
- `MemoryRecords.toString` should not throw exception if data is corrupted
- Generate `responseString` correctly if unsupported api versions request is
received.
Unit tests were added for every issue fixed. Also changed
SocketServerTest to run with trace logging enabled as
request logging breakage has been a common issue.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#4250 from ijuma/fix-issues-when-trace-logging-is-enabled
It's failing often and it seems like there are multiple
reasons. PR #4238 will re-enable it.
Author: Ismael Juma <ismael@juma.me.uk>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#4262 from ijuma/temporarily-disable-test-log-start-offset-checkpoint
Replaced ZkUtils with KafkaZkClient in ReplicaManager and Partition.
Relying on existing tests.
Author: tedyu <yuzhihong@gmail.com>
Reviewers: Jun Rao <junrao@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
Closes#4254 from tedyu/trunk
- Updated logging to use string interpolation
- Minor refactors
- Fixed a few typos
Author: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4231 from mimaison/controller_refactor
The upgrade instructions concerning the message format versions did not account
for upgrades from versions prior to 0.11.0.x.
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes#4256 from hachikuji/KAFKA-6328
* Add AdminZkClient class
* Use KafkaZkClient, AdminZkClient in ConfigCommand, TopicCommand
* All the existing tests should work
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
Closes#4194 from omkreddy/KAFKA-5646-ZK-ADMIN-UTILS-DYNAMIC-MANAGER