|
|
|
@ -66,7 +66,7 @@ public class KRaftMigrationZkWriter {
@@ -66,7 +66,7 @@ public class KRaftMigrationZkWriter {
|
|
|
|
|
private static final String CREATE_TOPIC = "CreateTopic"; |
|
|
|
|
private static final String UPDATE_TOPIC = "UpdateTopic"; |
|
|
|
|
private static final String DELETE_TOPIC = "DeleteTopic"; |
|
|
|
|
private static final String UPDATE_PARTITON = "UpdatePartition"; |
|
|
|
|
private static final String UPDATE_PARTITION = "UpdatePartition"; |
|
|
|
|
private static final String DELETE_PARTITION = "DeletePartition"; |
|
|
|
|
private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig"; |
|
|
|
|
private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig"; |
|
|
|
@ -232,7 +232,7 @@ public class KRaftMigrationZkWriter {
@@ -232,7 +232,7 @@ public class KRaftMigrationZkWriter {
|
|
|
|
|
newPartitions.forEach((topicId, partitionMap) -> { |
|
|
|
|
TopicImage topic = topicsImage.getTopic(topicId); |
|
|
|
|
operationConsumer.accept( |
|
|
|
|
UPDATE_PARTITON, |
|
|
|
|
UPDATE_PARTITION, |
|
|
|
|
"Creating additional partitions for Topic " + topic.name() + ", ID " + topicId, |
|
|
|
|
migrationState -> migrationClient.topicClient().updateTopicPartitions( |
|
|
|
|
Collections.singletonMap(topic.name(), partitionMap), |
|
|
|
@ -242,7 +242,7 @@ public class KRaftMigrationZkWriter {
@@ -242,7 +242,7 @@ public class KRaftMigrationZkWriter {
|
|
|
|
|
changedPartitions.forEach((topicId, partitionMap) -> { |
|
|
|
|
TopicImage topic = topicsImage.getTopic(topicId); |
|
|
|
|
operationConsumer.accept( |
|
|
|
|
UPDATE_PARTITON, |
|
|
|
|
UPDATE_PARTITION, |
|
|
|
|
"Updating Partitions for Topic " + topic.name() + ", ID " + topicId, |
|
|
|
|
migrationState -> migrationClient.topicClient().updateTopicPartitions( |
|
|
|
|
Collections.singletonMap(topic.name(), partitionMap), |
|
|
|
@ -295,7 +295,7 @@ public class KRaftMigrationZkWriter {
@@ -295,7 +295,7 @@ public class KRaftMigrationZkWriter {
|
|
|
|
|
Map<Integer, PartitionRegistration> changedPartitions = topicDelta.partitionChanges(); |
|
|
|
|
if (!newPartitions.isEmpty()) { |
|
|
|
|
operationConsumer.accept( |
|
|
|
|
UPDATE_PARTITON, |
|
|
|
|
UPDATE_PARTITION, |
|
|
|
|
"Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId, |
|
|
|
|
migrationState -> migrationClient.topicClient().createTopicPartitions( |
|
|
|
|
Collections.singletonMap(topicDelta.name(), newPartitions), |
|
|
|
@ -306,7 +306,7 @@ public class KRaftMigrationZkWriter {
@@ -306,7 +306,7 @@ public class KRaftMigrationZkWriter {
|
|
|
|
|
// Need a final for the lambda
|
|
|
|
|
final Map<Integer, PartitionRegistration> finalChangedPartitions = changedPartitions; |
|
|
|
|
operationConsumer.accept( |
|
|
|
|
UPDATE_PARTITON, |
|
|
|
|
UPDATE_PARTITION, |
|
|
|
|
"Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId, |
|
|
|
|
migrationState -> migrationClient.topicClient().updateTopicPartitions( |
|
|
|
|
Collections.singletonMap(topicDelta.name(), finalChangedPartitions), |
|
|
|
|