diff --git a/build.gradle b/build.gradle index 62b8abf36c..dfc389ecd4 100644 --- a/build.gradle +++ b/build.gradle @@ -25,8 +25,8 @@ configure(allprojects) { project -> imports { mavenBom "com.fasterxml.jackson:jackson-bom:2.11.0" mavenBom "io.netty:netty-bom:4.1.50.Final" - mavenBom "io.projectreactor:reactor-bom:Dysprosium-SR7" - mavenBom "io.rsocket:rsocket-bom:1.0.0" + mavenBom "io.projectreactor:reactor-bom:2020.0.0-SNAPSHOT" + mavenBom "io.rsocket:rsocket-bom:1.0.1-SNAPSHOT" mavenBom "org.eclipse.jetty:jetty-bom:9.4.29.v20200521" mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.72" mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.5" @@ -281,6 +281,8 @@ configure(allprojects) { project -> repositories { mavenCentral() maven { url "https://repo.spring.io/libs-spring-framework-build" } + maven { url "https://repo.spring.io/snapshot" } // Reactor + maven { url "https://oss.jfrog.org/artifactory/oss-snapshot-local" } // RSocket } } configurations.all { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 8e327bcd81..f7f303f399 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -19,7 +19,6 @@ package org.springframework.messaging.tcp.reactor; import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -34,7 +33,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Scheduler; @@ -46,6 +44,7 @@ import reactor.netty.NettyOutbound; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.TcpClient; +import reactor.util.retry.Retry; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -103,14 +102,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @param codec for encoding and decoding the input/output byte streams * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ - @SuppressWarnings("deprecation") public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { Assert.notNull(host, "host is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.loopResources = LoopResources.create("tcp-client-loop"); - this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000); + this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000); this.codec = codec; this.tcpClient = TcpClient.create(this.poolResources) @@ -129,13 +127,12 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @since 5.1.3 * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ - @SuppressWarnings("deprecation") public ReactorNettyTcpClient(Function clientConfigurer, ReactorNettyCodec

codec) { Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.loopResources = LoopResources.create("tcp-client-loop"); - this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000); + this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000); this.codec = codec; this.tcpClient = clientConfigurer.apply(TcpClient @@ -199,7 +196,6 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ } @Override - @SuppressWarnings("deprecation") public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { Assert.notNull(handler, "TcpConnectionHandler is required"); Assert.notNull(strategy, "ReconnectStrategy is required"); @@ -218,8 +214,12 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ .doOnError(updateConnectMono(connectMono)) .doOnError(handler::afterConnectFailure) // report all connect failures to the handler .flatMap(Connection::onDispose) // post-connect issues - .retryWhen(reconnectFunction(strategy)) - .repeatWhen(reconnectFunction(strategy)) + .retryWhen(Retry.from(signals -> signals + .map(retrySignal -> (int) retrySignal.totalRetriesInARow()) + .flatMap(attempt -> reconnect(attempt, strategy)))) + .repeatWhen(flux -> flux + .scan(1, (count, element) -> count++) + .flatMap(attempt -> reconnect(attempt, strategy))) .subscribe(); return new MonoToListenableFutureAdapter<>(connectMono); @@ -244,12 +244,9 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ }; } - private Function, Publisher> reconnectFunction(ReconnectStrategy reconnectStrategy) { - return flux -> flux - .scan(1, (count, element) -> count++) - .flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt)) - .map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler)) - .orElse(Mono.empty())); + private Publisher reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) { + Long time = reconnectStrategy.getTimeToNextAttempt(attempt); + return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty()); } @Override @@ -342,7 +339,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private final ReactorNettyCodec

codec; - public StompMessageDecoder(ReactorNettyCodec

codec) { + StompMessageDecoder(ReactorNettyCodec

codec) { this.codec = codec; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index dfed717aaf..9665057011 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,13 +64,11 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ } @Override - @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable)); } @Override - @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable)); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index f695228117..a622d878ce 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -52,7 +52,6 @@ import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -75,7 +74,7 @@ public class DefaultRSocketRequesterBuilderTests { @BeforeEach public void setup() { this.transport = mock(ClientTransport.class); - given(this.transport.connect(anyInt())).willReturn(Mono.just(this.connection)); + given(this.transport.connect()).willReturn(Mono.just(this.connection)); } @@ -106,7 +105,7 @@ public class DefaultRSocketRequesterBuilderTests { // RSocketStrategies and RSocketConnector configurers should have been called - verify(this.transport).connect(anyInt()); + verify(this.transport).connect(); verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class)); verify(factoryConfigurer).configure(any(io.rsocket.RSocketFactory.ClientRSocketFactory.class)); assertThat(this.connectorConfigurer.connector()).isNotNull(); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 98dd9dd8ba..3e2e93adbe 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -82,7 +82,7 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { LoopResources resources = resourceFactory.getLoopResources(); Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); - return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources)); + return HttpClient.create(provider).runOn(resources); } /** diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java index 87af75f09d..556427435f 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java @@ -47,8 +47,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean @Nullable private Consumer globalResourcesConsumer; - @SuppressWarnings("deprecation") - private Supplier connectionProviderSupplier = () -> ConnectionProvider.fixed("webflux", 500); + private Supplier connectionProviderSupplier = () -> ConnectionProvider.create("webflux", 500); @Nullable private ConnectionProvider connectionProvider; diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpServer.java index 4df1070f61..d9a12fc4c3 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpServer.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpServer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.web.testfixture.http.server.reactive.bootstrap; +import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; import reactor.netty.DisposableServer; @@ -38,8 +39,7 @@ public class ReactorHttpServer extends AbstractHttpServer { protected void initServer() { this.reactorHandler = createHttpHandlerAdapter(); this.reactorServer = reactor.netty.http.server.HttpServer.create() - .tcpConfiguration(server -> server.host(getHost())) - .port(getPort()); + .host(getHost()).port(getPort()); } private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @@ -49,7 +49,7 @@ public class ReactorHttpServer extends AbstractHttpServer { @Override protected void startInternal() { DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); - setPort(server.address().getPort()); + setPort(((InetSocketAddress) server.address()).getPort()); this.serverRef.set(server); } diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpsServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpsServer.java index 004e27832a..ad6c90a689 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpsServer.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/ReactorHttpsServer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.web.testfixture.http.server.reactive.bootstrap; +import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicReference; import io.netty.handler.ssl.SslContextBuilder; @@ -57,7 +58,7 @@ public class ReactorHttpsServer extends AbstractHttpServer { @Override protected void startInternal() { DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block(); - setPort(server.address().getPort()); + setPort(((InetSocketAddress) server.address()).getPort()); this.serverRef.set(server); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java index 0fa80f70dc..f7ba51ed10 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @@ -228,8 +228,8 @@ final class DefaultWebClientBuilder implements WebClient.Builder { return this; } - @SuppressWarnings("deprecation") @Override + @SuppressWarnings("deprecation") public WebClient.Builder exchangeStrategies(Consumer configurer) { if (this.strategiesConfigurers == null) { this.strategiesConfigurers = new ArrayList<>(4); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java index e794c6f6a5..3d96ff0b57 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java @@ -70,8 +70,8 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport { } - @SuppressWarnings("deprecation") @Override + @SuppressWarnings("deprecation") public Mono transform(ServerWebExchange exchange, Resource inputResource, ResourceTransformerChain transformerChain) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java index 5b9453297f..392af3e894 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -17,15 +17,18 @@ package org.springframework.web.reactive.socket.client; import java.net.URI; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.WebsocketClientSpec; import reactor.netty.http.websocket.WebsocketInbound; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -47,9 +50,13 @@ public class ReactorNettyWebSocketClient implements WebSocketClient { private final HttpClient httpClient; - private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE; + private final Supplier specBuilderSupplier; - private boolean handlePing; + @Nullable + private Integer maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE; + + @Nullable + private Boolean handlePing; /** @@ -60,12 +67,25 @@ public class ReactorNettyWebSocketClient implements WebSocketClient { } /** - * Constructor that accepts an existing {@link HttpClient} builder. + * Constructor that accepts an existing {@link HttpClient}. * @since 5.1 */ public ReactorNettyWebSocketClient(HttpClient httpClient) { + this(httpClient, WebsocketClientSpec.builder()); + } + + /** + * Constructor with an {@link HttpClient} and a supplier for the + * {@link WebsocketClientSpec.Builder} to use. + * @since 5.3 + */ + public ReactorNettyWebSocketClient( + HttpClient httpClient, Supplier builderSupplier) { + Assert.notNull(httpClient, "HttpClient is required"); + Assert.notNull(builderSupplier, "WebsocketClientSpec.Builder is required"); this.httpClient = httpClient; + this.specBuilderSupplier = builderSupplier; } @@ -76,6 +96,31 @@ public class ReactorNettyWebSocketClient implements WebSocketClient { return this.httpClient; } + /** + * Build an instance of {@code WebsocketClientSpec} that reflects the current + * configuration. This can be used to check the configured parameters except + * for sub-protocols which depend on the {@link WebSocketHandler} that is used + * for a given upgrade. + * @since 5.3 + */ + public WebsocketClientSpec getWebsocketClientSpec() { + return buildSpec(null); + } + + private WebsocketClientSpec buildSpec(@Nullable String protocols) { + WebsocketClientSpec.Builder builder = this.specBuilderSupplier.get(); + if (StringUtils.hasText(protocols)) { + builder.protocols(protocols); + } + if (this.maxFramePayloadLength != null) { + builder.maxFramePayloadLength(this.maxFramePayloadLength); + } + if (this.handlePing != null) { + builder.handlePing(this.handlePing); + } + return builder.build(); + } + /** * Configure the maximum allowable frame payload length. Setting this value * to your application's requirement may reduce denial of service attacks @@ -96,7 +141,7 @@ public class ReactorNettyWebSocketClient implements WebSocketClient { * @since 5.2 */ public int getMaxFramePayloadLength() { - return this.maxFramePayloadLength; + return getWebsocketClientSpec().maxFramePayloadLength(); } /** @@ -119,7 +164,7 @@ public class ReactorNettyWebSocketClient implements WebSocketClient { * @since 5.2.4 */ public boolean getHandlePing() { - return this.handlePing; + return getWebsocketClientSpec().handlePing(); } @Override @@ -128,12 +173,11 @@ public class ReactorNettyWebSocketClient implements WebSocketClient { } @Override - @SuppressWarnings("deprecation") public Mono execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) { String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()); return getHttpClient() .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders)) - .websocket(protocols, getMaxFramePayloadLength(), this.handlePing) + .websocket(buildSpec(protocols)) .uri(url.toString()) .handle((inbound, outbound) -> { HttpHeaders responseHeaders = toHttpHeaders(inbound); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java index 4ac711ca79..a50fe279ec 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java @@ -68,7 +68,6 @@ public interface RequestUpgradeStrategy { * WebSocket session handling. * @since 5.1 */ - @SuppressWarnings("deprecation") default Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler, @Nullable String subProtocol, Supplier handshakeInfoFactory) { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java index 5dd289cba6..450198d08d 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientDataBufferAllocatingTests.java @@ -87,8 +87,8 @@ class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTes if (super.bufferFactory instanceof NettyDataBufferFactory) { ByteBufAllocator allocator = ((NettyDataBufferFactory) super.bufferFactory).getByteBufAllocator(); - return new ReactorClientHttpConnector(this.factory, httpClient -> - httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator))); + return new ReactorClientHttpConnector(this.factory, + client -> client.option(ChannelOption.ALLOCATOR, allocator)); } else { return new ReactorClientHttpConnector();