Browse Source

KAFKA-4935; Deprecate client checksum API and compute lazy partial checksum for magic v2

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3123 from hachikuji/KAFKA-4935
pull/3124/head
Jason Gustafson 8 years ago committed by Ismael Juma
parent
commit
cea319a4ad
  1. 23
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  3. 4
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  4. 7
      clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  5. 43
      clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
  6. 10
      clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
  7. 4
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
  8. 6
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
  9. 6
      clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
  10. 5
      clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
  11. 67
      clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
  12. 3
      clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
  13. 71
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
  14. 5
      clients/src/main/java/org/apache/kafka/common/record/Record.java
  15. 12
      clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
  16. 79
      clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java
  17. 6
      clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
  18. 29
      clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
  19. 2
      clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
  20. 8
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  21. 16
      clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
  22. 3
      clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
  23. 6
      core/src/main/scala/kafka/tools/DumpLogSegments.scala
  24. 2
      docs/upgrade.html

23
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java

@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer; @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
@ -36,13 +37,14 @@ public class ConsumerRecord<K, V> { @@ -36,13 +37,14 @@ public class ConsumerRecord<K, V> {
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
/**
* Creates a record to be received from a specified topic and partition (provided for
* compatibility with Kafka 0.9 before the message format supported timestamps and before
@ -63,7 +65,6 @@ public class ConsumerRecord<K, V> { @@ -63,7 +65,6 @@ public class ConsumerRecord<K, V> {
NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
}
/**
* Creates a record to be received from a specified topic and partition (provided for
* compatibility with Kafka 0.10 before the message format supported headers).
@ -89,7 +90,8 @@ public class ConsumerRecord<K, V> { @@ -89,7 +90,8 @@ public class ConsumerRecord<K, V> {
int serializedValueSize,
K key,
V value) {
this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, value, new RecordHeaders());
this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize,
key, value, new RecordHeaders());
}
/**
@ -112,7 +114,7 @@ public class ConsumerRecord<K, V> { @@ -112,7 +114,7 @@ public class ConsumerRecord<K, V> {
long offset,
long timestamp,
TimestampType timestampType,
long checksum,
Long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
@ -191,8 +193,19 @@ public class ConsumerRecord<K, V> { @@ -191,8 +193,19 @@ public class ConsumerRecord<K, V> {
/**
* The checksum (CRC32) of the record.
*
* @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
* checksum returned by the broker may not match what was computed by the producer.
* It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
* message format v2 does not include a record-level checksum (for performance, the record checksum
* was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
* the record timestamp, serialized key size, and serialized value size is returned instead, but
* this should not be depended on for end-to-end reliability.
*/
@Deprecated
public long checksum() {
if (checksum == null)
this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
return this.checksum;
}
@ -215,7 +228,7 @@ public class ConsumerRecord<K, V> { @@ -215,7 +228,7 @@ public class ConsumerRecord<K, V> {
@Override
public String toString() {
return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
+ ", " + timestampType + " = " + timestamp + ", checksum = " + checksum
+ ", " + timestampType + " = " + timestamp
+ ", serialized key size = " + serializedKeySize
+ ", serialized value size = " + serializedValueSize
+ ", headers = " + headers

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -917,7 +917,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -917,7 +917,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType, record.checksum(),
timestamp, timestampType, record.checksumOrNull(),
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers);

4
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -1014,8 +1014,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1014,8 +1014,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (this.interceptors != null) {
if (metadata == null) {
this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
exception);
this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP,
Long.valueOf(-1L), -1, -1), exception);
} else {
this.interceptors.onAcknowledgement(metadata, exception);
}

7
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

@ -251,11 +251,10 @@ public class MockProducer<K, V> implements Producer<K, V> { @@ -251,11 +251,10 @@ public class MockProducer<K, V> implements Producer<K, V> {
partition = partition(record, this.cluster);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
long offset = nextOffset(topicPartition);
Completion completion = new Completion(offset,
new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0),
result, callback);
Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset,
RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback);
if (!this.transactionInFlight)
this.sent.add(record);

43
clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.DefaultRecord;
/**
* The metadata for a record that has been acknowledged by the server
@ -36,15 +36,17 @@ public final class RecordMetadata { @@ -36,15 +36,17 @@ public final class RecordMetadata {
// user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
// producer.
private final long timestamp;
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private final TopicPartition topicPartition;
private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp, long
checksum, int serializedKeySize, int serializedValueSize) {
super();
this.offset = offset;
private volatile Long checksum;
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
Long checksum, int serializedKeySize, int serializedValueSize) {
// ignore the relativeOffset if the base offset is -1,
// since this indicates the offset is unknown
this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset;
this.timestamp = timestamp;
this.checksum = checksum;
this.serializedKeySize = serializedKeySize;
@ -52,17 +54,14 @@ public final class RecordMetadata { @@ -52,17 +54,14 @@ public final class RecordMetadata {
this.topicPartition = topicPartition;
}
/**
* @deprecated As of 0.11.0. Use @{@link RecordMetadata#RecordMetadata(TopicPartition, long, long, long, Long, int, int)}.
*/
@Deprecated
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) {
this(topicPartition, baseOffset, relativeOffset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
}
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset,
long timestamp, long checksum, int serializedKeySize, int serializedValueSize) {
// ignore the relativeOffset if the base offset is -1,
// since this indicates the offset is unknown
this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset,
timestamp, checksum, serializedKeySize, serializedValueSize);
public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
long checksum, int serializedKeySize, int serializedValueSize) {
this(topicPartition, baseOffset, relativeOffset, timestamp, Long.valueOf(checksum), serializedKeySize,
serializedValueSize);
}
/**
@ -81,8 +80,20 @@ public final class RecordMetadata { @@ -81,8 +80,20 @@ public final class RecordMetadata {
/**
* The checksum (CRC32) of the record.
*
* @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the
* computed checksum may not match what was stored on the broker, or what will be returned to the consumer.
* It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally,
* message format v2 does not include a record-level checksum (for performance, the record checksum
* was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from
* the record timestamp, serialized key size, and serialized value size is returned instead, but
* this should not be depended on for end-to-end reliability.
*/
@Deprecated
public long checksum() {
if (checksum == null)
// The checksum is null only for message format v2 and above, which do not have a record-level checksum.
this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize);
return this.checksum;
}

