Browse Source

MINOR: Log exception thrown by consumer.poll() in VerifiableConsumer (#6368)

SecurityTest.test_client_ssl_endpoint_validation_failure is failing because it greps for 'SSLHandshakeException in the consumer and producer log files. With the fix for KAKFA-7773, the test uses the VerifiableConsumer instead of the ConsoleConsumer, which does not log the exception stack trace to the service log. This patch catches exceptions in the VerifiableConsumer and logs them in order to fix the test. Tested by running the test locally.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
pull/7328/head
Bob Barrett 6 years ago committed by Jason Gustafson
parent
commit
6acd58a423
  1. 8
      tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java

8
tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java

@ -43,6 +43,8 @@ import org.apache.kafka.common.errors.WakeupException; @@ -43,6 +43,8 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@ -82,6 +84,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; @@ -82,6 +84,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
*/
public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener {
private static final Logger log = LoggerFactory.getLogger(VerifiableConsumer.class);
private final ObjectMapper mapper = new ObjectMapper();
private final PrintStream out;
private final KafkaConsumer<String, String> consumer;
@ -233,6 +237,10 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons @@ -233,6 +237,10 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
}
} catch (WakeupException e) {
// ignore, we are closing
log.trace("Caught WakeupException because consumer is shutdown, ignore and terminate.", e);
} catch (Throwable t) {
// Log the error so it goes to the service log and not stdout
log.error("Error during processing, terminating consumer process: ", t);
} finally {
consumer.close();
printJson(new ShutdownComplete());

Loading…
Cancel
Save