Browse Source

Add ExecutorChannelInterceptor

Issue: SPR-11968
pull/588/merge
Rossen Stoyanchev 11 years ago
parent
commit
2f371e5aeb
  1. 47
      spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java
  2. 131
      spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java
  3. 127
      spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java

47
spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java

@ -0,0 +1,47 @@ @@ -0,0 +1,47 @@
package org.springframework.messaging.support;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* An extension of {@link ChannelInterceptor} with callbacks to intercept the
* asynchronous sending of a {@link org.springframework.messaging.Message} to a
* specific subscriber through an {@link java.util.concurrent.Executor}.
* Supported on {@link org.springframework.messaging.MessageChannel}
* implementations that can be configured with an Executor.
*
* @author Rossen Stoyanchev
* @since 4.1
*/
public interface ExecutorChannelInterceptor extends ChannelInterceptor {
/**
* Invoked inside the {@link Runnable} submitted to the Executor just before
* calling the target MessageHandler to handle the message. Allows for
* modification of the Message if necessary or when {@code null} is returned
* the MessageHandler is not invoked.
*
* @param message the message to be handled
* @param channel the channel on which the message was sent to
* @param handler the target handler to handle the message
* @return the input message, or a new instance, or {@code null}
*/
Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler);
/**
* Invoked inside the {@link Runnable} submitted to the Executor after calling
* the target MessageHandler regardless of the outcome (i.e. Exception raised
* or not) thus allowing for proper resource cleanup.
*
* <p>Note that this will be invoked only if beforeHandle successfully completed
* and returned a Message, i.e. it did not return {@code null}.
*
* @param message the message handled
* @param channel the channel on which the message was sent to
* @param handler the target handler that handled the message
* @param ex any exception that may been raised by the handler
*/
void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex);
}

131
spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorSubscribableChannel.java

@ -16,10 +16,15 @@ @@ -16,10 +16,15 @@
package org.springframework.messaging.support;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
/**
@ -33,6 +38,8 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { @@ -33,6 +38,8 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
private final Executor executor;
private final List<ExecutorChannelInterceptor> executorInterceptors = new ArrayList<ExecutorChannelInterceptor>(4);
/**
* Create a new {@link ExecutorSubscribableChannel} instance where messages will be sent
@ -57,22 +64,130 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { @@ -57,22 +64,130 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
return this.executor;
}
@Override
public void setInterceptors(List<ChannelInterceptor> interceptors) {
super.setInterceptors(interceptors);
this.executorInterceptors.clear();
for (ChannelInterceptor interceptor : interceptors) {
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
}
}
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
super.addInterceptor(interceptor);
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptors.add((ExecutorChannelInterceptor) interceptor);
}
}
@Override
public boolean sendInternal(final Message<?> message, long timeout) {
for (final MessageHandler handler : getSubscribers()) {
for (MessageHandler subscriber : getSubscribers()) {
ExecutorChannelInterceptorChain chain = new ExecutorChannelInterceptorChain();
SendTask sendTask = new SendTask(message, this, subscriber, chain);
if (this.executor == null) {
handler.handleMessage(message);
sendTask.run();
}
else {
this.executor.execute(new Runnable() {
@Override
public void run() {
handler.handleMessage(message);
}
});
this.executor.execute(sendTask);
}
}
return true;
}
/**
* Helps with the invocation of the target MessageHandler and interceptors.
*/
private static class SendTask implements Runnable {
private final Message<?> inputMessage;
private final MessageChannel channel;
private final MessageHandler handler;
private final ExecutorChannelInterceptorChain chain;
public SendTask(Message<?> message, MessageChannel channel, MessageHandler handler,
ExecutorChannelInterceptorChain chain) {
this.inputMessage = message;
this.channel = channel;
this.handler = handler;
this.chain = chain;
}
@Override
public void run() {
Message<?> message = this.inputMessage;
try {
message = chain.applyBeforeHandle(message, this.channel, this.handler);
if (message == null) {
return;
}
this.handler.handleMessage(message);
this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, null);
}
catch (Exception ex) {
this.chain.triggerAfterMessageHandled(message, this.channel, this.handler, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(message,
"Failed to handle message to " + this.channel + " in " + this.handler, ex);
}
catch (Error ex) {
this.chain.triggerAfterMessageHandled(message, this.channel, this.handler,
new MessageDeliveryException(message,
"Failed to handle message to " + this.channel + " in " + this.handler, ex));
throw ex;
}
}
}
/**
* Helps with the invocation of configured executor channel interceptors.
*/
private class ExecutorChannelInterceptorChain {
private int interceptorIndex = -1;
public Message<?> applyBeforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
for (ExecutorChannelInterceptor interceptor : executorInterceptors) {
message = interceptor.beforeHandle(message, channel, handler);
if (message == null) {
String name = interceptor.getClass().getSimpleName();
if (logger.isDebugEnabled()) {
logger.debug(name + " returned null from beforeHandle, i.e. precluding the send.");
}
triggerAfterMessageHandled(message, channel, handler, null);
return null;
}
this.interceptorIndex++;
}
return message;
}
public void triggerAfterMessageHandled(Message<?> message, MessageChannel channel,
MessageHandler handler, Exception ex) {
for (int i = this.interceptorIndex; i >= 0; i--) {
ExecutorChannelInterceptor interceptor = executorInterceptors.get(i);
try {
interceptor.afterMessageHandled(message, channel, handler, ex);
}
catch (Throwable ex2) {
logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
}
}
}
}
}

