Browse Source

Fix concurrency issues in SockJS session impls

This change ensures the server "WebSocketHandler" is notified of the
opening of a session before writing the open frame to the remote
handler. Any messages sent by the server "WebSocketHandler" while
getting notified of the opening get cached and flushed after the open
frame has been written.

This change introduces locking in AbtractHttpSockJsSession to guard
access to the HTTP response. The goal is to prevent contention between
client requests to receive messages (i.e. long polling) and
the application trying to write.

Issue: SPR-11916
pull/568/head
Rossen Stoyanchev 11 years ago
parent
commit
fcf6ae8328
  1. 229
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java
  2. 9
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java
  3. 23
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java
  4. 32
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java
  5. 53
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java
  6. 3
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/JettySockJsIntegrationTests.java
  7. 8
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java
  8. 5
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/TestHttpSockJsSession.java
  9. 22
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java

229
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java

@ -30,7 +30,6 @@ import org.springframework.http.HttpHeaders; @@ -30,7 +30,6 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.server.ServerHttpAsyncRequestControl;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketExtension;
import org.springframework.web.socket.WebSocketHandler;
@ -41,7 +40,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; @@ -41,7 +40,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
/**
* An abstract base class for use with HTTP transport based SockJS sessions.
* An abstract base class for use with HTTP transport SockJS sessions.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -64,9 +63,12 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @@ -64,9 +63,12 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
private volatile ServerHttpResponse response;
private volatile SockJsFrameFormat frameFormat;
private volatile ServerHttpAsyncRequestControl asyncRequestControl;
private volatile SockJsFrameFormat frameFormat;
private final Object responseLock = new Object();
private volatile boolean requestInitialized;
@ -124,18 +126,10 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @@ -124,18 +126,10 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
return this.acceptedProtocol;
}
/**
* Return response for the current request, or {@code null} if between requests.
*/
protected ServerHttpResponse getResponse() {
return this.response;
}
/**
* Return the SockJS buffer for messages stored transparently between polling
* requests. If the polling request takes longer than 5 seconds, the session
* will be closed.
*
* is closed.
* @see org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService
*/
protected Queue<String> getMessageCache() {
@ -173,123 +167,104 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @@ -173,123 +167,104 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
return Collections.emptyList();
}
/**
* Whether this HTTP transport streams message frames vs closing the response
* after each frame written (long polling).
*/
protected abstract boolean isStreaming();
/**
* Handle the first HTTP request, i.e. the one that starts a SockJS session.
* Write a prelude to the response (if needed), send the SockJS "open" frame
* to indicate to the client the session is opened, and invoke the
* delegate WebSocketHandler to provide it with the newly opened session.
* <p>
* The "xhr" and "jsonp" (polling-based) transports completes the initial request
* as soon as the open frame is sent. Following that the client should start a
* successive polling request within the same SockJS session.
* <p>
* The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
* based and will leave the initial request open in order to stream one or
* more messages. However, even streaming based transports eventually recycle
* the long running request, after a certain number of bytes have been streamed
* (128K by default), and allow the client to start a successive request within
* the same SockJS session.
* Handle the first request for receiving messages on a SockJS HTTP transport
* based session.
*
* <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request
* after writing the open frame. Streaming-based transports ("xhr_streaming",
* "eventsource", and "htmlfile") leave the response open longer for further
* streaming of message frames but will also close it eventually after some
* amount of data has been sent.
*
* @param request the current request
* @param response the current response
* @param frameFormat the transport-specific SocksJS frame format to use
*
* @see #handleSuccessiveRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
*/
public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsFrameFormat frameFormat) throws SockJsException {
initRequest(request, response, frameFormat);
this.uri = request.getURI();
this.handshakeHeaders = request.getHeaders();
this.principal = request.getPrincipal();
this.localAddress = request.getLocalAddress();
this.remoteAddress = request.getRemoteAddress();
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
try {
// Let "our" handler know before sending the open frame to the remote handler
delegateConnectionEstablished();
writePrelude(request, response);
writeFrame(SockJsFrame.openFrame());
if (isStreaming() && !isClosed()) {
startAsyncRequest();
}
}
catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
}
try {
this.requestInitialized = true;
delegateConnectionEstablished();
}
catch (Throwable ex) {
throw new SockJsException("Unhandled exception from WebSocketHandler", getId(), ex);
throw new SockJsTransportFailureException("Failed to open session", getId(), ex);
}
}
private void initRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsFrameFormat frameFormat) {
Assert.notNull(request, "Request must not be null");
Assert.notNull(response, "Response must not be null");
Assert.notNull(frameFormat, "SockJsFrameFormat must not be null");
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
}
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
private void startAsyncRequest() {
this.asyncRequestControl.start(-1);
if (this.messageCache.size() > 0) {
flushCache();
}
else {
scheduleHeartbeat();
}
this.requestInitialized = true;
}
/**
* Handle all HTTP requests part of the same SockJS session except for the very
* first, initial request. Write a prelude (if needed) and keep the request
* open and ready to send a message from the server to the client.
* <p>
* The "xhr" and "jsonp" (polling-based) transports completes the request when
* the next message is sent, which could be an array of messages cached during
* the time between successive requests, or it could be a heartbeat message
* sent if no other messages were sent (by default within 25 seconds).
* <p>
* The "xhr_streaming", "eventsource", and "htmlfile" transports are streaming
* based and will leave the request open longer in order to stream messages over
* a period of time. However, even streaming based transports eventually recycle
* the long running request, after a certain number of bytes have been streamed
* (128K by default), and allow the client to start a successive request within
* the same SockJS session.
* Handle all requests, except the first one, to receive messages on a SockJS
* HTTP transport based session.
*
* <p>Long polling-based transports (e.g. "xhr", "jsonp") complete the request
* after writing any buffered message frames (or the next one). Streaming-based
* transports ("xhr_streaming", "eventsource", and "htmlfile") leave the
* response open longer for further streaming of message frames but will also
* close it eventually after some amount of data has been sent.
*
* @param request the current request
* @param response the current response
* @param frameFormat the transport-specific SocksJS frame format to use
*
* @see #handleInitialRequest(org.springframework.http.server.ServerHttpRequest, org.springframework.http.server.ServerHttpResponse, org.springframework.web.socket.sockjs.frame.SockJsFrameFormat)
*/
public void handleSuccessiveRequest(ServerHttpRequest request,
ServerHttpResponse response, SockJsFrameFormat frameFormat) throws SockJsException {
initRequest(request, response, frameFormat);
try {
writePrelude(request, response);
}
catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
throw new SockJsTransportFailureException("Failed to send \"open\" frame", getId(), ex);
synchronized (this.responseLock) {
try {
if (isClosed()) {
response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes());
}
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
writePrelude(request, response);
startAsyncRequest();
}
catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
throw new SockJsTransportFailureException("Failed to handle SockJS receive request", getId(), ex);
}
}
startAsyncRequest();
}
protected void startAsyncRequest() throws SockJsException {
try {
this.asyncRequestControl.start(-1);
this.requestInitialized = true;
scheduleHeartbeat();
tryFlushCache();
}
catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
throw new SockJsTransportFailureException("Failed to flush messages", getId(), ex);
}
}
@Override
protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
@ -297,27 +272,35 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @@ -297,27 +272,35 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
tryFlushCache();
}
private void tryFlushCache() throws SockJsTransportFailureException {
if (this.messageCache.isEmpty()) {
logger.trace("Nothing to flush");
return;
}
if (logger.isTraceEnabled()) {
logger.trace(this.messageCache.size() + " message(s) to flush");
}
if (isActive() && this.requestInitialized) {
logger.trace("Flushing messages");
flushCache();
}
else {
private boolean tryFlushCache() throws SockJsTransportFailureException {
synchronized (this.responseLock) {
if (this.messageCache.isEmpty()) {
logger.trace("Nothing to flush in session=" + this.getId());
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Not ready to flush");
logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId());
}
if (isActive() && this.requestInitialized) {
if (logger.isTraceEnabled()) {
logger.trace("Session is active, ready to flush.");
}
cancelHeartbeat();
flushCache();
return true;
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Session is not active, not ready to flush.");
}
return false;
}
}
}
/**
* Only called if the connection is currently active
* Called when the connection is active and ready to write to the response.
* Sub-classes should implement but never call this method directly.
*/
protected abstract void flushCache() throws SockJsTransportFailureException;
@ -327,25 +310,27 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @@ -327,25 +310,27 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
}
protected void resetRequest() {
synchronized (this.responseLock) {
this.requestInitialized = false;
updateLastActiveTime();
if (isActive()) {
ServerHttpAsyncRequestControl control = this.asyncRequestControl;
if (control.isStarted()) {
try {
logger.debug("Completing asynchronous request");
control.complete();
}
catch (Throwable ex) {
logger.error("Failed to complete request: " + ex.getMessage());
this.asyncRequestControl = null;
this.requestInitialized = false;
this.response = null;
updateLastActiveTime();
if (control != null && !control.isCompleted()) {
if (control.isStarted()) {
try {
logger.debug("Completing asynchronous request");
control.complete();
}
catch (Throwable ex) {
logger.error("Failed to complete request: " + ex.getMessage());
}
}
}
}
this.response = null;
this.asyncRequestControl = null;
}
@Override
@ -353,9 +338,15 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @@ -353,9 +338,15 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
if (isActive()) {
String formattedFrame = this.frameFormat.format(frame);
if (logger.isTraceEnabled()) {
logger.trace("Writing " + formattedFrame);
logger.trace("Writing to HTTP response: " + formattedFrame);
}
this.response.getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET));
if (isStreaming()) {
this.response.flush();
}
else {
resetRequest();
}
getResponse().getBody().write(formattedFrame.getBytes(SockJsFrame.CHARSET));
}
}

