From be82a2afc9e38adc0109dc694834ca5947128877 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 30 Jul 2015 14:23:43 -0700 Subject: [PATCH] KAFKA-2350; KafkaConsumer pause/resume API Author: Jason Gustafson Reviewers: Ismael, Ashish, Guozhang Closes #100 from hachikuji/KAFKA-2350 and squashes the following commits: 250e823 [Jason Gustafson] KAFKA-2350; KafkaConsumer pause/resume API --- .../kafka/clients/consumer/Consumer.java | 10 + .../kafka/clients/consumer/KafkaConsumer.java | 48 +++- .../kafka/clients/consumer/MockConsumer.java | 39 ++- .../consumer/internals/Coordinator.java | 8 +- .../clients/consumer/internals/Fetcher.java | 45 ++-- .../consumer/internals/SubscriptionState.java | 238 +++++++++++++----- .../clients/consumer/MockConsumerTest.java | 2 +- .../consumer/internals/FetcherTest.java | 36 ++- .../internals/SubscriptionStateTest.java | 58 ++++- .../integration/kafka/api/ConsumerTest.scala | 32 ++- 10 files changed, 386 insertions(+), 130 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 23e410b7d93..158e1ea1e35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -118,6 +118,16 @@ public interface Consumer extends Closeable { */ public Map> listTopics(); + /** + * @see KafkaConsumer#pause(TopicPartition...) + */ + public void pause(TopicPartition... partitions); + + /** + * @see KafkaConsumer#resume(TopicPartition...) + */ + public void resume(TopicPartition... partitions); + /** * @see KafkaConsumer#close() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 923ff999d1b..7851644c704 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -43,7 +43,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -852,9 +851,7 @@ public class KafkaConsumer implements Consumer { public void commit(CommitType commitType, ConsumerCommitCallback callback) { acquire(); try { - // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance) - Map allConsumed = new HashMap(this.subscriptions.allConsumed()); - commit(allConsumed, commitType, callback); + commit(subscriptions.allConsumed(), commitType, callback); } finally { release(); } @@ -941,7 +938,7 @@ public class KafkaConsumer implements Consumer { public long position(TopicPartition partition) { acquire(); try { - if (!this.subscriptions.assignedPartitions().contains(partition)) + if (!this.subscriptions.isAssigned(partition)) throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); Long offset = this.subscriptions.consumed(partition); if (offset == null) { @@ -972,7 +969,7 @@ public class KafkaConsumer implements Consumer { acquire(); try { Long committed; - if (subscriptions.assignedPartitions().contains(partition)) { + if (subscriptions.isAssigned(partition)) { committed = this.subscriptions.committed(partition); if (committed == null) { coordinator.refreshCommittedOffsetsIfNeeded(); @@ -1040,6 +1037,45 @@ public class KafkaConsumer implements Consumer { } } + /** + * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return + * any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}. + * Note that this method does not affect partition subscription. In particular, it does not cause a group + * rebalance when automatic assignment is used. + * @param partitions The partitions which should be paused + */ + @Override + public void pause(TopicPartition... partitions) { + acquire(); + try { + for (TopicPartition partition: partitions) { + log.debug("Pausing partition {}", partition); + subscriptions.pause(partition); + } + } finally { + release(); + } + } + + /** + * Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to + * {@link #poll(long)} will return records from these partitions if there are any to be fetched. + * If the partitions were not previously paused, this method is a no-op. + * @param partitions The partitions which should be resumed + */ + @Override + public void resume(TopicPartition... partitions) { + acquire(); + try { + for (TopicPartition partition: partitions) { + log.debug("Resuming partition {}", partition); + subscriptions.resume(partition); + } + } finally { + release(); + } + } + @Override public void close() { acquire(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 5b22fa0bcb4..b07e760c28b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -12,6 +12,12 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -20,12 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.kafka.clients.consumer.internals.SubscriptionState; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.MetricName; - /** * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not * threadsafe @@ -83,9 +83,11 @@ public class MockConsumer implements Consumer { ensureNotClosed(); // update the consumed offset for (Map.Entry>> entry : this.records.entrySet()) { - List> recs = entry.getValue(); - if (!recs.isEmpty()) - this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); + if (!subscriptions.isPaused(entry.getKey())) { + List> recs = entry.getValue(); + if (!recs.isEmpty()) + this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); + } } ConsumerRecords copy = new ConsumerRecords(this.records); @@ -96,7 +98,12 @@ public class MockConsumer implements Consumer { public synchronized void addRecord(ConsumerRecord record) { ensureNotClosed(); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - this.subscriptions.assignedPartitions().add(tp); + ArrayList currentAssigned = new ArrayList<>(this.subscriptions.assignedPartitions()); + if (!currentAssigned.contains(tp)) { + currentAssigned.add(tp); + this.subscriptions.changePartitionAssignment(currentAssigned); + } + subscriptions.seek(tp, record.offset()); List> recs = this.records.get(tp); if (recs == null) { recs = new ArrayList>(); @@ -188,6 +195,18 @@ public class MockConsumer implements Consumer { this.partitions.put(topic, partitions); } + @Override + public void pause(TopicPartition... partitions) { + for (TopicPartition partition : partitions) + subscriptions.pause(partition); + } + + @Override + public void resume(TopicPartition... partitions) { + for (TopicPartition partition : partitions) + subscriptions.resume(partition); + } + @Override public synchronized void close() { ensureNotClosed(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 6026b23a5cb..cd5cdc35014 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -119,7 +119,9 @@ public final class Coordinator { Map offsets = fetchCommittedOffsets(subscriptions.assignedPartitions()); for (Map.Entry entry : offsets.entrySet()) { TopicPartition tp = entry.getKey(); - this.subscriptions.committed(tp, entry.getValue()); + // verify assignment is still active + if (subscriptions.isAssigned(tp)) + this.subscriptions.committed(tp, entry.getValue()); } this.subscriptions.commitsRefreshed(); } @@ -459,7 +461,9 @@ public final class Coordinator { short errorCode = entry.getValue(); if (errorCode == Errors.NONE.code()) { log.debug("Committed offset {} for partition {}", offset, tp); - subscriptions.committed(tp, offset); + if (subscriptions.isAssigned(tp)) + // update the local cache only if the partition is still assigned + subscriptions.committed(tp, offset); } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 9f71451e1b0..956197b7aa7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -143,8 +143,7 @@ public class Fetcher { public void updateFetchPositions(Set partitions) { // reset the fetch position to the committed position for (TopicPartition tp : partitions) { - // skip if we already have a fetch position - if (subscriptions.fetched(tp) != null) + if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp)) continue; // TODO: If there are several offsets to reset, we could submit offset requests in parallel @@ -222,7 +221,10 @@ public class Fetcher { log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase()); long offset = listOffset(partition, timestamp); - this.subscriptions.seek(partition, offset); + + // we might lose the assignment while fetching the offset, so check it is still active + if (subscriptions.isAssigned(partition)) + this.subscriptions.seek(partition, offset); } /** @@ -259,11 +261,15 @@ public class Fetcher { if (this.subscriptions.partitionAssignmentNeeded()) { return Collections.emptyMap(); } else { - Map>> drained = new HashMap>>(); + Map>> drained = new HashMap<>(); for (PartitionRecords part : this.records) { + if (!subscriptions.isFetchable(part.partition)) { + log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition); + continue; + } + Long consumed = subscriptions.consumed(part.partition); - if (this.subscriptions.assignedPartitions().contains(part.partition) - && consumed != null && part.fetchOffset == consumed) { + if (consumed != null && part.fetchOffset == consumed) { List> records = drained.get(part.partition); if (records == null) { records = part.records; @@ -354,8 +360,8 @@ public class Fetcher { */ private Map createFetchRequests(Cluster cluster) { // create the fetch info - Map> fetchable = new HashMap>(); - for (TopicPartition partition : subscriptions.assignedPartitions()) { + Map> fetchable = new HashMap<>(); + for (TopicPartition partition : subscriptions.fetchablePartitions()) { Node node = cluster.leaderFor(partition); if (node == null) { metadata.requestUpdate(); @@ -363,16 +369,17 @@ public class Fetcher { // if there is a leader and no in-flight requests, issue a new fetch Map fetch = fetchable.get(node); if (fetch == null) { - fetch = new HashMap(); + fetch = new HashMap<>(); fetchable.put(node, fetch); } + long offset = this.subscriptions.fetched(partition); fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize)); } } // create the fetches - Map requests = new HashMap(); + Map requests = new HashMap<>(); for (Map.Entry> entry : fetchable.entrySet()) { Node node = entry.getKey(); FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue()); @@ -399,15 +406,7 @@ public class Fetcher { if (!subscriptions.assignedPartitions().contains(tp)) { log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp); } else if (partition.errorCode == Errors.NONE.code()) { - int bytes = 0; - ByteBuffer buffer = partition.recordSet; - MemoryRecords records = MemoryRecords.readableRecords(buffer); long fetchOffset = request.fetchData().get(tp).offset; - List> parsed = new ArrayList>(); - for (LogEntry logEntry : records) { - parsed.add(parseRecord(tp, logEntry)); - bytes += logEntry.size(); - } // we are interested in this fetch only if the beginning offset matches the // current consumed position @@ -422,7 +421,15 @@ public class Fetcher { continue; } - if (parsed.size() > 0) { + int bytes = 0; + ByteBuffer buffer = partition.recordSet; + MemoryRecords records = MemoryRecords.readableRecords(buffer); + List> parsed = new ArrayList>(); + for (LogEntry logEntry : records) { + parsed.add(parseRecord(tp, logEntry)); + bytes += logEntry.size(); + } + if (!parsed.isEmpty()) { ConsumerRecord record = parsed.get(parsed.size() - 1); this.subscriptions.fetched(tp, record.offset() + 1); this.records.add(new PartitionRecords(fetchOffset, tp, parsed)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 8a2cb1237ea..6788ee67df8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -23,7 +23,25 @@ import java.util.Map; import java.util.Set; /** - * A class for tracking the topics, partitions, and offsets for the consumer + * A class for tracking the topics, partitions, and offsets for the consumer. A partition + * is "assigned" either directly with {@link #subscribe(TopicPartition)} (manual assignment) + * or with {@link #changePartitionAssignment(List)} (automatic assignment). + * + * Once assigned, the partition is not considered "fetchable" until its initial position has + * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch + * position which is used to set the offset of the next fetch, and a consumed position + * which is the last offset that has been returned to the user. You can suspend fetching + * from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed + * offsets. The partition will remain unfetchable until the {@link #resume(TopicPartition)} is + * used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}. + * + * Note that pause state as well as fetch/consumed positions are not preserved when partition + * assignment is changed either with {@link #unsubscribe(TopicPartition)} or + * {@link #changePartitionAssignment(List)}. + * + * This class also maintains a cache of the latest commit position for each of the assigned + * partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used + * to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}. */ public class SubscriptionState { @@ -34,16 +52,7 @@ public class SubscriptionState { private final Set subscribedPartitions; /* the list of partitions currently assigned */ - private final Set assignedPartitions; - - /* the offset exposed to the user */ - private final Map consumed; - - /* the current point we have fetched up to */ - private final Map fetched; - - /* the last committed offset for each partition */ - private final Map committed; + private final Map assignedPartitions; /* do we need to request a partition assignment from the coordinator? */ private boolean needsPartitionAssignment; @@ -51,28 +60,21 @@ public class SubscriptionState { /* do we need to request the latest committed offsets from the coordinator? */ private boolean needsFetchCommittedOffsets; - /* Partitions that need to be reset before fetching */ - private Map resetPartitions; - /* Default offset reset strategy */ - private OffsetResetStrategy offsetResetStrategy; - - public SubscriptionState(OffsetResetStrategy offsetResetStrategy) { - this.offsetResetStrategy = offsetResetStrategy; - this.subscribedTopics = new HashSet(); - this.subscribedPartitions = new HashSet(); - this.assignedPartitions = new HashSet(); - this.consumed = new HashMap(); - this.fetched = new HashMap(); - this.committed = new HashMap(); + private final OffsetResetStrategy defaultResetStrategy; + + public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { + this.defaultResetStrategy = defaultResetStrategy; + this.subscribedTopics = new HashSet<>(); + this.subscribedPartitions = new HashSet<>(); + this.assignedPartitions = new HashMap<>(); this.needsPartitionAssignment = false; this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up - this.resetPartitions = new HashMap(); } public void subscribe(String topic) { - if (this.subscribedPartitions.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive"); + if (!this.subscribedPartitions.isEmpty()) + throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive"); if (!this.subscribedTopics.contains(topic)) { this.subscribedTopics.add(topic); this.needsPartitionAssignment = true; @@ -95,10 +97,10 @@ public class SubscriptionState { } public void subscribe(TopicPartition tp) { - if (this.subscribedTopics.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive"); + if (!this.subscribedTopics.isEmpty()) + throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive"); this.subscribedPartitions.add(tp); - this.assignedPartitions.add(tp); + addAssignedPartition(tp); } public void unsubscribe(TopicPartition partition) { @@ -110,17 +112,10 @@ public class SubscriptionState { private void clearPartition(TopicPartition tp) { this.assignedPartitions.remove(tp); - this.committed.remove(tp); - this.fetched.remove(tp); - this.consumed.remove(tp); - this.resetPartitions.remove(tp); } public void clearAssignment() { this.assignedPartitions.clear(); - this.committed.clear(); - this.fetched.clear(); - this.consumed.clear(); this.needsPartitionAssignment = !subscribedTopics().isEmpty(); } @@ -129,21 +124,26 @@ public class SubscriptionState { } public Long fetched(TopicPartition tp) { - return this.fetched.get(tp); + return assignedState(tp).fetched; } public void fetched(TopicPartition tp, long offset) { - if (!this.assignedPartitions.contains(tp)) - throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to."); - this.fetched.put(tp, offset); + assignedState(tp).fetched(offset); + } + + private TopicPartitionState assignedState(TopicPartition tp) { + TopicPartitionState state = this.assignedPartitions.get(tp); + if (state == null) + throw new IllegalStateException("No current assignment for partition " + tp); + return state; } public void committed(TopicPartition tp, long offset) { - this.committed.put(tp, offset); + assignedState(tp).committed(offset); } public Long committed(TopicPartition tp) { - return this.committed.get(tp); + return assignedState(tp).committed; } public void needRefreshCommits() { @@ -157,15 +157,22 @@ public class SubscriptionState { public void commitsRefreshed() { this.needsFetchCommittedOffsets = false; } - + public void seek(TopicPartition tp, long offset) { - fetched(tp, offset); - consumed(tp, offset); - resetPartitions.remove(tp); + assignedState(tp).seek(offset); } public Set assignedPartitions() { - return this.assignedPartitions; + return this.assignedPartitions.keySet(); + } + + public Set fetchablePartitions() { + Set fetchable = new HashSet<>(); + for (Map.Entry entry : assignedPartitions.entrySet()) { + if (entry.getValue().isFetchable()) + fetchable.add(entry.getKey()); + } + return fetchable; } public boolean partitionsAutoAssigned() { @@ -173,49 +180,52 @@ public class SubscriptionState { } public void consumed(TopicPartition tp, long offset) { - if (!this.assignedPartitions.contains(tp)) - throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to."); - this.consumed.put(tp, offset); + assignedState(tp).consumed(offset); } - public Long consumed(TopicPartition partition) { - return this.consumed.get(partition); + public Long consumed(TopicPartition tp) { + return assignedState(tp).consumed; } public Map allConsumed() { - return this.consumed; + Map allConsumed = new HashMap<>(); + for (Map.Entry entry : assignedPartitions.entrySet()) { + TopicPartitionState state = entry.getValue(); + if (state.hasValidPosition) + allConsumed.put(entry.getKey(), state.consumed); + } + return allConsumed; } public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) { - this.resetPartitions.put(partition, offsetResetStrategy); - this.fetched.remove(partition); - this.consumed.remove(partition); + assignedState(partition).awaitReset(offsetResetStrategy); } public void needOffsetReset(TopicPartition partition) { - needOffsetReset(partition, offsetResetStrategy); + needOffsetReset(partition, defaultResetStrategy); } public boolean isOffsetResetNeeded(TopicPartition partition) { - return resetPartitions.containsKey(partition); - } - - public boolean isOffsetResetNeeded() { - return !resetPartitions.isEmpty(); + return assignedState(partition).awaitingReset; } public OffsetResetStrategy resetStrategy(TopicPartition partition) { - return resetPartitions.get(partition); + return assignedState(partition).resetStrategy; } public boolean hasAllFetchPositions() { - return this.fetched.size() >= this.assignedPartitions.size(); + for (TopicPartitionState state : assignedPartitions.values()) + if (!state.hasValidPosition) + return false; + return true; } public Set missingFetchPositions() { - Set copy = new HashSet(this.assignedPartitions); - copy.removeAll(this.fetched.keySet()); - return copy; + Set missing = new HashSet<>(this.assignedPartitions.keySet()); + for (Map.Entry entry : assignedPartitions.entrySet()) + if (!entry.getValue().hasValidPosition) + missing.add(entry.getKey()); + return missing; } public boolean partitionAssignmentNeeded() { @@ -227,9 +237,99 @@ public class SubscriptionState { if (!this.subscribedTopics.contains(tp.topic())) throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic."); this.clearAssignment(); - this.assignedPartitions.addAll(assignments); + for (TopicPartition tp: assignments) + addAssignedPartition(tp); this.needsPartitionAssignment = false; } + public boolean isAssigned(TopicPartition tp) { + return assignedPartitions.containsKey(tp); + } + + public boolean isPaused(TopicPartition tp) { + return isAssigned(tp) && assignedState(tp).paused; + } + + public boolean isFetchable(TopicPartition tp) { + return isAssigned(tp) && assignedState(tp).isFetchable(); + } + + public void pause(TopicPartition tp) { + assignedState(tp).pause(); + } + + public void resume(TopicPartition tp) { + assignedState(tp).resume(); + } + + private void addAssignedPartition(TopicPartition tp) { + this.assignedPartitions.put(tp, new TopicPartitionState()); + } + + private static class TopicPartitionState { + private Long consumed; // offset exposed to the user + private Long fetched; // current fetch position + private Long committed; // last committed position + + private boolean hasValidPosition; // whether we have valid consumed and fetched positions + private boolean paused; // whether this partition has been paused by the user + private boolean awaitingReset; // whether we are awaiting reset + private OffsetResetStrategy resetStrategy; // the reset strategy if awaitingReset is set + + public TopicPartitionState() { + this.paused = false; + this.consumed = null; + this.fetched = null; + this.committed = null; + this.awaitingReset = false; + this.hasValidPosition = false; + this.resetStrategy = null; + } + + private void awaitReset(OffsetResetStrategy strategy) { + this.awaitingReset = true; + this.resetStrategy = strategy; + this.consumed = null; + this.fetched = null; + this.hasValidPosition = false; + } + + private void seek(long offset) { + this.consumed = offset; + this.fetched = offset; + this.awaitingReset = false; + this.resetStrategy = null; + this.hasValidPosition = true; + } + + private void fetched(long offset) { + if (!hasValidPosition) + throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions"); + this.fetched = offset; + } + + private void consumed(long offset) { + if (!hasValidPosition) + throw new IllegalStateException("Cannot update consumed position without valid consumed/fetched positions"); + this.consumed = offset; + } + + private void committed(Long offset) { + this.committed = offset; + } + + private void pause() { + this.paused = true; + } + + private void resume() { + this.paused = false; + } + + private boolean isFetchable() { + return !paused && hasValidPosition; + } + + } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 26b6b409843..d4da64216c3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -29,7 +29,7 @@ public class MockConsumerTest { @Test public void testSimpleMock() { - consumer.subscribe("topic"); + consumer.subscribe("test"); assertEquals(0, consumer.poll(1000).count()); ConsumerRecord rec1 = new ConsumerRecord("test", 0, 0, "key1", "value1"); ConsumerRecord rec2 = new ConsumerRecord("test", 0, 1, "key2", "value2"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 06e29906365..56850bbd4d0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class FetcherTest { @@ -99,8 +100,7 @@ public class FetcherTest { public void testFetchNormal() { List> records; subscriptions.subscribe(tp); - subscriptions.fetched(tp, 0); - subscriptions.consumed(tp, 0); + subscriptions.seek(tp, 0); // normal fetch fetcher.initFetches(cluster); @@ -121,8 +121,7 @@ public class FetcherTest { public void testFetchDuringRebalance() { subscriptions.subscribe(topicName); subscriptions.changePartitionAssignment(Arrays.asList(tp)); - subscriptions.fetched(tp, 0); - subscriptions.consumed(tp, 0); + subscriptions.seek(tp, 0); fetcher.initFetches(cluster); @@ -135,11 +134,33 @@ public class FetcherTest { assertTrue(fetcher.fetchedRecords().isEmpty()); } + @Test + public void testInFlightFetchOnPausedPartition() { + subscriptions.subscribe(tp); + subscriptions.seek(tp, 0); + + fetcher.initFetches(cluster); + subscriptions.pause(tp); + + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L)); + consumerClient.poll(0); + assertNull(fetcher.fetchedRecords().get(tp)); + } + + @Test + public void testFetchOnPausedPartition() { + subscriptions.subscribe(tp); + subscriptions.seek(tp, 0); + + subscriptions.pause(tp); + fetcher.initFetches(cluster); + assertTrue(client.requests().isEmpty()); + } + @Test public void testFetchFailed() { subscriptions.subscribe(tp); - subscriptions.fetched(tp, 0); - subscriptions.consumed(tp, 0); + subscriptions.seek(tp, 0); // fetch with not leader fetcher.initFetches(cluster); @@ -169,8 +190,7 @@ public class FetcherTest { @Test public void testFetchOutOfRange() { subscriptions.subscribe(tp); - subscriptions.fetched(tp, 5); - subscriptions.consumed(tp, 5); + subscriptions.seek(tp, 5); // fetch with out of range fetcher.initFetches(cluster); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index c47f3fb699d..1ba6f7a34db 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static java.util.Arrays.asList; @@ -37,12 +38,13 @@ public class SubscriptionStateTest { state.subscribe(tp0); assertEquals(Collections.singleton(tp0), state.assignedPartitions()); state.committed(tp0, 1); - state.fetched(tp0, 1); - state.consumed(tp0, 1); + state.seek(tp0, 1); + assertTrue(state.isFetchable(tp0)); assertAllPositions(tp0, 1L); state.unsubscribe(tp0); assertTrue(state.assignedPartitions().isEmpty()); - assertAllPositions(tp0, null); + assertFalse(state.isAssigned(tp0)); + assertFalse(state.isFetchable(tp0)); } @Test @@ -52,10 +54,15 @@ public class SubscriptionStateTest { assertEquals(5L, (long) state.fetched(tp0)); assertEquals(5L, (long) state.consumed(tp0)); state.needOffsetReset(tp0); - assertTrue(state.isOffsetResetNeeded()); + assertFalse(state.isFetchable(tp0)); assertTrue(state.isOffsetResetNeeded(tp0)); assertEquals(null, state.fetched(tp0)); assertEquals(null, state.consumed(tp0)); + + // seek should clear the reset and make the partition fetchable + state.seek(tp0, 0); + assertTrue(state.isFetchable(tp0)); + assertFalse(state.isOffsetResetNeeded(tp0)); } @Test @@ -65,15 +72,27 @@ public class SubscriptionStateTest { assertTrue(state.assignedPartitions().isEmpty()); assertTrue(state.partitionsAutoAssigned()); state.changePartitionAssignment(asList(tp0)); + state.seek(tp0, 1); state.committed(tp0, 1); - state.fetched(tp0, 1); - state.consumed(tp0, 1); assertAllPositions(tp0, 1L); state.changePartitionAssignment(asList(tp1)); - assertAllPositions(tp0, null); + assertTrue(state.isAssigned(tp1)); + assertFalse(state.isAssigned(tp0)); + assertFalse(state.isFetchable(tp1)); assertEquals(Collections.singleton(tp1), state.assignedPartitions()); } + @Test + public void partitionPause() { + state.subscribe(tp0); + state.seek(tp0, 100); + assertTrue(state.isFetchable(tp0)); + state.pause(tp0); + assertFalse(state.isFetchable(tp0)); + state.resume(tp0); + assertTrue(state.isFetchable(tp0)); + } + @Test public void topicUnsubscription() { final String topic = "test"; @@ -83,24 +102,37 @@ public class SubscriptionStateTest { assertTrue(state.partitionsAutoAssigned()); state.changePartitionAssignment(asList(tp0)); state.committed(tp0, 1); - state.fetched(tp0, 1); - state.consumed(tp0, 1); + state.seek(tp0, 1); assertAllPositions(tp0, 1L); state.changePartitionAssignment(asList(tp1)); - assertAllPositions(tp0, null); + assertFalse(state.isAssigned(tp0)); assertEquals(Collections.singleton(tp1), state.assignedPartitions()); state.unsubscribe(topic); assertEquals(0, state.subscribedTopics().size()); assertTrue(state.assignedPartitions().isEmpty()); } - - @Test(expected = IllegalArgumentException.class) + + @Test(expected = IllegalStateException.class) + public void invalidConsumedPositionUpdate() { + state.subscribe("test"); + state.changePartitionAssignment(asList(tp0)); + state.consumed(tp0, 0); + } + + @Test(expected = IllegalStateException.class) + public void invalidFetchPositionUpdate() { + state.subscribe("test"); + state.changePartitionAssignment(asList(tp0)); + state.fetched(tp0, 0); + } + + @Test(expected = IllegalStateException.class) public void cantChangeFetchPositionForNonAssignedPartition() { state.fetched(tp0, 1); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IllegalStateException.class) public void cantChangeConsumedPositionForNonAssignedPartition() { state.consumed(tp0, 1); } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 0c2755f7240..4ea49f20ae4 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{PartitionInfo, TopicPartition} import kafka.utils.{TestUtils, Logging} import kafka.server.KafkaConfig @@ -254,6 +254,34 @@ class ConsumerTest extends IntegrationTestHarness with Logging { } } + def testPartitionPauseAndResume() { + sendRecords(5) + this.consumers(0).subscribe(tp) + consumeRecords(this.consumers(0), 5, 0) + this.consumers(0).pause(tp) + sendRecords(5) + assertTrue(this.consumers(0).poll(0).isEmpty) + this.consumers(0).resume(tp) + consumeRecords(this.consumers(0), 5, 5) + } + + def testPauseStateNotPreservedByRebalance() { + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + sendRecords(5) + consumer0.subscribe(topic) + consumeRecords(consumer0, 5, 0) + consumer0.pause(tp) + + // subscribe to a new topic to trigger a rebalance + consumer0.subscribe("topic2") + + // after rebalance, our position should be reset and our pause state lost, + // so we should be able to consume from the beginning + consumeRecords(consumer0, 0, 5) + } + private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { var callsToAssigned = 0 var callsToRevoked = 0 @@ -264,7 +292,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { info("onPartitionsRevoked called.") callsToRevoked += 1 - } + } } private def sendRecords(numRecords: Int): Unit = {