Browse Source

KAFKA-6607: Commit correct offsets for transactional input data (#8091)

Reviewers: Guozhang Wang <guozhang@confluent.io>
pull/8100/head
Matthias J. Sax 5 years ago committed by GitHub
parent
commit
aa0d0ec32a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      clients/src/test/java/org/apache/kafka/test/TestUtils.java
  2. 35
      streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
  3. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
  4. 17
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  5. 14
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  6. 65
      streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
  7. 66
      streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
  8. 46
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
  9. 123
      streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
  10. 69
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
  11. 76
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  12. 14
      streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java

1
clients/src/test/java/org/apache/kafka/test/TestUtils.java

@ -282,7 +282,6 @@ public class TestUtils { @@ -282,7 +282,6 @@ public class TestUtils {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.putAll(additional);

35
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java

@ -59,14 +59,14 @@ public class PartitionGroup { @@ -59,14 +59,14 @@ public class PartitionGroup {
private boolean allBuffered;
public static class RecordInfo {
static class RecordInfo {
RecordQueue queue;
public ProcessorNode node() {
ProcessorNode node() {
return queue.source();
}
public TopicPartition partition() {
TopicPartition partition() {
return queue.partition();
}
@ -88,7 +88,7 @@ public class PartitionGroup { @@ -88,7 +88,7 @@ public class PartitionGroup {
long partitionTimestamp(final TopicPartition partition) {
final RecordQueue queue = partitionQueues.get(partition);
if (queue == null) {
throw new NullPointerException("Partition " + partition + " not found.");
throw new IllegalStateException("Partition " + partition + " not found.");
}
return queue.partitionTime();
}
@ -96,7 +96,7 @@ public class PartitionGroup { @@ -96,7 +96,7 @@ public class PartitionGroup {
void setPartitionTime(final TopicPartition partition, final long partitionTime) {
final RecordQueue queue = partitionQueues.get(partition);
if (queue == null) {
throw new NullPointerException("Partition " + partition + " not found.");
throw new IllegalStateException("Partition " + partition + " not found.");
}
if (streamTime < partitionTime) {
streamTime = partitionTime;
@ -152,6 +152,10 @@ public class PartitionGroup { @@ -152,6 +152,10 @@ public class PartitionGroup {
int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
final RecordQueue recordQueue = partitionQueues.get(partition);
if (recordQueue == null) {
throw new IllegalStateException("Partition " + partition + " not found.");
}
final int oldSize = recordQueue.size();
final int newSize = recordQueue.addRawRecords(rawRecords);
@ -172,17 +176,27 @@ public class PartitionGroup { @@ -172,17 +176,27 @@ public class PartitionGroup {
return newSize;
}
public Set<TopicPartition> partitions() {
Set<TopicPartition> partitions() {
return Collections.unmodifiableSet(partitionQueues.keySet());
}
/**
* Return the stream-time of this partition group defined as the largest timestamp seen across all partitions
*/
public long streamTime() {
long streamTime() {
return streamTime;
}
Long headRecordOffset(final TopicPartition partition) {
final RecordQueue recordQueue = partitionQueues.get(partition);
if (recordQueue == null) {
throw new IllegalStateException("Partition " + partition + " not found.");
}
return recordQueue.headRecordOffset();
}
/**
* @throws IllegalStateException if the record's partition does not belong to this partition group
*/
@ -190,7 +204,7 @@ public class PartitionGroup { @@ -190,7 +204,7 @@ public class PartitionGroup {
final RecordQueue recordQueue = partitionQueues.get(partition);
if (recordQueue == null) {
throw new IllegalStateException(String.format("Record's partition %s does not belong to this partition-group.", partition));
throw new IllegalStateException("Partition " + partition + " not found.");
}
return recordQueue.size();
@ -204,14 +218,15 @@ public class PartitionGroup { @@ -204,14 +218,15 @@ public class PartitionGroup {
return allBuffered;
}
public void close() {
void close() {
clear();
partitionQueues.clear();
streamTime = RecordQueue.UNKNOWN;
}
public void clear() {
void clear() {
nonEmptyQueuesByTime.clear();
totalBuffered = 0;
for (final RecordQueue queue : partitionQueues.values()) {
queue.clear();
}

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

@ -156,6 +156,10 @@ public class RecordQueue { @@ -156,6 +156,10 @@ public class RecordQueue {
return headRecord == null ? UNKNOWN : headRecord.timestamp;
}
public Long headRecordOffset() {
return headRecord == null ? null : headRecord.offset();
}
/**
* Clear the fifo queue of its elements
*/

17
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -336,7 +337,21 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -336,7 +337,21 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
final long offset = entry.getValue() + 1L;
Long offset = partitionGroup.headRecordOffset(partition);
if (offset == null) {
try {
offset = consumer.position(partition);
} catch (final TimeoutException error) {
// the `consumer.position()` call should never block, because we know that we did process data
// for the requested partition and thus the consumer should have a valid local position
// that it can return immediately
// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
throw new IllegalStateException(error);
} catch (final KafkaException fatal) {
throw new StreamsException(fatal);
}
}
final long partitionTime = partitionTimes.get(partition);
consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
}

14
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -802,6 +802,13 @@ public class StreamThread extends Thread { @@ -802,6 +802,13 @@ public class StreamThread extends Thread {
throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
}
final long pollLatency = advanceNowAndComputeLatency();
if (records != null && !records.isEmpty()) {
pollSensor.record(pollLatency, now);
addRecordsToTasks(records);
}
// Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
// Should only proceed when the thread is still running after #pollRequests(), because no external state mutation
@ -811,13 +818,6 @@ public class StreamThread extends Thread { @@ -811,13 +818,6 @@ public class StreamThread extends Thread {
return;
}
final long pollLatency = advanceNowAndComputeLatency();
if (records != null && !records.isEmpty()) {
pollSensor.record(pollLatency, now);
addRecordsToTasks(records);
}
// we can always let changelog reader to try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore();

65
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java

@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration; @@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
@ -42,7 +41,6 @@ import org.apache.kafka.streams.kstream.KStream; @@ -42,7 +41,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@ -62,9 +60,9 @@ import java.util.Collections; @@ -62,9 +60,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -170,25 +168,11 @@ public abstract class AbstractResetIntegrationTest { @@ -170,25 +168,11 @@ public abstract class AbstractResetIntegrationTest {
private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
private static final int TIMEOUT_MULTIPLIER = 15;
private class ConsumerGroupInactiveCondition implements TestCondition {
@Override
public boolean conditionMet() {
try {
final ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get();
return groupDescription.members().isEmpty();
} catch (final ExecutionException | InterruptedException e) {
return false;
}
}
}
void prepareTest() throws Exception {
prepareConfigs();
prepareEnvironment();
// busy wait until cluster (ie, ConsumerGroupCoordinator) is available
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Test consumer group " + appID + " still active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT);
cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
@ -286,15 +270,13 @@ public abstract class AbstractResetIntegrationTest { @@ -286,15 +270,13 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
// RESET
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
cleanGlobal(false, null, null);
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
assertInternalTopicsGotDeleted(null);
@ -305,8 +287,7 @@ public abstract class AbstractResetIntegrationTest { @@ -305,8 +287,7 @@ public abstract class AbstractResetIntegrationTest {
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}
@ -325,8 +306,7 @@ public abstract class AbstractResetIntegrationTest { @@ -325,8 +306,7 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2, 40);
streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
// insert bad record to make sure intermediate user topic gets seekToEnd()
mockTime.sleep(1);
@ -341,8 +321,7 @@ public abstract class AbstractResetIntegrationTest { @@ -341,8 +321,7 @@ public abstract class AbstractResetIntegrationTest {
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig);
streams.cleanUp();
cleanGlobal(true, null, null);
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
@ -363,8 +342,7 @@ public abstract class AbstractResetIntegrationTest { @@ -363,8 +342,7 @@ public abstract class AbstractResetIntegrationTest {
}
assertThat(resultIntermediate.get(10), equalTo(badMessage));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(true, null, null);
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
@ -380,8 +358,7 @@ public abstract class AbstractResetIntegrationTest { @@ -380,8 +358,7 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@ -393,8 +370,7 @@ public abstract class AbstractResetIntegrationTest { @@ -393,8 +370,7 @@ public abstract class AbstractResetIntegrationTest {
streams.cleanUp();
cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
assertInternalTopicsGotDeleted(null);
@ -408,8 +384,7 @@ public abstract class AbstractResetIntegrationTest { @@ -408,8 +384,7 @@ public abstract class AbstractResetIntegrationTest {
result.remove(0);
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}
@ -423,8 +398,7 @@ public abstract class AbstractResetIntegrationTest { @@ -423,8 +398,7 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@ -441,8 +415,7 @@ public abstract class AbstractResetIntegrationTest { @@ -441,8 +415,7 @@ public abstract class AbstractResetIntegrationTest {
calendar.add(Calendar.DATE, -1);
cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
assertInternalTopicsGotDeleted(null);
@ -455,8 +428,7 @@ public abstract class AbstractResetIntegrationTest { @@ -455,8 +428,7 @@ public abstract class AbstractResetIntegrationTest {
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}
@ -470,8 +442,7 @@ public abstract class AbstractResetIntegrationTest { @@ -470,8 +442,7 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close();
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
"Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
// RESET
final File resetFile = File.createTempFile("reset", ".csv");
@ -483,8 +454,7 @@ public abstract class AbstractResetIntegrationTest { @@ -483,8 +454,7 @@ public abstract class AbstractResetIntegrationTest {
streams.cleanUp();
cleanGlobal(false, "--by-duration", "PT1M");
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
assertInternalTopicsGotDeleted(null);
@ -497,8 +467,7 @@ public abstract class AbstractResetIntegrationTest { @@ -497,8 +467,7 @@ public abstract class AbstractResetIntegrationTest {
assertThat(resultRerun, equalTo(result));
TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
"Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null);
}

66
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java

@ -16,17 +16,24 @@ @@ -16,17 +16,24 @@
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@ -41,7 +48,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -41,7 +48,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@ -51,6 +57,7 @@ import org.junit.experimental.categories.Category; @@ -51,6 +57,7 @@ import org.junit.experimental.categories.Category;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -62,6 +69,9 @@ import java.util.Set; @@ -62,6 +69,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
@ -118,38 +128,66 @@ public class EosIntegrationTest { @@ -118,38 +128,66 @@ public class EosIntegrationTest {
@Test
public void shouldBeAbleToRunWithEosEnabled() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
}
@Test
public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true);
try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())));
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(mkMap(
mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ConsumerConfig.GROUP_ID_CONFIG, applicationId),
mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class),
mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)))) {
waitForEmptyConsumerGroup(adminClient, applicationId, 5 * MAX_POLL_INTERVAL_MS);
final TopicPartition topicPartition = new TopicPartition(SINGLE_PARTITION_INPUT_TOPIC, 0);
final Collection<TopicPartition> topicPartitions = Collections.singleton(topicPartition);
final long committedOffset = adminClient.listConsumerGroupOffsets(applicationId).partitionsToOffsetAndMetadata().get().get(topicPartition).offset();
consumer.assign(topicPartitions);
final long consumerPosition = consumer.position(topicPartition);
final long endOffset = consumer.endOffsets(topicPartitions).get(topicPartition);
assertThat(committedOffset, equalTo(consumerPosition));
assertThat(committedOffset, equalTo(endOffset));
}
}
@Test
public void shouldBeAbleToRestartAfterClose() throws Exception {
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
}
@Test
public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC);
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false);
}
@Test
public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false);
}
@Test
public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false);
}
@Test
public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false);
}
private void runSimpleCopyTest(final int numberOfRestarts,
final String inputTopic,
final String throughTopic,
final String outputTopic) throws Exception {
final String outputTopic,
final boolean inputTopicTransactional) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, Long> input = builder.stream(inputTopic);
KStream<Long, Long> output = input;
@ -179,11 +217,17 @@ public class EosIntegrationTest { @@ -179,11 +217,17 @@ public class EosIntegrationTest {
final List<KeyValue<Long, Long>> inputData = prepareData(i * 100, i * 100 + 10L, 0L, 1L);
final Properties producerConfigs = new Properties();
if (inputTopicTransactional) {
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-input-producer");
}
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputData,
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
CLUSTER.time
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, producerConfigs),
CLUSTER.time,
inputTopicTransactional
);
final List<KeyValue<Long, Long>> committedRecords =

46
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -16,15 +16,11 @@ @@ -16,15 +16,11 @@
*/
package org.apache.kafka.streams.integration.utils;
import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -55,6 +51,7 @@ import scala.Option; @@ -55,6 +51,7 @@ import scala.Option;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
@ -65,12 +62,17 @@ import java.util.Iterator; @@ -65,12 +62,17 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
@ -839,6 +841,38 @@ public class IntegrationTestUtils { @@ -839,6 +841,38 @@ public class IntegrationTestUtils {
});
}
private static class ConsumerGroupInactiveCondition implements TestCondition {
private final Admin adminClient;
private final String applicationId;
private ConsumerGroupInactiveCondition(final Admin adminClient,
final String applicationId) {
this.adminClient = adminClient;
this.applicationId = applicationId;
}
@Override
public boolean conditionMet() {
try {
final ConsumerGroupDescription groupDescription =
adminClient.describeConsumerGroups(Collections.singletonList(applicationId))
.describedGroups()
.get(applicationId)
.get();
return groupDescription.members().isEmpty();
} catch (final ExecutionException | InterruptedException e) {
return false;
}
}
}
public static void waitForEmptyConsumerGroup(final Admin adminClient,
final String applicationId,
final long timeoutMs) throws Exception {
TestUtils.waitForCondition(new IntegrationTestUtils.ConsumerGroupInactiveCondition(adminClient, applicationId), timeoutMs,
"Test consumer group " + applicationId + " still active even after waiting " + timeoutMs + " ms.");
}
private static StateListener getStateListener(final KafkaStreams streams) {
try {
final Field field = streams.getClass().getDeclaredField("stateListener");

123
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java

@ -39,7 +39,10 @@ import java.util.List; @@ -39,7 +39,10 @@ import java.util.List;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
public class PartitionGroupTest {
@ -47,8 +50,8 @@ public class PartitionGroupTest { @@ -47,8 +50,8 @@ public class PartitionGroupTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final TopicPartition randomPartition = new TopicPartition("random-partition", 0);
private final String errMessage = "Partition " + randomPartition + " not found.";
private final TopicPartition unknownPartition = new TopicPartition("unknown-partition", 0);
private final String errMessage = "Partition " + unknownPartition + " not found.";
private final String[] topics = {"topic"};
private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
@ -88,6 +91,14 @@ public class PartitionGroupTest { @@ -88,6 +91,14 @@ public class PartitionGroupTest {
@Test
public void testTimeTracking() {
testFirstBatch();
testSecondBatch();
}
private void testFirstBatch() {
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
assertEquals(0, group.numBuffered());
// add three 3 records with timestamp 1, 3, 5 to partition-1
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@ -109,12 +120,13 @@ public class PartitionGroupTest { @@ -109,12 +120,13 @@ public class PartitionGroupTest {
// st: -1 since no records was being processed yet
verifyBuffered(6, 3, 3);
assertEquals(1L, group.partitionTimestamp(partition1));
assertEquals(2L, group.partitionTimestamp(partition2));
assertEquals(1L, group.headRecordOffset(partition1).longValue());
assertEquals(2L, group.headRecordOffset(partition2).longValue());
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
// get one record, now the time should be advanced
record = group.nextRecord(info);
// 1:[3, 5]
@ -123,6 +135,8 @@ public class PartitionGroupTest { @@ -123,6 +135,8 @@ public class PartitionGroupTest {
assertEquals(partition1, info.partition());
assertEquals(3L, group.partitionTimestamp(partition1));
assertEquals(2L, group.partitionTimestamp(partition2));
assertEquals(3L, group.headRecordOffset(partition1).longValue());
assertEquals(2L, group.headRecordOffset(partition2).longValue());
assertEquals(1L, group.streamTime());
verifyTimes(record, 1L, 1L);
verifyBuffered(5, 2, 3);
@ -136,10 +150,17 @@ public class PartitionGroupTest { @@ -136,10 +150,17 @@ public class PartitionGroupTest {
assertEquals(partition2, info.partition());
assertEquals(3L, group.partitionTimestamp(partition1));
assertEquals(4L, group.partitionTimestamp(partition2));
assertEquals(3L, group.headRecordOffset(partition1).longValue());
assertEquals(4L, group.headRecordOffset(partition2).longValue());
assertEquals(2L, group.streamTime());
verifyTimes(record, 2L, 2L);
verifyBuffered(4, 2, 2);
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
}
private void testSecondBatch() {
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
// add 2 more records with timestamp 2, 4 to partition-1
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@ -153,6 +174,8 @@ public class PartitionGroupTest { @@ -153,6 +174,8 @@ public class PartitionGroupTest {
verifyBuffered(6, 4, 2);
assertEquals(3L, group.partitionTimestamp(partition1));
assertEquals(4L, group.partitionTimestamp(partition2));
assertEquals(3L, group.headRecordOffset(partition1).longValue());
assertEquals(4L, group.headRecordOffset(partition2).longValue());
assertEquals(2L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
@ -164,6 +187,8 @@ public class PartitionGroupTest { @@ -164,6 +187,8 @@ public class PartitionGroupTest {
assertEquals(partition1, info.partition());
assertEquals(5L, group.partitionTimestamp(partition1));
assertEquals(4L, group.partitionTimestamp(partition2));
assertEquals(5L, group.headRecordOffset(partition1).longValue());
assertEquals(4L, group.headRecordOffset(partition2).longValue());
assertEquals(3L, group.streamTime());
verifyTimes(record, 3L, 3L);
verifyBuffered(5, 3, 2);
@ -177,6 +202,8 @@ public class PartitionGroupTest { @@ -177,6 +202,8 @@ public class PartitionGroupTest {
assertEquals(partition2, info.partition());
assertEquals(5L, group.partitionTimestamp(partition1));
assertEquals(6L, group.partitionTimestamp(partition2));
assertEquals(5L, group.headRecordOffset(partition1).longValue());
assertEquals(6L, group.headRecordOffset(partition2).longValue());
assertEquals(4L, group.streamTime());
verifyTimes(record, 4L, 4L);
verifyBuffered(4, 3, 1);
@ -190,6 +217,8 @@ public class PartitionGroupTest { @@ -190,6 +217,8 @@ public class PartitionGroupTest {
assertEquals(partition1, info.partition());
assertEquals(5L, group.partitionTimestamp(partition1));
assertEquals(6L, group.partitionTimestamp(partition2));
assertEquals(2L, group.headRecordOffset(partition1).longValue());
assertEquals(6L, group.headRecordOffset(partition2).longValue());
assertEquals(5L, group.streamTime());
verifyTimes(record, 5L, 5L);
verifyBuffered(3, 2, 1);
@ -203,6 +232,8 @@ public class PartitionGroupTest { @@ -203,6 +232,8 @@ public class PartitionGroupTest {
assertEquals(partition1, info.partition());
assertEquals(5L, group.partitionTimestamp(partition1));
assertEquals(6L, group.partitionTimestamp(partition2));
assertEquals(4L, group.headRecordOffset(partition1).longValue());
assertEquals(6L, group.headRecordOffset(partition2).longValue());
assertEquals(5L, group.streamTime());
verifyTimes(record, 2L, 5L);
verifyBuffered(2, 1, 1);
@ -216,6 +247,8 @@ public class PartitionGroupTest { @@ -216,6 +247,8 @@ public class PartitionGroupTest {
assertEquals(partition1, info.partition());
assertEquals(5L, group.partitionTimestamp(partition1));
assertEquals(6L, group.partitionTimestamp(partition2));
assertNull(group.headRecordOffset(partition1));
assertEquals(6L, group.headRecordOffset(partition2).longValue());
assertEquals(5L, group.streamTime());
verifyTimes(record, 4L, 5L);
verifyBuffered(1, 0, 1);
@ -229,6 +262,8 @@ public class PartitionGroupTest { @@ -229,6 +262,8 @@ public class PartitionGroupTest {
assertEquals(partition2, info.partition());
assertEquals(5L, group.partitionTimestamp(partition1));
assertEquals(6L, group.partitionTimestamp(partition2));
assertNull(group.headRecordOffset(partition1));
assertNull(group.headRecordOffset(partition2));
assertEquals(6L, group.streamTime());
verifyTimes(record, 6L, 6L);
verifyBuffered(0, 0, 0);
@ -305,16 +340,78 @@ public class PartitionGroupTest { @@ -305,16 +340,78 @@ public class PartitionGroupTest {
}
@Test
public void shouldThrowNullpointerUponSetPartitionTimestampFailure() {
assertThrows(errMessage, NullPointerException.class, () -> {
group.setPartitionTime(randomPartition, 0L);
});
public void shouldThrowIllegalStateExceptionUponAddRecordsIfPartitionUnknown() {
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.addRawRecords(unknownPartition, null));
assertThat(errMessage, equalTo(exception.getMessage()));
}
@Test
public void shouldThrowIllegalStateExceptionUponNumBufferedIfPartitionUnknown() {
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.numBuffered(unknownPartition));
assertThat(errMessage, equalTo(exception.getMessage()));
}
@Test
public void shouldThrowNullpointerUponGetPartitionTimestampFailure() {
assertThrows(errMessage, NullPointerException.class, () -> {
group.partitionTimestamp(randomPartition);
});
public void shouldThrowIllegalStateExceptionUponSetPartitionTimestampIfPartitionUnknown() {
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.setPartitionTime(unknownPartition, 0L));
assertThat(errMessage, equalTo(exception.getMessage()));
}
@Test
public void shouldThrowIllegalStateExceptionUponGetPartitionTimestampIfPartitionUnknown() {
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.partitionTimestamp(unknownPartition));
assertThat(errMessage, equalTo(exception.getMessage()));
}
@Test
public void shouldThrowIllegalStateExceptionUponGetHeadRecordOffsetIfPartitionUnknown() {
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.headRecordOffset(unknownPartition));
assertThat(errMessage, equalTo(exception.getMessage()));
}
@Test
public void shouldEmpyPartitionsOnClean() {
final List<ConsumerRecord<byte[], byte[]>> list = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
group.addRawRecords(partition1, list);
group.clear();
assertThat(group.numBuffered(), equalTo(0));
assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN));
assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null));
group.addRawRecords(partition1, list);
}
public void shouldCleanPartitionsOnClose() {
final List<ConsumerRecord<byte[], byte[]>> list = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
group.addRawRecords(partition1, list);
group.close();
assertThat(group.numBuffered(), equalTo(0));
assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN));
assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null));
final IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> group.addRawRecords(partition1, list));
assertThat("Partition topic-1 not found.", equalTo(exception.getMessage()));
}
}

69
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@ -45,29 +46,33 @@ import java.util.Arrays; @@ -45,29 +46,33 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class RecordQueueTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final String[] topics = {"topic"};
final InternalMockProcessorContext context = new InternalMockProcessorContext(
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
new MockRecordCollector()
);
private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(new String[] {"topic"}, intDeserializer, intDeserializer);
private final RecordQueue queue = new RecordQueue(
new TopicPartition(topics[0], 1),
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
timestampExtractor,
new LogAndFailExceptionHandler(),
context,
new LogContext());
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
new TopicPartition(topics[0], 1),
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
timestampExtractor,
new LogAndContinueExceptionHandler(),
@ -89,10 +94,10 @@ public class RecordQueueTest { @@ -89,10 +94,10 @@ public class RecordQueueTest {
@Test
public void testTimeTracking() {
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
assertNull(queue.headRecordOffset());
// add three 3 out-of-order records with timestamp 2, 1, 3
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@ -104,16 +109,19 @@ public class RecordQueueTest { @@ -104,16 +109,19 @@ public class RecordQueueTest {
assertEquals(3, queue.size());
assertEquals(2L, queue.headRecordTimestamp());
assertEquals(2L, queue.headRecordOffset().longValue());
// poll the first record, now with 1, 3
assertEquals(2L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(1L, queue.headRecordTimestamp());
assertEquals(1L, queue.headRecordOffset().longValue());
// poll the second record, now with 3
assertEquals(1L, queue.poll().timestamp);
assertEquals(1, queue.size());
assertEquals(3L, queue.headRecordTimestamp());
assertEquals(3L, queue.headRecordOffset().longValue());
// add three 3 out-of-order records with timestamp 4, 1, 2
// now with 3, 4, 1, 2
@ -126,23 +134,28 @@ public class RecordQueueTest { @@ -126,23 +134,28 @@ public class RecordQueueTest {
assertEquals(4, queue.size());
assertEquals(3L, queue.headRecordTimestamp());
assertEquals(3L, queue.headRecordOffset().longValue());
// poll the third record, now with 4, 1, 2
assertEquals(3L, queue.poll().timestamp);
assertEquals(3, queue.size());
assertEquals(4L, queue.headRecordTimestamp());
assertEquals(4L, queue.headRecordOffset().longValue());
// poll the rest records
assertEquals(4L, queue.poll().timestamp);
assertEquals(1L, queue.headRecordTimestamp());
assertEquals(1L, queue.headRecordOffset().longValue());
assertEquals(1L, queue.poll().timestamp);
assertEquals(2L, queue.headRecordTimestamp());
assertEquals(2L, queue.headRecordOffset().longValue());
assertEquals(2L, queue.poll().timestamp);
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
assertNull(queue.headRecordOffset());
// add three more records with 4, 5, 6
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@ -154,23 +167,27 @@ public class RecordQueueTest { @@ -154,23 +167,27 @@ public class RecordQueueTest {
assertEquals(3, queue.size());
assertEquals(4L, queue.headRecordTimestamp());
assertEquals(4L, queue.headRecordOffset().longValue());
// poll one record again, the timestamp should advance now
assertEquals(4L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(5L, queue.headRecordTimestamp());
assertEquals(5L, queue.headRecordOffset().longValue());
// clear the queue
queue.clear();
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
assertNull(queue.headRecordOffset());
// re-insert the three records with 4, 5, 6
queue.addRawRecords(list3);
assertEquals(3, queue.size());
assertEquals(4L, queue.headRecordTimestamp());
assertEquals(4L, queue.headRecordOffset().longValue());
}
@Test
@ -227,22 +244,30 @@ public class RecordQueueTest { @@ -227,22 +244,30 @@ public class RecordQueueTest {
assertEquals(500L, queue.partitionTime());
}
@Test(expected = StreamsException.class)
@Test
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
queue.addRawRecords(records);
final StreamsException exception = assertThrows(
StreamsException.class,
() -> queue.addRawRecords(records)
);
assertThat(exception.getCause(), instanceOf(SerializationException.class));
}
@Test(expected = StreamsException.class)
@Test
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
queue.addRawRecords(records);
final StreamsException exception = assertThrows(
StreamsException.class,
() -> queue.addRawRecords(records)
);
assertThat(exception.getCause(), instanceOf(SerializationException.class));
}
@Test
@ -265,21 +290,29 @@ public class RecordQueueTest { @@ -265,21 +290,29 @@ public class RecordQueueTest {
assertEquals(0, queueThatSkipsDeserializeErrors.size());
}
@Test(expected = StreamsException.class)
@Test
public void shouldThrowOnNegativeTimestamp() {
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
final RecordQueue queue = new RecordQueue(
new TopicPartition(topics[0], 1),
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
new FailOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
new LogContext());
queue.addRawRecords(records);
final StreamsException exception = assertThrows(
StreamsException.class,
() -> queue.addRawRecords(records)
);
assertThat(exception.getMessage(), equalTo("Input record ConsumerRecord(topic = topic, partition = 1, " +
"leaderEpoch = null, offset = 1, CreateTime = -1, serialized key size = 0, serialized value size = 0, " +
"headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = 10) has invalid (negative) " +
"timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without " +
"embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
"Use a different TimestampExtractor to process this data."));
}
@Test
@ -288,8 +321,8 @@ public class RecordQueueTest { @@ -288,8 +321,8 @@ public class RecordQueueTest {
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
final RecordQueue queue = new RecordQueue(
new TopicPartition(topics[0], 1),
new MockSourceNode<>(topics, intDeserializer, intDeserializer),
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
new LogAndSkipOnInvalidTimestamp(),
new LogAndContinueExceptionHandler(),
new InternalMockProcessorContext(),
@ -304,7 +337,7 @@ public class RecordQueueTest { @@ -304,7 +337,7 @@ public class RecordQueueTest {
final PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor();
final RecordQueue queue = new RecordQueue(
new TopicPartition(topics[0], 1),
new TopicPartition("topic", 1),
mockSourceNodeWithMetrics,
timestampExtractor,
new LogAndFailExceptionHandler(),
@ -340,7 +373,7 @@ public class RecordQueueTest { @@ -340,7 +373,7 @@ public class RecordQueueTest {
}
class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor {
private static class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor {
private long partitionTime = RecordQueue.UNKNOWN;
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {

76
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -62,8 +62,11 @@ import org.junit.runner.RunWith; @@ -62,8 +62,11 @@ import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -76,8 +79,10 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -76,8 +79,10 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@ -196,6 +201,7 @@ public class StreamTaskTest { @@ -196,6 +201,7 @@ public class StreamTaskTest {
EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes();
consumer.assign(asList(partition1, partition2));
consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L)));
stateDirectory = new StateDirectory(createConfig(false, "100"), new MockTime(), true);
}
@ -232,7 +238,7 @@ public class StreamTaskTest { @@ -232,7 +238,7 @@ public class StreamTaskTest {
task.initializeIfNeeded();
// should fail if lock is called
EasyMock.verify(stateDirectory);
verify(stateDirectory);
}
@Test
@ -241,7 +247,7 @@ public class StreamTaskTest { @@ -241,7 +247,7 @@ public class StreamTaskTest {
EasyMock.replay(stateDirectory);
consumer.commitSync(partitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(0L, StreamTask.encodeTimestamp(10L)))));
.collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(0L, encodeTimestamp(10L)))));
task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);
@ -283,7 +289,7 @@ public class StreamTaskTest { @@ -283,7 +289,7 @@ public class StreamTaskTest {
assertTrue(source1.initialized);
assertTrue(source2.initialized);
EasyMock.verify(stateDirectory);
verify(stateDirectory);
}
@Test
@ -695,6 +701,51 @@ public class StreamTaskTest { @@ -695,6 +701,51 @@ public class StreamTaskTest {
assertFalse(task.commitNeeded());
}
@Test
public void shouldCommitNextOffsetFromQueueIfAvailable() {
recordCollector.commit(EasyMock.eq(mkMap(mkEntry(partition1, new OffsetAndMetadata(5L, encodeTimestamp(5L))))));
EasyMock.expectLastCall();
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
task.addRecords(partition1, Arrays.asList(getConsumerRecord(partition1, 0L), getConsumerRecord(partition1, 5L)));
task.process(0L);
task.commit();
verify(recordCollector);
}
@Test
public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
recordCollector.commit(EasyMock.eq(mkMap(mkEntry(partition1, new OffsetAndMetadata(3L, encodeTimestamp(0L))))));
EasyMock.expectLastCall();
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();
consumer.addRecord(getConsumerRecord(partition1, 0L));
consumer.addRecord(getConsumerRecord(partition1, 1L));
consumer.addRecord(getConsumerRecord(partition1, 2L));
consumer.poll(Duration.ZERO);
task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L)));
task.process(0L);
task.commit();
verify(recordCollector);
}
private Map<TopicPartition, Long> getCommittetOffsets(final Map<TopicPartition, OffsetAndMetadata> committedOffsetsAndMetadata) {
final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
for (final Map.Entry<TopicPartition, OffsetAndMetadata> e : committedOffsetsAndMetadata.entrySet()) {
committedOffsets.put(e.getKey(), e.getValue().offset());
}
return committedOffsets;
}
@Test
public void shouldRespectCommitRequested() {
task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);
@ -708,7 +759,7 @@ public class StreamTaskTest { @@ -708,7 +759,7 @@ public class StreamTaskTest {
@Test
public void shouldEncodeAndDecodeMetadata() {
task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST);
assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(StreamTask.encodeTimestamp(DEFAULT_TIMESTAMP)));
assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(encodeTimestamp(DEFAULT_TIMESTAMP)));
}
@Test
@ -984,7 +1035,7 @@ public class StreamTaskTest { @@ -984,7 +1035,7 @@ public class StreamTaskTest {
assertTrue(source1.closed);
assertTrue(source2.closed);
EasyMock.verify(stateManager);
verify(stateManager);
}
@Test
@ -1038,7 +1089,7 @@ public class StreamTaskTest { @@ -1038,7 +1089,7 @@ public class StreamTaskTest {
task.completeRestoration();
task.commit();
EasyMock.verify(recordCollector);
verify(recordCollector);
}
@Test
@ -1125,7 +1176,7 @@ public class StreamTaskTest { @@ -1125,7 +1176,7 @@ public class StreamTaskTest {
task.closeDirty();
EasyMock.verify(stateManager);
verify(stateManager);
}
@Test
@ -1138,6 +1189,7 @@ public class StreamTaskTest { @@ -1138,6 +1189,7 @@ public class StreamTaskTest {
Collections.singleton(repartition.topic())
);
consumer.assign(asList(partition1, repartition));
consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
@ -1266,7 +1318,7 @@ public class StreamTaskTest { @@ -1266,7 +1318,7 @@ public class StreamTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
EasyMock.verify(stateManager);
verify(stateManager);
}
@Test
@ -1294,7 +1346,7 @@ public class StreamTaskTest { @@ -1294,7 +1346,7 @@ public class StreamTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
EasyMock.verify(stateManager);
verify(stateManager);
}
@Test
@ -1321,7 +1373,7 @@ public class StreamTaskTest { @@ -1321,7 +1373,7 @@ public class StreamTaskTest {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
EasyMock.verify(stateManager);
verify(stateManager);
EasyMock.reset(stateManager);
EasyMock.replay(stateManager);
}
@ -1352,7 +1404,7 @@ public class StreamTaskTest { @@ -1352,7 +1404,7 @@ public class StreamTaskTest {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
EasyMock.verify(stateManager);
verify(stateManager);
EasyMock.reset(stateManager);
EasyMock.replay(stateManager);
}
@ -1383,7 +1435,7 @@ public class StreamTaskTest { @@ -1383,7 +1435,7 @@ public class StreamTaskTest {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
EasyMock.verify(stateManager);
verify(stateManager);
EasyMock.reset(stateManager);
EasyMock.replay(stateManager);
}

14
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java

@ -339,22 +339,28 @@ public class TopologyTestDriver implements Closeable { @@ -339,22 +339,28 @@ public class TopologyTestDriver implements Closeable {
offsetsByTopicOrPatternPartition.put(tp, new AtomicLong());
}
consumer.assign(partitionsByInputTopic.values());
final Map<TopicPartition, Long> startOffsets = new HashMap<>();
for (final TopicPartition topicPartition : partitionsByInputTopic.values()) {
startOffsets.put(topicPartition, 0L);
}
consumer.updateBeginningOffsets(startOffsets);
if (globalTopology != null) {
final MockConsumer<byte[], byte[]> globalConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
for (final String topicName : globalTopology.sourceTopics()) {
final TopicPartition partition = new TopicPartition(topicName, 0);
globalPartitionsByInputTopic.put(topicName, partition);
offsetsByTopicOrPatternPartition.put(partition, new AtomicLong());
consumer.updatePartitions(topicName, Collections.singletonList(
globalConsumer.updatePartitions(topicName, Collections.singletonList(
new PartitionInfo(topicName, 0, null, null, null)));
consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
consumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
globalConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
}
globalStateManager = new GlobalStateManagerImpl(
new LogContext("mock "),
globalTopology,
consumer,
globalConsumer,
stateDirectory,
stateRestoreListener,
streamsConfig);

Loading…
Cancel
Save