127
spring-messaging/src/test/java/org/springframework/messaging/support/ExecutorSubscribableChannelTests.java

@ -26,11 +26,15 @@ import org.mockito.Mock; @@ -26,11 +26,15 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.BDDMockito.*;
/**
@ -70,21 +74,29 @@ public class ExecutorSubscribableChannelTests { @@ -70,21 +74,29 @@ public class ExecutorSubscribableChannelTests {
@Test
public void sendWithoutExecutor() {
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verify(this.handler).handleMessage(this.message);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
@Test
public void sendWithExecutor() throws Exception {
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
TaskExecutor executor = mock(TaskExecutor.class);
this.channel = new ExecutorSubscribableChannel(executor);
this.channel.subscribe(this.handler);
this.channel.send(this.message);
ExecutorSubscribableChannel testChannel = new ExecutorSubscribableChannel(executor);
testChannel.addInterceptor(interceptor);
testChannel.subscribe(this.handler);
testChannel.send(this.message);
verify(executor).execute(this.runnableCaptor.capture());
verify(this.handler, never()).handleMessage(this.message);
this.runnableCaptor.getValue().run();
verify(this.handler).handleMessage(this.message);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
@Test
@ -128,4 +140,113 @@ public class ExecutorSubscribableChannelTests { @@ -128,4 +140,113 @@ public class ExecutorSubscribableChannelTests {
verify(this.handler).handleMessage(this.message);
}
@Test
public void interceptorWithModifiedMessage() {
Message<?> expected = mock(Message.class);
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
interceptor.setMessageToReturn(expected);
this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verify(this.handler).handleMessage(expected);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
@Test
public void interceptorWithNull() {
BeforeHandleInterceptor interceptor1 = new BeforeHandleInterceptor();
NullReturningBeforeHandleInterceptor interceptor2 = new NullReturningBeforeHandleInterceptor();
this.channel.addInterceptor(interceptor1);
this.channel.addInterceptor(interceptor2);
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verifyNoMoreInteractions(this.handler);
assertEquals(1, interceptor1.getCounter().get());
assertEquals(1, interceptor2.getCounter().get());
assertTrue(interceptor1.wasAfterHandledInvoked());
}
@Test
public void interceptorWithException() {
IllegalStateException expected = new IllegalStateException("Fake exception");
doThrow(expected).when(this.handler).handleMessage(this.message);
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
try {
this.channel.send(this.message);
}
catch (MessageDeliveryException actual) {
assertSame(expected, actual.getCause());
}
verify(this.handler).handleMessage(this.message);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
private abstract static class AbstractTestInterceptor extends ChannelInterceptorAdapter
implements ExecutorChannelInterceptor {
private AtomicInteger counter = new AtomicInteger();
private volatile boolean afterHandledInvoked;
public AtomicInteger getCounter() {
return this.counter;
}
public boolean wasAfterHandledInvoked() {
return this.afterHandledInvoked;
}
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
assertNotNull(message);
counter.incrementAndGet();
return message;
}
@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
this.afterHandledInvoked = true;
}
}
private static class BeforeHandleInterceptor extends AbstractTestInterceptor {
private Message<?> messageToReturn;
private RuntimeException exceptionToRaise;
public void setMessageToReturn(Message<?> messageToReturn) {
this.messageToReturn = messageToReturn;
}
public void setExceptionToRaise(RuntimeException exception) {
this.exceptionToRaise = exception;
}
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
super.beforeHandle(message, channel, handler);
if (this.exceptionToRaise != null) {
throw this.exceptionToRaise;
}
return (this.messageToReturn != null ? this.messageToReturn : message);
}
}
private static class NullReturningBeforeHandleInterceptor extends AbstractTestInterceptor {
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
super.beforeHandle(message, channel, handler);
return null;
}
}
}

Loading…
Cancel
Save