diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index eb6c46aa8d..6323e0fa78 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -188,6 +188,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private int maxMessagesPerTask = Integer.MIN_VALUE; + private int idleReceivesPerTaskLimit = Integer.MIN_VALUE; + private int idleConsumerLimit = 1; private int idleTaskExecutionLimit = 1; @@ -439,6 +441,44 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } + /** + * Return the maximum number of subsequent idle (or null) messages to receive in a single task. + */ + public int getIdleReceivesPerTaskLimit() { + synchronized (this.lifecycleMonitor) { + return idleReceivesPerTaskLimit; + } + } + + /** + * Marks the consumer as 'idle' after the specified number of idle receives + * have been reached. An idle receive is counted from the moment a null message + * is returned by the receiver after the potential {@link #setReceiveTimeout(long)} + * elapsed. This gives the opportunity to check if the idle task count exceeds + * {@link #setIdleTaskExecutionLimit(int)} and based on that decide if the task needs + * to be re-scheduled or not, saving resources that would otherwise be held. + *

This setting differs from {@link #setMaxMessagesPerTask(int)} where the task + * is released and re-scheduled after this limit is reached, no matter if the the received + * messages were null or non-null messages. This setting alone can be inflexible if one + * desires to have a large enough batch for each task but requires a quick(er) release + * from the moment there are no more messages to process.

This setting differs from + * {@link #setIdleTaskExecutionLimit(int)} where this limit decides after how many iterations + * of being marked as idle, a task is released.

For example; if + * {@link #setMaxMessagesPerTask(int)} is set to '500' and + * {@link #setIdleReceivesPerTaskLimit(int)} is set to '60' and {@link #setReceiveTimeout(long)} + * is set to '1000' and {@link #setIdleTaskExecutionLimit(int)} is set to '1', then 500 messages + * per task would be processed unless there is a subsequent number of 60 idle messages received, + * the task would be marked as idle and released. This also means that after the last message was + * processed, the task would be released after 60seconds as long as no new messages appear. + * @param idleReceivesPerTaskLimit {@link Integer#MIN_VALUE} to disable, any value > 0 to enable releasing the + */ + public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) { + Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)"); + synchronized (this.lifecycleMonitor) { + this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit; + } + } + /** * Specify the limit for the number of consumers that are allowed to be idle * at any given time. @@ -1072,13 +1112,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } boolean messageReceived = false; try { - if (maxMessagesPerTask < 0) { + if (maxMessagesPerTask < 0 && idleReceivesPerTaskLimit < 0) { messageReceived = executeOngoingLoop(); } else { + int idleMessagesReceived = 0; int messageCount = 0; - while (isRunning() && messageCount < maxMessagesPerTask) { - messageReceived = (invokeListener() || messageReceived); + while (isRunning() && (messageCount < maxMessagesPerTask) && (idleMessagesReceived < idleReceivesPerTaskLimit)) { + boolean messageReceivedThisInvocation = invokeListener(); + idleMessagesReceived = messageReceivedThisInvocation ? 0 : idleMessagesReceived + 1; + messageReceived |= messageReceivedThisInvocation; messageCount++; } }