Browse Source

KAFKA-5293; Do not apply exponential backoff if users have overridden…

… reconnect.backoff.ms

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3174 from cmccabe/KAFKA-5293
pull/3189/merge
Colin P. Mccabe 8 years ago committed by Ismael Juma
parent
commit
b3036c5861
  1. 27
      clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
  2. 5
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
  3. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  4. 5
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  5. 85
      clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
  6. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
  7. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
  8. 5
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

27
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java

@ -16,16 +16,22 @@
*/ */
package org.apache.kafka.clients; package org.apache.kafka.clients;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Some configurations shared by both producer and consumer * Some configurations shared by both producer and consumer
*/ */
public class CommonClientConfigs { public class CommonClientConfigs {
private static final Logger log = LoggerFactory.getLogger(CommonClientConfigs.class);
/* /*
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
@ -90,4 +96,25 @@ public class CommonClientConfigs {
return names; return names;
} }
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not cexplicitly onfigured.
*
* @param config The config object.
* @param parsedValues The parsedValues as provided to postProcessParsedConfig.
*
* @return The new values which have been set as described in postProcessParsedConfig.
*/
public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config,
Map<String, Object> parsedValues) {
HashMap<String, Object> rval = new HashMap<>();
if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
log.debug("Disabling exponential reconnect backoff because " + RECONNECT_BACKOFF_MS_CONFIG +
" is set, but " + RECONNECT_BACKOFF_MAX_MS_CONFIG + " is not.");
rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
}
return rval;
}
} }

5
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

@ -160,6 +160,11 @@ public class AdminClientConfig extends AbstractConfig {
.withClientSaslSupport(); .withClientSaslSupport();
} }
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}
AdminClientConfig(Map<?, ?> props) { AdminClientConfig(Map<?, ?> props) {
super(CONFIG, props); super(CONFIG, props);
} }

5
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -447,6 +447,11 @@ public class ConsumerConfig extends AbstractConfig {
} }
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs, public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer, Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) { Deserializer<?> valueDeserializer) {

5
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -328,6 +328,11 @@ public class ProducerConfig extends AbstractConfig {
TRANSACTIONAL_ID_DOC); TRANSACTIONAL_ID_DOC);
} }
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs, public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
Serializer<?> keySerializer, Serializer<?> valueSerializer) { Serializer<?> keySerializer, Serializer<?> valueSerializer) {
Map<String, Object> newConfigs = new HashMap<>(); Map<String, Object> newConfigs = new HashMap<>();

85
clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.junit.Assert.assertEquals;
public class CommonClientConfigsTest {
private static class TestConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
CONFIG = new ConfigDef()
.define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
ConfigDef.Type.LONG,
50L,
atLeast(0L),
ConfigDef.Importance.LOW,
"")
.define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
ConfigDef.Type.LONG,
1000L,
atLeast(0L),
ConfigDef.Importance.LOW,
"");
}
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}
public TestConfig(Map<?, ?> props) {
super(CONFIG, props);
}
}
@Test
public void testExponentialBackoffDefaults() throws Exception {
TestConfig defaultConf = new TestConfig(Collections.emptyMap());
assertEquals(Long.valueOf(50L),
defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
assertEquals(Long.valueOf(1000L),
defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
TestConfig bothSetConfig = new TestConfig(new HashMap<String, Object>() {{
put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "123");
put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "12345");
}});
assertEquals(Long.valueOf(123L),
bothSetConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
assertEquals(Long.valueOf(12345L),
bothSetConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
TestConfig reconnectBackoffSetConf = new TestConfig(new HashMap<String, Object>() {{
put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "123");
}});
assertEquals(Long.valueOf(123L),
reconnectBackoffSetConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
assertEquals(Long.valueOf(123L),
reconnectBackoffSetConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
}
}

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
@ -180,6 +181,11 @@ public class WorkerConfig extends AbstractConfig {
); );
} }
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}
public static List<String> pluginLocations(Map<String, String> props) { public static List<String> pluginLocations(Map<String, String> props) {
String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG); String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
return locationList == null return locationList == null

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java

@ -184,7 +184,7 @@ public class DistributedConfig extends WorkerConfig {
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, .define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
ConfigDef.Type.LONG, ConfigDef.Type.LONG,
50L, 1000L,
atLeast(0L), atLeast(0L),
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC) CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)

5
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -415,7 +415,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RECONNECT_BACKOFF_MAX_MS_CONFIG, .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
Type.LONG, Type.LONG,
50L, 1000L,
atLeast(0L), atLeast(0L),
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC) CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
@ -569,7 +569,8 @@ public class StreamsConfig extends AbstractConfig {
@Override @Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
final Map<String, Object> configUpdates = new HashMap<>(); final Map<String, Object> configUpdates =
CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
final boolean eosEnabled = EXACTLY_ONCE.equals(parsedValues.get(PROCESSING_GUARANTEE_CONFIG)); final boolean eosEnabled = EXACTLY_ONCE.equals(parsedValues.get(PROCESSING_GUARANTEE_CONFIG));
if (eosEnabled && !originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) { if (eosEnabled && !originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) {

Loading…
Cancel
Save