Browse Source

MINOR: Enable capture of full stack trace in StreamTask#process (#6310)

Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/6316/head
Bill Bejeck 6 years ago committed by Matthias J. Sax
parent
commit
12e8b8c2c7
  1. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -47,6 +47,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -374,12 +376,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
} catch (final ProducerFencedException fatal) { } catch (final ProducerFencedException fatal) {
throw new TaskMigratedException(this, fatal); throw new TaskMigratedException(this, fatal);
} catch (final KafkaException e) { } catch (final KafkaException e) {
throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", final String stackTrace = getStacktraceString(e);
throw new StreamsException(format("Exception caught in process. taskId=%s, " +
"processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s",
id(), id(),
processorContext.currentNode().name(), processorContext.currentNode().name(),
record.topic(), record.topic(),
record.partition(), record.partition(),
record.offset() record.offset(),
stackTrace
), e); ), e);
} finally { } finally {
processorContext.setCurrentNode(null); processorContext.setCurrentNode(null);
@ -388,6 +393,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return true; return true;
} }
private String getStacktraceString(final KafkaException e) {
String stacktrace = null;
try (final StringWriter stringWriter = new StringWriter();
final PrintWriter printWriter = new PrintWriter(stringWriter)) {
e.printStackTrace(printWriter);
stacktrace = stringWriter.toString();
} catch (final IOException ioe) {
log.error("Encountered error extracting stacktrace from this exception", ioe);
}
return stacktrace;
}
/** /**
* @throws IllegalStateException if the current node is not null * @throws IllegalStateException if the current node is not null
* @throws TaskMigratedException if the task producer got fenced (EOS only) * @throws TaskMigratedException if the task producer got fenced (EOS only)

Loading…
Cancel
Save