10
clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java

@ -31,13 +31,13 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { @@ -31,13 +31,13 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
private final ProduceRequestResult result;
private final long relativeOffset;
private final long createTimestamp;
private final long checksum;
private final Long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private volatile FutureRecordMetadata nextRecordMetadata = null;
public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
long checksum, int serializedKeySize, int serializedValueSize) {
Long checksum, int serializedKeySize, int serializedValueSize) {
this.result = result;
this.relativeOffset = relativeOffset;
this.createTimestamp = createTimestamp;
@ -96,14 +96,10 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { @@ -96,14 +96,10 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
return value();
}
long checksum() {
Long checksumOrNull() {
return this.checksum;
}
long relativeOffset() {
return this.relativeOffset;
}
RecordMetadata value() {
if (nextRecordMetadata != null)
return nextRecordMetadata.value();

4
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

@ -103,7 +103,7 @@ public final class ProducerBatch { @@ -103,7 +103,7 @@ public final class ProducerBatch {
if (!recordsBuilder.hasRoomFor(timestamp, key, value)) {
return null;
} else {
long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
@ -131,7 +131,7 @@ public final class ProducerBatch { @@ -131,7 +131,7 @@ public final class ProducerBatch {
this.maxRecordSize = Math.max(this.maxRecordSize,
AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, thunk.future.checksum(),
timestamp, thunk.future.checksumOrNull(),
key == null ? -1 : key.remaining(),
value == null ? -1 : value.remaining());
// Chain the future to the original thunk.

6
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java

@ -111,10 +111,10 @@ public class ProducerInterceptors<K, V> implements Closeable { @@ -111,10 +111,10 @@ public class ProducerInterceptors<K, V> implements Closeable {
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition = new TopicPartition(record.topic(),
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
}
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1),
exception);
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
}
} catch (Exception e) {
// do not propagate interceptor exceptions, just log

6
clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java

@ -54,10 +54,8 @@ public class RecordHeaders implements Headers { @@ -54,10 +54,8 @@ public class RecordHeaders implements Headers {
this.headers = new ArrayList<>((Collection<Header>) headers);
} else {
this.headers = new ArrayList<>();
Iterator<Header> iterator = headers.iterator();
while (iterator.hasNext()) {
this.headers.add(iterator.next());
}
for (Header header : headers)
this.headers.add(header);
}
}

5
clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java

@ -109,6 +109,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl @@ -109,6 +109,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
return outerRecord().timestampType() == timestampType;
}
@Override
public Long checksumOrNull() {
return checksum();
}
@Override
public long checksum() {
return outerRecord().checksum();

67
clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java

@ -77,7 +77,6 @@ public class DefaultRecord implements Record { @@ -77,7 +77,6 @@ public class DefaultRecord implements Record {
private final ByteBuffer key;
private final ByteBuffer value;
private final Header[] headers;
private Long checksum = null;
private DefaultRecord(int sizeInBytes,
byte attributes,
@ -122,10 +121,8 @@ public class DefaultRecord implements Record { @@ -122,10 +121,8 @@ public class DefaultRecord implements Record {
}
@Override
public long checksum() {
if (checksum == null)
checksum = computeChecksum(timestamp, key, value);
return checksum;
public Long checksumOrNull() {
return null;
}
@Override
@ -174,14 +171,14 @@ public class DefaultRecord implements Record { @@ -174,14 +171,14 @@ public class DefaultRecord implements Record {
}
/**
* Write the record to `out` and return its crc.
* Write the record to `out` and return its size.
*/
public static long writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
ByteUtils.writeVarint(sizeInBytes, out);
@ -230,18 +227,18 @@ public class DefaultRecord implements Record { @@ -230,18 +227,18 @@ public class DefaultRecord implements Record {
}
}
return computeChecksum(timestampDelta, key, value);
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
/**
* Write the record to `out` and return its crc.
* Write the record to `out` and return its size.
*/
public static long writeTo(ByteBuffer out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
public static int writeTo(ByteBuffer out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) {
try {
return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta,
key, value, headers);
@ -251,24 +248,6 @@ public class DefaultRecord implements Record { @@ -251,24 +248,6 @@ public class DefaultRecord implements Record {
}
}
/**
* Compute the checksum of the record from the timestamp, key and value payloads
*/
private static long computeChecksum(long timestamp,
ByteBuffer key,
ByteBuffer value) {
Checksum crc = Crc32C.create();
Checksums.updateLong(crc, timestamp);
if (key != null)
Checksums.update(crc, key, key.remaining());
if (value != null)
Checksums.update(crc, value, value.remaining());
return crc.getValue();
}
@Override
public boolean hasMagic(byte magic) {
return magic >= MAGIC_VALUE_V2;
@ -493,14 +472,18 @@ public class DefaultRecord implements Record { @@ -493,14 +472,18 @@ public class DefaultRecord implements Record {
return size;
}
static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) {
return recordSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
}
static int recordSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
int keySize = key == null ? -1 : key.remaining();
int valueSize = value == null ? -1 : value.remaining();
return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers);
}
public static long computePartialChecksum(long timestamp, int serializedKeySize, int serializedValueSize) {
Checksum checksum = Crc32C.create();
Checksums.updateLong(checksum, timestamp);
Checksums.updateInt(checksum, serializedKeySize);
Checksums.updateInt(checksum, serializedValueSize);
return checksum.getValue();
}
}

3
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
@ -30,7 +31,6 @@ import java.util.ArrayList; @@ -30,7 +31,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.kafka.common.utils.Utils;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@ -493,4 +493,5 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe @@ -493,4 +493,5 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
}
}

71
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

@ -344,7 +344,10 @@ public class MemoryRecordsBuilder { @@ -344,7 +344,10 @@ public class MemoryRecordsBuilder {
return writtenCompressed;
}
private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
/**
* Append a record and return its checksum for message format v0 and v1, or null for for v2 and above.
*/
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
if (isControlRecord != isControlBatch)
@ -363,10 +366,12 @@ public class MemoryRecordsBuilder { @@ -363,10 +366,12 @@ public class MemoryRecordsBuilder {
if (baseTimestamp == null)
baseTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1)
return appendDefaultRecord(offset, timestamp, key, value, headers);
else
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
return null;
} else {
return appendLegacyRecord(offset, timestamp, key, value);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
@ -379,9 +384,9 @@ public class MemoryRecordsBuilder { @@ -379,9 +384,9 @@ public class MemoryRecordsBuilder {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) {
return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers);
}
@ -392,9 +397,9 @@ public class MemoryRecordsBuilder { @@ -392,9 +397,9 @@ public class MemoryRecordsBuilder {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
return appendWithOffset(offset, false, timestamp, key, value, headers);
}
@ -404,9 +409,9 @@ public class MemoryRecordsBuilder { @@ -404,9 +409,9 @@ public class MemoryRecordsBuilder {
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) {
return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
}
@ -416,9 +421,9 @@ public class MemoryRecordsBuilder { @@ -416,9 +421,9 @@ public class MemoryRecordsBuilder {
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) {
return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS);
}
@ -426,21 +431,20 @@ public class MemoryRecordsBuilder { @@ -426,21 +431,20 @@ public class MemoryRecordsBuilder {
* Append a new record at the given offset.
* @param offset The absolute offset of the record in the log buffer
* @param record The record to append
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long appendWithOffset(long offset, SimpleRecord record) {
public Long appendWithOffset(long offset, SimpleRecord record) {
return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers());
}
/**
* Append a new record at the next sequential offset.
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long append(long timestamp, ByteBuffer key, ByteBuffer value) {
public Long append(long timestamp, ByteBuffer key, ByteBuffer value) {
return append(timestamp, key, value, Record.EMPTY_HEADERS);
}
@ -450,9 +454,9 @@ public class MemoryRecordsBuilder { @@ -450,9 +454,9 @@ public class MemoryRecordsBuilder {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}
@ -461,9 +465,9 @@ public class MemoryRecordsBuilder { @@ -461,9 +465,9 @@ public class MemoryRecordsBuilder {
* @param timestamp The record timestamp
* @param key The record key
* @param value The record value
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long append(long timestamp, byte[] key, byte[] value) {
public Long append(long timestamp, byte[] key, byte[] value) {
return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS);
}
@ -473,18 +477,18 @@ public class MemoryRecordsBuilder { @@ -473,18 +477,18 @@ public class MemoryRecordsBuilder {
* @param key The record key
* @param value The record value
* @param headers The record headers if there are any
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
public Long append(long timestamp, byte[] key, byte[] value, Header[] headers) {
return append(timestamp, wrapNullable(key), wrapNullable(value), headers);
}
/**
* Append a new record at the next sequential offset.
* @param record The record to append
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
public long append(SimpleRecord record) {
public Long append(SimpleRecord record) {
return appendWithOffset(nextSequentialOffset(), record);
}
@ -493,9 +497,9 @@ public class MemoryRecordsBuilder { @@ -493,9 +497,9 @@ public class MemoryRecordsBuilder {
* @param timestamp The record timestamp
* @param type The control record type (cannot be UNKNOWN)
* @param value The control record value
* @return crc of the record
* @return CRC of the record or null if record-level CRC is not supported for the message format
*/
private long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
private Long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) {
Struct keyStruct = type.recordKey();
ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf());
keyStruct.writeTo(key);
@ -503,7 +507,10 @@ public class MemoryRecordsBuilder { @@ -503,7 +507,10 @@ public class MemoryRecordsBuilder {
return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS);
}
public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
/**
* Return CRC of the record or null if record-level CRC is not supported for the message format
*/
public Long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) {
if (producerId == RecordBatch.NO_PRODUCER_ID)
throw new IllegalArgumentException("End transaction marker requires a valid producerId");
if (!isTransactional)
@ -568,15 +575,13 @@ public class MemoryRecordsBuilder { @@ -568,15 +575,13 @@ public class MemoryRecordsBuilder {
appendWithOffset(nextSequentialOffset(), record);
}
private long appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
long crc = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
// TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size?
recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers));
return crc;
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {

5
clients/src/main/java/org/apache/kafka/common/record/Record.java

@ -54,9 +54,10 @@ public interface Record { @@ -54,9 +54,10 @@ public interface Record {
/**
* Get a checksum of the record contents.
* @return a 4-byte unsigned checksum represented as a long
* @return A 4-byte unsigned checksum represented as a long or null if the message format does not
* include a checksum (i.e. for v2 and above)
*/
long checksum();
Long checksumOrNull();
/**
* Check whether the record has a valid checksum.

12
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
@ -25,6 +26,7 @@ import static org.junit.Assert.assertEquals; @@ -25,6 +26,7 @@ import static org.junit.Assert.assertEquals;
public class ConsumerRecordTest {
@Test
@SuppressWarnings("deprecation")
public void testOldConstructor() {
String topic = "topic";
int partition = 0;
@ -46,5 +48,15 @@ public class ConsumerRecordTest { @@ -46,5 +48,15 @@ public class ConsumerRecordTest {
assertEquals(new RecordHeaders(), record.headers());
}
@Test
@SuppressWarnings("deprecation")
public void testNullChecksumInConstructor() {
String key = "key";
String value = "value";
long timestamp = 242341324L;
ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 0, 23L, timestamp,
TimestampType.CREATE_TIME, null, key.length(), value.length(), key, value, new RecordHeaders());
assertEquals(DefaultRecord.computePartialChecksum(timestamp, key.length(), value.length()), record.checksum());
}
}

79
clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.DefaultRecord;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class RecordMetadataTest {
@Test
@SuppressWarnings("deprecation")
public void testConstructionWithMissingRelativeOffset() {
TopicPartition tp = new TopicPartition("foo", 0);
long timestamp = 2340234L;
int keySize = 3;
int valueSize = 5;
Long checksum = 908923L;
RecordMetadata metadata = new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize);
assertEquals(tp.topic(), metadata.topic());
assertEquals(tp.partition(), metadata.partition());
assertEquals(timestamp, metadata.timestamp());
assertEquals(-1L, metadata.offset());
assertEquals(checksum.longValue(), metadata.checksum());
assertEquals(keySize, metadata.serializedKeySize());
assertEquals(valueSize, metadata.serializedValueSize());
}
@Test
@SuppressWarnings("deprecation")
public void testConstructionWithRelativeOffset() {
TopicPartition tp = new TopicPartition("foo", 0);
long timestamp = 2340234L;
int keySize = 3;
int valueSize = 5;
long baseOffset = 15L;
long relativeOffset = 3L;
Long checksum = 908923L;
RecordMetadata metadata = new RecordMetadata(tp, baseOffset, relativeOffset, timestamp, checksum,
keySize, valueSize);
assertEquals(tp.topic(), metadata.topic());
assertEquals(tp.partition(), metadata.partition());
assertEquals(timestamp, metadata.timestamp());
assertEquals(baseOffset + relativeOffset, metadata.offset());
assertEquals(checksum.longValue(), metadata.checksum());
assertEquals(keySize, metadata.serializedKeySize());
assertEquals(valueSize, metadata.serializedValueSize());
}
@Test
@SuppressWarnings("deprecation")
public void testNullChecksum() {
long timestamp = 2340234L;
int keySize = 3;
int valueSize = 5;
RecordMetadata metadata = new RecordMetadata(new TopicPartition("foo", 0), 15L, 3L, timestamp, null,
keySize, valueSize);
assertEquals(DefaultRecord.computePartialChecksum(timestamp, keySize, valueSize), metadata.checksum());
}
}

6
clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java

@ -46,7 +46,7 @@ public class RecordSendTest { @@ -46,7 +46,7 @@ public class RecordSendTest {
public void testTimeout() throws Exception {
ProduceRequestResult request = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset,
RecordBatch.NO_TIMESTAMP, 0, 0, 0);
RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
assertFalse("Request is not completed", future.isDone());
try {
future.get(5, TimeUnit.MILLISECONDS);
@ -66,7 +66,7 @@ public class RecordSendTest { @@ -66,7 +66,7 @@ public class RecordSendTest {
@Test(expected = ExecutionException.class)
public void testError() throws Exception {
FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L),
relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
future.get();
}
@ -76,7 +76,7 @@ public class RecordSendTest { @@ -76,7 +76,7 @@ public class RecordSendTest {
@Test
public void testBlocking() throws Exception {
FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L),
relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0);
relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0);
assertEquals(baseOffset + relOffset, future.get().offset());
}

29
clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java

@ -18,17 +18,21 @@ package org.apache.kafka.clients.producer.internals; @@ -18,17 +18,21 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ProducerBatchTest {
@ -38,6 +42,31 @@ public class ProducerBatchTest { @@ -38,6 +42,31 @@ public class ProducerBatchTest {
private final MemoryRecordsBuilder memoryRecordsBuilder = MemoryRecords.builder(ByteBuffer.allocate(128),
CompressionType.NONE, TimestampType.CREATE_TIME, 128);
@Test
public void testChecksumNullForMagicV2() {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
assertNotNull(future);
assertNull(future.checksumOrNull());
}
@Test
public void testAppendedChecksumMagicV0AndV1() {
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic,
CompressionType.NONE, TimestampType.CREATE_TIME, 128);
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
byte[] key = "hi".getBytes();
byte[] value = "there".getBytes();
FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now);
assertNotNull(future);
byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value);
assertEquals(expectedChecksum, future.checksumOrNull().longValue());
}
}
/**
* A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create
* time is interpreted correctly as not expired when the linger time is larger than the difference

2
clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java

@ -144,7 +144,7 @@ public class ProducerInterceptorsTest { @@ -144,7 +144,7 @@ public class ProducerInterceptorsTest {
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
// verify onAck is called on all interceptors
RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0);
RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0);
interceptors.onAcknowledgement(meta, null);
assertEquals(2, onAckCount);

8
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -16,9 +16,6 @@ @@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.producer.internals;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
@ -33,8 +30,8 @@ import org.apache.kafka.common.metrics.Metrics; @@ -33,8 +30,8 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ApiVersionsResponse;
@ -52,7 +49,10 @@ import java.util.Deque; @@ -52,7 +49,10 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

16
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java

@ -322,6 +322,22 @@ public class MemoryRecordsBuilderTest { @@ -322,6 +322,22 @@ public class MemoryRecordsBuilderTest {
}
}
@Test
public void testAppendedChecksumConsistency() {
ByteBuffer buffer = ByteBuffer.allocate(512);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes());
MemoryRecords memoryRecords = builder.build();
List<Record> records = TestUtils.toList(memoryRecords.records());
assertEquals(1, records.size());
assertEquals(checksumOrNull, records.get(0).checksumOrNull());
}
}
@Test
public void testSmallWriteLimit() {
// with a small write limit, we always allow at least one record to be added

3
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java

@ -56,6 +56,7 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume @@ -56,6 +56,7 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
}
@Override
@SuppressWarnings("deprecation")
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// This will ensure that we get the cluster metadata when onConsume is called for the first time
@ -99,4 +100,4 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume @@ -99,4 +100,4 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
public void onUpdate(ClusterResource clusterResource) {
CLUSTER_META.set(clusterResource);
}
}
}

6
core/src/main/scala/kafka/tools/DumpLogSegments.scala

@ -353,11 +353,13 @@ object DumpLogSegments { @@ -353,11 +353,13 @@ object DumpLogSegments {
print("offset: " + record.offset + " position: " + validBytes +
" " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid +
" keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic +
" compresscodec: " + batch.compressionType + " crc: " + record.checksum)
" compresscodec: " + batch.compressionType)
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
print(" sequence: " + record.sequence +
print(" crc: " + batch.checksum + " sequence: " + record.sequence +
" headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]"))
} else {
print(" crc: " + record.checksumOrNull)
}
if (batch.isControlBatch) {

2
docs/upgrade.html

@ -68,7 +68,7 @@ @@ -68,7 +68,7 @@
individual messages is only reduced by the overhead of the batch format. This similarly affects the
producer's <code>batch.size</code> configuration.</li>
<li>GC log rotation is enabled by default, see KAFKA-3754 for details.</li>
<li>Deprecated constructors of MetricName and Cluster classes have been removed.</li>
<li>Deprecated constructors of RecordMetadata, MetricName and Cluster classes have been removed.</li>
<li>Added user headers support through a new Headers interface providing user headers read and write access.</li>
<li>ProducerRecord and ConsumerRecord expose the new Headers API via <code>Headers headers()</code> method call.</li>
<li>ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.</li>

Loading…
Cancel
Save