diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java index a3b3700a1e0..9783ea0c531 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -37,7 +37,10 @@ public class StringDeserializer implements Deserializer { @Override public String deserialize(String topic, byte[] data) { try { - return new String(data, encoding); + if (data == null) + return null; + else + return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java index 02db47f8736..636d905ffc7 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java @@ -37,7 +37,10 @@ public class StringSerializer implements Serializer { @Override public byte[] serialize(String topic, String data) { try { - return data.getBytes(encoding); + if (data == null) + return null; + else + return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index d550a3137c0..b6e14975066 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -48,8 +48,10 @@ public class SerializationTest { assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str))); - } + assertEquals("Should support null in serialization and deserialization with encoding " + encoding, + null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + } } private SerDeser getStringSerDeser(String encoder) {