Browse Source

Ensure Environment.shutdown() in Reactor2TcpClient

Issue: SPR-14229
pull/1047/head
Rossen Stoyanchev 9 years ago
parent
commit
bd40a93604
  1. 15
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

15
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

@ -84,6 +84,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { @@ -84,6 +84,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
private final EventLoopGroup eventLoopGroup;
private final Environment environment;
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
@ -108,12 +110,13 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { @@ -108,12 +110,13 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
this.eventLoopGroup = nioEventLoopGroup;
this.environment = new Environment(new SynchronousDispatcherConfigReader());
this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
@Override
public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
return spec
.env(new Environment(new SynchronousDispatcherConfigReader()))
.env(environment)
.codec(codec)
.connect(host, port)
.options(createClientSocketOptions());
@ -139,6 +142,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { @@ -139,6 +142,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpecFactory = tcpClientSpecFactory;
this.eventLoopGroup = null;
this.environment = null;
}
@ -269,6 +273,15 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { @@ -269,6 +273,15 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
promise = eventLoopPromise;
}
if (this.environment != null) {
promise.onComplete(new Consumer<Promise<Void>>() {
@Override
public void accept(Promise<Void> voidPromise) {
environment.shutdown();
}
});
}
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}

Loading…
Cancel
Save