Browse Source

MINOR: Misc improvements on runtime / storage / metrics / config parts (#4525)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/3648/merge
Benedict Jin 7 years ago committed by Guozhang Wang
parent
commit
2693e9be74
  1. 4
      clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
  2. 2
      clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
  3. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
  4. 3
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
  5. 10
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
  6. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
  7. 32
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java

4
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java

@ -877,7 +877,9 @@ public class ConfigDef { @@ -877,7 +877,9 @@ public class ConfigDef {
}
public String toString() {
if (min == null)
if (min == null && max == null)
return "[...]";
else if (min == null)
return "[...," + max + "]";
else if (max == null)
return "[" + min + ",...]";

2
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java

@ -461,7 +461,7 @@ public class Metrics implements Closeable { @@ -461,7 +461,7 @@ public class Metrics implements Closeable {
* This is a way to expose existing values as metrics.
*
* This method is kept for binary compatibility purposes, it has the same behaviour as
* {@link #addMetric(MetricName, MetricValue)}.
* {@link #addMetric(MetricName, MetricValueProvider)}.
*
* @param metricName The name of the metric
* @param measurable The measurable that will be measured by this metric

6
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java

@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder; @@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@ -83,10 +84,11 @@ public class ConnectDistributed { @@ -83,10 +84,11 @@ public class ConnectDistributed {
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config);
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,

3
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java

@ -104,8 +104,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext { @@ -104,8 +104,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
}
try {
for (TopicPartition partition : partitions)
pausedPartitions.remove(partition);
pausedPartitions.removeAll(Arrays.asList(partitions));
consumer.resume(Arrays.asList(partitions));
} catch (IllegalStateException e) {
throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);

10
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

@ -228,7 +228,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -228,7 +228,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
this.offset = -1;
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (this.topic.equals(""))
if (this.topic == null || this.topic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector configuration.");
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
@ -406,16 +406,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -406,16 +406,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
Map<String, Object> producerProps = new HashMap<>(config.originals());
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>(config.originals());
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Map<String, Object> adminProps = new HashMap<>(config.originals());
Map<String, Object> adminProps = new HashMap<>(originals);
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(1).

9
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java

@ -62,21 +62,22 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { @@ -62,21 +62,22 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
@Override
public void configure(final WorkerConfig config) {
String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
if (topic.equals(""))
if (topic == null || topic.trim().length() == 0)
throw new ConfigException("Offset storage topic must be specified");
data = new HashMap<>();
Map<String, Object> producerProps = new HashMap<>(config.originals());
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>(config.originals());
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Map<String, Object> adminProps = new HashMap<>(config.originals());
Map<String, Object> adminProps = new HashMap<>(originals);
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).

32
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java

@ -121,19 +121,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @@ -121,19 +121,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
@Override
public void configure(final WorkerConfig config) {
this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
if (topic.equals(""))
if (this.topic == null || this.topic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector status.");
Map<String, Object> producerProps = new HashMap<>(config.originals());
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
Map<String, Object> consumerProps = new HashMap<>(config.originals());
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Map<String, Object> adminProps = new HashMap<>(config.originals());
Map<String, Object> adminProps = new HashMap<>(originals);
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
@ -203,7 +204,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @@ -203,7 +204,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
}
private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
String connector = status.id();
String connector = status.id();
CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
String key = CONNECTOR_STATUS_PREFIX + connector;
send(key, status, entry, safeWrite);
@ -233,18 +234,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @@ -233,18 +234,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
if (exception instanceof RetriableException) {
synchronized (KafkaStatusBackingStore.this) {
if (entry.isDeleted()
|| status.generation() != generation
|| (safeWrite && !entry.canWriteSafely(status, sequence)))
return;
}
kafkaLog.send(key, value, this);
} else {
log.error("Failed to write status update", exception);
if (exception == null) return;
if (exception instanceof RetriableException) {
synchronized (KafkaStatusBackingStore.this) {
if (entry.isDeleted()
|| status.generation() != generation
|| (safeWrite && !entry.canWriteSafely(status, sequence)))
return;
}
kafkaLog.send(key, value, this);
} else {
log.error("Failed to write status update", exception);
}
}
});

Loading…
Cancel
Save