Browse Source

KAFKA-4786; Wait for heartbeat thread to terminate in consumer close

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #2586 from rajinisivaram/KAFKA-4786
pull/2563/merge
Rajini Sivaram 8 years ago committed by Jason Gustafson
parent
commit
5916ef0227
  1. 46
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

46
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements Closeable { @@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements Closeable {
heartbeatThread.disable();
}
private void closeHeartbeatThread() {
if (heartbeatThread != null) {
heartbeatThread.close();
try {
heartbeatThread.join();
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for consumer heartbeat thread to close");
throw new InterruptException(e);
}
}
}
// visible for testing. Joins the group without starting the heartbeat thread.
void joinGroupIfNeeded() {
while (needRejoin() || rejoinIncomplete()) {
@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements Closeable { @@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements Closeable {
close(0);
}
protected synchronized void close(long timeoutMs) {
if (heartbeatThread != null)
heartbeatThread.close();
maybeLeaveGroup();
// At this point, there may be pending commits (async commits or sync commits that were
// interrupted using wakeup) and the leave group request which have been queued, but not
// yet sent to the broker. Wait up to close timeout for these pending requests to be processed.
// If coordinator is not known, requests are aborted.
Node coordinator = coordinator();
if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs))
log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.",
client.pendingRequestCount(coordinator), groupId);
protected void close(long timeoutMs) {
try {
closeHeartbeatThread();
} finally {
// Synchronize after closing the heartbeat thread since heartbeat thread
// needs this lock to complete and terminate after close flag is set.
synchronized (this) {
maybeLeaveGroup();
// At this point, there may be pending commits (async commits or sync commits that were
// interrupted using wakeup) and the leave group request which have been queued, but not
// yet sent to the broker. Wait up to close timeout for these pending requests to be processed.
// If coordinator is not known, requests are aborted.
Node coordinator = coordinator();
if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs))
log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.",
client.pendingRequestCount(coordinator), groupId);
}
}
}
/**

Loading…
Cancel
Save