|
|
|
@ -593,6 +593,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@@ -593,6 +593,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|
|
|
|
"replicas": map(int, fields[3].split(','))}) |
|
|
|
|
return {"partitions": partitions} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _connect_setting_reassign_partitions(self, node): |
|
|
|
|
if node.version.reassign_partitions_command_supports_bootstrap_server(): |
|
|
|
|
return "--bootstrap-server %s " % self.bootstrap_servers(self.security_protocol) |
|
|
|
|
else: |
|
|
|
|
return "--zookeeper %s " % self.zk_connect_setting() |
|
|
|
|
|
|
|
|
|
def verify_reassign_partitions(self, reassignment, node=None): |
|
|
|
|
"""Run the reassign partitions admin tool in "verify" mode |
|
|
|
|
""" |
|
|
|
@ -609,7 +616,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@@ -609,7 +616,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|
|
|
|
cmd = fix_opts_for_new_jvm(node) |
|
|
|
|
cmd += "echo %s > %s && " % (json_str, json_file) |
|
|
|
|
cmd += "%s " % self.path.script("kafka-reassign-partitions.sh", node) |
|
|
|
|
cmd += "--zookeeper %s " % self.zk_connect_setting() |
|
|
|
|
cmd += self._connect_setting_reassign_partitions(node) |
|
|
|
|
cmd += "--reassignment-json-file %s " % json_file |
|
|
|
|
cmd += "--verify " |
|
|
|
|
cmd += "&& sleep 1 && rm -f %s" % json_file |
|
|
|
@ -649,7 +656,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@@ -649,7 +656,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|
|
|
|
cmd = fix_opts_for_new_jvm(node) |
|
|
|
|
cmd += "echo %s > %s && " % (json_str, json_file) |
|
|
|
|
cmd += "%s " % self.path.script( "kafka-reassign-partitions.sh", node) |
|
|
|
|
cmd += "--zookeeper %s " % self.zk_connect_setting() |
|
|
|
|
cmd += self._connect_setting_reassign_partitions(node) |
|
|
|
|
cmd += "--reassignment-json-file %s " % json_file |
|
|
|
|
cmd += "--execute" |
|
|
|
|
if throttle is not None: |
|
|
|
|