diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index bb199dde202..bbe69a015a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -877,7 +877,9 @@ public class ConfigDef { } public String toString() { - if (min == null) + if (min == null && max == null) + return "[...]"; + else if (min == null) return "[...," + max + "]"; else if (max == null) return "[" + min + ",...]"; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 8868ee7ac48..ea18cd3fef8 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -461,7 +461,7 @@ public class Metrics implements Closeable { * This is a way to expose existing values as metrics. * * This method is kept for binary compatibility purposes, it has the same behaviour as - * {@link #addMetric(MetricName, MetricValue)}. + * {@link #addMetric(MetricName, MetricValueProvider)}. * * @param metricName The name of the metric * @param measurable The measurable that will be measured by this metric diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 98a77ed06c9..4afa47dda1a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.storage.ConfigBackingStore; +import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.KafkaConfigBackingStore; import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; import org.apache.kafka.connect.storage.KafkaStatusBackingStore; @@ -83,10 +84,11 @@ public class ConnectDistributed { Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter()); + Converter internalValueConverter = worker.getInternalValueConverter(); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); statusBackingStore.configure(config); - ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config); + ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config); DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index 80764037579..08789497645 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -104,8 +104,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext { throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized"); } try { - for (TopicPartition partition : partitions) - pausedPartitions.remove(partition); + pausedPartitions.removeAll(Arrays.asList(partitions)); consumer.resume(Arrays.asList(partitions)); } catch (IllegalStateException e) { throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 5c2c72c8d1b..b34e48390e1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -228,7 +228,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { this.offset = -1; this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); - if (this.topic.equals("")) + if (this.topic == null || this.topic.trim().length() == 0) throw new ConfigException("Must specify topic for connector configuration."); configLog = setupAndCreateKafkaBasedLog(this.topic, config); @@ -406,16 +406,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { // package private for testing KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { - Map producerProps = new HashMap<>(config.originals()); + Map originals = config.originals(); + Map producerProps = new HashMap<>(originals); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); - - Map consumerProps = new HashMap<>(config.originals()); + Map consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(config.originals()); + Map adminProps = new HashMap<>(originals); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 2e4e5232dfd..f29f3c23d03 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -62,21 +62,22 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { @Override public void configure(final WorkerConfig config) { String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG); - if (topic.equals("")) + if (topic == null || topic.trim().length() == 0) throw new ConfigException("Offset storage topic must be specified"); data = new HashMap<>(); - Map producerProps = new HashMap<>(config.originals()); + Map originals = config.originals(); + Map producerProps = new HashMap<>(originals); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); - Map consumerProps = new HashMap<>(config.originals()); + Map consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(config.originals()); + Map adminProps = new HashMap<>(originals); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index ca744324740..8ca21ebb350 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -121,19 +121,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @Override public void configure(final WorkerConfig config) { this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); - if (topic.equals("")) + if (this.topic == null || this.topic.trim().length() == 0) throw new ConfigException("Must specify topic for connector status."); - Map producerProps = new HashMap<>(config.originals()); + Map originals = config.originals(); + Map producerProps = new HashMap<>(originals); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class - Map consumerProps = new HashMap<>(config.originals()); + Map consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(config.originals()); + Map adminProps = new HashMap<>(originals); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). @@ -203,7 +204,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { } private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) { - String connector = status.id(); + String connector = status.id(); CacheEntry entry = getOrAdd(connector); String key = CONNECTOR_STATUS_PREFIX + connector; send(key, status, entry, safeWrite); @@ -233,18 +234,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore { kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - if (exception instanceof RetriableException) { - synchronized (KafkaStatusBackingStore.this) { - if (entry.isDeleted() - || status.generation() != generation - || (safeWrite && !entry.canWriteSafely(status, sequence))) - return; - } - kafkaLog.send(key, value, this); - } else { - log.error("Failed to write status update", exception); + if (exception == null) return; + if (exception instanceof RetriableException) { + synchronized (KafkaStatusBackingStore.this) { + if (entry.isDeleted() + || status.generation() != generation + || (safeWrite && !entry.canWriteSafely(status, sequence))) + return; } + kafkaLog.send(key, value, this); + } else { + log.error("Failed to write status update", exception); } } });