From a473d46e1c472309807e7376255aef5d60ca174e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 10 Mar 2014 16:19:13 -0400 Subject: [PATCH] Tighten connection management in STOMP broker relay Fix NPE exception when closing TcpConnection Ensure a ConnectionHandler is cleared when a TcpConnection is closed (at the same time), logging an exception if the closing fails. Improve error messages. Issue: SPR-11531 --- .../stomp/StompBrokerRelayMessageHandler.java | 114 ++++++++++++------ ...erRelayMessageHandlerIntegrationTests.java | 3 +- 2 files changed, 75 insertions(+), 42 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 70ad6b439c..4ce89dde73 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package org.springframework.messaging.simp.stomp; import java.util.Collection; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import org.springframework.messaging.Message; @@ -36,7 +35,6 @@ import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; -import org.springframework.util.concurrent.ListenableFutureTask; /** * A {@link org.springframework.messaging.MessageHandler} that handles messages by forwarding them to a STOMP broker. @@ -354,10 +352,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler for (StompConnectionHandler handler : this.connectionHandlers.values()) { try { - handler.resetTcpConnection(); + handler.clearConnection(); } catch (Throwable t) { - logger.error("Failed to close STOMP connection " + t.getMessage()); + logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage()); } } try { @@ -410,7 +408,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpClient.connect(handler); } else if (SimpMessageType.DISCONNECT.equals(messageType)) { - StompConnectionHandler handler = removeConnectionHandler(sessionId); + StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isTraceEnabled()) { logger.trace("Connection already removed for sessionId=" + sessionId); @@ -422,18 +420,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler else { StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { - logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message"); + if (logger.isWarnEnabled()) { + logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message"); + } return; } handler.forward(message); } } - private StompConnectionHandler removeConnectionHandler(String sessionId) { - return SystemStompConnectionHandler.SESSION_ID.equals(sessionId) - ? null : this.connectionHandlers.remove(sessionId); - } - private class StompConnectionHandler implements TcpConnectionHandler { @@ -489,20 +484,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (logger.isErrorEnabled()) { logger.error(errorMessage + ", sessionId=" + this.sessionId, ex); } - resetTcpConnection(); - sendStompErrorToClient(errorMessage); + try { + sendStompErrorToClient(errorMessage); + } + finally { + try { + clearConnection(); + } + catch (Throwable t) { + if (logger.isErrorEnabled()) { + logger.error("Failed to close connection: " + t.getMessage()); + } + } + } } private void sendStompErrorToClient(String errorText) { if (this.isRemoteClientSession) { - StompConnectionHandler removed = removeConnectionHandler(this.sessionId); - if (removed != null) { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); - headers.setSessionId(this.sessionId); - headers.setMessage(errorText); - Message errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); - sendMessageToClient(errorMessage); - } + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); + headers.setSessionId(this.sessionId); + headers.setMessage(errorText); + Message errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); + sendMessageToClient(errorMessage); } } @@ -587,21 +590,39 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnectionClosed() { - sendStompErrorToClient("Connection to broker closed"); + if (this.tcpConnection == null) { + return; + } + try { + sendStompErrorToClient("Connection to broker closed"); + } + finally { + try { + clearConnection(); + } + catch (Throwable t) { + if (logger.isErrorEnabled()) { + // Ignore + } + } + } } public ListenableFuture forward(final Message message) { if (!this.isStompConnected) { - if (logger.isWarnEnabled()) { - logger.warn("Connection to broker inactive or not ready. Ignoring message"); + if (this.isRemoteClientSession) { + // Should never happen + throw new IllegalStateException("Unexpected client message " + message + + (this.tcpConnection != null ? + "before STOMP CONNECTED frame" : "after TCP connection closed")); + } + else { + throw new IllegalStateException("Cannot forward messages on system connection " + + (this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") + + ". Try listening for BrokerAvailabilityEvent ApplicationContext events."); + } - return new ListenableFutureTask(new Callable() { - @Override - public Void call() throws Exception { - return null; - } - }); } if (logger.isDebugEnabled()) { @@ -622,7 +643,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public void onSuccess(Void result) { StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); if (command == StompCommand.DISCONNECT) { - resetTcpConnection(); + clearConnection(); } } @Override @@ -634,16 +655,29 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return future; } - public void resetTcpConnection() { - TcpConnection conn = this.tcpConnection; + /** + * Close the TCP connection to the broker and release the connection reference, + * Any exception arising from closing the connection is propagated. The caller + * must handle and log the exception accordingly. + * + *

If the connection belongs to a client session, the connection handler + * for the session (basically the current instance) is also released from the + * {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}. + */ + public void clearConnection() { + this.isStompConnected = false; - this.tcpConnection = null; - if (conn != null) { - try { - this.tcpConnection.close(); + + try { + TcpConnection conn = this.tcpConnection; + this.tcpConnection = null; + if (conn != null) { + conn.close(); } - catch (Throwable t) { - // ignore + } + finally { + if (this.isRemoteClientSession) { + StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); } } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index f317ba12bf..3f7750739f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After;