9
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java

@ -40,6 +40,7 @@ import org.springframework.web.socket.WebSocketMessage; @@ -40,6 +40,7 @@ import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
import org.springframework.web.socket.sockjs.transport.SockJsSession;
@ -142,6 +143,10 @@ public abstract class AbstractSockJsSession implements SockJsSession { @@ -142,6 +143,10 @@ public abstract class AbstractSockJsSession implements SockJsSession {
return this.id;
}
protected SockJsMessageCodec getMessageCodec() {
return this.config.getMessageCodec();
}
public SockJsServiceConfig getSockJsServiceConfig() {
return this.config;
}
@ -420,7 +425,9 @@ public abstract class AbstractSockJsSession implements SockJsSession { @@ -420,7 +425,9 @@ public abstract class AbstractSockJsSession implements SockJsSession {
@Override
public String toString() {
return "SockJS session id=" + this.id;
long currentTime = System.currentTimeMillis();
return "SockJsSession[id=" + this.id + ", state=" + this.state + ", sinceCreated=" +
(currentTime - this.timeCreated) + ", sinceLastActive=" + (currentTime - this.timeLastActive) + "]";
}

23
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/PollingSockJsSession.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.web.socket.sockjs.transport.session;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -33,6 +35,7 @@ import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; @@ -33,6 +35,7 @@ import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
*/
public class PollingSockJsSession extends AbstractHttpSockJsSession {
public PollingSockJsSession(String sessionId, SockJsServiceConfig config,
WebSocketHandler wsHandler, Map<String, Object> attributes) {
@ -41,22 +44,20 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession { @@ -41,22 +44,20 @@ public class PollingSockJsSession extends AbstractHttpSockJsSession {
@Override
protected void flushCache() throws SockJsTransportFailureException {
cancelHeartbeat();
Queue<String> messageCache = getMessageCache();
String[] messages = messageCache.toArray(new String[messageCache.size()]);
messageCache.clear();
protected boolean isStreaming() {
return false;
}
@Override
protected void flushCache() throws SockJsTransportFailureException {
String[] messages = new String[getMessageCache().size()];
for (int i = 0; i < messages.length; i++) {
messages[i] = getMessageCache().poll();
}
SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec();
SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, messages);
writeFrame(frame);
}
@Override
protected void writeFrame(SockJsFrame frame) throws SockJsTransportFailureException {
super.writeFrame(frame);
resetRequest();
}
}

32
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
package org.springframework.web.socket.sockjs.transport.session;
import java.io.IOException;
import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
@ -48,21 +47,12 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { @@ -48,21 +47,12 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession {
@Override
public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse response,
SockJsFrameFormat frameFormat) throws SockJsException {
super.handleInitialRequest(request, response, frameFormat);
// the WebSocketHandler delegate may have closed the session
if (!isClosed()) {
super.startAsyncRequest();
}
protected boolean isStreaming() {
return true;
}
@Override
protected void flushCache() throws SockJsTransportFailureException {
cancelHeartbeat();
do {
String message = getMessageCache().poll();
SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec();
@ -79,26 +69,12 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { @@ -79,26 +69,12 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession {
logger.trace("Streamed bytes limit reached. Recycling current request");
}
resetRequest();
this.byteCount = 0;
break;
}
} while (!getMessageCache().isEmpty());
scheduleHeartbeat();
}
@Override
protected void resetRequest() {
super.resetRequest();
this.byteCount = 0;
}
@Override
protected void writeFrameInternal(SockJsFrame frame) throws IOException {
if (isActive()) {
super.writeFrameInternal(frame);
getResponse().flush();
}
}
}

53
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSession.java

@ -22,6 +22,10 @@ import java.net.URI; @@ -22,6 +22,10 @@ import java.net.URI;
import java.security.Principal;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
@ -34,7 +38,6 @@ import org.springframework.web.socket.WebSocketSession; @@ -34,7 +38,6 @@ import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.NativeWebSocketSession;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
/**
@ -47,6 +50,12 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @@ -47,6 +50,12 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen
private WebSocketSession webSocketSession;
private volatile boolean openFrameSent;
private final Queue<String> initSessionCache = new LinkedBlockingDeque<String>();
private final Lock initSessionLock = new ReentrantLock();
public WebSocketServerSockJsSession(String id, SockJsServiceConfig config,
WebSocketHandler handler, Map<String, Object> attributes) {
@ -143,15 +152,23 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @@ -143,15 +152,23 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen
public void initializeDelegateSession(WebSocketSession session) {
this.webSocketSession = session;
try {
TextMessage message = new TextMessage(SockJsFrame.openFrame().getContent());
this.webSocketSession.sendMessage(message);
scheduleHeartbeat();
delegateConnectionEstablished();
}
catch (Exception ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
synchronized (this.initSessionLock) {
this.webSocketSession = session;
try {
// Let "our" handler know before sending the open frame to the remote handler
delegateConnectionEstablished();
this.webSocketSession.sendMessage(new TextMessage(SockJsFrame.openFrame().getContent()));
// Flush any messages cached in the mean time
while (!this.initSessionCache.isEmpty()) {
writeFrame(SockJsFrame.messageFrame(getMessageCodec(), this.initSessionCache.poll()));
}
scheduleHeartbeat();
this.openFrameSent = true;
}
catch (Exception ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
}
}
}
@ -180,10 +197,20 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen @@ -180,10 +197,20 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen
@Override
public void sendMessageInternal(String message) throws SockJsTransportFailureException {
// Open frame not sent yet?
// If in the session initialization thread, then cache, otherwise wait.
if (!this.openFrameSent) {
synchronized (this.initSessionLock) {
if (!this.openFrameSent) {
this.initSessionCache.add(message);
return;
}
}
}
cancelHeartbeat();
SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec();
SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, message);
writeFrame(frame);
writeFrame(SockJsFrame.messageFrame(getMessageCodec(), message));
scheduleHeartbeat();
}

3
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/JettySockJsIntegrationTests.java

@ -27,9 +27,6 @@ import org.springframework.web.socket.client.jetty.JettyWebSocketClient; @@ -27,9 +27,6 @@ import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.server.RequestUpgradeStrategy;
import org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy;
import java.util.ArrayList;
import java.util.List;
/**
* SockJS integration tests using Jetty for client and server.
*

8
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java

@ -94,10 +94,8 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr @@ -94,10 +94,8 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr
this.session.handleSuccessiveRequest(this.request, this.response, this.frameFormat);
assertTrue(this.servletRequest.isAsyncStarted());
assertTrue(this.session.wasHeartbeatScheduled());
assertTrue(this.session.wasCacheFlushed());
assertEquals("hhh\n", this.servletResponse.getContentAsString());
verifyNoMoreInteractions(this.webSocketHandler);
@ -119,6 +117,11 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr @@ -119,6 +117,11 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr
super("1", config, handler, attributes);
}
@Override
protected boolean isStreaming() {
return false;
}
@Override
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
response.getBody().write("hhh\n".getBytes());
@ -139,6 +142,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr @@ -139,6 +142,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr
@Override
protected void flushCache() {
this.cacheFlushed = true;
scheduleHeartbeat();
}
@Override

5
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/TestHttpSockJsSession.java

@ -53,6 +53,11 @@ public class TestHttpSockJsSession extends AbstractHttpSockJsSession { @@ -53,6 +53,11 @@ public class TestHttpSockJsSession extends AbstractHttpSockJsSession {
super(sessionId, config, wsHandler, attributes);
}
@Override
protected boolean isStreaming() {
return true;
}
@Override
public String getAcceptedProtocol() {
return this.subProtocol;

22
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/WebSocketServerSockJsSessionTests.java

@ -28,6 +28,8 @@ import org.junit.Test; @@ -28,6 +28,8 @@ import org.junit.Test;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
import org.springframework.web.socket.sockjs.transport.session.WebSocketServerSockJsSessionTests.TestWebSocketServerSockJsSession;
import org.springframework.web.socket.handler.TestWebSocketSession;
@ -71,17 +73,27 @@ public class WebSocketServerSockJsSessionTests extends AbstractSockJsSessionTest @@ -71,17 +73,27 @@ public class WebSocketServerSockJsSessionTests extends AbstractSockJsSessionTest
@Test
public void afterSessionInitialized() throws Exception {
this.session.initializeDelegateSession(this.webSocketSession);
assertEquals("Open frame not sent",
Collections.singletonList(new TextMessage("o")), this.webSocketSession.getSentMessages());
assertEquals(Collections.singletonList(new TextMessage("o")), this.webSocketSession.getSentMessages());
assertEquals(Arrays.asList("schedule"), this.session.heartbeatSchedulingEvents);
verify(this.webSocketHandler).afterConnectionEstablished(this.session);
verifyNoMoreInteractions(this.taskScheduler, this.webSocketHandler);
}
@Test
public void afterSessionInitializedOpenFrameFirst() throws Exception {
TextWebSocketHandler handler = new TextWebSocketHandler() {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
session.sendMessage(new TextMessage("go go"));
}
};
TestWebSocketServerSockJsSession session = new TestWebSocketServerSockJsSession(this.sockJsConfig, handler, null);
session.initializeDelegateSession(this.webSocketSession);
List<TextMessage> expected = Arrays.asList(new TextMessage("o"), new TextMessage("a[\"go go\"]"));
assertEquals(expected, this.webSocketSession.getSentMessages());
}
@Test
public void handleMessageEmptyPayload() throws Exception {
this.session.handleMessage(new TextMessage(""), this.webSocketSession);

Loading…
Cancel
Save