|
|
|
@ -25,7 +25,10 @@ import org.apache.kafka.common.Node;
@@ -25,7 +25,10 @@ import org.apache.kafka.common.Node;
|
|
|
|
|
import org.apache.kafka.common.config.ConfigException; |
|
|
|
|
import org.apache.kafka.common.metrics.Sensor; |
|
|
|
|
import org.apache.kafka.common.network.Selectable; |
|
|
|
|
import org.apache.kafka.common.serialization.Deserializer; |
|
|
|
|
import org.apache.kafka.common.serialization.Serde; |
|
|
|
|
import org.apache.kafka.common.serialization.Serdes; |
|
|
|
|
import org.apache.kafka.common.serialization.Serializer; |
|
|
|
|
import org.apache.kafka.common.serialization.StringDeserializer; |
|
|
|
|
import org.apache.kafka.common.serialization.StringSerializer; |
|
|
|
|
import org.apache.kafka.common.utils.Utils; |
|
|
|
@ -33,7 +36,9 @@ import org.apache.kafka.streams.errors.StreamsException;
@@ -33,7 +36,9 @@ import org.apache.kafka.streams.errors.StreamsException;
|
|
|
|
|
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; |
|
|
|
|
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; |
|
|
|
|
import org.apache.kafka.streams.kstream.Consumed; |
|
|
|
|
import org.apache.kafka.streams.kstream.Grouped; |
|
|
|
|
import org.apache.kafka.streams.kstream.Materialized; |
|
|
|
|
import org.apache.kafka.streams.kstream.Produced; |
|
|
|
|
import org.apache.kafka.streams.processor.AbstractProcessor; |
|
|
|
|
import org.apache.kafka.streams.processor.ThreadMetadata; |
|
|
|
|
import org.apache.kafka.streams.processor.internals.GlobalStreamThread; |
|
|
|
@ -74,6 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
import static java.util.Arrays.asList; |
|
|
|
|
import static org.easymock.EasyMock.anyObject; |
|
|
|
|
import static org.easymock.EasyMock.eq; |
|
|
|
|
import static org.easymock.EasyMock.expect; |
|
|
|
|
import static org.easymock.EasyMock.expectLastCall; |
|
|
|
|
import static org.easymock.EasyMock.mock; |
|
|
|
|
import static org.easymock.EasyMock.replay; |
|
|
|
|
import static org.easymock.EasyMock.verify; |
|
|
|
|
import static org.junit.Assert.assertEquals; |
|
|
|
|
import static org.junit.Assert.assertFalse; |
|
|
|
|
import static org.junit.Assert.assertNotNull; |
|
|
|
@ -725,6 +737,131 @@ public class KafkaStreamsTest {
@@ -725,6 +737,131 @@ public class KafkaStreamsTest {
|
|
|
|
|
startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
@Test |
|
|
|
|
public void shouldInitializeUserSerdes() { |
|
|
|
|
final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class); |
|
|
|
|
mockSourceKeyDeserialzer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class); |
|
|
|
|
mockSourceValueDeserialzer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
|
|
|
|
|
final Serde mockSourceKeySerde = mock(Serde.class); |
|
|
|
|
final Serde mockSourceValueSerde = mock(Serde.class); |
|
|
|
|
expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes(); |
|
|
|
|
expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final Serializer mockThroughKeySerializer = mock(Serializer.class); |
|
|
|
|
mockThroughKeySerializer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Serializer mockThroughValueSerializer = mock(Serializer.class); |
|
|
|
|
mockThroughValueSerializer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class); |
|
|
|
|
mockThroughKeyDeserializer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Deserializer mockThroughValueDeserializer = mock(Deserializer.class); |
|
|
|
|
mockThroughValueDeserializer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
|
|
|
|
|
final Serde mockThroughKeySerde = mock(Serde.class); |
|
|
|
|
final Serde mockThroughValueSerde = mock(Serde.class); |
|
|
|
|
expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes(); |
|
|
|
|
expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes(); |
|
|
|
|
expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes(); |
|
|
|
|
expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final Serializer mockGroupedKeySerializer = mock(Serializer.class); |
|
|
|
|
mockGroupedKeySerializer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Serializer mockGroupedValueSerializer = mock(Serializer.class); |
|
|
|
|
mockGroupedValueSerializer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class); |
|
|
|
|
mockGroupedKeyDeserializer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class); |
|
|
|
|
mockGroupedValueDeserializer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
|
|
|
|
|
final Serde mockGroupedKeySerde = mock(Serde.class); |
|
|
|
|
final Serde mockGroupedValueSerde = mock(Serde.class); |
|
|
|
|
expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes(); |
|
|
|
|
expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes(); |
|
|
|
|
expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes(); |
|
|
|
|
expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final Serializer mockOutputKeySerializer = mock(Serializer.class); |
|
|
|
|
mockOutputKeySerializer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Serializer mockOutputValueSerializer = mock(Serializer.class); |
|
|
|
|
mockOutputValueSerializer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
|
|
|
|
|
final Serde mockOutputKeySerde = mock(Serde.class); |
|
|
|
|
final Serde mockOutputValueSerde = mock(Serde.class); |
|
|
|
|
expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes(); |
|
|
|
|
expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class); |
|
|
|
|
mockGlobalKeyDeserializer.configure(anyObject(), eq(true)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class); |
|
|
|
|
mockGlobalValueDeserializer.configure(anyObject(), eq(false)); |
|
|
|
|
expectLastCall(); |
|
|
|
|
|
|
|
|
|
final Serde mockGlobalKeySerde = mock(Serde.class); |
|
|
|
|
final Serde mockGlobalValueSerde = mock(Serde.class); |
|
|
|
|
expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes(); |
|
|
|
|
expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
builder |
|
|
|
|
.stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde)) |
|
|
|
|
.through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde)) |
|
|
|
|
.selectKey(KeyValue::pair) |
|
|
|
|
.groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde)) |
|
|
|
|
.count() |
|
|
|
|
.toStream() |
|
|
|
|
.to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde)); |
|
|
|
|
builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde)); |
|
|
|
|
|
|
|
|
|
replay( |
|
|
|
|
mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde, |
|
|
|
|
mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde, |
|
|
|
|
mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde, |
|
|
|
|
mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde, |
|
|
|
|
mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde, |
|
|
|
|
mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde, |
|
|
|
|
mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde); |
|
|
|
|
|
|
|
|
|
KafkaStreams kafkaStreams = null; |
|
|
|
|
try { |
|
|
|
|
kafkaStreams = new KafkaStreams(builder.build(), props); |
|
|
|
|
} finally { |
|
|
|
|
if (kafkaStreams != null) { |
|
|
|
|
kafkaStreams.close(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
verify( |
|
|
|
|
mockSourceKeyDeserialzer, mockSourceValueDeserialzer, |
|
|
|
|
mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer, |
|
|
|
|
mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer, |
|
|
|
|
mockOutputKeySerializer, mockOutputValueSerializer, |
|
|
|
|
mockGlobalKeyDeserializer, mockGlobalValueDeserializer); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
private Topology getStatefulTopology(final String inputTopic, |
|
|
|
|
final String outputTopic, |
|
|
|
|