From ff557f02ac628edbe220ea69888d39de834527d3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Jul 2016 11:15:45 +0100 Subject: [PATCH] KAFKA-3977; Defer fetch parsing for space efficiency and to ensure exceptions are raised to the user Author: Jason Gustafson Reviewers: Ewen Cheslack-Postava , Ismael Juma Closes #1656 from hachikuji/KAFKA-3977 --- .../clients/consumer/internals/Fetcher.java | 325 ++++++++++-------- .../kafka/common/record/Compressor.java | 4 +- .../common/record/InvalidRecordException.java | 4 +- .../consumer/internals/FetcherTest.java | 153 +++++++-- 4 files changed, 310 insertions(+), 176 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ddfb5841e3f..c811a0332b4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.Metrics; @@ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; @@ -59,7 +62,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -83,13 +85,11 @@ public class Fetcher { private final Metadata metadata; private final FetchManagerMetrics sensors; private final SubscriptionState subscriptions; - private final List> records; + private final List completedFetches; private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; - private final Map offsetOutOfRangePartitions; - private final Set unauthorizedTopics; - private final Map recordTooLargePartitions; + private PartitionRecords nextInLineRecords = null; public Fetcher(ConsumerNetworkClient client, int minBytes, @@ -105,7 +105,6 @@ public class Fetcher { String metricGrpPrefix, Time time, long retryBackoffMs) { - this.time = time; this.client = client; this.metadata = metadata; @@ -115,31 +114,37 @@ public class Fetcher { this.fetchSize = fetchSize; this.maxPollRecords = maxPollRecords; this.checkCrcs = checkCrcs; - this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; - - this.records = new LinkedList<>(); - this.offsetOutOfRangePartitions = new HashMap<>(); - this.unauthorizedTopics = new HashSet<>(); - this.recordTooLargePartitions = new HashMap<>(); - + this.completedFetches = new ArrayList<>(); this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; } /** - * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one. - * + * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have + * an in-flight fetch or pending fetch data. */ public void sendFetches() { for (Map.Entry fetchEntry: createFetchRequests().entrySet()) { - final FetchRequest fetch = fetchEntry.getValue(); - client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch) + final FetchRequest request = fetchEntry.getValue(); + client.send(fetchEntry.getKey(), ApiKeys.FETCH, request) .addListener(new RequestFutureListener() { @Override - public void onSuccess(ClientResponse response) { - handleFetchResponse(response, fetch); + public void onSuccess(ClientResponse resp) { + FetchResponse response = new FetchResponse(resp.responseBody()); + Set partitions = new HashSet<>(response.responseData().keySet()); + FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions); + + for (Map.Entry entry : response.responseData().entrySet()) { + TopicPartition partition = entry.getKey(); + long fetchOffset = request.fetchData().get(partition).offset; + FetchResponse.PartitionData fetchData = entry.getValue(); + completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); + } + + sensors.fetchLatency.record(resp.requestLatencyMs()); + sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); } @Override @@ -152,7 +157,7 @@ public class Fetcher { /** * Update the fetch positions for the provided partitions. - * @param partitions + * @param partitions the partitions to update positions for * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available */ public void updateFetchPositions(Set partitions) { @@ -323,62 +328,6 @@ public class Fetcher { } } - /** - * If any partition from previous fetchResponse contains OffsetOutOfRange error and - * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException - * - * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse - */ - private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException { - Map currentOutOfRangePartitions = new HashMap<>(); - - // filter offsetOutOfRangePartitions to retain only the fetchable partitions - for (Map.Entry entry: this.offsetOutOfRangePartitions.entrySet()) { - if (!subscriptions.isFetchable(entry.getKey())) { - log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey()); - continue; - } - Long position = subscriptions.position(entry.getKey()); - // ignore partition if the current position != the offset in fetchResponse, e.g. after seek() - if (position != null && entry.getValue().equals(position)) - currentOutOfRangePartitions.put(entry.getKey(), entry.getValue()); - } - this.offsetOutOfRangePartitions.clear(); - if (!currentOutOfRangePartitions.isEmpty()) - throw new OffsetOutOfRangeException(currentOutOfRangePartitions); - } - - /** - * If any topic from previous fetchResponse contains an Authorization error, raise an exception - * @throws TopicAuthorizationException - */ - private void throwIfUnauthorizedTopics() throws TopicAuthorizationException { - if (!unauthorizedTopics.isEmpty()) { - Set topics = new HashSet<>(unauthorizedTopics); - unauthorizedTopics.clear(); - throw new TopicAuthorizationException(topics); - } - } - - /** - * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException - * - * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned - */ - private void throwIfRecordTooLarge() throws RecordTooLargeException { - Map copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions); - this.recordTooLargePartitions.clear(); - - if (!copiedRecordTooLargePartitions.isEmpty()) - throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " - + copiedRecordTooLargePartitions - + " whose size is larger than the fetch size " - + this.fetchSize - + " and hence cannot be ever returned." - + " Increase the fetch size, or decrease the maximum message size the broker will allow.", - copiedRecordTooLargePartitions); - } - /** * Return the fetched records, empty the record buffer and update the consumed position. * @@ -393,60 +342,68 @@ public class Fetcher { return Collections.emptyMap(); } else { Map>> drained = new HashMap<>(); - throwIfOffsetOutOfRange(); - throwIfUnauthorizedTopics(); - throwIfRecordTooLarge(); - - int maxRecords = maxPollRecords; - Iterator> iterator = records.iterator(); - while (iterator.hasNext() && maxRecords > 0) { - PartitionRecords part = iterator.next(); - maxRecords -= append(drained, part, maxRecords); - if (part.isConsumed()) - iterator.remove(); + int recordsRemaining = maxPollRecords; + Iterator completedFetchesIterator = completedFetches.iterator(); + + while (recordsRemaining > 0) { + if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { + if (!completedFetchesIterator.hasNext()) + break; + + CompletedFetch completion = completedFetchesIterator.next(); + completedFetchesIterator.remove(); + nextInLineRecords = parseFetchedData(completion); + } else { + recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); + } } + return drained; } } private int append(Map>> drained, - PartitionRecords part, + PartitionRecords partitionRecords, int maxRecords) { - if (!subscriptions.isAssigned(part.partition)) { + if (partitionRecords.isEmpty()) + return 0; + + if (!subscriptions.isAssigned(partitionRecords.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition); + log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition); } else { // note that the consumed position should always be available as long as the partition is still assigned - long position = subscriptions.position(part.partition); - if (!subscriptions.isFetchable(part.partition)) { + long position = subscriptions.position(partitionRecords.partition); + if (!subscriptions.isFetchable(partitionRecords.partition)) { // this can happen when a partition is paused before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition); - } else if (part.fetchOffset == position) { - List> partRecords = part.take(maxRecords); + log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); + } else if (partitionRecords.fetchOffset == position) { + // we are ensured to have at least one record since we already checked for emptiness + List> partRecords = partitionRecords.take(maxRecords); long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + - "position to {}", position, part.partition, nextOffset); + "position to {}", position, partitionRecords.partition, nextOffset); - List> records = drained.get(part.partition); + List> records = drained.get(partitionRecords.partition); if (records == null) { records = partRecords; - drained.put(part.partition, records); + drained.put(partitionRecords.partition, records); } else { records.addAll(partRecords); } - subscriptions.position(part.partition, nextOffset); + subscriptions.position(partitionRecords.partition, nextOffset); return partRecords.size(); } else { // these records aren't next in line based on the last consumed position, ignore them // they must be from an obsolete request log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", - part.partition, part.fetchOffset, position); + partitionRecords.partition, partitionRecords.fetchOffset, position); } } - part.discard(); + partitionRecords.discard(); return 0; } @@ -513,10 +470,10 @@ public class Fetcher { private Set fetchablePartitions() { Set fetchable = subscriptions.fetchablePartitions(); - if (records.isEmpty()) - return fetchable; - for (PartitionRecords partitionRecords : records) - fetchable.remove(partitionRecords.partition); + if (nextInLineRecords != null && !nextInLineRecords.isEmpty()) + fetchable.remove(nextInLineRecords.partition); + for (CompletedFetch completedFetch : completedFetches) + fetchable.remove(completedFetch.partition); return fetchable; } @@ -559,30 +516,29 @@ public class Fetcher { /** * The callback for fetch completion */ - private void handleFetchResponse(ClientResponse resp, FetchRequest request) { - int totalBytes = 0; - int totalCount = 0; - FetchResponse response = new FetchResponse(resp.responseBody()); - for (Map.Entry entry : response.responseData().entrySet()) { - TopicPartition tp = entry.getKey(); - FetchResponse.PartitionData partition = entry.getValue(); + private PartitionRecords parseFetchedData(CompletedFetch completedFetch) { + TopicPartition tp = completedFetch.partition; + FetchResponse.PartitionData partition = completedFetch.partitionData; + long fetchOffset = completedFetch.fetchedOffset; + int bytes = 0; + int recordsCount = 0; + PartitionRecords parsedRecords = null; + + try { if (!subscriptions.isFetchable(tp)) { // this can happen when a rebalance happened or a partition consumption paused // while fetch is still in-flight log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); } else if (partition.errorCode == Errors.NONE.code()) { - long fetchOffset = request.fetchData().get(tp).offset; - // we are interested in this fetch only if the beginning offset matches the // current consumed position Long position = subscriptions.position(tp); if (position == null || position != fetchOffset) { - log.debug("Discarding fetch response for partition {} since its offset {} does not match " + + log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + "the expected offset {}", tp, fetchOffset, position); - continue; + return null; } - int bytes = 0; ByteBuffer buffer = partition.recordSet; MemoryRecords records = MemoryRecords.readableRecords(buffer); List> parsed = new ArrayList<>(); @@ -597,79 +553,95 @@ public class Fetcher { } } + recordsCount = parsed.size(); + this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount); + if (!parsed.isEmpty()) { log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position); + parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); ConsumerRecord record = parsed.get(parsed.size() - 1); - this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed)); this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); } else if (buffer.limit() > 0 && !skippedRecords) { // we did not read a single message from a non-empty buffer // because that message's size is larger than fetch size, in this case // record this exception - this.recordTooLargePartitions.put(tp, fetchOffset); + Map recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); + throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + + recordTooLargePartitions + + " whose size is larger than the fetch size " + + this.fetchSize + + " and hence cannot be ever returned." + + " Increase the fetch size on the client (using max.partition.fetch.bytes)," + + " or decrease the maximum message size the broker will allow (using message.max.bytes).", + recordTooLargePartitions); } - - this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size()); - totalBytes += bytes; - totalCount += parsed.size(); } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { this.metadata.requestUpdate(); } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { - long fetchOffset = request.fetchData().get(tp).offset; - if (subscriptions.hasDefaultOffsetResetPolicy()) + if (fetchOffset != subscriptions.position(tp)) { + log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" + + "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); + } else if (subscriptions.hasDefaultOffsetResetPolicy()) { + log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp); subscriptions.needOffsetReset(tp); - else - this.offsetOutOfRangePartitions.put(tp, fetchOffset); - log.info("Fetch offset {} is out of range, resetting offset", fetchOffset); + } else { + throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset)); + } } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { log.warn("Not authorized to read from topic {}.", tp.topic()); - unauthorizedTopics.add(tp.topic()); + throw new TopicAuthorizationException(Collections.singleton(tp.topic())); } else if (partition.errorCode == Errors.UNKNOWN.code()) { log.warn("Unknown error fetching data for topic-partition {}", tp); } else { throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data"); } + } finally { + completedFetch.metricAggregator.record(tp, bytes, recordsCount); } - this.sensors.bytesFetched.record(totalBytes); - this.sensors.recordsFetched.record(totalCount); - this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); - this.sensors.fetchLatency.record(resp.requestLatencyMs()); + + return parsedRecords; } /** * Parse the record entry, deserializing the key / value fields if necessary */ private ConsumerRecord parseRecord(TopicPartition partition, LogEntry logEntry) { + Record record = logEntry.record(); + + if (this.checkCrcs && !record.isValid()) + throw new InvalidRecordException("Record for partition " + partition + " at offset " + + logEntry.offset() + " is corrupt (stored crc = " + record.checksum() + + ", computed crc = " + + record.computeChecksum() + + ")"); + try { - if (this.checkCrcs) - logEntry.record().ensureValid(); long offset = logEntry.offset(); - long timestamp = logEntry.record().timestamp(); - TimestampType timestampType = logEntry.record().timestampType(); - ByteBuffer keyBytes = logEntry.record().key(); + long timestamp = record.timestamp(); + TimestampType timestampType = record.timestampType(); + ByteBuffer keyBytes = record.key(); byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray); - ByteBuffer valueBytes = logEntry.record().value(); + ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, - timestamp, timestampType, logEntry.record().checksum(), + timestamp, timestampType, record.checksum(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value); - } catch (KafkaException e) { - throw e; } catch (RuntimeException e) { - throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e); + throw new SerializationException("Error deserializing key/value for partition " + partition + + " at offset " + logEntry.offset(), e); } } private static class PartitionRecords { - public long fetchOffset; - public TopicPartition partition; - public List> records; + private long fetchOffset; + private TopicPartition partition; + private List> records; public PartitionRecords(long fetchOffset, TopicPartition partition, List> records) { this.fetchOffset = fetchOffset; @@ -677,7 +649,7 @@ public class Fetcher { this.records = records; } - private boolean isConsumed() { + private boolean isEmpty() { return records == null || records.isEmpty(); } @@ -687,7 +659,7 @@ public class Fetcher { private List> take(int n) { if (records == null) - return Collections.emptyList(); + return new ArrayList<>(); if (n >= records.size()) { List> res = this.records; @@ -709,7 +681,59 @@ public class Fetcher { } } - private class FetchManagerMetrics { + private static class CompletedFetch { + private final TopicPartition partition; + private final long fetchedOffset; + private final FetchResponse.PartitionData partitionData; + private final FetchResponseMetricAggregator metricAggregator; + + public CompletedFetch(TopicPartition partition, + long fetchedOffset, + FetchResponse.PartitionData partitionData, + FetchResponseMetricAggregator metricAggregator) { + this.partition = partition; + this.fetchedOffset = fetchedOffset; + this.partitionData = partitionData; + this.metricAggregator = metricAggregator; + } + } + + /** + * Since we parse the message data for each partition from each fetch response lazily, fetch-level + * metrics need to be aggregated as the messages from each partition are parsed. This class is used + * to facilitate this incremental aggregation. + */ + private static class FetchResponseMetricAggregator { + private final FetchManagerMetrics sensors; + private final Set unrecordedPartitions; + + private int totalBytes; + private int totalRecords; + + public FetchResponseMetricAggregator(FetchManagerMetrics sensors, + Set partitions) { + this.sensors = sensors; + this.unrecordedPartitions = partitions; + } + + /** + * After each partition is parsed, we update the current metric totals with the total bytes + * and number of records parsed. After all partitions have reported, we write the metric. + */ + public void record(TopicPartition partition, int bytes, int records) { + unrecordedPartitions.remove(partition); + totalBytes += bytes; + totalRecords += records; + + if (unrecordedPartitions.isEmpty()) { + // once all expected partitions from the fetch have reported in, record the metrics + sensors.bytesFetched.record(totalBytes); + sensors.recordsFetched.record(totalRecords); + } + } + } + + private static class FetchManagerMetrics { public final Metrics metrics; public final String metricGrpName; @@ -719,7 +743,6 @@ public class Fetcher { public final Sensor recordsFetchLag; public final Sensor fetchThrottleTimeSensor; - public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) { this.metrics = metrics; this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics"; diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index e23a52e710f..a806975959e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -242,7 +242,7 @@ public class Compressor { // the following two functions also need to be public since they are used in MemoryRecords.iteration - static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { + public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { try { switch (type) { case NONE: @@ -271,7 +271,7 @@ public class Compressor { } } - static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { + public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { try { switch (type) { case NONE: diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java index 5815b21591d..a1009ca2e38 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.common.record; -public class InvalidRecordException extends RuntimeException { +import org.apache.kafka.common.KafkaException; + +public class InvalidRecordException extends KafkaException { private static final long serialVersionUID = 1; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 8fad30f986b..2fbd43ec7ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.KafkaMetric; @@ -38,6 +39,8 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.Compressor; +import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.FetchRequest; @@ -47,6 +50,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -128,7 +132,7 @@ public class FetcherTest { consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(3, records.size()); - assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position + assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position long offset = 1; for (ConsumerRecord record : records) { assertEquals(offset, record.offset()); @@ -147,9 +151,83 @@ public class FetcherTest { }; } + @Test + public void testFetchedRecordsRaisesOnSerializationErrors() { + // raise an exception from somewhere in the middle of the fetch response + // so that we can verify that our position does not advance after raising + ByteArrayDeserializer deserializer = new ByteArrayDeserializer() { + int i = 0; + @Override + public byte[] deserialize(String topic, byte[] data) { + if (i++ == 1) + throw new SerializationException(); + return data; + } + }; + + Fetcher fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer); + + subscriptions.assignFromUser(Collections.singleton(tp)); + subscriptions.seek(tp, 1); + + client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); + + fetcher.sendFetches(); + consumerClient.poll(0); + try { + fetcher.fetchedRecords(); + fail("fetchedRecords should have raised"); + } catch (SerializationException e) { + // the position should not advance since no data has been returned + assertEquals(1, subscriptions.position(tp).longValue()); + } + } + + @Test + public void testParseInvalidRecord() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + Compressor compressor = new Compressor(buffer, CompressionType.NONE); + + byte[] key = "foo".getBytes(); + byte[] value = "baz".getBytes(); + long offset = 0; + long timestamp = 500L; + + int size = Record.recordSize(key, value); + long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1); + + // write one valid record + compressor.putLong(offset); + compressor.putInt(size); + Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1); + + // and one invalid record (note the crc) + compressor.putLong(offset); + compressor.putInt(size); + Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1); + + compressor.close(); + buffer.flip(); + + subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.seek(tp, 0); + + // normal fetch + fetcher.sendFetches(); + client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0)); + consumerClient.poll(0); + try { + fetcher.fetchedRecords(); + fail("fetchedRecords should have raised"); + } catch (InvalidRecordException e) { + // the position should not advance since no data has been returned + assertEquals(0, subscriptions.position(tp).longValue()); + } + } + @Test public void testFetchMaxPollRecords() { - Fetcher fetcher = createFetcher(2, subscriptions, new Metrics(time)); + Fetcher fetcher = createFetcher(subscriptions, new Metrics(time), 2); List> records; subscriptions.assignFromUser(Arrays.asList(tp)); @@ -162,7 +240,7 @@ public class FetcherTest { consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(2, records.size()); - assertEquals(3L, (long) subscriptions.position(tp)); + assertEquals(3L, subscriptions.position(tp).longValue()); assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); @@ -170,14 +248,14 @@ public class FetcherTest { consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(1, records.size()); - assertEquals(4L, (long) subscriptions.position(tp)); + assertEquals(4L, subscriptions.position(tp).longValue()); assertEquals(3, records.get(0).offset()); fetcher.sendFetches(); consumerClient.poll(0); records = fetcher.fetchedRecords().get(tp); assertEquals(2, records.size()); - assertEquals(6L, (long) subscriptions.position(tp)); + assertEquals(6L, subscriptions.position(tp).longValue()); assertEquals(4, records.get(0).offset()); assertEquals(5, records.get(1).offset()); } @@ -203,7 +281,7 @@ public class FetcherTest { consumerClient.poll(0); consumerRecords = fetcher.fetchedRecords().get(tp); assertEquals(3, consumerRecords.size()); - assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position + assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position assertEquals(15L, consumerRecords.get(0).offset()); assertEquals(20L, consumerRecords.get(1).offset()); @@ -318,11 +396,27 @@ public class FetcherTest { fetcher.sendFetches(); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); - assertTrue(subscriptions.isOffsetResetNeeded(tp)); assertEquals(0, fetcher.fetchedRecords().size()); + assertTrue(subscriptions.isOffsetResetNeeded(tp)); assertEquals(null, subscriptions.position(tp)); } + @Test + public void testStaleOutOfRangeError() { + // verify that an out of range error which arrives after a seek + // does not cause us to reset our position or throw an exception + subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.seek(tp, 0); + + fetcher.sendFetches(); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); + subscriptions.seek(tp, 1); + consumerClient.poll(0); + assertEquals(0, fetcher.fetchedRecords().size()); + assertFalse(subscriptions.isOffsetResetNeeded(tp)); + assertEquals(1, subscriptions.position(tp).longValue()); + } + @Test public void testFetchedRecordsAfterSeek() { subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); @@ -368,7 +462,7 @@ public class FetcherTest { // disconnects should have no affect on subscription state assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); - assertEquals(0, (long) subscriptions.position(tp)); + assertEquals(0, subscriptions.position(tp).longValue()); } @Test @@ -380,7 +474,7 @@ public class FetcherTest { fetcher.updateFetchPositions(Collections.singleton(tp)); assertTrue(subscriptions.isFetchable(tp)); - assertEquals(5, (long) subscriptions.position(tp)); + assertEquals(5, subscriptions.position(tp).longValue()); } @Test @@ -393,7 +487,7 @@ public class FetcherTest { fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); - assertEquals(5, (long) subscriptions.position(tp)); + assertEquals(5, subscriptions.position(tp).longValue()); } @Test @@ -406,7 +500,7 @@ public class FetcherTest { fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); - assertEquals(5, (long) subscriptions.position(tp)); + assertEquals(5, subscriptions.position(tp).longValue()); } @Test @@ -419,7 +513,7 @@ public class FetcherTest { fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); - assertEquals(5, (long) subscriptions.position(tp)); + assertEquals(5, subscriptions.position(tp).longValue()); } @Test @@ -437,7 +531,7 @@ public class FetcherTest { fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); - assertEquals(5, (long) subscriptions.position(tp)); + assertEquals(5, subscriptions.position(tp).longValue()); } @Test @@ -575,17 +669,36 @@ public class FetcherTest { return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata)); } - private Fetcher createFetcher(int maxPollRecords, - SubscriptionState subscriptions, - Metrics metrics) { + private Fetcher createFetcher(SubscriptionState subscriptions, + Metrics metrics, + int maxPollRecords) { + return createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords); + } + + private Fetcher createFetcher(SubscriptionState subscriptions, Metrics metrics) { + return createFetcher(subscriptions, metrics, Integer.MAX_VALUE); + } + + private Fetcher createFetcher(SubscriptionState subscriptions, + Metrics metrics, + Deserializer keyDeserializer, + Deserializer valueDeserializer) { + return createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE); + } + + private Fetcher createFetcher(SubscriptionState subscriptions, + Metrics metrics, + Deserializer keyDeserializer, + Deserializer valueDeserializer, + int maxPollRecords) { return new Fetcher<>(consumerClient, minBytes, maxWaitMs, fetchSize, maxPollRecords, true, // check crc - new ByteArrayDeserializer(), - new ByteArrayDeserializer(), + keyDeserializer, + valueDeserializer, metadata, subscriptions, metrics, @@ -594,8 +707,4 @@ public class FetcherTest { retryBackoffMs); } - - private Fetcher createFetcher(SubscriptionState subscriptions, Metrics metrics) { - return createFetcher(Integer.MAX_VALUE, subscriptions, metrics); - } }