From 342c8172d7f4065e78fc29ca4c56fae0289d861d Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Thu, 4 Oct 2018 20:34:50 -0700 Subject: [PATCH] KAFKA-7476: Fix Date-based types in SchemaProjector Various converters (AvroConverter and JsonConverter) produce a SchemaAndValue consisting of a logical schema type and a java.util.Date. This is a fix for SchemaProjector to properly handle the Date. Author: Robert Yokota Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #5736 from rayokota/KAFKA-7476 (cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d) Signed-off-by: Ewen Cheslack-Postava --- .../apache/kafka/connect/data/SchemaProjector.java | 2 +- .../kafka/connect/data/SchemaProjectorTest.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java index ad0caf85f83..323d18d40a7 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -161,7 +161,7 @@ public class SchemaProjector { assert source.type().isPrimitive(); assert target.type().isPrimitive(); Object result; - if (isPromotable(source.type(), target.type())) { + if (isPromotable(source.type(), target.type()) && record instanceof Number) { Number numberRecord = (Number) record; switch (target.type()) { case INT8: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java index 0b1760be7a2..08b357593a8 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -351,6 +351,17 @@ public class SchemaProjectorTest { projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); assertEquals(34567L, projected); + java.util.Date date = new java.util.Date(); + + projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA); + assertEquals(date, projected); + + projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA); + assertEquals(date, projected); + + projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA); + assertEquals(date, projected); + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); for (Schema logicalTypeSchema: logicalTypeSchemas) { try {