Browse Source

Synchronize message sending

Issue: SPR-12516
(backport for commit b796c1)
pull/710/head
Rossen Stoyanchev 10 years ago
parent
commit
9cb1569e67
  1. 72
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java
  2. 7
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java
  3. 18
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/handler/HttpSendingTransportHandlerTests.java
  4. 4
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java

72
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractHttpSockJsSession.java

@ -70,7 +70,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
private final Object responseLock = new Object(); private final Object responseLock = new Object();
private volatile boolean requestInitialized; private volatile boolean readyToSend;
private final Queue<String> messageCache; private final Queue<String> messageCache;
@ -195,18 +195,24 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
this.localAddress = request.getLocalAddress(); this.localAddress = request.getLocalAddress();
this.remoteAddress = request.getRemoteAddress(); this.remoteAddress = request.getRemoteAddress();
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
synchronized (this.responseLock) { synchronized (this.responseLock) {
try { try {
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
this.asyncRequestControl.start(-1);
// Let "our" handler know before sending the open frame to the remote handler // Let "our" handler know before sending the open frame to the remote handler
delegateConnectionEstablished(); delegateConnectionEstablished();
writePrelude(request, response);
writeFrame(SockJsFrame.openFrame()); if (isStreaming()) {
if (isStreaming() && !isClosed()) { writePrelude(request, response);
startAsyncRequest(); writeFrame(SockJsFrame.openFrame());
flushCache();
this.readyToSend = true;
}
else {
writeFrame(SockJsFrame.openFrame());
} }
} }
catch (Throwable ex) { catch (Throwable ex) {
@ -219,17 +225,6 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException { protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
} }
private void startAsyncRequest() {
this.asyncRequestControl.start(-1);
if (this.messageCache.size() > 0) {
flushCache();
}
else {
scheduleHeartbeat();
}
this.requestInitialized = true;
}
/** /**
* Handle all requests, except the first one, to receive messages on a SockJS * Handle all requests, except the first one, to receive messages on a SockJS
* HTTP transport based session. * HTTP transport based session.
@ -249,12 +244,27 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
try { try {
if (isClosed()) { if (isClosed()) {
response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes()); response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes());
return;
} }
this.response = response; this.response = response;
this.frameFormat = frameFormat; this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response); this.asyncRequestControl = request.getAsyncRequestControl(response);
writePrelude(request, response); this.asyncRequestControl.start(-1);
startAsyncRequest();
if (isStreaming()) {
writePrelude(request, response);
flushCache();
this.readyToSend = true;
}
else {
if (this.messageCache.isEmpty()) {
scheduleHeartbeat();
this.readyToSend = true;
}
else {
flushCache();
}
}
} }
catch (Throwable ex) { catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR); tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
@ -266,32 +276,24 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
@Override @Override
protected final void sendMessageInternal(String message) throws SockJsTransportFailureException { protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
this.messageCache.add(message);
tryFlushCache();
}
private boolean tryFlushCache() throws SockJsTransportFailureException {
synchronized (this.responseLock) { synchronized (this.responseLock) {
if (this.messageCache.isEmpty()) { this.messageCache.add(message);
logger.trace("Nothing to flush in session=" + this.getId());
return false;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId()); logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId());
} }
if (isActive() && this.requestInitialized) { if (isActive() && this.readyToSend) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Session is active, ready to flush."); logger.trace("Session is active, ready to flush.");
} }
cancelHeartbeat(); cancelHeartbeat();
flushCache(); flushCache();
return true; return;
} }
else { else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Session is not active, not ready to flush."); logger.trace("Session is not active, not ready to flush.");
} }
return false; return;
} }
} }
} }
@ -312,7 +314,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {
ServerHttpAsyncRequestControl control = this.asyncRequestControl; ServerHttpAsyncRequestControl control = this.asyncRequestControl;
this.asyncRequestControl = null; this.asyncRequestControl = null;
this.requestInitialized = false; this.readyToSend = false;
this.response = null; this.response = null;
updateLastActiveTime(); updateLastActiveTime();

7
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/StreamingSockJsSession.java

@ -18,13 +18,9 @@ package org.springframework.web.socket.sockjs.transport.session;
import java.util.Map; import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException; import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec; import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig; import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;
@ -53,7 +49,7 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession {
@Override @Override
protected void flushCache() throws SockJsTransportFailureException { protected void flushCache() throws SockJsTransportFailureException {
do { while (!getMessageCache().isEmpty()) {
String message = getMessageCache().poll(); String message = getMessageCache().poll();
SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec(); SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec();
SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, message); SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, message);
@ -73,7 +69,6 @@ public class StreamingSockJsSession extends AbstractHttpSockJsSession {
break; break;
} }
} }
while (!getMessageCache().isEmpty());
scheduleHeartbeat(); scheduleHeartbeat();
} }

18
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/handler/HttpSendingTransportHandlerTests.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2013 the original author or authors. * Copyright 2002-2014 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,25 +16,27 @@
package org.springframework.web.socket.sockjs.transport.handler; package org.springframework.web.socket.sockjs.transport.handler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.sql.Date; import java.sql.Date;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.socket.AbstractHttpRequestTests; import org.springframework.web.socket.AbstractHttpRequestTests;
import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.frame.SockJsFrame; import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession; import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession; import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession; import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.StubSockJsServiceConfig; import org.springframework.web.socket.sockjs.transport.session.StubSockJsServiceConfig;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/** /**
* Test fixture for {@link AbstractHttpSendingTransportHandler} and sub-classes. * Test fixture for {@link AbstractHttpSendingTransportHandler} and sub-classes.
* *
@ -75,7 +77,7 @@ public class HttpSendingTransportHandlerTests extends AbstractHttpRequestTests
assertFalse("Polling request should complete after open frame", this.servletRequest.isAsyncStarted()); assertFalse("Polling request should complete after open frame", this.servletRequest.isAsyncStarted());
verify(this.webSocketHandler).afterConnectionEstablished(session); verify(this.webSocketHandler).afterConnectionEstablished(session);
resetResponse(); resetRequestAndResponse();
transportHandler.handleRequest(this.request, this.response, this.webSocketHandler, session); transportHandler.handleRequest(this.request, this.response, this.webSocketHandler, session);
assertTrue("Polling request should remain open", this.servletRequest.isAsyncStarted()); assertTrue("Polling request should remain open", this.servletRequest.isAsyncStarted());

4
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/HttpSockJsSessionTests.java

@ -82,7 +82,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr
this.session.handleInitialRequest(this.request, this.response, this.frameFormat); this.session.handleInitialRequest(this.request, this.response, this.frameFormat);
assertEquals("hhh\no", this.servletResponse.getContentAsString()); assertEquals("hhh\no", this.servletResponse.getContentAsString());
assertFalse(this.servletRequest.isAsyncStarted()); assertTrue(this.servletRequest.isAsyncStarted());
verify(this.webSocketHandler).afterConnectionEstablished(this.session); verify(this.webSocketHandler).afterConnectionEstablished(this.session);
} }
@ -119,7 +119,7 @@ public class HttpSockJsSessionTests extends AbstractSockJsSessionTests<TestAbstr
@Override @Override
protected boolean isStreaming() { protected boolean isStreaming() {
return false; return true;
} }
@Override @Override

Loading…
Cancel
Save