Browse Source

KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the exception

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4046 from mjsax/kafka-5541-minor-follow-up
pull/3750/head
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
c22c1775a5
  1. 34
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

34
streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java

@ -507,12 +507,17 @@ class AssignedTasks implements RestoringTasks { @@ -507,12 +507,17 @@ class AssignedTasks implements RestoringTasks {
} catch (final TaskMigratedException e) {
firstException.compareAndSet(null, closeZombieTask(task));
} catch (final RuntimeException t) {
firstException.compareAndSet(null, t);
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
t);
firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
if (clean) {
if (!closeUnclean(task)) {
firstException.compareAndSet(null, t);
}
} else {
firstException.compareAndSet(null, t);
}
}
}
@ -524,21 +529,18 @@ class AssignedTasks implements RestoringTasks { @@ -524,21 +529,18 @@ class AssignedTasks implements RestoringTasks {
}
}
private RuntimeException closeUncleanIfRequired(final Task task,
final boolean triedToCloseCleanlyPreviously) {
if (triedToCloseCleanlyPreviously) {
log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
try {
task.close(false, false);
} catch (final RuntimeException fatalException) {
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
fatalException);
return fatalException;
}
private boolean closeUnclean(final Task task) {
log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
try {
task.close(false, false);
} catch (final RuntimeException fatalException) {
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
fatalException);
return false;
}
return null;
return true;
}
}

Loading…
Cancel
Save