From 95aad9cdc2eb3a9811d117a97bb1e96cf9787017 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 28 Feb 2018 00:06:46 +0100 Subject: [PATCH] Interrupt listener invoker threads on shutdown (after initial wait step) Issue: SPR-16536 --- .../DefaultMessageListenerContainer.java | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index 77a03a8f3d..17066f24bb 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -563,21 +563,32 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe logger.debug("Waiting for shutdown of message listener invokers"); try { synchronized (this.lifecycleMonitor) { - // Waiting for AsyncMessageListenerInvokers to deactivate themselves... + long receiveTimeout = getReceiveTimeout(); + long waitStartTime = System.currentTimeMillis(); + int waitCount = 0; while (this.activeInvokerCount > 0) { + if (waitCount > 0 && !isAcceptMessagesWhileStopping() && + System.currentTimeMillis() - waitStartTime >= receiveTimeout) { + // Unexpectedly some invokers are still active after the receive timeout period + // -> interrupt remaining receive attempts since we'd reject the messages anyway + for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { + scheduledInvoker.interruptIfNecessary(); + } + } if (logger.isDebugEnabled()) { logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + - " message listener invokers"); + " message listener invokers (iteration " + waitCount + ")"); } - long timeout = getReceiveTimeout(); - if (timeout > 0) { - this.lifecycleMonitor.wait(timeout); + // Wait for AsyncMessageListenerInvokers to deactivate themselves... + if (receiveTimeout > 0) { + this.lifecycleMonitor.wait(receiveTimeout); } else { this.lifecycleMonitor.wait(); } + waitCount++; } - // Clear remaining scheduled invokers, possibly left over as paused tasks... + // Clear remaining scheduled invokers, possibly left over as paused tasks for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { scheduledInvoker.clearResources(); } @@ -1050,6 +1061,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private volatile boolean idle = true; + @Nullable + private volatile Thread currentReceiveThread; + @Override public void run() { synchronized (lifecycleMonitor) { @@ -1169,10 +1183,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } private boolean invokeListener() throws JMSException { - initResourcesIfNecessary(); - boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); - this.lastMessageSucceeded = true; - return messageReceived; + this.currentReceiveThread = Thread.currentThread(); + try { + initResourcesIfNecessary(); + boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); + this.lastMessageSucceeded = true; + return messageReceived; + } + finally { + this.currentReceiveThread = null; + } } private void decreaseActiveInvokerCount() { @@ -1207,6 +1227,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } + private void interruptIfNecessary() { + Thread currentReceiveThread = this.currentReceiveThread; + if (currentReceiveThread != null && !currentReceiveThread.isInterrupted()) { + currentReceiveThread.interrupt(); + } + } + private void clearResources() { if (sharedConnectionEnabled()) { synchronized (sharedConnectionMonitor) {