Browse Source

KAFKA-5096; Log invalid user configs and use defaults

Kafka Streams does not allow users to modify some consumer configurations.
Currently, it does not allow modifying the value of 'enable.auto.commit'.
If the user modifies this property, currently an exception is thrown.
The following changes were made in this patch:
- Defined a new array 'NON_CONFIGURABLE_CONSUMER_CONFIGS' to hold the names
  of the configuration parameters that is not allowed to be modified
- Defined a new method 'checkIfUnexpectedUserSpecifiedConsumerConfig' to
  check if user overwrote the values of any of the non configurable configuration
  parameters. If so, then log a warning message and reset the default values
- Updated the javadoc to include the configuration parameters that cannot be
  modified by users.
- Updated the corresponding tests in StreamsConfigTest.java to reflect the changes
  made in StreamsConfig.java

Author: Mariam John <mariamj@us.ibm.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Eno Thereska <eno.thereska@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #2990 from johnma14/bug/kafka-5096
pull/3587/merge
Mariam John 7 years ago committed by Damian Guy
parent
commit
b7684b47b8
  1. 103
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  2. 48
      streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

103
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -25,7 +25,6 @@ import org.apache.kafka.common.config.AbstractConfig; @@ -25,7 +25,6 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@ -52,7 +51,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; @@ -52,7 +51,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
/**
* Configuration for a {@link KafkaStreams} instance.
* Can also be use to configure the Kafka Streams internal {@link KafkaConsumer} and {@link KafkaProducer}.
* Can also be used to configure the Kafka Streams internal {@link KafkaConsumer} and {@link KafkaProducer}.
* To avoid consumer/producer property conflicts, you should prefix those properties using
* {@link #consumerPrefix(String)} and {@link #producerPrefix(String)}, respectively.
* <p>
@ -73,10 +72,25 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; @@ -73,10 +72,25 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
*
* StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
* }</pre>
* Kafka Streams required to set at least properties {@link #APPLICATION_ID_CONFIG "application.id"} and
* {@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}.
* Furthermore, it is not allowed to enable {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} that
* is disabled by Kafka Streams by default.
*
* Kafka Streams requires at least the following properties to be set:
* <ul>
* <li>{@link #APPLICATION_ID_CONFIG "application.id"}</li>
* <li>{@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}</li>
* </ul>
*
* By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
* <ul>
* <li>{@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} (false) - Streams client will always disable/turn off auto committing</li>
* </ul>
*
* If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE "exactly_once"}, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
* <ul>
* <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only</li>
* <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true) - Producer will always have idempotency enabled</li>
* <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION "max.in.flight.requests.per.connection"} (1) - Producer will always have one in-flight request per connection</li>
* </ul>
*
*
* @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig)
* @see ConsumerConfig
@ -285,6 +299,11 @@ public class StreamsConfig extends AbstractConfig { @@ -285,6 +299,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management. This config is deprecated and will be ignored as Streams API does not use Zookeeper anymore.";
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new String[] {ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION};
static {
CONFIG = new ConfigDef()
@ -617,22 +636,12 @@ public class StreamsConfig extends AbstractConfig { @@ -617,22 +636,12 @@ public class StreamsConfig extends AbstractConfig {
return configUpdates;
}
private Map<String, Object> getCommonConsumerConfigs() throws ConfigException {
private Map<String, Object> getCommonConsumerConfigs() {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
if (eosEnabled) {
if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ISOLATION_LEVEL_CONFIG
+ "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' consumers will always read committed data only.");
}
}
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
consumerProps.putAll(clientProvidedProps);
@ -643,7 +652,43 @@ public class StreamsConfig extends AbstractConfig { @@ -643,7 +652,43 @@ public class StreamsConfig extends AbstractConfig {
return consumerProps;
}
private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Object> clientProvidedProps, final String[] nonConfigurableConfigs) {
// Streams does not allow users to configure certain consumer/producer configurations, for example,
// enable.auto.commit. In cases where user tries to override such non-configurable
// consumer/producer configurations, log a warning and remove the user defined value from the Map.
// Thus the default values for these consumer/producer configurations that are suitable for
// Streams will be used instead.
for (final String config: nonConfigurableConfigs) {
if (clientProvidedProps.containsKey(config)) {
final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " + EXACTLY_ONCE + ". Hence, ";
final String nonConfigurableConfigMessage = "Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ";
if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) {
log.warn(String.format(nonConfigurableConfigMessage, "consumer", config, "", clientProvidedProps.get(config), CONSUMER_DEFAULT_OVERRIDES.get(config)));
clientProvidedProps.remove(config);
}
} else if (eosEnabled) {
if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
log.warn(String.format(nonConfigurableConfigMessage,
"consumer", config, eosMessage, clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
clientProvidedProps.remove(config);
}
} else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
log.warn(String.format(nonConfigurableConfigMessage,
"producer", config, eosMessage, clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
clientProvidedProps.remove(config);
}
}
}
}
}
}
/**
* Get the configs to the {@link KafkaConsumer consumer}.
* Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions
@ -654,11 +699,10 @@ public class StreamsConfig extends AbstractConfig { @@ -654,11 +699,10 @@ public class StreamsConfig extends AbstractConfig {
* @param groupId consumer groupId
* @param clientId clientId
* @return Map of the consumer configuration.
* @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by the user
*/
public Map<String, Object> getConsumerConfigs(final StreamThread streamThread,
final String groupId,
final String clientId) throws ConfigException {
final String clientId) {
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
// add client id with stream client id prefix, and group id
@ -685,9 +729,8 @@ public class StreamsConfig extends AbstractConfig { @@ -685,9 +729,8 @@ public class StreamsConfig extends AbstractConfig {
*
* @param clientId clientId
* @return Map of the consumer configuration.
* @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by the user
*/
public Map<String, Object> getRestoreConsumerConfigs(final String clientId) throws ConfigException {
public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
// no need to set group id for a restore consumer
@ -710,17 +753,7 @@ public class StreamsConfig extends AbstractConfig { @@ -710,17 +753,7 @@ public class StreamsConfig extends AbstractConfig {
public Map<String, Object> getProducerConfigs(final String clientId) {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
if (eosEnabled) {
if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+ "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have idempotency enabled.");
}
if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+ "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have only one in-flight request per connection.");
}
}
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
// generate producer configs from original properties and overridden maps
final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);

48
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

@ -271,18 +271,20 @@ public class StreamsConfigTest { @@ -271,18 +271,20 @@ public class StreamsConfigTest {
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@Test(expected = ConfigException.class)
public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
@Test
public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getConsumerConfigs(null, "a", "b");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "a", "b");
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test(expected = ConfigException.class)
public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
@Test
public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getRestoreConsumerConfigs("client");
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test
@ -312,50 +314,56 @@ public class StreamsConfigTest { @@ -312,50 +314,56 @@ public class StreamsConfigTest {
new StreamsConfig(props);
}
@Test(expected = ConfigException.class)
public void shouldThrowExceptionIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
@Test
public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
}
@Test
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientrId");
assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
}
@Test(expected = ConfigException.class)
public void shouldThrowExceptionIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
@Test
public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
}
@Test
public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
assertThat((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
}
@Test(expected = ConfigException.class)
public void shouldThrowExceptionIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled() {
@Test
public void shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(1));
}
@Test
public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(2));
}
@Test

Loading…
Cancel
Save