Browse Source

KAFKA-5700; Producer should not drop header information when splitting batches

Producer should not drop header information when splitting batches.  This PR also corrects a minor typo in Sender.java, where `spitting and retrying` should be `splitting and retrying`.

Author: huxihx <huxi_2b@hotmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin <becket.qin@gmail.com>

Closes #3620 from huxihx/KAFKA-5700
pull/3632/merge
huxihx 7 years ago committed by Jiangjie Qin
parent
commit
1cd86284e8
  1. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  3. 40
      clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

@ -128,7 +128,7 @@ public final class ProducerBatch { @@ -128,7 +128,7 @@ public final class ProducerBatch {
return false;
} else {
// No need to get the CRC.
this.recordsBuilder.append(timestamp, key, value);
this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -498,7 +498,7 @@ public class Sender implements Runnable { @@ -498,7 +498,7 @@ public class Sender implements Runnable {
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
// If the batch is too large, we split the batch and send the split batches again. We do not decrement
// the retry attempts in this case.
log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}",
log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts(),

40
clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java

@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals; @@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
@ -31,6 +33,7 @@ import org.junit.Test; @@ -31,6 +33,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
@ -138,6 +141,43 @@ public class ProducerBatchTest { @@ -138,6 +141,43 @@ public class ProducerBatchTest {
}
}
@Test
public void testSplitPreservesHeaders() {
for (CompressionType compressionType : CompressionType.values()) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
ByteBuffer.allocate(1024),
MAGIC_VALUE_V2,
compressionType,
TimestampType.CREATE_TIME,
0L);
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now);
Header header = new RecordHeader("header-key", "header-value".getBytes());
while (true) {
FutureRecordMetadata future = batch.tryAppend(
now, "hi".getBytes(), "there".getBytes(),
new Header[]{header}, null, now);
if (future == null) {
break;
}
}
Deque<ProducerBatch> batches = batch.split(200);
assertTrue("This batch should be split to multiple small batches.", batches.size() >= 2);
for (ProducerBatch splitProducerBatch : batches) {
for (RecordBatch splitBatch : splitProducerBatch.records().batches()) {
Iterator<Record> iter = splitBatch.iterator();
while (iter.hasNext()) {
Record record = iter.next();
assertTrue("Header size should be 1.", record.headers().length == 1);
assertTrue("Header key should be 'header-key'.", record.headers()[0].key().equals("header-key"));
assertTrue("Header value should be 'header-value'.", new String(record.headers()[0].value()).equals("header-value"));
}
}
}
}
}
@Test
public void testSplitPreservesMagicAndCompressionType() {
for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2)) {

Loading…
Cancel
Save