From aa0d0ec32ac53099ddf33e04be2a1701e539dffa Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 12 Feb 2020 12:19:34 -0800 Subject: [PATCH] KAFKA-6607: Commit correct offsets for transactional input data (#8091) Reviewers: Guozhang Wang --- .../java/org/apache/kafka/test/TestUtils.java | 1 - .../processor/internals/PartitionGroup.java | 35 +++-- .../processor/internals/RecordQueue.java | 4 + .../processor/internals/StreamTask.java | 17 ++- .../processor/internals/StreamThread.java | 14 +- .../AbstractResetIntegrationTest.java | 65 +++------ .../integration/EosIntegrationTest.java | 66 ++++++++-- .../utils/IntegrationTestUtils.java | 46 ++++++- .../internals/PartitionGroupTest.java | 123 ++++++++++++++++-- .../processor/internals/RecordQueueTest.java | 69 +++++++--- .../processor/internals/StreamTaskTest.java | 76 +++++++++-- .../kafka/streams/TopologyTestDriver.java | 14 +- 12 files changed, 399 insertions(+), 131 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index ad2ad995034..14eab7284db 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -282,7 +282,6 @@ public class TestUtils { final Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "all"); - properties.put(ProducerConfig.RETRIES_CONFIG, 0); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); properties.putAll(additional); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 365a7a2ad93..05cf66ad072 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -59,14 +59,14 @@ public class PartitionGroup { private boolean allBuffered; - public static class RecordInfo { + static class RecordInfo { RecordQueue queue; - public ProcessorNode node() { + ProcessorNode node() { return queue.source(); } - public TopicPartition partition() { + TopicPartition partition() { return queue.partition(); } @@ -88,7 +88,7 @@ public class PartitionGroup { long partitionTimestamp(final TopicPartition partition) { final RecordQueue queue = partitionQueues.get(partition); if (queue == null) { - throw new NullPointerException("Partition " + partition + " not found."); + throw new IllegalStateException("Partition " + partition + " not found."); } return queue.partitionTime(); } @@ -96,7 +96,7 @@ public class PartitionGroup { void setPartitionTime(final TopicPartition partition, final long partitionTime) { final RecordQueue queue = partitionQueues.get(partition); if (queue == null) { - throw new NullPointerException("Partition " + partition + " not found."); + throw new IllegalStateException("Partition " + partition + " not found."); } if (streamTime < partitionTime) { streamTime = partitionTime; @@ -152,6 +152,10 @@ public class PartitionGroup { int addRawRecords(final TopicPartition partition, final Iterable> rawRecords) { final RecordQueue recordQueue = partitionQueues.get(partition); + if (recordQueue == null) { + throw new IllegalStateException("Partition " + partition + " not found."); + } + final int oldSize = recordQueue.size(); final int newSize = recordQueue.addRawRecords(rawRecords); @@ -172,17 +176,27 @@ public class PartitionGroup { return newSize; } - public Set partitions() { + Set partitions() { return Collections.unmodifiableSet(partitionQueues.keySet()); } /** * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions */ - public long streamTime() { + long streamTime() { return streamTime; } + Long headRecordOffset(final TopicPartition partition) { + final RecordQueue recordQueue = partitionQueues.get(partition); + + if (recordQueue == null) { + throw new IllegalStateException("Partition " + partition + " not found."); + } + + return recordQueue.headRecordOffset(); + } + /** * @throws IllegalStateException if the record's partition does not belong to this partition group */ @@ -190,7 +204,7 @@ public class PartitionGroup { final RecordQueue recordQueue = partitionQueues.get(partition); if (recordQueue == null) { - throw new IllegalStateException(String.format("Record's partition %s does not belong to this partition-group.", partition)); + throw new IllegalStateException("Partition " + partition + " not found."); } return recordQueue.size(); @@ -204,14 +218,15 @@ public class PartitionGroup { return allBuffered; } - public void close() { + void close() { clear(); partitionQueues.clear(); streamTime = RecordQueue.UNKNOWN; } - public void clear() { + void clear() { nonEmptyQueuesByTime.clear(); + totalBuffered = 0; for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 2dabf7a95c6..00601a7e92f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -156,6 +156,10 @@ public class RecordQueue { return headRecord == null ? UNKNOWN : headRecord.timestamp; } + public Long headRecordOffset() { + return headRecord == null ? null : headRecord.offset(); + } + /** * Clear the fifo queue of its elements */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index f025b00e039..06832921345 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -336,7 +337,21 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final Map consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); for (final Map.Entry entry : consumedOffsets.entrySet()) { final TopicPartition partition = entry.getKey(); - final long offset = entry.getValue() + 1L; + Long offset = partitionGroup.headRecordOffset(partition); + if (offset == null) { + try { + offset = consumer.position(partition); + } catch (final TimeoutException error) { + // the `consumer.position()` call should never block, because we know that we did process data + // for the requested partition and thus the consumer should have a valid local position + // that it can return immediately + + // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` + throw new IllegalStateException(error); + } catch (final KafkaException fatal) { + throw new StreamsException(fatal); + } + } final long partitionTime = partitionTimes.get(partition); consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime))); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 27324fdf7c5..7efbbc759ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -802,6 +802,13 @@ public class StreamThread extends Thread { throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); } + final long pollLatency = advanceNowAndComputeLatency(); + + if (records != null && !records.isEmpty()) { + pollSensor.record(pollLatency, now); + addRecordsToTasks(records); + } + // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation @@ -811,13 +818,6 @@ public class StreamThread extends Thread { return; } - final long pollLatency = advanceNowAndComputeLatency(); - - if (records != null && !records.isEmpty()) { - pollSensor.record(pollLatency, now); - addRecordsToTasks(records); - } - // we can always let changelog reader to try restoring in order to initialize the changelogs; // if there's no active restoring or standby updating it would not try to fetch any data changelogReader.restore(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 675286bd259..73ec0d91623 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration; import kafka.tools.StreamsResetter; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; @@ -42,7 +41,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -62,9 +60,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -170,25 +168,11 @@ public abstract class AbstractResetIntegrationTest { private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; private static final int TIMEOUT_MULTIPLIER = 15; - private class ConsumerGroupInactiveCondition implements TestCondition { - @Override - public boolean conditionMet() { - try { - final ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get(); - return groupDescription.members().isEmpty(); - } catch (final ExecutionException | InterruptedException e) { - return false; - } - } - } - void prepareTest() throws Exception { prepareConfigs(); prepareEnvironment(); - // busy wait until cluster (ie, ConsumerGroupCoordinator) is available - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Test consumer group " + appID + " still active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT); cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); @@ -286,15 +270,13 @@ public abstract class AbstractResetIntegrationTest { final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); // RESET streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.cleanUp(); cleanGlobal(false, null, null); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); assertInternalTopicsGotDeleted(null); @@ -305,8 +287,7 @@ public abstract class AbstractResetIntegrationTest { assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); cleanGlobal(false, null, null); } @@ -325,8 +306,7 @@ public abstract class AbstractResetIntegrationTest { final List> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC_2, 40); streams.close(); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); // insert bad record to make sure intermediate user topic gets seekToEnd() mockTime.sleep(1); @@ -341,8 +321,7 @@ public abstract class AbstractResetIntegrationTest { streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig); streams.cleanUp(); cleanGlobal(true, null, null); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); @@ -363,8 +342,7 @@ public abstract class AbstractResetIntegrationTest { } assertThat(resultIntermediate.get(10), equalTo(badMessage)); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); cleanGlobal(true, null, null); cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); @@ -380,8 +358,7 @@ public abstract class AbstractResetIntegrationTest { final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); // RESET final File resetFile = File.createTempFile("reset", ".csv"); @@ -393,8 +370,7 @@ public abstract class AbstractResetIntegrationTest { streams.cleanUp(); cleanGlobal(false, "--from-file", resetFile.getAbsolutePath()); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); assertInternalTopicsGotDeleted(null); @@ -408,8 +384,7 @@ public abstract class AbstractResetIntegrationTest { result.remove(0); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); cleanGlobal(false, null, null); } @@ -423,8 +398,7 @@ public abstract class AbstractResetIntegrationTest { final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); // RESET final File resetFile = File.createTempFile("reset", ".csv"); @@ -441,8 +415,7 @@ public abstract class AbstractResetIntegrationTest { calendar.add(Calendar.DATE, -1); cleanGlobal(false, "--to-datetime", format.format(calendar.getTime())); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); assertInternalTopicsGotDeleted(null); @@ -455,8 +428,7 @@ public abstract class AbstractResetIntegrationTest { assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); cleanGlobal(false, null, null); } @@ -470,8 +442,7 @@ public abstract class AbstractResetIntegrationTest { final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); // RESET final File resetFile = File.createTempFile("reset", ".csv"); @@ -483,8 +454,7 @@ public abstract class AbstractResetIntegrationTest { streams.cleanUp(); cleanGlobal(false, "--by-duration", "PT1M"); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); assertInternalTopicsGotDeleted(null); @@ -497,8 +467,7 @@ public abstract class AbstractResetIntegrationTest { assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(new ConsumerGroupInactiveCondition(), TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group " + appID + " did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); cleanGlobal(false, null, null); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 902015184b7..f508ab0f2c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -16,17 +16,24 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -41,7 +48,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; - import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -51,6 +57,7 @@ import org.junit.experimental.categories.Category; import java.io.File; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -62,6 +69,9 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; @@ -118,38 +128,66 @@ public class EosIntegrationTest { @Test public void shouldBeAbleToRunWithEosEnabled() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC); + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false); + } + + @Test + public void shouldCommitCorrectOffsetIfInputTopicIsTransactional() throws Exception { + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, true); + + try (final Admin adminClient = Admin.create(mkMap(mkEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()))); + final Consumer consumer = new KafkaConsumer<>(mkMap( + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.GROUP_ID_CONFIG, applicationId), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)))) { + + waitForEmptyConsumerGroup(adminClient, applicationId, 5 * MAX_POLL_INTERVAL_MS); + + final TopicPartition topicPartition = new TopicPartition(SINGLE_PARTITION_INPUT_TOPIC, 0); + final Collection topicPartitions = Collections.singleton(topicPartition); + + final long committedOffset = adminClient.listConsumerGroupOffsets(applicationId).partitionsToOffsetAndMetadata().get().get(topicPartition).offset(); + + consumer.assign(topicPartitions); + final long consumerPosition = consumer.position(topicPartition); + final long endOffset = consumer.endOffsets(topicPartitions).get(topicPartition); + + assertThat(committedOffset, equalTo(consumerPosition)); + assertThat(committedOffset, equalTo(endOffset)); + } } @Test public void shouldBeAbleToRestartAfterClose() throws Exception { - runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC); + runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false); } @Test public void shouldBeAbleToCommitToMultiplePartitions() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC); + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC, false); } @Test public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception { - runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC); + runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC, false); } @Test public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception { - runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC); + runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, false); } @Test public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception { - runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC); + runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, false); } private void runSimpleCopyTest(final int numberOfRestarts, final String inputTopic, final String throughTopic, - final String outputTopic) throws Exception { + final String outputTopic, + final boolean inputTopicTransactional) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream input = builder.stream(inputTopic); KStream output = input; @@ -179,11 +217,17 @@ public class EosIntegrationTest { final List> inputData = prepareData(i * 100, i * 100 + 10L, 0L, 1L); + final Properties producerConfigs = new Properties(); + if (inputTopicTransactional) { + producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-input-producer"); + } + IntegrationTestUtils.produceKeyValuesSynchronously( inputTopic, inputData, - TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), - CLUSTER.time + TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, producerConfigs), + CLUSTER.time, + inputTopicTransactional ); final List> committedRecords = diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index ef508a0894b..42d203c97d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -16,15 +16,11 @@ */ package org.apache.kafka.streams.integration.utils; -import java.lang.reflect.Field; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import kafka.api.Request; import kafka.server.KafkaServer; import kafka.server.MetadataCache; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -55,6 +51,7 @@ import scala.Option; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; @@ -65,12 +62,17 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout; @@ -839,6 +841,38 @@ public class IntegrationTestUtils { }); } + private static class ConsumerGroupInactiveCondition implements TestCondition { + private final Admin adminClient; + private final String applicationId; + + private ConsumerGroupInactiveCondition(final Admin adminClient, + final String applicationId) { + this.adminClient = adminClient; + this.applicationId = applicationId; + } + + @Override + public boolean conditionMet() { + try { + final ConsumerGroupDescription groupDescription = + adminClient.describeConsumerGroups(Collections.singletonList(applicationId)) + .describedGroups() + .get(applicationId) + .get(); + return groupDescription.members().isEmpty(); + } catch (final ExecutionException | InterruptedException e) { + return false; + } + } + } + + public static void waitForEmptyConsumerGroup(final Admin adminClient, + final String applicationId, + final long timeoutMs) throws Exception { + TestUtils.waitForCondition(new IntegrationTestUtils.ConsumerGroupInactiveCondition(adminClient, applicationId), timeoutMs, + "Test consumer group " + applicationId + " still active even after waiting " + timeoutMs + " ms."); + } + private static StateListener getStateListener(final KafkaStreams streams) { try { final Field field = streams.getClass().getDeclaredField("stateListener"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 3584f9c9a30..ef31d84a1d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -39,7 +39,10 @@ import java.util.List; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; public class PartitionGroupTest { @@ -47,8 +50,8 @@ public class PartitionGroupTest { private final Serializer intSerializer = new IntegerSerializer(); private final Deserializer intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); - private final TopicPartition randomPartition = new TopicPartition("random-partition", 0); - private final String errMessage = "Partition " + randomPartition + " not found."; + private final TopicPartition unknownPartition = new TopicPartition("unknown-partition", 0); + private final String errMessage = "Partition " + unknownPartition + " not found."; private final String[] topics = {"topic"}; private final TopicPartition partition1 = new TopicPartition(topics[0], 1); private final TopicPartition partition2 = new TopicPartition(topics[0], 2); @@ -88,6 +91,14 @@ public class PartitionGroupTest { @Test public void testTimeTracking() { + testFirstBatch(); + testSecondBatch(); + } + + private void testFirstBatch() { + StampedRecord record; + final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); + assertEquals(0, group.numBuffered()); // add three 3 records with timestamp 1, 3, 5 to partition-1 final List> list1 = Arrays.asList( @@ -109,12 +120,13 @@ public class PartitionGroupTest { // st: -1 since no records was being processed yet verifyBuffered(6, 3, 3); + assertEquals(1L, group.partitionTimestamp(partition1)); + assertEquals(2L, group.partitionTimestamp(partition2)); + assertEquals(1L, group.headRecordOffset(partition1).longValue()); + assertEquals(2L, group.headRecordOffset(partition2).longValue()); assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); - StampedRecord record; - final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); - // get one record, now the time should be advanced record = group.nextRecord(info); // 1:[3, 5] @@ -123,6 +135,8 @@ public class PartitionGroupTest { assertEquals(partition1, info.partition()); assertEquals(3L, group.partitionTimestamp(partition1)); assertEquals(2L, group.partitionTimestamp(partition2)); + assertEquals(3L, group.headRecordOffset(partition1).longValue()); + assertEquals(2L, group.headRecordOffset(partition2).longValue()); assertEquals(1L, group.streamTime()); verifyTimes(record, 1L, 1L); verifyBuffered(5, 2, 3); @@ -136,10 +150,17 @@ public class PartitionGroupTest { assertEquals(partition2, info.partition()); assertEquals(3L, group.partitionTimestamp(partition1)); assertEquals(4L, group.partitionTimestamp(partition2)); + assertEquals(3L, group.headRecordOffset(partition1).longValue()); + assertEquals(4L, group.headRecordOffset(partition2).longValue()); assertEquals(2L, group.streamTime()); verifyTimes(record, 2L, 2L); verifyBuffered(4, 2, 2); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); + } + + private void testSecondBatch() { + StampedRecord record; + final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); // add 2 more records with timestamp 2, 4 to partition-1 final List> list3 = Arrays.asList( @@ -153,6 +174,8 @@ public class PartitionGroupTest { verifyBuffered(6, 4, 2); assertEquals(3L, group.partitionTimestamp(partition1)); assertEquals(4L, group.partitionTimestamp(partition2)); + assertEquals(3L, group.headRecordOffset(partition1).longValue()); + assertEquals(4L, group.headRecordOffset(partition2).longValue()); assertEquals(2L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); @@ -164,6 +187,8 @@ public class PartitionGroupTest { assertEquals(partition1, info.partition()); assertEquals(5L, group.partitionTimestamp(partition1)); assertEquals(4L, group.partitionTimestamp(partition2)); + assertEquals(5L, group.headRecordOffset(partition1).longValue()); + assertEquals(4L, group.headRecordOffset(partition2).longValue()); assertEquals(3L, group.streamTime()); verifyTimes(record, 3L, 3L); verifyBuffered(5, 3, 2); @@ -177,6 +202,8 @@ public class PartitionGroupTest { assertEquals(partition2, info.partition()); assertEquals(5L, group.partitionTimestamp(partition1)); assertEquals(6L, group.partitionTimestamp(partition2)); + assertEquals(5L, group.headRecordOffset(partition1).longValue()); + assertEquals(6L, group.headRecordOffset(partition2).longValue()); assertEquals(4L, group.streamTime()); verifyTimes(record, 4L, 4L); verifyBuffered(4, 3, 1); @@ -190,6 +217,8 @@ public class PartitionGroupTest { assertEquals(partition1, info.partition()); assertEquals(5L, group.partitionTimestamp(partition1)); assertEquals(6L, group.partitionTimestamp(partition2)); + assertEquals(2L, group.headRecordOffset(partition1).longValue()); + assertEquals(6L, group.headRecordOffset(partition2).longValue()); assertEquals(5L, group.streamTime()); verifyTimes(record, 5L, 5L); verifyBuffered(3, 2, 1); @@ -203,6 +232,8 @@ public class PartitionGroupTest { assertEquals(partition1, info.partition()); assertEquals(5L, group.partitionTimestamp(partition1)); assertEquals(6L, group.partitionTimestamp(partition2)); + assertEquals(4L, group.headRecordOffset(partition1).longValue()); + assertEquals(6L, group.headRecordOffset(partition2).longValue()); assertEquals(5L, group.streamTime()); verifyTimes(record, 2L, 5L); verifyBuffered(2, 1, 1); @@ -216,6 +247,8 @@ public class PartitionGroupTest { assertEquals(partition1, info.partition()); assertEquals(5L, group.partitionTimestamp(partition1)); assertEquals(6L, group.partitionTimestamp(partition2)); + assertNull(group.headRecordOffset(partition1)); + assertEquals(6L, group.headRecordOffset(partition2).longValue()); assertEquals(5L, group.streamTime()); verifyTimes(record, 4L, 5L); verifyBuffered(1, 0, 1); @@ -229,6 +262,8 @@ public class PartitionGroupTest { assertEquals(partition2, info.partition()); assertEquals(5L, group.partitionTimestamp(partition1)); assertEquals(6L, group.partitionTimestamp(partition2)); + assertNull(group.headRecordOffset(partition1)); + assertNull(group.headRecordOffset(partition2)); assertEquals(6L, group.streamTime()); verifyTimes(record, 6L, 6L); verifyBuffered(0, 0, 0); @@ -305,16 +340,78 @@ public class PartitionGroupTest { } @Test - public void shouldThrowNullpointerUponSetPartitionTimestampFailure() { - assertThrows(errMessage, NullPointerException.class, () -> { - group.setPartitionTime(randomPartition, 0L); - }); + public void shouldThrowIllegalStateExceptionUponAddRecordsIfPartitionUnknown() { + final IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> group.addRawRecords(unknownPartition, null)); + assertThat(errMessage, equalTo(exception.getMessage())); + } + + @Test + public void shouldThrowIllegalStateExceptionUponNumBufferedIfPartitionUnknown() { + final IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> group.numBuffered(unknownPartition)); + assertThat(errMessage, equalTo(exception.getMessage())); } @Test - public void shouldThrowNullpointerUponGetPartitionTimestampFailure() { - assertThrows(errMessage, NullPointerException.class, () -> { - group.partitionTimestamp(randomPartition); - }); + public void shouldThrowIllegalStateExceptionUponSetPartitionTimestampIfPartitionUnknown() { + final IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> group.setPartitionTime(unknownPartition, 0L)); + assertThat(errMessage, equalTo(exception.getMessage())); + } + + @Test + public void shouldThrowIllegalStateExceptionUponGetPartitionTimestampIfPartitionUnknown() { + final IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> group.partitionTimestamp(unknownPartition)); + assertThat(errMessage, equalTo(exception.getMessage())); + } + + @Test + public void shouldThrowIllegalStateExceptionUponGetHeadRecordOffsetIfPartitionUnknown() { + final IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> group.headRecordOffset(unknownPartition)); + assertThat(errMessage, equalTo(exception.getMessage())); + } + + @Test + public void shouldEmpyPartitionsOnClean() { + final List> list = Arrays.asList( + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); + group.addRawRecords(partition1, list); + + group.clear(); + + assertThat(group.numBuffered(), equalTo(0)); + assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN)); + assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null)); + + group.addRawRecords(partition1, list); + } + + public void shouldCleanPartitionsOnClose() { + final List> list = Arrays.asList( + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); + group.addRawRecords(partition1, list); + + group.close(); + + assertThat(group.numBuffered(), equalTo(0)); + assertThat(group.streamTime(), equalTo(RecordQueue.UNKNOWN)); + assertThat(group.nextRecord(new PartitionGroup.RecordInfo()), equalTo(null)); + + final IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> group.addRawRecords(partition1, list)); + assertThat("Partition topic-1 not found.", equalTo(exception.getMessage())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index c32feb297fb..c98950ebc10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -45,29 +46,33 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class RecordQueueTest { private final Serializer intSerializer = new IntegerSerializer(); private final Deserializer intDeserializer = new IntegerDeserializer(); private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); - private final String[] topics = {"topic"}; final InternalMockProcessorContext context = new InternalMockProcessorContext( StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), new MockRecordCollector() ); - private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer); + private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(new String[] {"topic"}, intDeserializer, intDeserializer); private final RecordQueue queue = new RecordQueue( - new TopicPartition(topics[0], 1), + new TopicPartition("topic", 1), mockSourceNodeWithMetrics, timestampExtractor, new LogAndFailExceptionHandler(), context, new LogContext()); private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue( - new TopicPartition(topics[0], 1), + new TopicPartition("topic", 1), mockSourceNodeWithMetrics, timestampExtractor, new LogAndContinueExceptionHandler(), @@ -89,10 +94,10 @@ public class RecordQueueTest { @Test public void testTimeTracking() { - assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + assertNull(queue.headRecordOffset()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List> list1 = Arrays.asList( @@ -104,16 +109,19 @@ public class RecordQueueTest { assertEquals(3, queue.size()); assertEquals(2L, queue.headRecordTimestamp()); + assertEquals(2L, queue.headRecordOffset().longValue()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); assertEquals(2, queue.size()); assertEquals(1L, queue.headRecordTimestamp()); + assertEquals(1L, queue.headRecordOffset().longValue()); // poll the second record, now with 3 assertEquals(1L, queue.poll().timestamp); assertEquals(1, queue.size()); assertEquals(3L, queue.headRecordTimestamp()); + assertEquals(3L, queue.headRecordOffset().longValue()); // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 @@ -126,23 +134,28 @@ public class RecordQueueTest { assertEquals(4, queue.size()); assertEquals(3L, queue.headRecordTimestamp()); + assertEquals(3L, queue.headRecordOffset().longValue()); // poll the third record, now with 4, 1, 2 assertEquals(3L, queue.poll().timestamp); assertEquals(3, queue.size()); assertEquals(4L, queue.headRecordTimestamp()); + assertEquals(4L, queue.headRecordOffset().longValue()); // poll the rest records assertEquals(4L, queue.poll().timestamp); assertEquals(1L, queue.headRecordTimestamp()); + assertEquals(1L, queue.headRecordOffset().longValue()); assertEquals(1L, queue.poll().timestamp); assertEquals(2L, queue.headRecordTimestamp()); + assertEquals(2L, queue.headRecordOffset().longValue()); assertEquals(2L, queue.poll().timestamp); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + assertNull(queue.headRecordOffset()); // add three more records with 4, 5, 6 final List> list3 = Arrays.asList( @@ -154,23 +167,27 @@ public class RecordQueueTest { assertEquals(3, queue.size()); assertEquals(4L, queue.headRecordTimestamp()); + assertEquals(4L, queue.headRecordOffset().longValue()); // poll one record again, the timestamp should advance now assertEquals(4L, queue.poll().timestamp); assertEquals(2, queue.size()); assertEquals(5L, queue.headRecordTimestamp()); + assertEquals(5L, queue.headRecordOffset().longValue()); // clear the queue queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + assertNull(queue.headRecordOffset()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3); assertEquals(3, queue.size()); assertEquals(4L, queue.headRecordTimestamp()); + assertEquals(4L, queue.headRecordOffset().longValue()); } @Test @@ -227,22 +244,30 @@ public class RecordQueueTest { assertEquals(500L, queue.partitionTime()); } - @Test(expected = StreamsException.class) + @Test public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() { final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue)); - queue.addRawRecords(records); + final StreamsException exception = assertThrows( + StreamsException.class, + () -> queue.addRawRecords(records) + ); + assertThat(exception.getCause(), instanceOf(SerializationException.class)); } - @Test(expected = StreamsException.class) + @Test public void shouldThrowStreamsExceptionWhenValueDeserializationFails() { final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value)); - queue.addRawRecords(records); + final StreamsException exception = assertThrows( + StreamsException.class, + () -> queue.addRawRecords(records) + ); + assertThat(exception.getCause(), instanceOf(SerializationException.class)); } @Test @@ -265,21 +290,29 @@ public class RecordQueueTest { assertEquals(0, queueThatSkipsDeserializeErrors.size()); } - - @Test(expected = StreamsException.class) + @Test public void shouldThrowOnNegativeTimestamp() { final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); final RecordQueue queue = new RecordQueue( - new TopicPartition(topics[0], 1), - new MockSourceNode<>(topics, intDeserializer, intDeserializer), + new TopicPartition("topic", 1), + mockSourceNodeWithMetrics, new FailOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), new LogContext()); - queue.addRawRecords(records); + final StreamsException exception = assertThrows( + StreamsException.class, + () -> queue.addRawRecords(records) + ); + assertThat(exception.getMessage(), equalTo("Input record ConsumerRecord(topic = topic, partition = 1, " + + "leaderEpoch = null, offset = 1, CreateTime = -1, serialized key size = 0, serialized value size = 0, " + + "headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = 10) has invalid (negative) " + + "timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without " + + "embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data.")); } @Test @@ -288,8 +321,8 @@ public class RecordQueueTest { new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); final RecordQueue queue = new RecordQueue( - new TopicPartition(topics[0], 1), - new MockSourceNode<>(topics, intDeserializer, intDeserializer), + new TopicPartition("topic", 1), + mockSourceNodeWithMetrics, new LogAndSkipOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), @@ -304,7 +337,7 @@ public class RecordQueueTest { final PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor(); final RecordQueue queue = new RecordQueue( - new TopicPartition(topics[0], 1), + new TopicPartition("topic", 1), mockSourceNodeWithMetrics, timestampExtractor, new LogAndFailExceptionHandler(), @@ -340,7 +373,7 @@ public class RecordQueueTest { } - class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor { + private static class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor { private long partitionTime = RecordQueue.UNKNOWN; public long extract(final ConsumerRecord record, final long partitionTime) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 912b75df9bc..9909e9c0e99 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -62,8 +62,11 @@ import org.junit.runner.RunWith; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -76,8 +79,10 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.StreamTask.encodeTimestamp; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_24; +import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -196,6 +201,7 @@ public class StreamTaskTest { EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes(); consumer.assign(asList(partition1, partition2)); + consumer.updateBeginningOffsets(mkMap(mkEntry(partition1, 0L), mkEntry(partition2, 0L))); stateDirectory = new StateDirectory(createConfig(false, "100"), new MockTime(), true); } @@ -232,7 +238,7 @@ public class StreamTaskTest { task.initializeIfNeeded(); // should fail if lock is called - EasyMock.verify(stateDirectory); + verify(stateDirectory); } @Test @@ -241,7 +247,7 @@ public class StreamTaskTest { EasyMock.replay(stateDirectory); consumer.commitSync(partitions.stream() - .collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(0L, StreamTask.encodeTimestamp(10L))))); + .collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(0L, encodeTimestamp(10L))))); task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST); @@ -283,7 +289,7 @@ public class StreamTaskTest { assertTrue(source1.initialized); assertTrue(source2.initialized); - EasyMock.verify(stateDirectory); + verify(stateDirectory); } @Test @@ -695,6 +701,51 @@ public class StreamTaskTest { assertFalse(task.commitNeeded()); } + @Test + public void shouldCommitNextOffsetFromQueueIfAvailable() { + recordCollector.commit(EasyMock.eq(mkMap(mkEntry(partition1, new OffsetAndMetadata(5L, encodeTimestamp(5L)))))); + EasyMock.expectLastCall(); + + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + task.addRecords(partition1, Arrays.asList(getConsumerRecord(partition1, 0L), getConsumerRecord(partition1, 5L))); + task.process(0L); + task.commit(); + + verify(recordCollector); + } + + @Test + public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() { + recordCollector.commit(EasyMock.eq(mkMap(mkEntry(partition1, new OffsetAndMetadata(3L, encodeTimestamp(0L)))))); + EasyMock.expectLastCall(); + + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + task.initializeIfNeeded(); + task.completeRestoration(); + + consumer.addRecord(getConsumerRecord(partition1, 0L)); + consumer.addRecord(getConsumerRecord(partition1, 1L)); + consumer.addRecord(getConsumerRecord(partition1, 2L)); + consumer.poll(Duration.ZERO); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L))); + task.process(0L); + task.commit(); + + verify(recordCollector); + } + + private Map getCommittetOffsets(final Map committedOffsetsAndMetadata) { + final Map committedOffsets = new HashMap<>(); + for (final Map.Entry e : committedOffsetsAndMetadata.entrySet()) { + committedOffsets.put(e.getKey(), e.getValue().offset()); + } + return committedOffsets; + } + @Test public void shouldRespectCommitRequested() { task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST); @@ -708,7 +759,7 @@ public class StreamTaskTest { @Test public void shouldEncodeAndDecodeMetadata() { task = createStatelessTask(createConfig(false, "100"), StreamsConfig.METRICS_LATEST); - assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(StreamTask.encodeTimestamp(DEFAULT_TIMESTAMP))); + assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(encodeTimestamp(DEFAULT_TIMESTAMP))); } @Test @@ -984,7 +1035,7 @@ public class StreamTaskTest { assertTrue(source1.closed); assertTrue(source2.closed); - EasyMock.verify(stateManager); + verify(stateManager); } @Test @@ -1038,7 +1089,7 @@ public class StreamTaskTest { task.completeRestoration(); task.commit(); - EasyMock.verify(recordCollector); + verify(recordCollector); } @Test @@ -1125,7 +1176,7 @@ public class StreamTaskTest { task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager); } @Test @@ -1138,6 +1189,7 @@ public class StreamTaskTest { Collections.singleton(repartition.topic()) ); consumer.assign(asList(partition1, repartition)); + consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L))); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); @@ -1266,7 +1318,7 @@ public class StreamTaskTest { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); + verify(stateManager); } @Test @@ -1294,7 +1346,7 @@ public class StreamTaskTest { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); + verify(stateManager); } @Test @@ -1321,7 +1373,7 @@ public class StreamTaskTest { final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); + verify(stateManager); EasyMock.reset(stateManager); EasyMock.replay(stateManager); } @@ -1352,7 +1404,7 @@ public class StreamTaskTest { final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); + verify(stateManager); EasyMock.reset(stateManager); EasyMock.replay(stateManager); } @@ -1383,7 +1435,7 @@ public class StreamTaskTest { final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); + verify(stateManager); EasyMock.reset(stateManager); EasyMock.replay(stateManager); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 52e5aa25c3d..98e059e2485 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -339,22 +339,28 @@ public class TopologyTestDriver implements Closeable { offsetsByTopicOrPatternPartition.put(tp, new AtomicLong()); } consumer.assign(partitionsByInputTopic.values()); + final Map startOffsets = new HashMap<>(); + for (final TopicPartition topicPartition : partitionsByInputTopic.values()) { + startOffsets.put(topicPartition, 0L); + } + consumer.updateBeginningOffsets(startOffsets); if (globalTopology != null) { + final MockConsumer globalConsumer = new MockConsumer<>(OffsetResetStrategy.NONE); for (final String topicName : globalTopology.sourceTopics()) { final TopicPartition partition = new TopicPartition(topicName, 0); globalPartitionsByInputTopic.put(topicName, partition); offsetsByTopicOrPatternPartition.put(partition, new AtomicLong()); - consumer.updatePartitions(topicName, Collections.singletonList( + globalConsumer.updatePartitions(topicName, Collections.singletonList( new PartitionInfo(topicName, 0, null, null, null))); - consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); - consumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); + globalConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); + globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); } globalStateManager = new GlobalStateManagerImpl( new LogContext("mock "), globalTopology, - consumer, + globalConsumer, stateDirectory, stateRestoreListener, streamsConfig);