diff --git a/build.gradle b/build.gradle index ade0090147..f7ef16f223 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ configure(allprojects) { project -> imports { mavenBom "com.fasterxml.jackson:jackson-bom:2.13.3" mavenBom "io.netty:netty-bom:4.1.77.Final" - mavenBom "io.projectreactor:reactor-bom:2022.0.0-M2" + mavenBom "io.projectreactor:reactor-bom:2022.0.0-SNAPSHOT" mavenBom "io.r2dbc:r2dbc-bom:Borca-SR1" mavenBom "io.rsocket:rsocket-bom:1.1.2" mavenBom "org.eclipse.jetty:jetty-bom:11.0.9" @@ -252,7 +252,7 @@ configure(allprojects) { project -> repositories { mavenCentral() maven { url "https://repo.spring.io/libs-spring-framework-build" } - maven { url "https://repo.spring.io/milestone"} + maven { url "https://repo.spring.io/snapshot"} // Reactor } } configurations.all { diff --git a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java index a4b292f86a..1db06c829e 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; @@ -240,12 +241,13 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests { @Test // SPR-16402 public void singleSubscriberWithStrings() { - @SuppressWarnings("deprecation") - reactor.core.publisher.UnicastProcessor processor = reactor.core.publisher.UnicastProcessor.create(); - Flux.just("foo", "bar", "baz").subscribe(processor); + AtomicBoolean subscribed = new AtomicBoolean(); + Flux publisher = Flux.just("foo", "bar", "baz") + .doOnSubscribe(subscription -> + assertThat(subscribed.compareAndSet(false, true)).isTrue()); MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); - bodyBuilder.asyncPart("name", processor, String.class); + bodyBuilder.asyncPart("name", publisher, String.class); Mono>> result = Mono.just(bodyBuilder.build()); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index efab572402..486bda528f 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,10 +66,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Nullable private final Sinks.Empty handlerCompletionSink; - @Nullable - @SuppressWarnings("deprecation") - private final reactor.core.publisher.MonoProcessor handlerCompletionMono; - private final WebSocketReceivePublisher receivePublisher; @Nullable @@ -90,7 +86,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc public AbstractListenerWebSocketSession( T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) { - this(delegate, id, info, bufferFactory, (Sinks.Empty) null); + this(delegate, id, info, bufferFactory, null); } /** @@ -105,25 +101,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc super(delegate, id, info, bufferFactory); this.receivePublisher = new WebSocketReceivePublisher(); this.handlerCompletionSink = handlerCompletionSink; - this.handlerCompletionMono = null; - } - - /** - * Alternative constructor with completion MonoProcessor to use to signal - * when the handling of the session is complete, with success or error. - *

Primarily for use with {@code WebSocketClient} to be able to - * communicate the end of handling. - * @deprecated as of 5.3 in favor of - * {@link #AbstractListenerWebSocketSession(Object, String, HandshakeInfo, DataBufferFactory, Sinks.Empty)} - */ - @Deprecated - public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, - DataBufferFactory bufferFactory, @Nullable reactor.core.publisher.MonoProcessor handlerCompletion) { - - super(delegate, id, info, bufferFactory); - this.receivePublisher = new WebSocketReceivePublisher(); - this.handlerCompletionMono = handlerCompletion; - this.handlerCompletionSink = null; } @@ -244,9 +221,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc // Ignore result: can't overflow, ok if not first or no one listens this.handlerCompletionSink.tryEmitError(ex); } - if (this.handlerCompletionMono != null) { - this.handlerCompletionMono.onError(ex); - } close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage())); } @@ -256,9 +230,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc // Ignore result: can't overflow, ok if not first or no one listens this.handlerCompletionSink.tryEmitEmpty(); } - if (this.handlerCompletionMono != null) { - this.handlerCompletionMono.onComplete(); - } close(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index a8a9240a61..5baa7971a0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession) null); + this(session, info, factory, null); } public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, @@ -61,13 +61,6 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession completionMono) { - - super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono); - } - @Override protected boolean canSuspendReceiving() { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java index b08e4aca2c..c9eddd8b6c 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class StandardWebSocketSession extends AbstractListenerWebSocketSession { public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { - this(session, info, factory, (Sinks.Empty) null); + this(session, info, factory, null); } public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, @@ -55,13 +55,6 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession completionMono) { - - super(session, session.getId(), info, factory, completionMono); - } - @Override protected boolean canSuspendReceiving() { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index f23bcd42d2..057fef4e5c 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,14 +53,6 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { suspendReceiving(); } - @Deprecated - public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, - reactor.core.publisher.MonoProcessor completionMono) { - - super(session, info, factory, completionMono); - suspendReceiving(); - } - @Override protected boolean canSuspendReceiving() { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 3d4e99ab85..dfe12a6213 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) { - this(channel, info, factory, (Sinks.Empty) null); + this(channel, info, factory, null); } public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, @@ -58,14 +58,6 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession completionMono) { - - super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono); - suspendReceiving(); - } - @Override protected boolean canSuspendReceiving() {