@ -15,6 +15,9 @@
@@ -15,6 +15,9 @@
* /
package org.springframework.http.client.reactive ;
import java.util.function.Consumer ;
import java.util.function.Supplier ;
import reactor.netty.http.HttpResources ;
import reactor.netty.resources.ConnectionProvider ;
import reactor.netty.resources.LoopResources ;
@ -37,7 +40,16 @@ import org.springframework.util.Assert;
@@ -37,7 +40,16 @@ import org.springframework.util.Assert;
* /
public class ReactorResourceFactory implements InitializingBean , DisposableBean {
private boolean globalResources = true ;
private boolean useGlobalResources = true ;
@Nullable
private Consumer < HttpResources > globalResourcesConsumer ;
private Supplier < ConnectionProvider > connectionProviderSupplier = ( ) - > ConnectionProvider . elastic ( "http" ) ;
private Supplier < LoopResources > loopResourcesSupplier = ( ) - > LoopResources . create ( "reactor-http" ) ;
@Nullable
private ConnectionProvider connectionProvider ;
@ -45,52 +57,78 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
@@ -45,52 +57,78 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
@Nullable
private LoopResources loopResources ;
private String threadPrefix = "reactor-http" ;
private boolean manageConnectionProvider = false ;
private boolean manageLoopResources = false ;
/ * *
* Whether to expose and manage the global Reactor Netty resources from the
* { @link HttpResources } holder .
* < p > Default is "true" in which case this factory helps to configure and
* shut down the global Reactor Netty resources within the lifecycle of a
* Spring { @code ApplicationContext } .
* < p > If set to "false" then the factory creates and manages its own
* { @link LoopResources } and { @link ConnectionProvider } , independent of the
* global ones in the { @link HttpResources } holder .
* @param globalResources whether to expose and manage the global resources
* Whether to use global Reactor Netty resources via { @link HttpResources } .
* < p > Default is "true" in which case this factory initializes and stops the
* global Reactor Netty resources within Spring ' s { @code ApplicationContext }
* lifecycle . If set to "false" the factory manages its resources independent
* of the global ones .
* @param useGlobalResources whether to expose and manage the global resources
* @see # addGlobalResourcesConsumer ( Consumer )
* /
public void setGlobalResources ( boolean g lobalResources) {
this . globalResources = g lobalResources;
public void setUseGlobalResources ( boolean useGlobalResources ) {
this . useGlobalResources = useG lobalResources;
}
/ * *
* Configure the { @link ConnectionProvider } to use .
* < p > By default , initialized with { @link ConnectionProvider # elastic ( String ) } .
* @param connectionProvider the connection provider to use
* Add a Consumer for configuring the global Reactor Netty resources on
* startup . When this option is used , { @link # setUseGlobalResources } is also
* enabled .
* @param consumer the consumer to apply
* @see # setUseGlobalResources ( boolean )
* /
public void setConnectionProvider ( @Nullable ConnectionProvider connectionProvider ) {
this . connectionProvider = connectionProvider ;
public void addGlobalResourcesConsumer ( Consumer < HttpResources > consumer ) {
this . useGlobalResources = true ;
this . globalResourcesConsumer = this . globalResourcesConsumer ! = null ?
this . globalResourcesConsumer . andThen ( consumer ) : consumer ;
}
/ * *
* Configure the { @link LoopResources } to use .
* < p > By default , initialized with { @link LoopResources # create ( String ) } .
* @param loopResources the loop resources to use
* Use this option when you don ' t want to participate in global resources and
* you want to customize the creation of the managed { @code ConnectionProvider } .
* < p > By default , { @code ConnectionProvider . elastic ( "http" ) } is used .
* < p > Note that this option is ignored if { @code userGlobalResources = false } or
* { @link # setConnectionProvider ( ConnectionProvider ) } is set .
* @param supplier the supplier to use
* /
public void setLoopResources ( @Nullable LoopResources loopResources ) {
this . loopResources = loopResources ;
public void setConnectionProviderSupplier ( @Nullable Supplier < ConnectionProvider > supplier ) {
this . connectionProviderSupplier = supplier ;
}
/ * *
* Use this option when you don ' t want to participate in global resources and
* you want to customize the creation of the managed { @code LoopResources } .
* < p > By default , { @code LoopResources . create ( "reactor-http" ) } is used .
* < p > Note that this option is ignored if { @code userGlobalResources = false } or
* { @link # setLoopResources ( LoopResources ) } is set .
* @param supplier the supplier to use
* /
public void setLoopResourcesSupplier ( @Nullable Supplier < LoopResources > supplier ) {
this . loopResourcesSupplier = supplier ;
}
/ * *
* Use this option when you want to provide an externally managed
* { @link ConnectionProvider } instance .
* @param connectionProvider the connection provider to use as is
* /
public void setConnectionProvider ( @Nullable ConnectionProvider connectionProvider ) {
this . connectionProvider = connectionProvider ;
}
/ * *
* Configure the thread prefix to initialize { @link LoopResources } with . This
* is used only when a { @link LoopResources } instance isn ' t
* { @link # setLoopResources ( LoopResources ) provided } .
* < p > By default set to "reactor-http" .
* @param threadPrefix the thread prefix to use
* Use this option when you want to provide an externally managed
* { @link LoopResources } instance .
* @param loopResources the loop resources to use as is
* /
public void setThreadPrefix ( String threadPrefix ) {
Assert . notNull ( threadPrefix , "Thread prefix is required" ) ;
this . threadPrefix = threadPrefix ;
public void setLoopResources ( @Nullable LoopResources loopResources ) {
this . loopResources = loopResources ;
}
@ -98,57 +136,58 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
@@ -98,57 +136,58 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
* Whether this factory exposes the global
* { @link reactor . netty . http . HttpResources HttpResources } holder .
* /
public boolean isGlobalResources ( ) {
return this . g lobalResources;
public boolean isUse GlobalResources ( ) {
return this . useG lobalResources;
}
/ * *
* Return the configured { @link ConnectionProvider } .
* /
@Nullable
public ConnectionProvider getConnectionProvider ( ) {
Assert . notNull ( this . connectionProvider , "ConnectionProvider not initialized yet via InitializingBean." ) ;
return this . connectionProvider ;
}
/ * *
* Return the configured { @link LoopResources } .
* /
@Nullable
public LoopResources getLoopResources ( ) {
Assert . notNull ( this . loopResources , "LoopResources not initialized yet via InitializingBean." ) ;
return this . loopResources ;
}
/ * *
* Return the configured prefix for event loop threads .
* /
public String getThreadPrefix ( ) {
return this . threadPrefix ;
}
@Override
public void afterPropertiesSet ( ) throws Exception {
if ( this . loopResources = = null ) {
this . loopResources = LoopResources . create ( this . threadPrefix ) ;
}
if ( this . connectionProvider = = null ) {
this . connectionProvider = ConnectionProvider . elastic ( "http" ) ;
if ( this . useGlobalResources ) {
Assert . isTrue ( this . loopResources = = null & & this . connectionProvider = = null ,
"'useGlobalResources' is mutually exclusive with explicitly configured resources." ) ;
HttpResources httpResources = HttpResources . get ( ) ;
if ( this . globalResourcesConsumer ! = null ) {
this . globalResourcesConsumer . accept ( httpResources ) ;
}
}
if ( this . globalResources ) {
HttpResources . set ( this . loopResources ) ;
HttpResources . set ( this . connectionProvider ) ;
else {
if ( this . loopResources = = null ) {
this . manageLoopResources = true ;
this . loopResources = this . loopResourcesSupplier . get ( ) ;
}
if ( this . connectionProvider = = null ) {
this . manageConnectionProvider = true ;
this . connectionProvider = this . connectionProviderSupplier . get ( ) ;
}
}
}
@Override
public void destroy ( ) {
if ( this . globalResources ) {
if ( this . useG lobalResources) {
HttpResources . disposeLoopsAndConnections ( ) ;
}
else {
try {
ConnectionProvider provider = this . connectionProvider ;
if ( provider ! = null ) {
if ( provider ! = null & & this . manageConnectionProvider ) {
provider . dispose ( ) ;
}
}
@ -158,7 +197,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
@@ -158,7 +197,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
try {
LoopResources resources = this . loopResources ;
if ( resources ! = null ) {
if ( resources ! = null & & this . manageLoopResources ) {
resources . dispose ( ) ;
}
}