@ -24,6 +24,7 @@ import java.util.Collections;
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List ;
import java.util.Map ;
import java.util.function.IntPredicate ;
import java.util.stream.Collectors ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
@ -40,8 +41,6 @@ import org.springframework.http.MediaType;
@@ -40,8 +41,6 @@ import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage ;
import org.springframework.lang.Nullable ;
import static java.util.stream.Collectors.joining ;
/ * *
* Reader that supports a stream of { @link ServerSentEvent } s and also plain
* { @link Object } s which is the same as an { @link ServerSentEvent } with data only .
@ -120,9 +119,10 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
@@ -120,9 +119,10 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
} )
. bufferUntil ( line - > line . equals ( "\n" ) )
. concatMap ( rawLines - > {
String [ ] lines = rawLines . stream ( ) . collect ( joining ( ) ) . split ( "\\r?\\n" ) ;
ServerSentEvent < Object > event = buildEvent ( lines , valueType , hints ) ;
return ( shouldWrap ? Mono . just ( event ) : Mono . justOrEmpty ( event . data ( ) ) ) ;
String [ ] lines = rawLines . stream ( ) . collect ( Collectors . joining ( ) ) . split ( "\\r?\\n" ) ;
return buildEvent ( lines , valueType , hints )
. filter ( event - > shouldWrap | | event . data ( ) ! = null )
. map ( event - > shouldWrap ? event : event . data ( ) ) ;
} )
. cast ( Object . class ) ;
}
@ -144,12 +144,12 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
@@ -144,12 +144,12 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
return Flux . fromIterable ( results ) ;
}
private ServerSentEvent < Object > buildEvent ( String [ ] lines , ResolvableType valueType ,
private Mono < ServerSentEvent < Object > > buildEvent ( String [ ] lines , ResolvableType valueType ,
Map < String , Object > hints ) {
ServerSentEvent . Builder < Object > sseBuilder = ServerSentEvent . builder ( ) ;
StringBuilder mutableD ata = new Stri ngB ui lder ( ) ;
StringBuilder mutableC omment = new Stri ngB ui lder ( ) ;
StringBuilder d ata = null ;
StringBuilder c omment = null ;
for ( String line : lines ) {
if ( line . startsWith ( "id:" ) ) {
@ -159,42 +159,43 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
@@ -159,42 +159,43 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
sseBuilder . event ( line . substring ( 6 ) ) ;
}
else if ( line . startsWith ( "data:" ) ) {
mutableData . append ( line . substring ( 5 ) ) . append ( "\n" ) ;
data = ( data ! = null ? data : new StringBuilder ( ) ) ;
data . append ( line . substring ( 5 ) ) . append ( "\n" ) ;
}
else if ( line . startsWith ( "retry:" ) ) {
sseBuilder . retry ( Duration . ofMillis ( Long . valueOf ( line . substring ( 6 ) ) ) ) ;
}
else if ( line . startsWith ( ":" ) ) {
mutableComment . append ( line . substring ( 1 ) ) . append ( "\n" ) ;
comment = ( comment ! = null ? comment : new StringBuilder ( ) ) ;
comment . append ( line . substring ( 1 ) ) . append ( "\n" ) ;
}
}
if ( mutableData . length ( ) > 0 ) {
String data = mutableData . toString ( ) ;
sseBuilder . data ( decodeData ( data , valueType , hints ) ) ;
if ( comment ! = null ) {
sseBuilder . comment ( comment . toString ( ) . substring ( 0 , comment . length ( ) - 1 ) ) ;
}
if ( data ! = null ) {
return decodeData ( data . toString ( ) , valueType , hints ) . map ( decodedData - > {
sseBuilder . data ( decodedData ) ;
return sseBuilder . build ( ) ;
} ) ;
}
if ( mutableComment . length ( ) > 0 ) {
String comment = mutableComment . toString ( ) ;
sseBuilder . comment ( comment . substring ( 0 , comment . length ( ) - 1 ) ) ;
else {
return Mono . just ( sseBuilder . build ( ) ) ;
}
return sseBuilder . build ( ) ;
}
@Nullable
private Object decodeData ( String data , ResolvableType dataType , Map < String , Object > hints ) {
private Mono < ? > decodeData ( String data , ResolvableType dataType , Map < String , Object > hints ) {
if ( String . class = = dataType . resolve ( ) ) {
return data . substring ( 0 , data . length ( ) - 1 ) ;
return Mono . just ( data . substring ( 0 , data . length ( ) - 1 ) ) ;
}
if ( this . decoder = = null ) {
return Flux . error ( new CodecException ( "No SSE decoder configured and the data is not String." ) ) ;
return Mono . error ( new CodecException ( "No SSE decoder configured and the data is not String." ) ) ;
}
byte [ ] bytes = data . getBytes ( StandardCharsets . UTF_8 ) ;
Mono < DataBuffer > input = Mono . just ( bufferFactory . wrap ( bytes ) ) ;
return this . decoder
. decodeToMono ( input , dataType , MediaType . TEXT_EVENT_STREAM , hints )
. block ( Duration . ZERO ) ;
return this . decoder . decodeToMono ( input , dataType , MediaType . TEXT_EVENT_STREAM , hints ) ;
}
@Override