Browse Source
This test has been completely subsumed by the coverage of reset integration test, and hence can be removed. Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io> Closes #4184 from guozhangwang/KMinor-remove-fanout-integrationpull/4200/merge
Guozhang Wang
7 years ago
1 changed files with 0 additions and 166 deletions
@ -1,166 +0,0 @@
@@ -1,166 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.integration; |
||||
|
||||
import kafka.utils.MockTime; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.producer.ProducerConfig; |
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
||||
import org.apache.kafka.common.serialization.ByteArraySerializer; |
||||
import org.apache.kafka.common.serialization.Serdes; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.apache.kafka.common.serialization.StringSerializer; |
||||
import org.apache.kafka.streams.KafkaStreams; |
||||
import org.apache.kafka.streams.StreamsBuilder; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; |
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; |
||||
import org.apache.kafka.streams.kstream.KStream; |
||||
import org.apache.kafka.streams.kstream.ValueMapper; |
||||
import org.apache.kafka.test.IntegrationTest; |
||||
import org.junit.BeforeClass; |
||||
import org.junit.ClassRule; |
||||
import org.junit.Test; |
||||
import org.junit.experimental.categories.Category; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
import java.util.Locale; |
||||
import java.util.Properties; |
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo; |
||||
import static org.junit.Assert.assertThat; |
||||
|
||||
/** |
||||
* End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster. |
||||
* <p> |
||||
* This example shows how you can read from one input topic/stream, transform the data (here: |
||||
* trivially) in two different ways via two intermediate streams, and then write the respective |
||||
* results to two output topics. |
||||
* <p> |
||||
* <pre> |
||||
* {@code |
||||
* |
||||
* +---map()---> stream2 ---to()---> Kafka topic B |
||||
* | |
||||
* Kafka topic A ---stream()--> stream1 ---+ |
||||
* | |
||||
* +---map()---> stream3 ---to()---> Kafka topic C |
||||
* |
||||
* } |
||||
* </pre> |
||||
*/ |
||||
@Category({IntegrationTest.class}) |
||||
public class FanoutIntegrationTest { |
||||
private static final int NUM_BROKERS = 1; |
||||
private static final long COMMIT_INTERVAL_MS = 300L; |
||||
|
||||
@ClassRule |
||||
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); |
||||
private final MockTime mockTime = CLUSTER.time; |
||||
private static final String INPUT_TOPIC_A = "A"; |
||||
private static final String OUTPUT_TOPIC_B = "B"; |
||||
private static final String OUTPUT_TOPIC_C = "C"; |
||||
|
||||
@BeforeClass |
||||
public static void startKafkaCluster() throws InterruptedException { |
||||
CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldFanoutTheInput() throws Exception { |
||||
final List<String> inputValues = Arrays.asList("Hello", "World"); |
||||
final List<String> expectedValuesForB = new ArrayList<>(); |
||||
final List<String> expectedValuesForC = new ArrayList<>(); |
||||
for (final String input : inputValues) { |
||||
expectedValuesForB.add(input.toUpperCase(Locale.getDefault())); |
||||
expectedValuesForC.add(input.toLowerCase(Locale.getDefault())); |
||||
} |
||||
|
||||
//
|
||||
// Step 1: Configure and start the processor topology.
|
||||
//
|
||||
final StreamsBuilder builder = new StreamsBuilder(); |
||||
|
||||
final Properties streamsConfiguration = new Properties(); |
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test"); |
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); |
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
||||
|
||||
final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A); |
||||
final KStream<byte[], String> stream2 = stream1.mapValues( |
||||
new ValueMapper<String, String>() { |
||||
@Override |
||||
public String apply(final String value) { |
||||
return value.toUpperCase(Locale.getDefault()); |
||||
} |
||||
}); |
||||
final KStream<byte[], String> stream3 = stream1.mapValues( |
||||
new ValueMapper<String, String>() { |
||||
@Override |
||||
public String apply(final String value) { |
||||
return value.toLowerCase(Locale.getDefault()); |
||||
} |
||||
}); |
||||
stream2.to(OUTPUT_TOPIC_B); |
||||
stream3.to(OUTPUT_TOPIC_C); |
||||
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); |
||||
streams.start(); |
||||
|
||||
//
|
||||
// Step 2: Produce some input data to the input topic.
|
||||
//
|
||||
final Properties producerConfig = new Properties(); |
||||
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
||||
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); |
||||
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); |
||||
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
||||
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
||||
IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig, mockTime); |
||||
|
||||
//
|
||||
// Step 3: Verify the application's output data.
|
||||
//
|
||||
|
||||
// Verify output topic B
|
||||
final Properties consumerConfigB = new Properties(); |
||||
consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
||||
consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicB"); |
||||
consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
||||
consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); |
||||
consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
final List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB, |
||||
OUTPUT_TOPIC_B, inputValues.size()); |
||||
assertThat(actualValuesForB, equalTo(expectedValuesForB)); |
||||
|
||||
// Verify output topic C
|
||||
final Properties consumerConfigC = new Properties(); |
||||
consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); |
||||
consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicC"); |
||||
consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
||||
consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); |
||||
consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
final List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC, |
||||
OUTPUT_TOPIC_C, inputValues.size()); |
||||
streams.close(); |
||||
assertThat(actualValuesForC, equalTo(expectedValuesForC)); |
||||
} |
||||
} |
Loading…
Reference in new issue