Browse Source

Add SubProtocolErrorHandler

Issue: SPR-12732
pull/800/head
Rossen Stoyanchev 10 years ago
parent
commit
696a010e81
  1. 10
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  2. 75
      spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandler.java
  3. 91
      spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java
  4. 56
      spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolErrorHandler.java
  5. 74
      spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandlerTests.java

10
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -572,7 +572,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -572,7 +572,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void afterConnectFailure(Throwable ex) {
handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex);
handleTcpConnectionFailure("Failed to connect: " + ex.getMessage(), ex);
}
/**
@ -675,8 +675,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -675,8 +675,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public void onSuccess(Void result) {
}
public void onFailure(Throwable ex) {
String error = "failed to forward heartbeat in \"system\" session.";
handleTcpConnectionFailure(error, ex);
handleTcpConnectionFailure(
"Failed to forward heartbeat: " + ex.getMessage(), ex);
}
});
}
@ -688,7 +688,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -688,7 +688,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
this.tcpConnection.onReadInactivity(new Runnable() {
@Override
public void run() {
handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null);
handleTcpConnectionFailure("No messages received in " + interval + " ms.", null);
}
}, interval);
}
@ -697,7 +697,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -697,7 +697,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void handleFailure(Throwable ex) {
if (this.tcpConnection != null) {
handleTcpConnectionFailure("transport failure.", ex);
handleTcpConnectionFailure("Transport failure: " + ex.getMessage(), ex);
}
else if (logger.isErrorEnabled()) {
logger.error("Transport failure: " + ex);

75
spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandler.java

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;
/**
* A {@link SubProtocolErrorHandler} for use with STOMP.
*
* @author Rossen Stoyanchev
* @since 4.2
*/
public class StompSubProtocolErrorHandler implements SubProtocolErrorHandler<byte[]> {
private static byte[] EMPTY_PAYLOAD = new byte[0];
@Override
public Message<byte[]> handleClientMessageProcessingError(Message<byte[]> clientMessage, Throwable ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setMessage(ex.getMessage());
accessor.setLeaveMutable(true);
StompHeaderAccessor clientHeaderAccessor = null;
if (clientMessage != null) {
clientHeaderAccessor = MessageHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
String receiptId = clientHeaderAccessor.getReceipt();
if (receiptId != null) {
accessor.setReceiptId(receiptId);
}
}
return handleInternal(accessor, EMPTY_PAYLOAD, ex, clientHeaderAccessor);
}
@Override
public Message<byte[]> handleErrorMessageToClient(Message<byte[]> errorMessage) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class);
Assert.notNull(accessor, "Expected STOMP headers.");
if (!accessor.isMutable()) {
accessor = StompHeaderAccessor.wrap(errorMessage);
}
return handleInternal(accessor, errorMessage.getPayload(), null, null);
}
@SuppressWarnings("unused")
protected Message<byte[]> handleInternal(StompHeaderAccessor errorHeaderAccessor,
byte[] errorPayload, Throwable cause, StompHeaderAccessor clientHeaderAccessor) {
return MessageBuilder.createMessage(errorPayload, errorHeaderAccessor.getMessageHeaders());
}
}

91
spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

