diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6dbf394ae4b..ca3375689aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -47,6 +47,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -374,12 +376,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } catch (final ProducerFencedException fatal) { throw new TaskMigratedException(this, fatal); } catch (final KafkaException e) { - throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", + final String stackTrace = getStacktraceString(e); + throw new StreamsException(format("Exception caught in process. taskId=%s, " + + "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", id(), processorContext.currentNode().name(), record.topic(), record.partition(), - record.offset() + record.offset(), + stackTrace ), e); } finally { processorContext.setCurrentNode(null); @@ -388,6 +393,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator return true; } + private String getStacktraceString(final KafkaException e) { + String stacktrace = null; + try (final StringWriter stringWriter = new StringWriter(); + final PrintWriter printWriter = new PrintWriter(stringWriter)) { + e.printStackTrace(printWriter); + stacktrace = stringWriter.toString(); + } catch (final IOException ioe) { + log.error("Encountered error extracting stacktrace from this exception", ioe); + } + return stacktrace; + } + /** * @throws IllegalStateException if the current node is not null * @throws TaskMigratedException if the task producer got fenced (EOS only)