diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java index 0136a8b611..608d4023e1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java @@ -70,7 +70,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { private final Object responseLock = new Object(); - private volatile boolean requestInitialized; + private volatile boolean readyToSend; private final Queue messageCache; @@ -195,18 +195,24 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { this.localAddress = request.getLocalAddress(); this.remoteAddress = request.getRemoteAddress(); - this.response = response; - this.frameFormat = frameFormat; - this.asyncRequestControl = request.getAsyncRequestControl(response); - synchronized (this.responseLock) { try { + this.response = response; + this.frameFormat = frameFormat; + this.asyncRequestControl = request.getAsyncRequestControl(response); + this.asyncRequestControl.start(-1); + // Let "our" handler know before sending the open frame to the remote handler delegateConnectionEstablished(); - writePrelude(request, response); - writeFrame(SockJsFrame.openFrame()); - if (isStreaming() && !isClosed()) { - startAsyncRequest(); + + if (isStreaming()) { + writePrelude(request, response); + writeFrame(SockJsFrame.openFrame()); + flushCache(); + this.readyToSend = true; + } + else { + writeFrame(SockJsFrame.openFrame()); } } catch (Throwable ex) { @@ -219,17 +225,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { } - private void startAsyncRequest() { - this.asyncRequestControl.start(-1); - if (this.messageCache.size() > 0) { - flushCache(); - } - else { - scheduleHeartbeat(); - } - this.requestInitialized = true; - } - /** * Handle all requests, except the first one, to receive messages on a SockJS * HTTP transport based session. @@ -249,12 +244,27 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { try { if (isClosed()) { response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes()); + return; } this.response = response; this.frameFormat = frameFormat; this.asyncRequestControl = request.getAsyncRequestControl(response); - writePrelude(request, response); - startAsyncRequest(); + this.asyncRequestControl.start(-1); + + if (isStreaming()) { + writePrelude(request, response); + flushCache(); + this.readyToSend = true; + } + else { + if (this.messageCache.isEmpty()) { + scheduleHeartbeat(); + this.readyToSend = true; + } + else { + flushCache(); + } + } } catch (Throwable ex) { tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); @@ -266,32 +276,24 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { @Override protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { - this.messageCache.add(message); - tryFlushCache(); - } - - private boolean tryFlushCache() throws SockJsTransportFailureException { synchronized (this.responseLock) { - if (this.messageCache.isEmpty()) { - logger.trace("Nothing to flush in session=" + this.getId()); - return false; - } + this.messageCache.add(message); if (logger.isTraceEnabled()) { logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId()); } - if (isActive() && this.requestInitialized) { + if (isActive() && this.readyToSend) { if (logger.isTraceEnabled()) { logger.trace("Session is active, ready to flush."); } cancelHeartbeat(); flushCache(); - return true; + return; } else { if (logger.isTraceEnabled()) { logger.trace("Session is not active, not ready to flush."); } - return false; + return; } } } @@ -312,7 +314,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession { ServerHttpAsyncRequestControl control = this.asyncRequestControl; this.asyncRequestControl = null; - this.requestInitialized = false; + this.readyToSend = false; this.response = null; updateLastActiveTime(); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java index 7f733e62c9..06caafe088 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java @@ -18,13 +18,9 @@ package org.springframework.web.socket.sockjs.transport.session; import java.util.Map; -import org.springframework.http.server.ServerHttpRequest; -import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.sockjs.SockJsException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.frame.SockJsFrame; -import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; @@ -53,7 +49,7 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { @Override protected void flushCache() throws SockJsTransportFailureException { - do { + while (!getMessageCache().isEmpty()) { String message = getMessageCache().poll(); SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec(); SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, message); @@ -73,7 +69,6 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession { break; } } - while (!getMessageCache().isEmpty()); scheduleHeartbeat(); } diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/handler/HttpSendingTransportHandlerTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/handler/HttpSendingTransportHandlerTests.java index 9504ae1268..d95677d16a 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/handler/HttpSendingTransportHandlerTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/handler/HttpSendingTransportHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,25 +16,27 @@ package org.springframework.web.socket.sockjs.transport.handler; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import java.sql.Date; import org.junit.Before; import org.junit.Test; - import org.springframework.scheduling.TaskScheduler; import org.springframework.web.socket.AbstractHttpRequestTests; import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; import org.springframework.web.socket.sockjs.frame.SockJsFrame; +import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat; import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession; import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession; import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession; import org.springframework.web.socket.sockjs.transport.session.StubSockJsServiceConfig; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; - /** * Test fixture for {@link AbstractHttpSendingTransportHandler} and sub-classes. * @@ -75,7 +77,7 @@ public class HttpSendingTransportHandlerTests extends AbstractHttpRequestTests assertFalse("Polling request should complete after open frame", this.servletRequest.isAsyncStarted()); verify(this.webSocketHandler).afterConnectionEstablished(session); - resetResponse(); + resetRequestAndResponse(); transportHandler.handleRequest(this.request, this.response, this.webSocketHandler, session); assertTrue("Polling request should remain open", this.servletRequest.isAsyncStarted()); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java index b2ec62daea..ca8c582a05 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java @@ -82,7 +82,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests