Browse Source

KAFKA-5949; Follow-up after latest KIP-161 changes

- compare KAFKA-5958

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3986 from mjsax/kafka-5949-exceptions-user-callbacks-KIP-161-follow-up
pull/3943/merge
Matthias J. Sax 7 years ago committed by Damian Guy
parent
commit
b79b179716
  1. 30
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 30
      streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java

30
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -562,21 +562,45 @@ public class KafkaStreams {
@Override @Override
public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
if (globalStateRestoreListener != null) { if (globalStateRestoreListener != null) {
globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); try {
globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
} }
} }
@Override @Override
public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) {
if (globalStateRestoreListener != null) { if (globalStateRestoreListener != null) {
globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); try {
globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
} }
} }
@Override @Override
public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
if (globalStateRestoreListener != null) { if (globalStateRestoreListener != null) {
globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); try {
globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
} }
} }
}; };

30
streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java

@ -55,15 +55,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
final String storeName, final String storeName,
final long startingOffset, final long startingOffset,
final long endingOffset) { final long endingOffset) {
try { userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset); storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
} }
@ -76,15 +68,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
final String storeName, final String storeName,
final long batchEndOffset, final long batchEndOffset,
final long numRestored) { final long numRestored) {
try { userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored); storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
} }
@ -96,15 +80,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
public void onRestoreEnd(final TopicPartition topicPartition, public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName, final String storeName,
final long totalRestored) { final long totalRestored) {
try { userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
} catch (final Exception fatalUserException) {
throw new StreamsException(
String.format("Fatal user code error in store restore listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}
storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored); storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
} }

Loading…
Cancel
Save