Browse Source

KAFKA-6657: Add StreamsConfig prefix for different consumers (#4805)

This pull request is for JIRA 6657, for KIP-276.

Added unit tests for new getGlobalConsumerConfigs API and make sure existing restore consumer tests are passing.

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/4961/head
Boyang Chen 7 years ago committed by Guozhang Wang
parent
commit
1b170df31c
  1. 20
      docs/streams/developer-guide/config-streams.html
  2. 20
      streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
  3. 2
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  4. 144
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  5. 11
      streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
  6. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  7. 96
      streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
  8. 4
      streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java

20
docs/streams/developer-guide/config-streams.html

@ -516,7 +516,7 @@ @@ -516,7 +516,7 @@
<h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
<p>Some consumer and producer configuration parameters use the same parameter name. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> and
<code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries
for client request. You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code> or <code class="docutils literal"><span class="pre">producer</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
for client request. You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code> or <code class="docutils literal"><span class="pre">producer.</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// same value for consumer and producer</span>
<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;value&quot;</span><span class="o">);</span>
@ -527,6 +527,24 @@ @@ -527,6 +527,24 @@
<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">consumerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;consumer-value&quot;</span><span class="o">);</span>
<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">producerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;producer-value&quot;</span><span class="o">);</span>
</pre></div>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
<li><code class="docutils literal"><span class="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
<li><code class="docutils literal"><span class="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <code class="docutils literal"><span class="pre">restore.consumer.</span></code> to set the config.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">streamsSettings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// same config value for all consumer types</span>
<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;general-consumer-value&quot;</span><span class="o">);</span>
<span class="c1">// set a different restore consumer config. This would make restore consumer take restore-consumer-value,</span>
<span>// while main consumer and global consumer stay with general-consumer-value</span>
<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;restore.consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> <span class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
<span class="c1">// alternatively, you can use</span>
<span class="n">streamsSettings</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">restoreConsumerPrefix</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
</pre></div>
</div>
<p> Same applied to <code class="docutils literal"><span class="pre">main.consumer.</span></code> and <code class="docutils literal"><span class="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
</div>
</div>
<div class="section" id="default-values">

20
streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java

