@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2014 the original author or authors .
* Copyright 2002 - 2015 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -65,6 +65,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
@@ -65,6 +65,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
/ * *
* An XHR transport based on Undertow ' s { @link io . undertow . client . UndertowClient } .
* Compatible with Undertow 1 . 0 , 1 . 1 , 1 . 2 .
*
* < p > When used for testing purposes ( e . g . load testing ) or for specific use cases
* ( like HTTPS configuration ) , a custom OptionMap should be provided :
@ -88,13 +89,15 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -88,13 +89,15 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
private static final AttachmentKey < String > RESPONSE_BODY = AttachmentKey . create ( String . class ) ;
private final Pool < ByteBuffer > bufferPool ;
private final UndertowClient httpClient ;
private final OptionMap optionMap ;
private final XnioWorker worker ;
private final UndertowClient httpClient ;
private final Pool < ByteBuffer > bufferPool ;
public UndertowXhrTransport ( ) throws IOException {
this ( OptionMap . builder ( ) . parse ( Options . WORKER_NAME , "SockJSClient" ) . getMap ( ) ) ;
@ -102,19 +105,20 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -102,19 +105,20 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
public UndertowXhrTransport ( OptionMap optionMap ) throws IOException {
Assert . notNull ( optionMap , "'optionMap' is required" ) ;
this . bufferPool = new ByteBufferSlicePool ( 1048 , 1048 ) ;
this . httpClient = UndertowClient . getInstance ( ) ;
this . optionMap = optionMap ;
this . worker = Xnio . getInstance ( ) . createWorker ( optionMap ) ;
this . httpClient = UndertowClient . getInstance ( ) ;
this . bufferPool = new ByteBufferSlicePool ( 1048 , 1048 ) ;
}
private static HttpHeaders toHttpHeaders ( HeaderMap headerMap ) {
HttpHeaders responseHeaders = new HttpHeaders ( ) ;
Iterator < HttpString > names = headerMap . getHeaderNames ( ) . iterator ( ) ;
while ( names . hasNext ( ) ) {
while ( names . hasNext ( ) ) {
HttpString name = names . next ( ) ;
Iterator < String > values = headerMap . get ( name ) . iterator ( ) ;
while ( values . hasNext ( ) ) {
while ( values . hasNext ( ) ) {
responseHeaders . add ( name . toString ( ) , values . next ( ) ) ;
}
}
@ -130,21 +134,24 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -130,21 +134,24 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
}
/ * *
* Return Undertow ' s native HTTP client
* /
public UndertowClient getHttpClient ( ) {
return httpClient ;
return this . httpClient ;
}
/ * *
* Return the { @link org . xnio . XnioWorker } backing the I / O operations for Undertow ' s HTTP client
* Return the { @link org . xnio . XnioWorker } backing the I / O operations
* for Undertow ' s HTTP client .
* @see org . xnio . Xnio
* /
public XnioWorker getWorker ( ) {
return this . worker ;
}
@Override
protected ResponseEntity < String > executeInfoRequestInternal ( URI infoUrl ) {
return executeRequest ( infoUrl , Methods . GET , getRequestHeaders ( ) , null ) ;
@ -156,23 +163,23 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -156,23 +163,23 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
protected ResponseEntity < String > executeRequest ( URI url , HttpString method , HttpHeaders headers , String body ) {
CountDownLatch latch = new CountDownLatch ( 1 ) ;
List < ClientResponse > responses = new CopyOnWriteArrayList < ClientResponse > ( ) ;
final CountDownLatch latch = new CountDownLatch ( 1 ) ;
final List < ClientResponse > responses = new CopyOnWriteArrayList < ClientResponse > ( ) ;
try {
final ClientConnection connection = this . httpClient . connect ( url , this . worker ,
this . bufferPool , this . optionMap ) . get ( ) ;
ClientConnection connection = this . httpClient . connect (
url , this . worker , this . bufferPool , this . optionMap ) . get ( ) ;
try {
final ClientRequest request = new ClientRequest ( ) . setMethod ( method ) . setPath ( url . getPath ( ) ) ;
ClientRequest request = new ClientRequest ( ) . setMethod ( method ) . setPath ( url . getPath ( ) ) ;
request . getRequestHeaders ( ) . add ( HttpString . tryFromString ( HttpHeaders . HOST ) , url . getHost ( ) ) ;
if ( body ! = null & & ! body . isEmpty ( ) ) {
if ( body ! = null & & ! body . isEmpty ( ) ) {
request . getRequestHeaders ( ) . add ( HttpString . tryFromString ( HttpHeaders . CONTENT_LENGTH ) , body . length ( ) ) ;
}
addHttpHeaders ( request , headers ) ;
connection . sendRequest ( request , createRequestCallback ( body , responses , latch ) ) ;
latch . await ( ) ;
final ClientResponse response = responses . iterator ( ) . next ( ) ;
ClientResponse response = responses . iterator ( ) . next ( ) ;
HttpStatus status = HttpStatus . valueOf ( response . getResponseCode ( ) ) ;
HttpHeaders responseHeaders = toHttpHeaders ( response . getResponseHeaders ( ) ) ;
String responseBody = response . getAttachment ( RESPONSE_BODY ) ;
@ -185,10 +192,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -185,10 +192,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
}
catch ( IOException ex ) {
throw new SockJsTransportFailureException ( "Failed to execute request to " + url , null , ex ) ;
throw new SockJsTransportFailureException ( "Failed to execute request to " + url , ex ) ;
}
catch ( InterruptedException ex ) {
throw new SockJsTransportFailureException ( "Failed to execute request to " + url , nul l , ex ) ;
catch ( InterruptedException ex ) {
throw new SockJsTransportFailureException ( "Interrupted while processing request to " + url , ex ) ;
}
}
@ -203,21 +210,18 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -203,21 +210,18 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@Override
public void completed ( final ClientExchange result ) {
responses . add ( result . getResponse ( ) ) ;
new StringReadChannelListener ( result . getConnection ( ) . getBufferPool ( ) ) {
@Override
protected void stringDone ( String string ) {
result . getResponse ( ) . putAttachment ( RESPONSE_BODY , string ) ;
latch . countDown ( ) ;
}
@Override
protected void error ( IOException ex ) {
onFailure ( latch , ex ) ;
}
} . setup ( result . getResponseChannel ( ) ) ;
}
@Override
public void failed ( IOException ex ) {
onFailure ( latch , ex ) ;
@ -238,28 +242,28 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -238,28 +242,28 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
onFailure ( latch , ex ) ;
}
}
@Override
public void failed ( IOException ex ) {
onFailure ( latch , ex ) ;
}
private void onFailure ( final CountDownLatch latch , IOException ex ) {
private void onFailure ( CountDownLatch latch , IOException ex ) {
latch . countDown ( ) ;
throw new SockJsTransportFailureException ( "Failed to execute request" , null , ex ) ;
throw new SockJsTransportFailureException ( "Failed to execute request" , ex ) ;
}
} ;
}
@Override
protected void connectInternal ( TransportRequest request , WebSocketHandler handler , URI receiveUrl ,
HttpHeaders handshakeHeaders , XhrClientSockJsSession session , SettableListenableFuture < WebSocketSession > connectFuture ) {
HttpHeaders handshakeHeaders , XhrClientSockJsSession session ,
SettableListenableFuture < WebSocketSession > connectFuture ) {
executeReceiveRequest ( receiveUrl , handshakeHeaders , session , connectFuture ) ;
}
private void executeReceiveRequest ( final URI url , final HttpHeaders headers , final XhrClientSockJsSession session ,
final SettableListenableFuture < WebSocketSession > connectFuture ) {
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( "Starting XHR receive request, url=" + url ) ;
}
@ -273,10 +277,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -273,10 +277,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
addHttpHeaders ( httpRequest , headers ) ;
result . sendRequest ( httpRequest , createConnectCallback ( url , getRequestHeaders ( ) , session , connectFuture ) ) ;
}
@Override
public void failed ( IOException ex ) {
throw new SockJsTransportFailureException ( "Failed to execute request to " + url , null , ex ) ;
throw new SockJsTransportFailureException ( "Failed to execute request to " + url , ex ) ;
}
} ,
url , this . worker , this . bufferPool , this . optionMap ) ;
@ -289,11 +292,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -289,11 +292,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
return new ClientCallback < ClientExchange > ( ) {
@Override
public void completed ( final ClientExchange result ) {
result . setResponseListener ( new ClientCallback < ClientExchange > ( ) {
@Override
public void completed ( final ClientExchange result ) {
public void completed ( ClientExchange result ) {
ClientResponse response = result . getResponse ( ) ;
if ( response . getResponseCode ( ) ! = 200 ) {
HttpStatus status = HttpStatus . valueOf ( response . getResponseCode ( ) ) ;
@ -320,9 +321,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -320,9 +321,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
IoUtils . safeClose ( result . getConnection ( ) ) ;
onFailure ( exc ) ;
}
}
@Override
public void failed ( IOException exc ) {
IoUtils . safeClose ( result . getConnection ( ) ) ;
@ -330,12 +329,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -330,12 +329,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
} ) ;
}
@Override
public void failed ( IOException exc ) {
onFailure ( exc ) ;
}
private void onFailure ( Throwable failure ) {
if ( connectFuture . setException ( failure ) ) {
return ;
@ -349,21 +346,26 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -349,21 +346,26 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
}
} ;
}
public class SockJsResponseListener implements ChannelListener < StreamSourceChannel > {
private final ClientConnection connection ;
private final URI url ;
private final HttpHeaders headers ;
private final XhrClientSockJsSession session ;
private final SettableListenableFuture < WebSocketSession > connectFuture ;
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream ( ) ;
public SockJsResponseListener ( ClientConnection connection , URI url , HttpHeaders headers ,
XhrClientSockJsSession sockJsSession , SettableListenableFuture < WebSocketSession > connectFuture ) {
this . connection = connection ;
this . url = url ;
this . headers = headers ;
@ -371,7 +373,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -371,7 +373,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
this . connectFuture = connectFuture ;
}
public void setup ( final StreamSourceChannel channel ) {
public void setup ( StreamSourceChannel channel ) {
channel . suspendReads ( ) ;
channel . getReadSetter ( ) . set ( this ) ;
channel . resumeReads ( ) ;
@ -388,7 +390,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -388,7 +390,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
Pooled < ByteBuffer > pooled = this . connection . getBufferPool ( ) . allocate ( ) ;
try {
int r ;
do {
@ -403,7 +404,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -403,7 +404,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
onSuccess ( ) ;
}
else {
while ( buffer . hasRemaining ( ) ) {
while ( buffer . hasRemaining ( ) ) {
int b = buffer . get ( ) ;
if ( b = = '\n' ) {
handleFrame ( ) ;
@ -413,8 +414,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
@@ -413,8 +414,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
}
}
}
} while ( r > 0 ) ;
}
while ( r > 0 ) ;
}
catch ( IOException exc ) {
onFailure ( exc ) ;