From b79b179716b5f8bacb870a53a5a9216a0687b3c9 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 29 Sep 2017 10:21:57 +0100 Subject: [PATCH] KAFKA-5949; Follow-up after latest KIP-161 changes - compare KAFKA-5958 Author: Matthias J. Sax Reviewers: Damian Guy Closes #3986 from mjsax/kafka-5949-exceptions-user-callbacks-KIP-161-follow-up --- .../apache/kafka/streams/KafkaStreams.java | 30 +++++++++++++++++-- .../internals/CompositeRestoreListener.java | 30 ++----------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2f5ce4bd379..928d0e9e8e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -562,21 +562,45 @@ public class KafkaStreams { @Override public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { if (globalStateRestoreListener != null) { - globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); + try { + globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in store restore listener for store %s, partition %s.", + storeName, + topicPartition), + fatalUserException); + } } } @Override public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { if (globalStateRestoreListener != null) { - globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); + try { + globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in store restore listener for store %s, partition %s.", + storeName, + topicPartition), + fatalUserException); + } } } @Override public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { if (globalStateRestoreListener != null) { - globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); + try { + globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); + } catch (final Exception fatalUserException) { + throw new StreamsException( + String.format("Fatal user code error in store restore listener for store %s, partition %s.", + storeName, + topicPartition), + fatalUserException); + } } } }; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java index a1c2f7f23f4..01ba4572bf6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java @@ -55,15 +55,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S final String storeName, final long startingOffset, final long endingOffset) { - try { - userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); - } catch (final Exception fatalUserException) { - throw new StreamsException( - String.format("Fatal user code error in store restore listener for store %s, partition %s.", - storeName, - topicPartition), - fatalUserException); - } + userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); } @@ -76,15 +68,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S final String storeName, final long batchEndOffset, final long numRestored) { - try { - userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); - } catch (final Exception fatalUserException) { - throw new StreamsException( - String.format("Fatal user code error in store restore listener for store %s, partition %s.", - storeName, - topicPartition), - fatalUserException); - } + userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); } @@ -96,15 +80,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { - try { - userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); - } catch (final Exception fatalUserException) { - throw new StreamsException( - String.format("Fatal user code error in store restore listener for store %s, partition %s.", - storeName, - topicPartition), - fatalUserException); - } + userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); }