@ -19,6 +19,7 @@ package org.apache.kafka.streams; @@ -19,6 +19,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.processor.StateStore;
import java.util.Map;
@ -32,7 +33,7 @@ public interface KafkaClientSupplier { @@ -32,7 +33,7 @@ public interface KafkaClientSupplier {
/**
* Create an {@link AdminClient} which is used for internal topic management.
*
* @param config Supplied by the {@link StreamsConfig} given to the {@link KafkaStreams}
* @param config Supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
* @return an instance of {@link AdminClient}
*/
AdminClient getAdminClient(final Map<String, Object> config);
@ -41,7 +42,7 @@ public interface KafkaClientSupplier { @@ -41,7 +42,7 @@ public interface KafkaClientSupplier {
* Create a {@link Producer} which is used to write records to sink topics.
*
* @param config {@link StreamsConfig#getProducerConfigs(String) producer config} which is supplied by the
* {@link StreamsConfig} given to the {@link KafkaStreams} instance
* {@link java.util.Properties} given to the {@link KafkaStreams} instance
* @return an instance of Kafka producer
*/
Producer<byte[], byte[]> getProducer(final Map<String, Object> config);
@ -49,8 +50,8 @@ public interface KafkaClientSupplier { @@ -49,8 +50,8 @@ public interface KafkaClientSupplier {
/**
* Create a {@link Consumer} which is used to read records of source topics.
*
* @param config {@link StreamsConfig#getConsumerConfigs(String, String) consumer config} which is
* supplied by the {@link StreamsConfig} given to the {@link KafkaStreams} instance
* @param config {@link StreamsConfig#getMainConsumerConfigs(String, String) consumer config} which is
* supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance
* @return an instance of Kafka consumer
*/
Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
@ -59,8 +60,17 @@ public interface KafkaClientSupplier { @@ -59,8 +60,17 @@ public interface KafkaClientSupplier {
* Create a {@link Consumer} which is used to read records to restore {@link StateStore}s.
*
* @param config {@link StreamsConfig#getRestoreConsumerConfigs(String) restore consumer config} which is supplied
* by the {@link StreamsConfig} given to the {@link KafkaStreams}
* by the {@link java.util.Properties} given to the {@link KafkaStreams}
* @return an instance of Kafka consumer
*/
Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config);
/**
* Create a {@link Consumer} which is used to consume records for {@link GlobalKTable}.
*
* @param config {@link StreamsConfig#getGlobalConsumerConfigs(String) global consumer config} which is supplied
* by the {@link java.util.Properties} given to the {@link KafkaStreams}
* @return an instance of Kafka consumer
*/
Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config);
}

2
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -708,7 +708,7 @@ public class KafkaStreams { @@ -708,7 +708,7 @@ public class KafkaStreams {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
metrics,

144
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -154,6 +154,33 @@ public class StreamsConfig extends AbstractConfig { @@ -154,6 +154,33 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String CONSUMER_PREFIX = "consumer.";
/**
* Prefix used to override {@link KafkaConsumer consumer} configs for the main consumer client from
* the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
* 1. main.consumer.[config-name]
* 2. consumer.[config-name]
* 3. [config-name]
*/
public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
/**
* Prefix used to override {@link KafkaConsumer consumer} configs for the restore consumer client from
* the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
* 1. restore.consumer.[config-name]
* 2. consumer.[config-name]
* 3. [config-name]
*/
public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
/**
* Prefix used to override {@link KafkaConsumer consumer} configs for the global consumer client from
* the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
* 1. global.consumer.[config-name]
* 2. consumer.[config-name]
* 3. [config-name]
*/
public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
/**
* Prefix used to isolate {@link KafkaProducer producer} configs from other client configs.
* It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link ProducerConfig producer
@ -639,6 +666,39 @@ public class StreamsConfig extends AbstractConfig { @@ -639,6 +666,39 @@ public class StreamsConfig extends AbstractConfig {
return CONSUMER_PREFIX + consumerProp;
}
/**
* Prefix a property with {@link #MAIN_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig main consumer configs}
* from other client configs.
*
* @param consumerProp the consumer property to be masked
* @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp}
*/
public static String mainConsumerPrefix(final String consumerProp) {
return MAIN_CONSUMER_PREFIX + consumerProp;
}
/**
* Prefix a property with {@link #RESTORE_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig restore consumer configs}
* from other client configs.
*
* @param consumerProp the consumer property to be masked
* @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp}
*/
public static String restoreConsumerPrefix(final String consumerProp) {
return RESTORE_CONSUMER_PREFIX + consumerProp;
}
/**
* Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig global consumer configs}
* from other client configs.
*
* @param consumerProp the consumer property to be masked
* @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp}
*/
public static String globalConsumerPrefix(final String consumerProp) {
return GLOBAL_CONSUMER_PREFIX + consumerProp;
}
/**
* Prefix a property with {@link #PRODUCER_PREFIX}. This is used to isolate {@link ProducerConfig producer configs}
* from other client configs.
@ -771,10 +831,37 @@ public class StreamsConfig extends AbstractConfig { @@ -771,10 +831,37 @@ public class StreamsConfig extends AbstractConfig {
* @param groupId consumer groupId
* @param clientId clientId
* @return Map of the consumer configuration.
* @Deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
*/
@Deprecated
public Map<String, Object> getConsumerConfigs(final String groupId,
final String clientId) {
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
return getMainConsumerConfigs(groupId, clientId);
}
/**
* Get the configs to the {@link KafkaConsumer main consumer}.
* Properties using the prefix {@link #MAIN_CONSUMER_PREFIX} will be used in favor over
* the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
* (read the override precedence ordering in {@link #MAIN_CONSUMER_PREFIX)
* except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
* version as we only support reading/writing from/to the same Kafka Cluster.
* If not specified by {@link #MAIN_CONSUMER_PREFIX}, main consumer will share the general consumer configs
* prefixed by {@link #CONSUMER_PREFIX}.
*
* @param groupId consumer groupId
* @param clientId clientId
* @return Map of the consumer configuration.
*/
public Map<String, Object> getMainConsumerConfigs(final String groupId,
final String clientId) {
Map<String, Object> consumerProps = getCommonConsumerConfigs();
// Get main consumer override configs
Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
for (Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
consumerProps.put(entry.getKey(), entry.getValue());
}
// add client id with stream client id prefix, and group id
consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@ -821,23 +908,64 @@ public class StreamsConfig extends AbstractConfig { @@ -821,23 +908,64 @@ public class StreamsConfig extends AbstractConfig {
/**
* Get the configs for the {@link KafkaConsumer restore-consumer}.
* Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions
* Properties using the prefix {@link #RESTORE_CONSUMER_PREFIX} will be used in favor over
* the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
* (read the override precedence ordering in {@link #RESTORE_CONSUMER_PREFIX)
* except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
* version as we only support reading/writing from/to the same Kafka Cluster.
* If not specified by {@link #RESTORE_CONSUMER_PREFIX}, restore consumer will share the general consumer configs
* prefixed by {@link #CONSUMER_PREFIX}.
*
* @param clientId clientId
* @return Map of the consumer configuration.
* @return Map of the restore consumer configuration.
*/
public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
// Get restore consumer override configs
Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
for (Map.Entry<String, Object> entry: restoreConsumerProps.entrySet()) {
baseConsumerProps.put(entry.getKey(), entry.getValue());
}
// no need to set group id for a restore consumer
consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
// add client id with stream client id prefix
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return consumerProps;
return baseConsumerProps;
}
/**
* Get the configs for the {@link KafkaConsumer global consumer}.
* Properties using the prefix {@link #GLOBAL_CONSUMER_PREFIX} will be used in favor over
* the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
* (read the override precedence ordering in {@link #GLOBAL_CONSUMER_PREFIX)
* except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
* version as we only support reading/writing from/to the same Kafka Cluster.
* If not specified by {@link #GLOBAL_CONSUMER_PREFIX}, global consumer will share the general consumer configs
* prefixed by {@link #CONSUMER_PREFIX}.
*
* @param clientId clientId
* @return Map of the global consumer configuration.
*/
public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
// Get global consumer override configs
Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
for (Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
baseConsumerProps.put(entry.getKey(), entry.getValue());
}
// no need to set group id for a global consumer
baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
// add client id with stream client id prefix
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return baseConsumerProps;
}
/**

11
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java

@ -35,17 +35,22 @@ public class DefaultKafkaClientSupplier implements KafkaClientSupplier { @@ -35,17 +35,22 @@ public class DefaultKafkaClientSupplier implements KafkaClientSupplier {
}
@Override
public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
}
@Override
public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
}

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -641,7 +641,7 @@ public class StreamThread extends Thread { @@ -641,7 +641,7 @@ public class StreamThread extends Thread {
log.info("Creating consumer client");
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getConsumerConfigs(applicationId, threadClientId);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId);
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {

96
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

@ -96,7 +96,7 @@ public class StreamsConfigTest { @@ -96,7 +96,7 @@ public class StreamsConfigTest {
public void testGetConsumerConfigs() {
final String groupId = "example-application";
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
@ -115,7 +115,7 @@ public class StreamsConfigTest { @@ -115,7 +115,7 @@ public class StreamsConfigTest {
final String groupId = "example-application";
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
@ -135,11 +135,21 @@ public class StreamsConfigTest { @@ -135,11 +135,21 @@ public class StreamsConfigTest {
final String groupId = "example-application";
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId, clientId);
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
}
@Test
public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
final String groupId = "example-application";
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@Test
public void testGetRestoreConsumerConfigs() {
final String clientId = "client";
@ -185,7 +195,7 @@ public class StreamsConfigTest { @@ -185,7 +195,7 @@ public class StreamsConfigTest {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@ -202,7 +212,7 @@ public class StreamsConfigTest { @@ -202,7 +212,7 @@ public class StreamsConfigTest {
public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
}
@ -238,7 +248,7 @@ public class StreamsConfigTest { @@ -238,7 +248,7 @@ public class StreamsConfigTest {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@ -265,7 +275,7 @@ public class StreamsConfigTest { @@ -265,7 +275,7 @@ public class StreamsConfigTest {
public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put("custom.property.host", "host");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
@ -282,7 +292,7 @@ public class StreamsConfigTest { @@ -282,7 +292,7 @@ public class StreamsConfigTest {
props.put(consumerPrefix("custom.property.host"), "host1");
props.put(producerPrefix("custom.property.host"), "host2");
props.put(adminClientPrefix("custom.property.host"), "host3");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
@ -319,7 +329,7 @@ public class StreamsConfigTest { @@ -319,7 +329,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@ -344,7 +354,7 @@ public class StreamsConfigTest { @@ -344,7 +354,7 @@ public class StreamsConfigTest {
public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("a", "b");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b");
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@ -356,10 +366,66 @@ public class StreamsConfigTest { @@ -356,10 +366,66 @@ public class StreamsConfigTest {
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test
public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("clientId");
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@Test
public void testGetGlobalConsumerConfigs() {
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
}
@Test
public void shouldSupportPrefixedGlobalConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@Test
public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
}
@Test
public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("groupId");
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@Test
public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("client");
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test
public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs("clientId");
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@Test
public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
}
@ -388,7 +454,9 @@ public class StreamsConfigTest { @@ -388,7 +454,9 @@ public class StreamsConfigTest {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
String isoLevel = (String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
String name = READ_COMMITTED.name();
assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
}
@ -396,7 +464,7 @@ public class StreamsConfigTest { @@ -396,7 +464,7 @@ public class StreamsConfigTest {
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientrId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
}
@ -423,7 +491,7 @@ public class StreamsConfigTest { @@ -423,7 +491,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId");
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));

4
streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java

@ -83,4 +83,8 @@ public class MockClientSupplier implements KafkaClientSupplier { @@ -83,4 +83,8 @@ public class MockClientSupplier implements KafkaClientSupplier {
return restoreConsumer;
}
@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
return restoreConsumer;
}
}

Loading…
Cancel
Save