Browse Source

Tighten connection management in STOMP broker relay

Fix NPE exception when closing TcpConnection

Ensure a ConnectionHandler is cleared when a TcpConnection is closed
(at the same time), logging an exception if the closing fails.

Improve error messages.

Issue: SPR-11531
pull/448/merge
Rossen Stoyanchev 11 years ago
parent
commit
a473d46e1c
  1. 114
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 3
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

114
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,6 @@ package org.springframework.messaging.simp.stomp; @@ -18,7 +18,6 @@ package org.springframework.messaging.simp.stomp;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.messaging.Message;
@ -36,7 +35,6 @@ import org.springframework.messaging.tcp.TcpOperations; @@ -36,7 +35,6 @@ import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
* A {@link org.springframework.messaging.MessageHandler} that handles messages by forwarding them to a STOMP broker.
@ -354,10 +352,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -354,10 +352,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
for (StompConnectionHandler handler : this.connectionHandlers.values()) {
try {
handler.resetTcpConnection();
handler.clearConnection();
}
catch (Throwable t) {
logger.error("Failed to close STOMP connection " + t.getMessage());
logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage());
}
}
try {
@ -410,7 +408,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -410,7 +408,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.tcpClient.connect(handler);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
StompConnectionHandler handler = removeConnectionHandler(sessionId);
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isTraceEnabled()) {
logger.trace("Connection already removed for sessionId=" + sessionId);
@ -422,18 +420,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -422,18 +420,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
else {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message");
if (logger.isWarnEnabled()) {
logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message");
}
return;
}
handler.forward(message);
}
}
private StompConnectionHandler removeConnectionHandler(String sessionId) {
return SystemStompConnectionHandler.SESSION_ID.equals(sessionId)
? null : this.connectionHandlers.remove(sessionId);
}
private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
@ -489,20 +484,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -489,20 +484,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (logger.isErrorEnabled()) {
logger.error(errorMessage + ", sessionId=" + this.sessionId, ex);
}
resetTcpConnection();
sendStompErrorToClient(errorMessage);
try {
sendStompErrorToClient(errorMessage);
}
finally {
try {
clearConnection();
}
catch (Throwable t) {
if (logger.isErrorEnabled()) {
logger.error("Failed to close connection: " + t.getMessage());
}
}
}
}
private void sendStompErrorToClient(String errorText) {
if (this.isRemoteClientSession) {
StompConnectionHandler removed = removeConnectionHandler(this.sessionId);
if (removed != null) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(this.sessionId);
headers.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build();
sendMessageToClient(errorMessage);
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
headers.setSessionId(this.sessionId);
headers.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build();
sendMessageToClient(errorMessage);
}
}
@ -587,21 +590,39 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -587,21 +590,39 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void afterConnectionClosed() {
sendStompErrorToClient("Connection to broker closed");
if (this.tcpConnection == null) {
return;
}
try {
sendStompErrorToClient("Connection to broker closed");
}
finally {
try {
clearConnection();
}
catch (Throwable t) {
if (logger.isErrorEnabled()) {
// Ignore
}
}
}
}
public ListenableFuture<Void> forward(final Message<?> message) {
if (!this.isStompConnected) {
if (logger.isWarnEnabled()) {
logger.warn("Connection to broker inactive or not ready. Ignoring message");
if (this.isRemoteClientSession) {
// Should never happen
throw new IllegalStateException("Unexpected client message " + message +
(this.tcpConnection != null ?
"before STOMP CONNECTED frame" : "after TCP connection closed"));
}
else {
throw new IllegalStateException("Cannot forward messages on system connection " +
(this.tcpConnection != null ? "before STOMP CONNECTED frame" : "while inactive") +
". Try listening for BrokerAvailabilityEvent ApplicationContext events.");
}
return new ListenableFutureTask<Void>(new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
});
}
if (logger.isDebugEnabled()) {
@ -622,7 +643,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -622,7 +643,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public void onSuccess(Void result) {
StompCommand command = StompHeaderAccessor.wrap(message).getCommand();
if (command == StompCommand.DISCONNECT) {
resetTcpConnection();
clearConnection();
}
}
@Override
@ -634,16 +655,29 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -634,16 +655,29 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return future;
}
public void resetTcpConnection() {
TcpConnection<byte[]> conn = this.tcpConnection;
/**
* Close the TCP connection to the broker and release the connection reference,
* Any exception arising from closing the connection is propagated. The caller
* must handle and log the exception accordingly.
*
* <p>If the connection belongs to a client session, the connection handler
* for the session (basically the current instance) is also released from the
* {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler}.
*/
public void clearConnection() {
this.isStompConnected = false;
this.tcpConnection = null;
if (conn != null) {
try {
this.tcpConnection.close();
try {
TcpConnection<byte[]> conn = this.tcpConnection;
this.tcpConnection = null;
if (conn != null) {
conn.close();
}
catch (Throwable t) {
// ignore
}
finally {
if (this.isRemoteClientSession) {
StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
}
}
}

3
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch; @@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;

Loading…
Cancel
Save