@ -89,6 +89,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -89,6 +89,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private StompSubProtocolErrorHandler errorHandler;
private int messageSizeLimit = 64 * 1024;
private final StompEncoder stompEncoder = new StompEncoder();
@ -106,6 +108,24 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -106,6 +108,24 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
private final Stats stats = new Stats();
/**
* Configure a handler for error messages sent to clients which allows
* customizing the error messages or preventing them from being sent.
* <p>By default this isn't configured in which case an ERROR frame is sent
* with a message header reflecting the error.
* @param errorHandler the error handler
*/
public void setErrorHandler(StompSubProtocolErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Return the configured error handler.
*/
public StompSubProtocolErrorHandler getErrorHandler() {
return this.errorHandler;
}
/**
* Configure the maximum size allowed for an incoming STOMP message.
* Since a STOMP message can be received in multiple WebSocket messages,
@ -205,7 +225,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -205,7 +225,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
logger.error("Failed to parse " + webSocketMessage +
" in session " + session.getId() + ". Sending STOMP ERROR to client.", ex);
}
sendErrorMessage(session, ex);
handleError(session, ex, null);
return;
}
@ -257,12 +277,48 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -257,12 +277,48 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
catch (Throwable ex) {
logger.error("Failed to send client message to application via MessageChannel" +
" in session " + session.getId() + ". Sending STOMP ERROR to client.", ex);
sendErrorMessage(session, ex);
handleError(session, ex, message);
}
}
}
@SuppressWarnings("deprecation")
private void handleError(WebSocketSession session, Throwable ex, Message<byte[]> clientMessage) {
if (getErrorHandler() == null) {
sendErrorMessage(session, ex);
return;
}
Message<byte[]> message = getErrorHandler().handleClientMessageProcessingError(clientMessage, ex);
if (message == null) {
return;
}
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
Assert.notNull(accessor, "Expected STOMP headers.");
sendToClient(session, accessor, message.getPayload());
}
/**
* Invoked when no
* {@link #setErrorHandler(StompSubProtocolErrorHandler) errorHandler} is
* configured to send an ERROR frame to the client.
* @deprecated as of 4.2 this method is deprecated in favor of
* {@link #setErrorHandler(StompSubProtocolErrorHandler) configuring} a
* {@code StompSubProtocolErrorHandler}.
*/
@Deprecated
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
headerAccessor.setMessage(error.getMessage());
byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD);
try {
session.sendMessage(new TextMessage(bytes));
}
catch (Throwable ex) {
// Could be part of normal workflow (e.g. browser tab closed)
logger.debug("Failed to send STOMP ERROR to client.", ex);
}
}
private boolean detectImmutableMessageInterceptor(MessageChannel channel) {
if (this.immutableMessageInterceptorPresent != null) {
return this.immutableMessageInterceptorPresent;
@ -288,19 +344,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -288,19 +344,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
}
}
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
headerAccessor.setMessage(error.getMessage());
byte[] bytes = this.stompEncoder.encode(headerAccessor.getMessageHeaders(), EMPTY_PAYLOAD);
try {
session.sendMessage(new TextMessage(bytes));
}
catch (Throwable ex) {
// Could be part of normal workflow (e.g. browser tab closed)
logger.debug("Failed to send STOMP ERROR to client.", ex);
}
}
/**
* Handle STOMP messages going back out to WebSocket clients.
*/
@ -339,8 +382,22 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE @@ -339,8 +382,22 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
}
}
}
byte[] payload = (byte[]) message.getPayload();
if (StompCommand.ERROR.equals(command) && getErrorHandler() != null) {
Message<byte[]> errorMessage = getErrorHandler().handleErrorMessageToClient((Message<byte[]>) message);
stompAccessor = MessageHeaderAccessor.getAccessor(errorMessage, StompHeaderAccessor.class);
Assert.notNull(stompAccessor, "Expected STOMP headers.");
payload = errorMessage.getPayload();
}
sendToClient(session, stompAccessor, payload);
}
private void sendToClient(WebSocketSession session, StompHeaderAccessor stompAccessor, byte[] payload) {
StompCommand command = stompAccessor.getCommand();
try {
byte[] payload = (byte[]) message.getPayload();
byte[] bytes = this.stompEncoder.encode(stompAccessor.getMessageHeaders(), payload);
boolean useBinary = (payload.length > 0 && !(session instanceof SockJsSession) &&

56
spring-websocket/src/main/java/org/springframework/web/socket/messaging/SubProtocolErrorHandler.java

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging;
import org.springframework.messaging.Message;
/**
* A contract for handling sub-protocol errors sent to clients.
*
* @author Rossen Stoyanchev
* @since 4.2
*/
public interface SubProtocolErrorHandler<P> {
/**
* Handle errors thrown while processing client messages providing an
* opportunity to prepare the error message or to prevent one from being sent.
*
* <p>Note that the STOMP protocol requires a server to close the connection
* after sending an ERROR frame. To prevent that, a handler could return
* {@code null} and send a message through the broker instead, e.g. via a
* user destination targeting the user.
*
* @param clientMessage the client message related to the error, possibly
* {@code null} if error occurred while parsing a WebSocket message
* @param ex the cause for the error, never {@code null}
* @return the error message to send to the client, or {@code null} in which
* case no message will be sent.
*/
Message<P> handleClientMessageProcessingError(Message<P> clientMessage, Throwable ex);
/**
* Handle errors sent from the server side to clients, e.g. errors from the
* {@link org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler
* "broke relay"} because connectivity failed or the external broker sent an
* error message, etc.
* @param errorMessage the error message, never {@code null}
* @return the error message to send to the client, or {@code null} in which
* case no message will be sent.
*/
Message<P> handleErrorMessageToClient(Message<P> errorMessage);
}

74
spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolErrorHandlerTests.java

@ -0,0 +1,74 @@ @@ -0,0 +1,74 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
/**
* Unit tests for {@link StompSubProtocolErrorHandler}.
* @author Rossen Stoyanchev
*/
public class StompSubProtocolErrorHandlerTests {
private StompSubProtocolErrorHandler handler;
@Before
public void setUp() throws Exception {
this.handler = new StompSubProtocolErrorHandler();
}
@Test
public void handleClientMessageProcessingError() throws Exception {
Exception ex = new Exception("fake exception");
Message<byte[]> actual = this.handler.handleClientMessageProcessingError(null, ex);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
assertNotNull(accessor);
assertEquals(StompCommand.ERROR, accessor.getCommand());
assertEquals(ex.getMessage(), accessor.getMessage());
assertArrayEquals(new byte[0], actual.getPayload());
}
@Test
public void handleClientMessageProcessingErrorWithReceipt() throws Exception {
String receiptId = "123";
StompHeaderAccessor clientHeaderAccessor = StompHeaderAccessor.create(StompCommand.SEND);
clientHeaderAccessor.setReceipt(receiptId);
MessageHeaders clientHeaders = clientHeaderAccessor.getMessageHeaders();
Message<byte[]> clientMessage = MessageBuilder.createMessage(new byte[0], clientHeaders);
Message<byte[]> actual = this.handler.handleClientMessageProcessingError(clientMessage, new Exception());
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
assertNotNull(accessor);
assertEquals(receiptId, accessor.getReceiptId());
}
}
Loading…
Cancel
Save