Browse Source

KAFKA-8142: Fix NPE for nulls in Headers (#6484)

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/6493/head
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
1f692bdf53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
  2. 21
      clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
  3. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
  4. 98
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java

12
clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java

@ -16,16 +16,17 @@ @@ -16,16 +16,17 @@
*/
package org.apache.kafka.common.header.internals;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.AbstractIterator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.AbstractIterator;
import java.util.Objects;
public class RecordHeaders implements Headers {
@ -61,6 +62,7 @@ public class RecordHeaders implements Headers { @@ -61,6 +62,7 @@ public class RecordHeaders implements Headers {
@Override
public Headers add(Header header) throws IllegalStateException {
Objects.requireNonNull(header, "Header cannot be null.");
canWrite();
headers.add(header);
return this;

21
clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java

@ -16,19 +16,19 @@ @@ -16,19 +16,19 @@
*/
package org.apache.kafka.common.header.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RecordHeadersTest {
@ -206,6 +206,11 @@ public class RecordHeadersTest { @@ -206,6 +206,11 @@ public class RecordHeadersTest {
assertEquals(2, getCount(newHeaders));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNpeWhenAddingNullHeader() {
new RecordHeaders().add(null);
}
private int getCount(Headers headers) {
int count = 0;
Iterator<Header> headerIterator = headers.iterator();

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

@ -90,7 +90,10 @@ public class ProcessorRecordContext implements RecordContext { @@ -90,7 +90,10 @@ public class ProcessorRecordContext implements RecordContext {
if (headers != null) {
for (final Header header : headers) {
size += header.key().toCharArray().length;
size += header.value().length;
final byte[] value = header.value();
if (value != null) {
size += value.length;
}
}
}
return size;

98
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ProcessorRecordContextTest {
// timestamp + offset + partition: 8 + 8 + 4
private final static long MIN_SIZE = 20L;
@Test
public void shouldEstimateNullTopicAndNullHeadersAsZeroLength() {
final Headers headers = new RecordHeaders();
final ProcessorRecordContext context = new ProcessorRecordContext(
42L,
73L,
0,
null,
null
);
assertEquals(MIN_SIZE, context.sizeBytes());
}
@Test
public void shouldEstimateEmptyHeaderAsZeroLength() {
final ProcessorRecordContext context = new ProcessorRecordContext(
42L,
73L,
0,
null,
new RecordHeaders()
);
assertEquals(MIN_SIZE, context.sizeBytes());
}
@Test
public void shouldEstimateTopicLength() {
final ProcessorRecordContext context = new ProcessorRecordContext(
42L,
73L,
0,
"topic",
null
);
assertEquals(MIN_SIZE + 5L, context.sizeBytes());
}
@Test
public void shouldEstimateHeadersLength() {
final Headers headers = new RecordHeaders();
headers.add("header-key", "header-value".getBytes());
final ProcessorRecordContext context = new ProcessorRecordContext(
42L,
73L,
0,
null,
headers
);
assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes());
}
@Test
public void shouldEstimateNullValueInHeaderAsZero() {
final Headers headers = new RecordHeaders();
headers.add("header-key", null);
final ProcessorRecordContext context = new ProcessorRecordContext(
42L,
73L,
0,
null,
headers
);
assertEquals(MIN_SIZE + 10L, context.sizeBytes());
}
}
Loading…
Cancel
Save