Browse Source

KAFKA-7476: Fix Date-based types in SchemaProjector

Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.

Author: Robert Yokota <rayokota@gmail.com>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #5736 from rayokota/KAFKA-7476

(cherry picked from commit 3edd8e7333)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
pull/4886/head
Robert Yokota 6 years ago committed by Ewen Cheslack-Postava
parent
commit
342c8172d7
  1. 2
      connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
  2. 11
      connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java

2
connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java

@ -161,7 +161,7 @@ public class SchemaProjector { @@ -161,7 +161,7 @@ public class SchemaProjector {
assert source.type().isPrimitive();
assert target.type().isPrimitive();
Object result;
if (isPromotable(source.type(), target.type())) {
if (isPromotable(source.type(), target.type()) && record instanceof Number) {
Number numberRecord = (Number) record;
switch (target.type()) {
case INT8:

11
connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java

@ -351,6 +351,17 @@ public class SchemaProjectorTest { @@ -351,6 +351,17 @@ public class SchemaProjectorTest {
projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA);
assertEquals(34567L, projected);
java.util.Date date = new java.util.Date();
projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
assertEquals(date, projected);
projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
assertEquals(date, projected);
projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA);
assertEquals(date, projected);
Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build();
for (Schema logicalTypeSchema: logicalTypeSchemas) {
try {

Loading…
Cancel
Save