Browse Source

Interrupt listener invoker threads on shutdown (after initial wait step)

Issue: SPR-16536
pull/1708/head
Juergen Hoeller 7 years ago
parent
commit
95aad9cdc2
  1. 49
      spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java

49
spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -563,21 +563,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -563,21 +563,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
logger.debug("Waiting for shutdown of message listener invokers");
try {
synchronized (this.lifecycleMonitor) {
// Waiting for AsyncMessageListenerInvokers to deactivate themselves...
long receiveTimeout = getReceiveTimeout();
long waitStartTime = System.currentTimeMillis();
int waitCount = 0;
while (this.activeInvokerCount > 0) {
if (waitCount > 0 && !isAcceptMessagesWhileStopping() &&
System.currentTimeMillis() - waitStartTime >= receiveTimeout) {
// Unexpectedly some invokers are still active after the receive timeout period
// -> interrupt remaining receive attempts since we'd reject the messages anyway
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.interruptIfNecessary();
}
}
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers");
" message listener invokers (iteration " + waitCount + ")");
}
long timeout = getReceiveTimeout();
if (timeout > 0) {
this.lifecycleMonitor.wait(timeout);
// Wait for AsyncMessageListenerInvokers to deactivate themselves...
if (receiveTimeout > 0) {
this.lifecycleMonitor.wait(receiveTimeout);
}
else {
this.lifecycleMonitor.wait();
}
waitCount++;
}
// Clear remaining scheduled invokers, possibly left over as paused tasks...
// Clear remaining scheduled invokers, possibly left over as paused tasks
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.clearResources();
}
@ -1050,6 +1061,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1050,6 +1061,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private volatile boolean idle = true;
@Nullable
private volatile Thread currentReceiveThread;
@Override
public void run() {
synchronized (lifecycleMonitor) {
@ -1169,10 +1183,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1169,10 +1183,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
this.currentReceiveThread = Thread.currentThread();
try {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
}
finally {
this.currentReceiveThread = null;
}
}
private void decreaseActiveInvokerCount() {
@ -1207,6 +1227,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe @@ -1207,6 +1227,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
private void interruptIfNecessary() {
Thread currentReceiveThread = this.currentReceiveThread;
if (currentReceiveThread != null && !currentReceiveThread.isInterrupted()) {
currentReceiveThread.interrupt();
}
}
private void clearResources() {
if (sharedConnectionEnabled()) {
synchronized (sharedConnectionMonitor) {

Loading…
Cancel
Save