Browse Source

KAFKA-2350; KafkaConsumer pause/resume API

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael, Ashish, Guozhang

Closes #100 from hachikuji/KAFKA-2350 and squashes the following commits:

250e823 [Jason Gustafson] KAFKA-2350; KafkaConsumer pause/resume API
pull/100/merge
Jason Gustafson 9 years ago committed by Guozhang Wang
parent
commit
be82a2afc9
  1. 10
      clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
  2. 48
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  3. 39
      clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
  4. 8
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  5. 45
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  6. 238
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  7. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
  8. 36
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  9. 58
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  10. 32
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala

10
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

@ -118,6 +118,16 @@ public interface Consumer<K, V> extends Closeable { @@ -118,6 +118,16 @@ public interface Consumer<K, V> extends Closeable {
*/
public Map<String, List<PartitionInfo>> listTopics();
/**
* @see KafkaConsumer#pause(TopicPartition...)
*/
public void pause(TopicPartition... partitions);
/**
* @see KafkaConsumer#resume(TopicPartition...)
*/
public void resume(TopicPartition... partitions);
/**
* @see KafkaConsumer#close()
*/

48
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -43,7 +43,6 @@ import java.util.Arrays; @@ -43,7 +43,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -852,9 +851,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -852,9 +851,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void commit(CommitType commitType, ConsumerCommitCallback callback) {
acquire();
try {
// need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
commit(allConsumed, commitType, callback);
commit(subscriptions.allConsumed(), commitType, callback);
} finally {
release();
}
@ -941,7 +938,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -941,7 +938,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public long position(TopicPartition partition) {
acquire();
try {
if (!this.subscriptions.assignedPartitions().contains(partition))
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.consumed(partition);
if (offset == null) {
@ -972,7 +969,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -972,7 +969,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
acquire();
try {
Long committed;
if (subscriptions.assignedPartitions().contains(partition)) {
if (subscriptions.isAssigned(partition)) {
committed = this.subscriptions.committed(partition);
if (committed == null) {
coordinator.refreshCommittedOffsetsIfNeeded();
@ -1040,6 +1037,45 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1040,6 +1037,45 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
/**
* Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
* any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
*/
@Override
public void pause(TopicPartition... partitions) {
acquire();
try {
for (TopicPartition partition: partitions) {
log.debug("Pausing partition {}", partition);
subscriptions.pause(partition);
}
} finally {
release();
}
}
/**
* Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
* {@link #poll(long)} will return records from these partitions if there are any to be fetched.
* If the partitions were not previously paused, this method is a no-op.
* @param partitions The partitions which should be resumed
*/
@Override
public void resume(TopicPartition... partitions) {
acquire();
try {
for (TopicPartition partition: partitions) {
log.debug("Resuming partition {}", partition);
subscriptions.resume(partition);
}
} finally {
release();
}
}
@Override
public void close() {
acquire();

39
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

@ -12,6 +12,12 @@ @@ -12,6 +12,12 @@
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -20,12 +26,6 @@ import java.util.Map; @@ -20,12 +26,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.MetricName;
/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
* threadsafe </i>
@ -83,9 +83,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @@ -83,9 +83,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
ensureNotClosed();
// update the consumed offset
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
List<ConsumerRecord<K, V>> recs = entry.getValue();
if (!recs.isEmpty())
this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
if (!subscriptions.isPaused(entry.getKey())) {
List<ConsumerRecord<K, V>> recs = entry.getValue();
if (!recs.isEmpty())
this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
}
}
ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
@ -96,7 +98,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @@ -96,7 +98,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public synchronized void addRecord(ConsumerRecord<K, V> record) {
ensureNotClosed();
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
this.subscriptions.assignedPartitions().add(tp);
ArrayList<TopicPartition> currentAssigned = new ArrayList<>(this.subscriptions.assignedPartitions());
if (!currentAssigned.contains(tp)) {
currentAssigned.add(tp);
this.subscriptions.changePartitionAssignment(currentAssigned);
}
subscriptions.seek(tp, record.offset());
List<ConsumerRecord<K, V>> recs = this.records.get(tp);
if (recs == null) {
recs = new ArrayList<ConsumerRecord<K, V>>();
@ -188,6 +195,18 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @@ -188,6 +195,18 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.partitions.put(topic, partitions);
}
@Override
public void pause(TopicPartition... partitions) {
for (TopicPartition partition : partitions)
subscriptions.pause(partition);
}
@Override
public void resume(TopicPartition... partitions) {
for (TopicPartition partition : partitions)
subscriptions.resume(partition);
}
@Override
public synchronized void close() {
ensureNotClosed();

8
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java

@ -119,7 +119,9 @@ public final class Coordinator { @@ -119,7 +119,9 @@ public final class Coordinator {
Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
this.subscriptions.committed(tp, entry.getValue());
// verify assignment is still active
if (subscriptions.isAssigned(tp))
this.subscriptions.committed(tp, entry.getValue());
}
this.subscriptions.commitsRefreshed();
}
@ -459,7 +461,9 @@ public final class Coordinator { @@ -459,7 +461,9 @@ public final class Coordinator {
short errorCode = entry.getValue();
if (errorCode == Errors.NONE.code()) {
log.debug("Committed offset {} for partition {}", offset, tp);
subscriptions.committed(tp, offset);
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offset);
} else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();

45
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -143,8 +143,7 @@ public class Fetcher<K, V> { @@ -143,8 +143,7 @@ public class Fetcher<K, V> {
public void updateFetchPositions(Set<TopicPartition> partitions) {
// reset the fetch position to the committed position
for (TopicPartition tp : partitions) {
// skip if we already have a fetch position
if (subscriptions.fetched(tp) != null)
if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue;
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
@ -222,7 +221,10 @@ public class Fetcher<K, V> { @@ -222,7 +221,10 @@ public class Fetcher<K, V> {
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
long offset = listOffset(partition, timestamp);
this.subscriptions.seek(partition, offset);
// we might lose the assignment while fetching the offset, so check it is still active
if (subscriptions.isAssigned(partition))
this.subscriptions.seek(partition, offset);
}
/**
@ -259,11 +261,15 @@ public class Fetcher<K, V> { @@ -259,11 +261,15 @@ public class Fetcher<K, V> {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
for (PartitionRecords<K, V> part : this.records) {
if (!subscriptions.isFetchable(part.partition)) {
log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition);
continue;
}
Long consumed = subscriptions.consumed(part.partition);
if (this.subscriptions.assignedPartitions().contains(part.partition)
&& consumed != null && part.fetchOffset == consumed) {
if (consumed != null && part.fetchOffset == consumed) {
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
@ -354,8 +360,8 @@ public class Fetcher<K, V> { @@ -354,8 +360,8 @@ public class Fetcher<K, V> {
*/
private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
// create the fetch info
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
for (TopicPartition partition : subscriptions.assignedPartitions()) {
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
for (TopicPartition partition : subscriptions.fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
@ -363,16 +369,17 @@ public class Fetcher<K, V> { @@ -363,16 +369,17 @@ public class Fetcher<K, V> {
// if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
fetch = new HashMap<>();
fetchable.put(node, fetch);
}
long offset = this.subscriptions.fetched(partition);
fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
}
}
// create the fetches
Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
@ -399,15 +406,7 @@ public class Fetcher<K, V> { @@ -399,15 +406,7 @@ public class Fetcher<K, V> {
if (!subscriptions.assignedPartitions().contains(tp)) {
log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
} else if (partition.errorCode == Errors.NONE.code()) {
int bytes = 0;
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
long fetchOffset = request.fetchData().get(tp).offset;
List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
for (LogEntry logEntry : records) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
@ -422,7 +421,15 @@ public class Fetcher<K, V> { @@ -422,7 +421,15 @@ public class Fetcher<K, V> {
continue;
}
if (parsed.size() > 0) {
int bytes = 0;
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
for (LogEntry logEntry : records) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
if (!parsed.isEmpty()) {
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.subscriptions.fetched(tp, record.offset() + 1);
this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));

238
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -23,7 +23,25 @@ import java.util.Map; @@ -23,7 +23,25 @@ import java.util.Map;
import java.util.Set;
/**
* A class for tracking the topics, partitions, and offsets for the consumer
* A class for tracking the topics, partitions, and offsets for the consumer. A partition
* is "assigned" either directly with {@link #subscribe(TopicPartition)} (manual assignment)
* or with {@link #changePartitionAssignment(List)} (automatic assignment).
*
* Once assigned, the partition is not considered "fetchable" until its initial position has
* been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
* position which is used to set the offset of the next fetch, and a consumed position
* which is the last offset that has been returned to the user. You can suspend fetching
* from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed
* offsets. The partition will remain unfetchable until the {@link #resume(TopicPartition)} is
* used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}.
*
* Note that pause state as well as fetch/consumed positions are not preserved when partition
* assignment is changed either with {@link #unsubscribe(TopicPartition)} or
* {@link #changePartitionAssignment(List)}.
*
* This class also maintains a cache of the latest commit position for each of the assigned
* partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used
* to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}.
*/
public class SubscriptionState {
@ -34,16 +52,7 @@ public class SubscriptionState { @@ -34,16 +52,7 @@ public class SubscriptionState {
private final Set<TopicPartition> subscribedPartitions;
/* the list of partitions currently assigned */
private final Set<TopicPartition> assignedPartitions;
/* the offset exposed to the user */
private final Map<TopicPartition, Long> consumed;
/* the current point we have fetched up to */
private final Map<TopicPartition, Long> fetched;
/* the last committed offset for each partition */
private final Map<TopicPartition, Long> committed;
private final Map<TopicPartition, TopicPartitionState> assignedPartitions;
/* do we need to request a partition assignment from the coordinator? */
private boolean needsPartitionAssignment;
@ -51,28 +60,21 @@ public class SubscriptionState { @@ -51,28 +60,21 @@ public class SubscriptionState {
/* do we need to request the latest committed offsets from the coordinator? */
private boolean needsFetchCommittedOffsets;
/* Partitions that need to be reset before fetching */
private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
/* Default offset reset strategy */
private OffsetResetStrategy offsetResetStrategy;
public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
this.offsetResetStrategy = offsetResetStrategy;
this.subscribedTopics = new HashSet<String>();
this.subscribedPartitions = new HashSet<TopicPartition>();
this.assignedPartitions = new HashSet<TopicPartition>();
this.consumed = new HashMap<TopicPartition, Long>();
this.fetched = new HashMap<TopicPartition, Long>();
this.committed = new HashMap<TopicPartition, Long>();
private final OffsetResetStrategy defaultResetStrategy;
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
this.defaultResetStrategy = defaultResetStrategy;
this.subscribedTopics = new HashSet<>();
this.subscribedPartitions = new HashSet<>();
this.assignedPartitions = new HashMap<>();
this.needsPartitionAssignment = false;
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
}
public void subscribe(String topic) {
if (this.subscribedPartitions.size() > 0)
throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
if (!this.subscribedPartitions.isEmpty())
throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
if (!this.subscribedTopics.contains(topic)) {
this.subscribedTopics.add(topic);
this.needsPartitionAssignment = true;
@ -95,10 +97,10 @@ public class SubscriptionState { @@ -95,10 +97,10 @@ public class SubscriptionState {
}
public void subscribe(TopicPartition tp) {
if (this.subscribedTopics.size() > 0)
throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
if (!this.subscribedTopics.isEmpty())
throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
this.subscribedPartitions.add(tp);
this.assignedPartitions.add(tp);
addAssignedPartition(tp);
}
public void unsubscribe(TopicPartition partition) {
@ -110,17 +112,10 @@ public class SubscriptionState { @@ -110,17 +112,10 @@ public class SubscriptionState {
private void clearPartition(TopicPartition tp) {
this.assignedPartitions.remove(tp);
this.committed.remove(tp);
this.fetched.remove(tp);
this.consumed.remove(tp);
this.resetPartitions.remove(tp);
}
public void clearAssignment() {
this.assignedPartitions.clear();
this.committed.clear();
this.fetched.clear();
this.consumed.clear();
this.needsPartitionAssignment = !subscribedTopics().isEmpty();
}
@ -129,21 +124,26 @@ public class SubscriptionState { @@ -129,21 +124,26 @@ public class SubscriptionState {
}
public Long fetched(TopicPartition tp) {
return this.fetched.get(tp);
return assignedState(tp).fetched;
}
public void fetched(TopicPartition tp, long offset) {
if (!this.assignedPartitions.contains(tp))
throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
this.fetched.put(tp, offset);
assignedState(tp).fetched(offset);
}
private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignedPartitions.get(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
}
public void committed(TopicPartition tp, long offset) {
this.committed.put(tp, offset);
assignedState(tp).committed(offset);
}
public Long committed(TopicPartition tp) {
return this.committed.get(tp);
return assignedState(tp).committed;
}
public void needRefreshCommits() {
@ -157,15 +157,22 @@ public class SubscriptionState { @@ -157,15 +157,22 @@ public class SubscriptionState {
public void commitsRefreshed() {
this.needsFetchCommittedOffsets = false;
}
public void seek(TopicPartition tp, long offset) {
fetched(tp, offset);
consumed(tp, offset);
resetPartitions.remove(tp);
assignedState(tp).seek(offset);
}
public Set<TopicPartition> assignedPartitions() {
return this.assignedPartitions;
return this.assignedPartitions.keySet();
}
public Set<TopicPartition> fetchablePartitions() {
Set<TopicPartition> fetchable = new HashSet<>();
for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
if (entry.getValue().isFetchable())
fetchable.add(entry.getKey());
}
return fetchable;
}
public boolean partitionsAutoAssigned() {
@ -173,49 +180,52 @@ public class SubscriptionState { @@ -173,49 +180,52 @@ public class SubscriptionState {
}
public void consumed(TopicPartition tp, long offset) {
if (!this.assignedPartitions.contains(tp))
throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
this.consumed.put(tp, offset);
assignedState(tp).consumed(offset);
}
public Long consumed(TopicPartition partition) {
return this.consumed.get(partition);
public Long consumed(TopicPartition tp) {
return assignedState(tp).consumed;
}
public Map<TopicPartition, Long> allConsumed() {
return this.consumed;
Map<TopicPartition, Long> allConsumed = new HashMap<>();
for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
TopicPartitionState state = entry.getValue();
if (state.hasValidPosition)
allConsumed.put(entry.getKey(), state.consumed);
}
return allConsumed;
}
public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
this.resetPartitions.put(partition, offsetResetStrategy);
this.fetched.remove(partition);
this.consumed.remove(partition);
assignedState(partition).awaitReset(offsetResetStrategy);
}
public void needOffsetReset(TopicPartition partition) {
needOffsetReset(partition, offsetResetStrategy);
needOffsetReset(partition, defaultResetStrategy);
}
public boolean isOffsetResetNeeded(TopicPartition partition) {
return resetPartitions.containsKey(partition);
}
public boolean isOffsetResetNeeded() {
return !resetPartitions.isEmpty();
return assignedState(partition).awaitingReset;
}
public OffsetResetStrategy resetStrategy(TopicPartition partition) {
return resetPartitions.get(partition);
return assignedState(partition).resetStrategy;
}
public boolean hasAllFetchPositions() {
return this.fetched.size() >= this.assignedPartitions.size();
for (TopicPartitionState state : assignedPartitions.values())
if (!state.hasValidPosition)
return false;
return true;
}
public Set<TopicPartition> missingFetchPositions() {
Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
copy.removeAll(this.fetched.keySet());
return copy;
Set<TopicPartition> missing = new HashSet<>(this.assignedPartitions.keySet());
for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet())
if (!entry.getValue().hasValidPosition)
missing.add(entry.getKey());
return missing;
}
public boolean partitionAssignmentNeeded() {
@ -227,9 +237,99 @@ public class SubscriptionState { @@ -227,9 +237,99 @@ public class SubscriptionState {
if (!this.subscribedTopics.contains(tp.topic()))
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
this.clearAssignment();
this.assignedPartitions.addAll(assignments);
for (TopicPartition tp: assignments)
addAssignedPartition(tp);
this.needsPartitionAssignment = false;
}
public boolean isAssigned(TopicPartition tp) {
return assignedPartitions.containsKey(tp);
}
public boolean isPaused(TopicPartition tp) {
return isAssigned(tp) && assignedState(tp).paused;
}
public boolean isFetchable(TopicPartition tp) {
return isAssigned(tp) && assignedState(tp).isFetchable();
}
public void pause(TopicPartition tp) {
assignedState(tp).pause();
}
public void resume(TopicPartition tp) {
assignedState(tp).resume();
}
private void addAssignedPartition(TopicPartition tp) {
this.assignedPartitions.put(tp, new TopicPartitionState());
}
private static class TopicPartitionState {
private Long consumed; // offset exposed to the user
private Long fetched; // current fetch position
private Long committed; // last committed position
private boolean hasValidPosition; // whether we have valid consumed and fetched positions
private boolean paused; // whether this partition has been paused by the user
private boolean awaitingReset; // whether we are awaiting reset
private OffsetResetStrategy resetStrategy; // the reset strategy if awaitingReset is set
public TopicPartitionState() {
this.paused = false;
this.consumed = null;
this.fetched = null;
this.committed = null;
this.awaitingReset = false;
this.hasValidPosition = false;
this.resetStrategy = null;
}
private void awaitReset(OffsetResetStrategy strategy) {
this.awaitingReset = true;
this.resetStrategy = strategy;
this.consumed = null;
this.fetched = null;
this.hasValidPosition = false;
}
private void seek(long offset) {
this.consumed = offset;
this.fetched = offset;
this.awaitingReset = false;
this.resetStrategy = null;
this.hasValidPosition = true;
}
private void fetched(long offset) {
if (!hasValidPosition)
throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions");
this.fetched = offset;
}
private void consumed(long offset) {
if (!hasValidPosition)
throw new IllegalStateException("Cannot update consumed position without valid consumed/fetched positions");
this.consumed = offset;
}
private void committed(Long offset) {
this.committed = offset;
}
private void pause() {
this.paused = true;
}
private void resume() {
this.paused = false;
}
private boolean isFetchable() {
return !paused && hasValidPosition;
}
}
}

2
clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java

@ -29,7 +29,7 @@ public class MockConsumerTest { @@ -29,7 +29,7 @@ public class MockConsumerTest {
@Test
public void testSimpleMock() {
consumer.subscribe("topic");
consumer.subscribe("test");
assertEquals(0, consumer.poll(1000).count());
ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");

36
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -45,6 +45,7 @@ import java.util.List; @@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class FetcherTest {
@ -99,8 +100,7 @@ public class FetcherTest { @@ -99,8 +100,7 @@ public class FetcherTest {
public void testFetchNormal() {
List<ConsumerRecord<byte[], byte[]>> records;
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
subscriptions.seek(tp, 0);
// normal fetch
fetcher.initFetches(cluster);
@ -121,8 +121,7 @@ public class FetcherTest { @@ -121,8 +121,7 @@ public class FetcherTest {
public void testFetchDuringRebalance() {
subscriptions.subscribe(topicName);
subscriptions.changePartitionAssignment(Arrays.asList(tp));
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
@ -135,11 +134,33 @@ public class FetcherTest { @@ -135,11 +134,33 @@ public class FetcherTest {
assertTrue(fetcher.fetchedRecords().isEmpty());
}
@Test
public void testInFlightFetchOnPausedPartition() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 0);
fetcher.initFetches(cluster);
subscriptions.pause(tp);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
consumerClient.poll(0);
assertNull(fetcher.fetchedRecords().get(tp));
}
@Test
public void testFetchOnPausedPartition() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 0);
subscriptions.pause(tp);
fetcher.initFetches(cluster);
assertTrue(client.requests().isEmpty());
}
@Test
public void testFetchFailed() {
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 0);
subscriptions.consumed(tp, 0);
subscriptions.seek(tp, 0);
// fetch with not leader
fetcher.initFetches(cluster);
@ -169,8 +190,7 @@ public class FetcherTest { @@ -169,8 +190,7 @@ public class FetcherTest {
@Test
public void testFetchOutOfRange() {
subscriptions.subscribe(tp);
subscriptions.fetched(tp, 5);
subscriptions.consumed(tp, 5);
subscriptions.seek(tp, 5);
// fetch with out of range
fetcher.initFetches(cluster);

58
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static java.util.Arrays.asList;
@ -37,12 +38,13 @@ public class SubscriptionStateTest { @@ -37,12 +38,13 @@ public class SubscriptionStateTest {
state.subscribe(tp0);
assertEquals(Collections.singleton(tp0), state.assignedPartitions());
state.committed(tp0, 1);
state.fetched(tp0, 1);
state.consumed(tp0, 1);
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertAllPositions(tp0, 1L);
state.unsubscribe(tp0);
assertTrue(state.assignedPartitions().isEmpty());
assertAllPositions(tp0, null);
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp0));
}
@Test
@ -52,10 +54,15 @@ public class SubscriptionStateTest { @@ -52,10 +54,15 @@ public class SubscriptionStateTest {
assertEquals(5L, (long) state.fetched(tp0));
assertEquals(5L, (long) state.consumed(tp0));
state.needOffsetReset(tp0);
assertTrue(state.isOffsetResetNeeded());
assertFalse(state.isFetchable(tp0));
assertTrue(state.isOffsetResetNeeded(tp0));
assertEquals(null, state.fetched(tp0));
assertEquals(null, state.consumed(tp0));
// seek should clear the reset and make the partition fetchable
state.seek(tp0, 0);
assertTrue(state.isFetchable(tp0));
assertFalse(state.isOffsetResetNeeded(tp0));
}
@Test
@ -65,15 +72,27 @@ public class SubscriptionStateTest { @@ -65,15 +72,27 @@ public class SubscriptionStateTest {
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
state.seek(tp0, 1);
state.committed(tp0, 1);
state.fetched(tp0, 1);
state.consumed(tp0, 1);
assertAllPositions(tp0, 1L);
state.changePartitionAssignment(asList(tp1));
assertAllPositions(tp0, null);
assertTrue(state.isAssigned(tp1));
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
}
@Test
public void partitionPause() {
state.subscribe(tp0);
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
assertFalse(state.isFetchable(tp0));
state.resume(tp0);
assertTrue(state.isFetchable(tp0));
}
@Test
public void topicUnsubscription() {
final String topic = "test";
@ -83,24 +102,37 @@ public class SubscriptionStateTest { @@ -83,24 +102,37 @@ public class SubscriptionStateTest {
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
state.committed(tp0, 1);
state.fetched(tp0, 1);
state.consumed(tp0, 1);
state.seek(tp0, 1);
assertAllPositions(tp0, 1L);
state.changePartitionAssignment(asList(tp1));
assertAllPositions(tp0, null);
assertFalse(state.isAssigned(tp0));
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
state.unsubscribe(topic);
assertEquals(0, state.subscribedTopics().size());
assertTrue(state.assignedPartitions().isEmpty());
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = IllegalStateException.class)
public void invalidConsumedPositionUpdate() {
state.subscribe("test");
state.changePartitionAssignment(asList(tp0));
state.consumed(tp0, 0);
}
@Test(expected = IllegalStateException.class)
public void invalidFetchPositionUpdate() {
state.subscribe("test");
state.changePartitionAssignment(asList(tp0));
state.fetched(tp0, 0);
}
@Test(expected = IllegalStateException.class)
public void cantChangeFetchPositionForNonAssignedPartition() {
state.fetched(tp0, 1);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = IllegalStateException.class)
public void cantChangeConsumedPositionForNonAssignedPartition() {
state.consumed(tp0, 1);
}

32
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig @@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{TestUtils, Logging}
import kafka.server.KafkaConfig
@ -254,6 +254,34 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -254,6 +254,34 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
def testPartitionPauseAndResume() {
sendRecords(5)
this.consumers(0).subscribe(tp)
consumeRecords(this.consumers(0), 5, 0)
this.consumers(0).pause(tp)
sendRecords(5)
assertTrue(this.consumers(0).poll(0).isEmpty)
this.consumers(0).resume(tp)
consumeRecords(this.consumers(0), 5, 5)
}
def testPauseStateNotPreservedByRebalance() {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(), new ByteArrayDeserializer())
sendRecords(5)
consumer0.subscribe(topic)
consumeRecords(consumer0, 5, 0)
consumer0.pause(tp)
// subscribe to a new topic to trigger a rebalance
consumer0.subscribe("topic2")
// after rebalance, our position should be reset and our pause state lost,
// so we should be able to consume from the beginning
consumeRecords(consumer0, 0, 5)
}
private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
var callsToAssigned = 0
var callsToRevoked = 0
@ -264,7 +292,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -264,7 +292,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
info("onPartitionsRevoked called.")
callsToRevoked += 1
}
}
}
private def sendRecords(numRecords: Int): Unit = {

Loading…
Cancel
Save