Browse Source

KAFKA-3189: Kafka server returns UnknownServerException for inherited exceptions

… exceptions

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #856 from granthenke/inherited-errors
pull/856/merge
Grant Henke 9 years ago committed by Ewen Cheslack-Postava
parent
commit
cd15321e0d
  1. 13
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  2. 17
      clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java
  3. 32
      core/src/test/scala/unit/kafka/message/MessageTest.scala

13
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -192,10 +192,17 @@ public enum Errors { @@ -192,10 +192,17 @@ public enum Errors {
}
/**
* Return the error instance associated with this exception (or UNKNOWN if there is none)
* Return the error instance associated with this exception or any of its superclasses (or UNKNOWN if there is none).
* If there are multiple matches in the class hierarchy, the first match starting from the bottom is used.
*/
public static Errors forException(Throwable t) {
Errors error = classToError.get(t.getClass());
return error == null ? UNKNOWN : error;
Class clazz = t.getClass();
while (clazz != null) {
Errors error = classToError.get(clazz);
if (error != null)
return error;
clazz = clazz.getSuperclass();
}
return UNKNOWN;
}
}

17
clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java

@ -24,6 +24,7 @@ import java.util.HashSet; @@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Test;
public class ErrorsTest {
@ -60,4 +61,20 @@ public class ErrorsTest { @@ -60,4 +61,20 @@ public class ErrorsTest {
assertNull("The NONE error should not have an exception", Errors.NONE.exception());
}
@Test
public void testForExceptionInheritance() {
class ExtendedTimeoutException extends TimeoutException { }
Errors expectedError = Errors.forException(new TimeoutException());
Errors actualError = Errors.forException(new ExtendedTimeoutException());
assertEquals("forException should match super classes", expectedError, actualError);
}
@Test
public void testForExceptionDefault() {
Errors error = Errors.forException(new ApiException());
assertEquals("forException should default to unknown", Errors.UNKNOWN, error);
}
}

32
core/src/test/scala/unit/kafka/message/MessageTest.scala

@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@ -19,6 +19,8 @@ package kafka.message @@ -19,6 +19,8 @@ package kafka.message
import java.nio._
import java.util.HashMap
import org.apache.kafka.common.protocol.Errors
import scala.collection._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
@ -27,15 +29,15 @@ import kafka.utils.TestUtils @@ -27,15 +29,15 @@ import kafka.utils.TestUtils
import kafka.utils.CoreUtils
import org.apache.kafka.common.utils.Utils
case class MessageTestVal(val key: Array[Byte],
val payload: Array[Byte],
val codec: CompressionCodec,
case class MessageTestVal(val key: Array[Byte],
val payload: Array[Byte],
val codec: CompressionCodec,
val message: Message)
class MessageTest extends JUnitSuite {
var messages = new mutable.ArrayBuffer[MessageTestVal]()
@Before
def setUp(): Unit = {
val keys = Array(null, "key".getBytes, "".getBytes)
@ -44,7 +46,7 @@ class MessageTest extends JUnitSuite { @@ -44,7 +46,7 @@ class MessageTest extends JUnitSuite {
for(k <- keys; v <- vals; codec <- codecs)
messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
}
@Test
def testFieldValues {
for(v <- messages) {
@ -73,7 +75,7 @@ class MessageTest extends JUnitSuite { @@ -73,7 +75,7 @@ class MessageTest extends JUnitSuite {
assertFalse("Message with invalid checksum should be invalid", v.message.isValid)
}
}
@Test
def testEquality() {
for(v <- messages) {
@ -84,7 +86,7 @@ class MessageTest extends JUnitSuite { @@ -84,7 +86,7 @@ class MessageTest extends JUnitSuite {
assertTrue("Should equal another message with the same content.", v.message.equals(copy))
}
}
@Test
def testIsHashable() {
// this is silly, but why not
@ -94,6 +96,14 @@ class MessageTest extends JUnitSuite { @@ -94,6 +96,14 @@ class MessageTest extends JUnitSuite {
for(v <- messages)
assertEquals(v.message, m.get(v.message))
}
@Test
def testExceptionMapping() {
val expected = Errors.CORRUPT_MESSAGE
val actual = Errors.forException(new InvalidMessageException())
assertEquals("InvalidMessageException should map to a corrupt message error", expected, actual)
}
}

Loading…
Cancel
Save