Browse Source

KAFKA-3711: Ensure a RecordingMap is passed to configured instances

See https://issues.apache.org/jira/browse/KAFKA-3711

I've tested locally that this change does indeed resolve the warning I mention in the ticket:
```
org.apache.kafka.clients.consumer.ConsumerConfig: The configuration metric.dropwizard.registry = kafka-metrics was supplied but isn't a known config.
```
where `metric.dropwizard.registry` is a configuration value defined in a custom `MetricReporter` (https://github.com/SimpleFinance/kafka-dropwizard-reporter).

With this change, the above warning no longer appears, as ewencp predicted.

This contribution is my original work and I license the work to the project under the project's open source license.

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1479 from jklukas/abstractconfig-originals
pull/1481/head
Jeff Klukas 9 years ago committed by Ewen Cheslack-Postava
parent
commit
eb6f04a8b1
  1. 32
      clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  2. 54
      clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java

32
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java

@ -151,7 +151,7 @@ public class AbstractConfig { @@ -151,7 +151,7 @@ public class AbstractConfig {
* @return a Map containing the settings with the prefix
*/
public Map<String, Object> originalsWithPrefix(String prefix) {
Map<String, Object> result = new RecordingMap<>();
Map<String, Object> result = new RecordingMap<>(prefix);
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
@ -202,7 +202,7 @@ public class AbstractConfig { @@ -202,7 +202,7 @@ public class AbstractConfig {
if (!t.isInstance(o))
throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
if (o instanceof Configurable)
((Configurable) o).configure(this.originals);
((Configurable) o).configure(originals());
return t.cast(o);
}
@ -229,7 +229,7 @@ public class AbstractConfig { @@ -229,7 +229,7 @@ public class AbstractConfig {
if (!t.isInstance(o))
throw new KafkaException(klass + " is not an instance of " + t.getName());
if (o instanceof Configurable)
((Configurable) o).configure(this.originals);
((Configurable) o).configure(originals());
objects.add(t.cast(o));
}
return objects;
@ -256,16 +256,36 @@ public class AbstractConfig { @@ -256,16 +256,36 @@ public class AbstractConfig {
*/
private class RecordingMap<V> extends HashMap<String, V> {
RecordingMap() {}
private final String prefix;
RecordingMap() {
this("");
}
RecordingMap(String prefix) {
this.prefix = prefix;
}
RecordingMap(Map<String, ? extends V> m) {
this(m, "");
}
RecordingMap(Map<String, ? extends V> m, String prefix) {
super(m);
this.prefix = prefix;
}
@Override
public V get(Object key) {
if (key instanceof String)
ignore((String) key);
if (key instanceof String) {
String keyWithPrefix;
if (prefix.isEmpty()) {
keyWithPrefix = (String) key;
} else {
keyWithPrefix = prefix + key;
}
ignore(keyWithPrefix);
}
return super.get(key);
}
}

54
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java

@ -15,6 +15,7 @@ package org.apache.kafka.common.config; @@ -15,6 +15,7 @@ package org.apache.kafka.common.config;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.junit.Test;
@ -22,6 +23,8 @@ import java.util.HashMap; @@ -22,6 +23,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
@ -44,9 +47,30 @@ public class AbstractConfigTest { @@ -44,9 +47,30 @@ public class AbstractConfigTest {
props.put("foo.bar", "abc");
props.put("setting", "def");
TestConfig config = new TestConfig(props);
Map<String, Object> originalsWithPrefix = config.originalsWithPrefix("foo.");
assertTrue(config.unused().contains("foo.bar"));
originalsWithPrefix.get("bar");
assertFalse(config.unused().contains("foo.bar"));
Map<String, Object> expected = new HashMap<>();
expected.put("bar", "abc");
assertEquals(expected, config.originalsWithPrefix("foo."));
assertEquals(expected, originalsWithPrefix);
}
@Test
public void testUnused() {
Properties props = new Properties();
String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
props.put(FakeMetricsReporterConfig.EXTRA_CONFIG, "my_value");
TestConfig config = new TestConfig(props);
assertTrue("metric.extra_config should be marked unused before getConfiguredInstances is called",
config.unused().contains(FakeMetricsReporterConfig.EXTRA_CONFIG));
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
assertTrue("All defined configurations should be marked as used", config.unused().isEmpty());
}
private void testValidInputs(String configValue) {
@ -91,4 +115,32 @@ public class AbstractConfigTest { @@ -91,4 +115,32 @@ public class AbstractConfigTest {
super(CONFIG, props);
}
}
public static class ConfiguredFakeMetricsReporter extends FakeMetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
FakeMetricsReporterConfig config = new FakeMetricsReporterConfig(configs);
// Calling getString() should have the side effect of marking that config as used.
config.getString(FakeMetricsReporterConfig.EXTRA_CONFIG);
}
}
public static class FakeMetricsReporterConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
public static final String EXTRA_CONFIG = "metric.extra_config";
private static final String EXTRA_CONFIG_DOC = "An extraneous configuration string.";
static {
CONFIG = new ConfigDef().define(
EXTRA_CONFIG, ConfigDef.Type.STRING, "",
ConfigDef.Importance.LOW, EXTRA_CONFIG_DOC);
}
public FakeMetricsReporterConfig(Map<?, ?> props) {
super(CONFIG, props);
}
}
}

Loading…
Cancel
Save