From 28a95e89f35600199ee1254dee9e8ed53f958bcc Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 11 Feb 2020 12:51:35 +0000 Subject: [PATCH] Upgrade to Dysprosium SR5 snapshots See gh-24355 --- build.gradle | 3 +- .../core/codec/StringDecoder.java | 70 +++++-------------- .../core/codec/StringDecoderTests.java | 9 ++- .../tcp/reactor/ReactorNettyTcpClient.java | 8 ++- .../reactive/ReactorResourceFactory.java | 1 + .../ErrorHandlerIntegrationTests.java | 8 +-- .../ReactorNettyRequestUpgradeStrategy.java | 1 + 7 files changed, 35 insertions(+), 65 deletions(-) diff --git a/build.gradle b/build.gradle index bdc9543026..201220ee29 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,7 @@ configure(allprojects) { project -> imports { mavenBom "com.fasterxml.jackson:jackson-bom:2.10.2" mavenBom "io.netty:netty-bom:4.1.45.Final" - mavenBom "io.projectreactor:reactor-bom:Dysprosium-SR4" + mavenBom "io.projectreactor:reactor-bom:Dysprosium-BUILD-SNAPSHOT" mavenBom "io.rsocket:rsocket-bom:1.0.0-RC6" mavenBom "org.eclipse.jetty:jetty-bom:9.4.26.v20200117" mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.61" @@ -289,6 +289,7 @@ configure(allprojects) { project -> repositories { mavenCentral() maven { url "https://repo.spring.io/libs-spring-framework-build" } + maven { url "https://repo.spring.io/snapshot" } // Reactor Dysprosium snapshots } } configurations.all { diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index abbe66a3b3..e36345de02 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -32,7 +32,6 @@ import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferLimitException; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferWrapper; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -96,42 +95,25 @@ public final class StringDecoder extends AbstractDataBufferDecoder { Flux inputFlux = Flux.defer(() -> { DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); - if (getMaxInMemorySize() != -1) { - - // Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer - // containing multiple lines, the limit is checked and raised immediately without accumulating - // subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard. - // When reactor-core#1925 is resolved, we could replace bufferUntil with: - - // .windowUntil(buffer -> buffer instanceof EndFrameBuffer) - // .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add)) - LimitedDataBufferList limiter = new LimitedDataBufferList(getMaxInMemorySize()); + Flux buffers = Flux.from(input) + .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher)); - return Flux.from(input) - .concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher, limiter)) - .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) - .map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + Flux> delimitedBuffers; + if (getMaxInMemorySize() != -1) { + delimitedBuffers = buffers + .windowUntil(buffer -> buffer instanceof EndFrameBuffer) + .concatMap(window -> window.collect( + () -> new LimitedDataBufferList(getMaxInMemorySize()), + LimitedDataBufferList::add)); } else { - - // When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not - // be released if cancel is signalled before they are turned into String lines - // (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited). - // When reactor-core#1925 is resolved, the workaround can be removed and the entire - // else clause possibly dropped. - - ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache(); - - return Flux.from(input) - .concatMapIterable(buffer -> cache.addAll(endFrameAfterDelimiter(buffer, matcher, null))) - .doOnNext(cache) - .doOnCancel(cache) - .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) - .map(buffers -> joinAndStrip(buffers, this.stripDelimiter)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + delimitedBuffers = buffers.bufferUntil(buffer -> buffer instanceof EndFrameBuffer); } + + return delimitedBuffers + .map(list -> joinAndStrip(list, this.stripDelimiter)) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); }); return super.decode(inputFlux, elementType, mimeType, hints); @@ -176,14 +158,11 @@ public final class StringDecoder extends AbstractDataBufferDecoder { * * @param dataBuffer the buffer to find delimiters in * @param matcher used to find the first delimiters - * @param limiter to enforce maxInMemorySize with * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) * results in memory leaks due to pre-fetching. */ - private static List endFrameAfterDelimiter( - DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, @Nullable LimitedDataBufferList limiter) { - + private static List endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) { List result = new ArrayList<>(); try { do { @@ -195,27 +174,14 @@ public final class StringDecoder extends AbstractDataBufferDecoder { result.add(slice); result.add(new EndFrameBuffer(matcher.delimiter())); dataBuffer.readPosition(endIdx + 1); - if (limiter != null) { - limiter.add(slice); // enforce the limit - limiter.clear(); - } } else { result.add(DataBufferUtils.retain(dataBuffer)); - if (limiter != null) { - limiter.add(dataBuffer); - } break; } } while (dataBuffer.readableByteCount() > 0); } - catch (DataBufferLimitException ex) { - if (limiter != null) { - limiter.releaseAndClear(); - } - throw ex; - } finally { DataBufferUtils.release(dataBuffer); } @@ -230,9 +196,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { * @param stripDelimiter whether to strip the delimiter * @return the joined buffer */ - private static DataBuffer joinAndStrip(List dataBuffers, - boolean stripDelimiter) { - + private static DataBuffer joinAndStrip(List dataBuffers, boolean stripDelimiter) { Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); byte[] matchingDelimiter = null; @@ -241,7 +205,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { DataBuffer lastBuffer = dataBuffers.get(lastIdx); if (lastBuffer instanceof EndFrameBuffer) { matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); - dataBuffers.remove(lastIdx); + dataBuffers = dataBuffers.subList(0, lastIdx); } DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index fad7a1ec10..7057c52865 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -75,7 +75,14 @@ class StringDecoderTests extends AbstractDecoderTests { String s = String.format("%s\n%s\n%s", u, e, o); Flux input = toDataBuffers(s, 1, UTF_8); - testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty + // see https://github.com/reactor/reactor-core/issues/2041 + + testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + testDecodeCancel(input, TYPE, null, null); + testDecodeEmpty(TYPE, null, null); + + // testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); } @Test diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 078ec12c1e..d7489158fa 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,13 +103,14 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @param codec for encoding and decoding the input/output byte streams * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ + @SuppressWarnings("deprecation") public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { Assert.notNull(host, "host is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.loopResources = LoopResources.create("tcp-client-loop"); - this.poolResources = ConnectionProvider.elastic("tcp-client-pool"); + this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000); this.codec = codec; this.tcpClient = TcpClient.create(this.poolResources) @@ -128,12 +129,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @since 5.1.3 * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ + @SuppressWarnings("deprecation") public ReactorNettyTcpClient(Function clientConfigurer, ReactorNettyCodec

codec) { Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); this.loopResources = LoopResources.create("tcp-client-loop"); - this.poolResources = ConnectionProvider.elastic("tcp-client-pool"); + this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000); this.codec = codec; this.tcpClient = clientConfigurer.apply(TcpClient diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java index 8e696504b4..46d1f57c60 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java @@ -46,6 +46,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean @Nullable private Consumer globalResourcesConsumer; + @SuppressWarnings("deprecation") private Supplier connectionProviderSupplier = () -> ConnectionProvider.fixed("webflux", 500); private Supplier loopResourcesSupplier = () -> LoopResources.create("webflux-http"); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java index c7a925b472..44bbd00a29 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ package org.springframework.http.server.reactive; import java.net.URI; -import org.junit.jupiter.api.Assumptions; import reactor.core.publisher.Mono; import org.springframework.http.HttpStatus; @@ -28,7 +27,6 @@ import org.springframework.web.client.ResponseErrorHandler; import org.springframework.web.client.RestTemplate; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; -import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import static org.assertj.core.api.Assertions.assertThat; @@ -74,10 +72,6 @@ class ErrorHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { @ParameterizedHttpServerTest // SPR-15560 void emptyPathSegments(HttpServer httpServer) throws Exception { - - /* Temporarily necessary for https://github.com/reactor/reactor-netty/issues/948 */ - Assumptions.assumeFalse(httpServer instanceof ReactorHttpServer); - startServer(httpServer); RestTemplate restTemplate = new RestTemplate(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index afd8315f6c..f7d6575b10 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -95,6 +95,7 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg @Override + @SuppressWarnings("deprecation") public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol, Supplier handshakeInfoFactory) {