Browse Source

MINOR: Make PushHttpMetricsReporter API compatible with releases back to 0.8.2.2

This is follow up to #4072 which added the PushHttpMetricsReporter and converted some services to use it. We somehow missed some compatibility issues that made the ProducerPerformance tool fail when using a newer tools jar with older common/clients jar, which we do with some system tests so we have all the features we need in the tool but can build compatibility tests for older releases.

This just adjusts some API usage to make the tool compatible with all previous releases.

I have a full run of the tests starting [here](https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1122/)

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4214 from ewencp/fix-compatibility-sanity-check-tests
pull/4212/merge
Ewen Cheslack-Postava 7 years ago
parent
commit
54371e63d3
  1. 29
      tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java

29
tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java

@ -24,9 +24,9 @@ import org.apache.kafka.common.MetricName; @@ -24,9 +24,9 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,15 +87,15 @@ public class PushHttpMetricsReporter implements MetricsReporter { @@ -87,15 +87,15 @@ public class PushHttpMetricsReporter implements MetricsReporter {
"The URL to report metrics to")
.define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
"The frequency at which metrics should be reported, in second")
.define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW,
"The hostname to report with each metric; if null, defaults to the FQDN that can be automatically" +
.define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
"The hostname to report with each metric; if empty, defaults to the FQDN that can be automatically" +
"determined")
.define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
"Client ID to identify the application, generally inherited from the " +
"producer/consumer/streams/connect instance");
public PushHttpMetricsReporter() {
time = Time.SYSTEM;
time = new SystemTime();
executor = Executors.newSingleThreadScheduledExecutor();
}
@ -106,17 +106,17 @@ public class PushHttpMetricsReporter implements MetricsReporter { @@ -106,17 +106,17 @@ public class PushHttpMetricsReporter implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs, true) { };
PushHttpMetricsReporterConfig config = new PushHttpMetricsReporterConfig(CONFIG_DEF, configs);
try {
url = new URL(config.getString(METRICS_URL_CONFIG));
} catch (MalformedURLException e) {
throw new ConfigException("Malformed metrics.url", e);
}
int period = config.getInt(METRICS_PERIOD_CONFIG);
int period = config.getInteger(METRICS_PERIOD_CONFIG);
clientId = config.getString(CLIENT_ID_CONFIG);
host = config.getString(METRICS_HOST_CONFIG);
if (host == null) {
if (host == null || host.isEmpty()) {
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
@ -161,7 +161,7 @@ public class PushHttpMetricsReporter implements MetricsReporter { @@ -161,7 +161,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new InterruptException("Interrupted when shutting down PushHttpMetricsReporter", e);
throw new KafkaException("Interrupted when shutting down PushHttpMetricsReporter", e);
}
}
@ -316,4 +316,17 @@ public class PushHttpMetricsReporter implements MetricsReporter { @@ -316,4 +316,17 @@ public class PushHttpMetricsReporter implements MetricsReporter {
return value;
}
}
// The signature for getInt changed from returning int to Integer so to remain compatible with 0.8.2.2 jars
// for system tests we replace it with a custom version that works for all versions.
private static class PushHttpMetricsReporterConfig extends AbstractConfig {
public PushHttpMetricsReporterConfig(ConfigDef definition, Map<?, ?> originals) {
super(definition, originals);
}
public Integer getInteger(String key) {
return (Integer) get(key);
}
}
}

Loading…
Cancel
Save