From 696a010e81c678f449320c3573cc26a0558baa07 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 15 May 2015 18:18:05 -0400 Subject: [PATCH] Add SubProtocolErrorHandler Issue: SPR-12732 --- .../stomp/StompBrokerRelayMessageHandler.java | 10 +- .../StompSubProtocolErrorHandler.java | 75 +++++++++++++++ .../messaging/StompSubProtocolHandler.java | 91 +++++++++++++++---- .../messaging/SubProtocolErrorHandler.java | 56 ++++++++++++ .../StompSubProtocolErrorHandlerTests.java | 74 +++++++++++++++ 5 files changed, 284 insertions(+), 22 deletions(-) create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandler.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolErrorHandler.java create mode 100644 spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandlerTests.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 0766267375..1eb2e49669 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -572,7 +572,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnectFailure(Throwable ex) { - handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex); + handleTcpConnectionFailure("Failed to connect: " + ex.getMessage(), ex); } /** @@ -675,8 +675,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler public void onSuccess(Void result) { } public void onFailure(Throwable ex) { - String error = "failed to forward heartbeat in \"system\" session."; - handleTcpConnectionFailure(error, ex); + handleTcpConnectionFailure( + "Failed to forward heartbeat: " + ex.getMessage(), ex); } }); } @@ -688,7 +688,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.tcpConnection.onReadInactivity(new Runnable() { @Override public void run() { - handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null); + handleTcpConnectionFailure("No messages received in " + interval + " ms.", null); } }, interval); } @@ -697,7 +697,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void handleFailure(Throwable ex) { if (this.tcpConnection != null) { - handleTcpConnectionFailure("transport failure.", ex); + handleTcpConnectionFailure("Transport failure: " + ex.getMessage(), ex); } else if (logger.isErrorEnabled()) { logger.error("Transport failure: " + ex); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandler.java new file mode 100644 index 0000000000..7ce097e3ea --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandler.java @@ -0,0 +1,75 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.socket.messaging; + +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.util.Assert; + +/** + * A {@link SubProtocolErrorHandler} for use with STOMP. + * + * @author Rossen Stoyanchev + * @since 4.2 + */ +public class StompSubProtocolErrorHandler implements SubProtocolErrorHandler { + + private static byte[] EMPTY_PAYLOAD = new byte[0]; + + + @Override + public Message handleClientMessageProcessingError(Message clientMessage, Throwable ex) { + + StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); + accessor.setMessage(ex.getMessage()); + accessor.setLeaveMutable(true); + + StompHeaderAccessor clientHeaderAccessor = null; + if (clientMessage != null) { + clientHeaderAccessor = MessageHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class); + String receiptId = clientHeaderAccessor.getReceipt(); + if (receiptId != null) { + accessor.setReceiptId(receiptId); + } + } + + return handleInternal(accessor, EMPTY_PAYLOAD, ex, clientHeaderAccessor); + } + + @Override + public Message handleErrorMessageToClient(Message errorMessage) { + + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class); + Assert.notNull(accessor, "Expected STOMP headers."); + + if (!accessor.isMutable()) { + accessor = StompHeaderAccessor.wrap(errorMessage); + } + + return handleInternal(accessor, errorMessage.getPayload(), null, null); + } + + @SuppressWarnings("unused") + protected Message handleInternal(StompHeaderAccessor errorHeaderAccessor, + byte[] errorPayload, Throwable cause, StompHeaderAccessor clientHeaderAccessor) { + + return MessageBuilder.createMessage(errorPayload, errorHeaderAccessor.getMessageHeaders()); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java index 9b4ad470bd..e9baeca42b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java @@ -89,6 +89,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE private static final byte[] EMPTY_PAYLOAD = new byte[0]; + private StompSubProtocolErrorHandler errorHandler; + private int messageSizeLimit = 64 * 1024; private final StompEncoder stompEncoder = new StompEncoder(); @@ -106,6 +108,24 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE private final Stats stats = new Stats(); + /** + * Configure a handler for error messages sent to clients which allows + * customizing the error messages or preventing them from being sent. + *

By default this isn't configured in which case an ERROR frame is sent + * with a message header reflecting the error. + * @param errorHandler the error handler + */ + public void setErrorHandler(StompSubProtocolErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + /** + * Return the configured error handler. + */ + public StompSubProtocolErrorHandler getErrorHandler() { + return this.errorHandler; + } + /** * Configure the maximum size allowed for an incoming STOMP message. * Since a STOMP message can be received in multiple WebSocket messages, @@ -205,7 +225,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE logger.error("Failed to parse " + webSocketMessage + " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); } - sendErrorMessage(session, ex); + handleError(session, ex, null); return; } @@ -257,12 +277,48 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE catch (Throwable ex) { logger.error("Failed to send client message to application via MessageChannel" + " in session " + session.getId() + ". Sending STOMP ERROR to client.", ex); - sendErrorMessage(session, ex); - + handleError(session, ex, message); } } } + @SuppressWarnings("deprecation") + private void handleError(WebSocketSession session, Throwable ex, Message clientMessage) { + if (getErrorHandler() == null) { + sendErrorMessage(session, ex); + return; + } + Message message = getErrorHandler().handleClientMessageProcessingError(clientMessage, ex); + if (message == null) { + return; + } + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + Assert.notNull(accessor, "Expected STOMP headers."); + sendToClient(session, accessor, message.getPayload()); + } + + /** + * Invoked when no + * {@link #setErrorHandler(StompSubProtocolErrorHandler) errorHandler} is + * configured to send an ERROR frame to the client. + * @deprecated as of 4.2 this method is deprecated in favor of + * {@link #setErrorHandler(StompSubProtocolErrorHandler) configuring} a + * {@code StompSubProtocolErrorHandler}. + */ + @Deprecated + protected void sendErrorMessage(WebSocketSession session, Throwable error) { + StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); + headerAccessor.setMessage(error.getMessage()); + byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD); + try { + session.sendMessage(new TextMessage(bytes)); + } + catch (Throwable ex) { + // Could be part of normal workflow (e.g. browser tab closed) + logger.debug("Failed to send STOMP ERROR to client.", ex); + } + } + private boolean detectImmutableMessageInterceptor(MessageChannel channel) { if (this.immutableMessageInterceptorPresent != null) { return this.immutableMessageInterceptorPresent; @@ -288,19 +344,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } - protected void sendErrorMessage(WebSocketSession session, Throwable error) { - StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR); - headerAccessor.setMessage(error.getMessage()); - byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD); - try { - session.sendMessage(new TextMessage(bytes)); - } - catch (Throwable ex) { - // Could be part of normal workflow (e.g. browser tab closed) - logger.debug("Failed to send STOMP ERROR to client.", ex); - } - } - /** * Handle STOMP messages going back out to WebSocket clients. */ @@ -339,8 +382,22 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE } } } + + byte[] payload = (byte[]) message.getPayload(); + + if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) { + Message errorMessage = getErrorHandler().handleErrorMessageToClient((Message) message); + stompAccessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class); + Assert.notNull(stompAccessor, "Expected STOMP headers."); + payload = errorMessage.getPayload(); + } + + sendToClient(session, stompAccessor, payload); + } + + private void sendToClient(WebSocketSession session, StompHeaderAccessor stompAccessor, byte[] payload) { + StompCommand command = stompAccessor.getCommand(); try { - byte[] payload = (byte[]) message.getPayload(); byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), payload); boolean useBinary = (payload.length > 0 && !(session instanceof SockJsSession) && diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolErrorHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolErrorHandler.java new file mode 100644 index 0000000000..9f0d28e27f --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolErrorHandler.java @@ -0,0 +1,56 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.socket.messaging; + +import org.springframework.messaging.Message; + +/** + * A contract for handling sub-protocol errors sent to clients. + * + * @author Rossen Stoyanchev + * @since 4.2 + */ +public interface SubProtocolErrorHandler

{ + + /** + * Handle errors thrown while processing client messages providing an + * opportunity to prepare the error message or to prevent one from being sent. + * + *

Note that the STOMP protocol requires a server to close the connection + * after sending an ERROR frame. To prevent that, a handler could return + * {@code null} and send a message through the broker instead, e.g. via a + * user destination targeting the user. + * + * @param clientMessage the client message related to the error, possibly + * {@code null} if error occurred while parsing a WebSocket message + * @param ex the cause for the error, never {@code null} + * @return the error message to send to the client, or {@code null} in which + * case no message will be sent. + */ + Message

handleClientMessageProcessingError(Message

clientMessage, Throwable ex); + + /** + * Handle errors sent from the server side to clients, e.g. errors from the + * {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler + * "broke relay"} because connectivity failed or the external broker sent an + * error message, etc. + * @param errorMessage the error message, never {@code null} + * @return the error message to send to the client, or {@code null} in which + * case no message will be sent. + */ + Message

handleErrorMessageToClient(Message

errorMessage); + +} diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandlerTests.java new file mode 100644 index 0000000000..fe931c8d03 --- /dev/null +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandlerTests.java @@ -0,0 +1,74 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.socket.messaging; + +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.Test; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.MessageHeaderAccessor; + +/** + * Unit tests for {@link StompSubProtocolErrorHandler}. + * @author Rossen Stoyanchev + */ +public class StompSubProtocolErrorHandlerTests { + + private StompSubProtocolErrorHandler handler; + + + @Before + public void setUp() throws Exception { + this.handler = new StompSubProtocolErrorHandler(); + } + + + @Test + public void handleClientMessageProcessingError() throws Exception { + + Exception ex = new Exception("fake exception"); + Message actual = this.handler.handleClientMessageProcessingError(null, ex); + + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class); + assertNotNull(accessor); + assertEquals(StompCommand.ERROR, accessor.getCommand()); + assertEquals(ex.getMessage(), accessor.getMessage()); + assertArrayEquals(new byte[0], actual.getPayload()); + } + + @Test + public void handleClientMessageProcessingErrorWithReceipt() throws Exception { + + String receiptId = "123"; + StompHeaderAccessor clientHeaderAccessor = StompHeaderAccessor.create(StompCommand.SEND); + clientHeaderAccessor.setReceipt(receiptId); + MessageHeaders clientHeaders = clientHeaderAccessor.getMessageHeaders(); + Message clientMessage = MessageBuilder.createMessage(new byte[0], clientHeaders); + Message actual = this.handler.handleClientMessageProcessingError(clientMessage, new Exception()); + + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class); + assertNotNull(accessor); + assertEquals(receiptId, accessor.getReceiptId()); + } + + +}