Browse Source

Add Lifecycle to SockJsClient and Transport types

Issue: SPR-10797
pull/588/merge
Rossen Stoyanchev 11 years ago
parent
commit
2ebc921545
  1. 32
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java
  2. 36
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsClient.java
  3. 40
      spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/WebSocketTransport.java
  4. 52
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java
  5. 45
      spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/JettySockJsIntegrationTests.java

32
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java

@ -23,6 +23,7 @@ import org.eclipse.jetty.client.api.Response; @@ -23,6 +23,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.springframework.context.Lifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@ -59,7 +60,7 @@ import java.util.Enumeration; @@ -59,7 +60,7 @@ import java.util.Enumeration;
* @author Rossen Stoyanchev
* @since 4.1
*/
public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransport {
public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransport, Lifecycle {
private final HttpClient httpClient;
@ -74,6 +75,35 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp @@ -74,6 +75,35 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
return this.httpClient;
}
@Override
public void start() {
try {
if (!this.httpClient.isRunning()) {
this.httpClient.start();
}
}
catch (Exception e) {
throw new SockJsException("Failed to start " + this, e);
}
}
@Override
public void stop() {
try {
if (this.httpClient.isRunning()) {
this.httpClient.stop();
}
}
catch (Exception e) {
throw new SockJsException("Failed to stop " + this, e);
}
}
@Override
public boolean isRunning() {
return this.httpClient.isRunning();
}
@Override
protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
return executeRequest(infoUrl, HttpMethod.GET, getRequestHeaders(), null);

36
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/SockJsClient.java

@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client; @@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.Lifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
@ -50,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap; @@ -50,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @see <a href="http://sockjs.org">http://sockjs.org</a>
* @see org.springframework.web.socket.sockjs.client.Transport
*/
public class SockJsClient extends AbstractWebSocketClient {
public class SockJsClient extends AbstractWebSocketClient implements Lifecycle {
private static final boolean jackson2Present = ClassUtils.isPresent(
"com.fasterxml.jackson.databind.ObjectMapper", SockJsClient.class.getClassLoader());
@ -68,6 +69,8 @@ public class SockJsClient extends AbstractWebSocketClient { @@ -68,6 +69,8 @@ public class SockJsClient extends AbstractWebSocketClient {
private final Map<URI, ServerInfo> infoCache = new ConcurrentHashMap<URI, ServerInfo>();
private volatile boolean running = false;
/**
* Create a {@code SockJsClient} with the given transports.
@ -143,6 +146,37 @@ public class SockJsClient extends AbstractWebSocketClient { @@ -143,6 +146,37 @@ public class SockJsClient extends AbstractWebSocketClient {
this.taskScheduler = taskScheduler;
}
@Override
public void start() {
if (!isRunning()) {
for (Transport transport : this.transports) {
if (transport instanceof Lifecycle) {
if (!((Lifecycle) transport).isRunning()) {
((Lifecycle) transport).start();
}
}
}
}
}
@Override
public void stop() {
if (!isRunning()) {
for (Transport transport : this.transports) {
if (transport instanceof Lifecycle) {
if (((Lifecycle) transport).isRunning()) {
((Lifecycle) transport).stop();
}
}
}
}
}
@Override
public boolean isRunning() {
return this.running;
}
public void clearServerInfoCache() {
this.infoCache.clear();
}

40
spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/WebSocketTransport.java

@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client; @@ -18,6 +18,7 @@ package org.springframework.web.socket.sockjs.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.Lifecycle;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@ -40,12 +41,14 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -40,12 +41,14 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author Rossen Stoyanchev
* @since 4.1
*/
public class WebSocketTransport implements Transport {
public class WebSocketTransport implements Transport, Lifecycle {
private static Log logger = LogFactory.getLog(WebSocketTransport.class);
private final WebSocketClient webSocketClient;
private volatile boolean running = false;
public WebSocketTransport(WebSocketClient webSocketClient) {
Assert.notNull(webSocketClient, "'webSocketClient' is required");
@ -60,6 +63,41 @@ public class WebSocketTransport implements Transport { @@ -60,6 +63,41 @@ public class WebSocketTransport implements Transport {
return this.webSocketClient;
}
@Override
public void start() {
if (!isRunning()) {
if (this.webSocketClient instanceof Lifecycle) {
((Lifecycle) this.webSocketClient).start();
}
else {
this.running = true;
}
}
}
@Override
public void stop() {
if (isRunning()) {
if (this.webSocketClient instanceof Lifecycle) {
((Lifecycle) this.webSocketClient).stop();
}
else {
this.running = false;
}
}
}
@Override
public boolean isRunning() {
if (this.webSocketClient instanceof Lifecycle) {
return ((Lifecycle) this.webSocketClient).isRunning();
}
else {
return this.running;
}
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<WebSocketSession>();

52
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/AbstractSockJsIntegrationTests.java

@ -82,6 +82,9 @@ public abstract class AbstractSockJsIntegrationTests { @@ -82,6 +82,9 @@ public abstract class AbstractSockJsIntegrationTests {
protected Log logger = LogFactory.getLog(getClass());
private SockJsClient sockJsClient;
private WebSocketTestServer server;
private AnnotationConfigWebApplicationContext wac;
@ -106,6 +109,12 @@ public abstract class AbstractSockJsIntegrationTests { @@ -106,6 +109,12 @@ public abstract class AbstractSockJsIntegrationTests {
@After
public void teardown() throws Exception {
try {
this.sockJsClient.stop();
}
catch (Throwable ex) {
logger.error("Failed to stop SockJsClient", ex);
}
try {
this.server.undeployConfig();
}
@ -120,48 +129,50 @@ public abstract class AbstractSockJsIntegrationTests { @@ -120,48 +129,50 @@ public abstract class AbstractSockJsIntegrationTests {
}
}
protected abstract WebSocketTestServer createWebSocketTestServer();
protected abstract Class<?> upgradeStrategyConfigClass();
protected abstract Transport getWebSocketTransport();
protected abstract WebSocketTestServer createWebSocketTestServer();
protected abstract AbstractXhrTransport getXhrTransport();
protected abstract Transport createWebSocketTransport();
protected SockJsClient createSockJsClient(Transport... transports) {
return new SockJsClient(Arrays.<Transport>asList(transports));
protected abstract AbstractXhrTransport createXhrTransport();
protected void initSockJsClient(Transport... transports) {
this.sockJsClient = new SockJsClient(Arrays.asList(transports));
this.sockJsClient.start();
}
@Test
public void echoWebSocket() throws Exception {
testEcho(100, getWebSocketTransport());
testEcho(100, createWebSocketTransport());
}
@Test
public void echoXhrStreaming() throws Exception {
testEcho(100, getXhrTransport());
testEcho(100, createXhrTransport());
}
@Test
public void echoXhr() throws Exception {
AbstractXhrTransport xhrTransport = getXhrTransport();
AbstractXhrTransport xhrTransport = createXhrTransport();
xhrTransport.setXhrStreamingDisabled(true);
testEcho(100, xhrTransport);
}
@Test
public void closeAfterOneMessageWebSocket() throws Exception {
testCloseAfterOneMessage(getWebSocketTransport());
testCloseAfterOneMessage(createWebSocketTransport());
}
@Test
public void closeAfterOneMessageXhrStreaming() throws Exception {
testCloseAfterOneMessage(getXhrTransport());
testCloseAfterOneMessage(createXhrTransport());
}
@Test
public void closeAfterOneMessageXhr() throws Exception {
AbstractXhrTransport xhrTransport = getXhrTransport();
AbstractXhrTransport xhrTransport = createXhrTransport();
xhrTransport.setXhrStreamingDisabled(true);
testCloseAfterOneMessage(xhrTransport);
}
@ -171,7 +182,8 @@ public abstract class AbstractSockJsIntegrationTests { @@ -171,7 +182,8 @@ public abstract class AbstractSockJsIntegrationTests {
TestClientHandler handler = new TestClientHandler();
this.errorFilter.responseStatusMap.put("/info", 500);
CountDownLatch latch = new CountDownLatch(1);
createSockJsClient(getWebSocketTransport()).doHandshake(handler, this.baseUrl + "/echo").addCallback(
initSockJsClient(createWebSocketTransport());
this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
@ -191,8 +203,8 @@ public abstract class AbstractSockJsIntegrationTests { @@ -191,8 +203,8 @@ public abstract class AbstractSockJsIntegrationTests {
this.errorFilter.responseStatusMap.put("/websocket", 200);
this.errorFilter.responseStatusMap.put("/xhr_streaming", 500);
TestClientHandler handler = new TestClientHandler();
Transport[] transports = { getWebSocketTransport(), getXhrTransport() };
WebSocketSession session = createSockJsClient(transports).doHandshake(handler, this.baseUrl + "/echo").get();
initSockJsClient(createWebSocketTransport(), createXhrTransport());
WebSocketSession session = this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").get();
assertEquals("Fallback didn't occur", XhrClientSockJsSession.class, session.getClass());
TextMessage message = new TextMessage("message1");
session.sendMessage(message);
@ -204,8 +216,8 @@ public abstract class AbstractSockJsIntegrationTests { @@ -204,8 +216,8 @@ public abstract class AbstractSockJsIntegrationTests {
TestClientHandler clientHandler = new TestClientHandler();
this.errorFilter.sleepDelayMap.put("/xhr_streaming", 10000L);
this.errorFilter.responseStatusMap.put("/xhr_streaming", 503);
SockJsClient sockJsClient = createSockJsClient(getXhrTransport());
sockJsClient.setTaskScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class));
initSockJsClient(createXhrTransport());
this.sockJsClient.setTaskScheduler(this.wac.getBean(ThreadPoolTaskScheduler.class));
WebSocketSession clientSession = sockJsClient.doHandshake(clientHandler, this.baseUrl + "/echo").get();
assertEquals("Fallback didn't occur", XhrClientSockJsSession.class, clientSession.getClass());
TextMessage message = new TextMessage("message1");
@ -221,7 +233,8 @@ public abstract class AbstractSockJsIntegrationTests { @@ -221,7 +233,8 @@ public abstract class AbstractSockJsIntegrationTests {
messages.add(new TextMessage("m" + i));
}
TestClientHandler handler = new TestClientHandler();
WebSocketSession session = createSockJsClient(transport).doHandshake(handler, this.baseUrl + "/echo").get();
initSockJsClient(transport);
WebSocketSession session = this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").get();
for (TextMessage message : messages) {
session.sendMessage(message);
}
@ -235,7 +248,8 @@ public abstract class AbstractSockJsIntegrationTests { @@ -235,7 +248,8 @@ public abstract class AbstractSockJsIntegrationTests {
private void testCloseAfterOneMessage(Transport transport) throws Exception {
TestClientHandler clientHandler = new TestClientHandler();
createSockJsClient(transport).doHandshake(clientHandler, this.baseUrl + "/test").get();
initSockJsClient(transport);
this.sockJsClient.doHandshake(clientHandler, this.baseUrl + "/test").get();
TestServerHandler serverHandler = this.wac.getBean(TestServerHandler.class);
assertNotNull("afterConnectionEstablished should have been called", clientHandler.session);

45
spring-websocket/src/test/java/org/springframework/web/socket/sockjs/client/JettySockJsIntegrationTests.java

@ -34,35 +34,10 @@ import org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy; @@ -34,35 +34,10 @@ import org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy;
*/
public class JettySockJsIntegrationTests extends AbstractSockJsIntegrationTests {
private WebSocketClient webSocketClient;
private HttpClient httpClient;
@Before
public void setup() throws Exception {
super.setup();
this.webSocketClient = new WebSocketClient();
this.webSocketClient.start();
this.httpClient = new HttpClient();
this.httpClient.start();
}
@After
public void teardown() throws Exception {
super.teardown();
try {
this.webSocketClient.stop();
}
catch (Throwable ex) {
logger.error("Failed to stop Jetty WebSocketClient", ex);
}
try {
this.httpClient.stop();
}
catch (Throwable ex) {
logger.error("Failed to stop Jetty HttpClient", ex);
}
@Override
protected Class<?> upgradeStrategyConfigClass() {
return JettyTestConfig.class;
}
@Override
@ -71,24 +46,18 @@ public class JettySockJsIntegrationTests extends AbstractSockJsIntegrationTests @@ -71,24 +46,18 @@ public class JettySockJsIntegrationTests extends AbstractSockJsIntegrationTests
}
@Override
protected Class<?> upgradeStrategyConfigClass() {
return JettyTestConfig.class;
}
@Override
protected Transport getWebSocketTransport() {
return new WebSocketTransport(new JettyWebSocketClient(this.webSocketClient));
protected Transport createWebSocketTransport() {
return new WebSocketTransport(new JettyWebSocketClient());
}
@Override
protected AbstractXhrTransport getXhrTransport() {
return new JettyXhrTransport(this.httpClient);
protected AbstractXhrTransport createXhrTransport() {
return new JettyXhrTransport(new HttpClient());
}
@Configuration
static class JettyTestConfig {
@Bean
public RequestUpgradeStrategy upgradeStrategy() {
return new JettyRequestUpgradeStrategy();

Loading…
Cancel
Save