From 1f692bdf53af4a80b7fd256de4e94ff1d17fc861 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 21 Mar 2019 23:53:56 -0700 Subject: [PATCH] KAFKA-8142: Fix NPE for nulls in Headers (#6484) Reviewers: Bill Bejeck , Guozhang Wang --- .../header/internals/RecordHeaders.java | 12 ++- .../header/internals/RecordHeadersTest.java | 21 ++-- .../internals/ProcessorRecordContext.java | 5 +- .../internals/ProcessorRecordContextTest.java | 98 +++++++++++++++++++ 4 files changed, 122 insertions(+), 14 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java index 577e758348d..5801bed99cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java +++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java @@ -16,16 +16,17 @@ */ package org.apache.kafka.common.header.internals; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.AbstractIterator; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; - -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.utils.AbstractIterator; +import java.util.Objects; public class RecordHeaders implements Headers { @@ -61,6 +62,7 @@ public class RecordHeaders implements Headers { @Override public Headers add(Header header) throws IllegalStateException { + Objects.requireNonNull(header, "Header cannot be null."); canWrite(); headers.add(header); return this; diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java index 39c1c9c9a7b..5b9f95ea91f 100644 --- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java @@ -16,19 +16,19 @@ */ package org.apache.kafka.common.header.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.junit.Test; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RecordHeadersTest { @@ -206,6 +206,11 @@ public class RecordHeadersTest { assertEquals(2, getCount(newHeaders)); } + @Test(expected = NullPointerException.class) + public void shouldThrowNpeWhenAddingNullHeader() { + new RecordHeaders().add(null); + } + private int getCount(Headers headers) { int count = 0; Iterator
headerIterator = headers.iterator(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 7e1466eb32d..00012746bd0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -90,7 +90,10 @@ public class ProcessorRecordContext implements RecordContext { if (headers != null) { for (final Header header : headers) { size += header.key().toCharArray().length; - size += header.value().length; + final byte[] value = header.value(); + if (value != null) { + size += value.length; + } } } return size; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java new file mode 100644 index 00000000000..1ea646fce2f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ProcessorRecordContextTest { + // timestamp + offset + partition: 8 + 8 + 4 + private final static long MIN_SIZE = 20L; + + @Test + public void shouldEstimateNullTopicAndNullHeadersAsZeroLength() { + final Headers headers = new RecordHeaders(); + final ProcessorRecordContext context = new ProcessorRecordContext( + 42L, + 73L, + 0, + null, + null + ); + + assertEquals(MIN_SIZE, context.sizeBytes()); + } + + @Test + public void shouldEstimateEmptyHeaderAsZeroLength() { + final ProcessorRecordContext context = new ProcessorRecordContext( + 42L, + 73L, + 0, + null, + new RecordHeaders() + ); + + assertEquals(MIN_SIZE, context.sizeBytes()); + } + + @Test + public void shouldEstimateTopicLength() { + final ProcessorRecordContext context = new ProcessorRecordContext( + 42L, + 73L, + 0, + "topic", + null + ); + + assertEquals(MIN_SIZE + 5L, context.sizeBytes()); + } + + @Test + public void shouldEstimateHeadersLength() { + final Headers headers = new RecordHeaders(); + headers.add("header-key", "header-value".getBytes()); + final ProcessorRecordContext context = new ProcessorRecordContext( + 42L, + 73L, + 0, + null, + headers + ); + + assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes()); + } + + @Test + public void shouldEstimateNullValueInHeaderAsZero() { + final Headers headers = new RecordHeaders(); + headers.add("header-key", null); + final ProcessorRecordContext context = new ProcessorRecordContext( + 42L, + 73L, + 0, + null, + headers + ); + + assertEquals(MIN_SIZE + 10L, context.sizeBytes()); + } +}