Browse Source

Switch to Reactor 2022.0.0 snapshots

See gh-28766
pull/28771/head
rstoyanchev 2 years ago
parent
commit
0938909cd9
  1. 4
      build.gradle
  2. 12
      spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java
  3. 33
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  4. 11
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  5. 11
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java
  6. 10
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  7. 12
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

4
build.gradle

@ -29,7 +29,7 @@ configure(allprojects) { project ->
imports { imports {
mavenBom "com.fasterxml.jackson:jackson-bom:2.13.3" mavenBom "com.fasterxml.jackson:jackson-bom:2.13.3"
mavenBom "io.netty:netty-bom:4.1.77.Final" mavenBom "io.netty:netty-bom:4.1.77.Final"
mavenBom "io.projectreactor:reactor-bom:2022.0.0-M2" mavenBom "io.projectreactor:reactor-bom:2022.0.0-SNAPSHOT"
mavenBom "io.r2dbc:r2dbc-bom:Borca-SR1" mavenBom "io.r2dbc:r2dbc-bom:Borca-SR1"
mavenBom "io.rsocket:rsocket-bom:1.1.2" mavenBom "io.rsocket:rsocket-bom:1.1.2"
mavenBom "org.eclipse.jetty:jetty-bom:11.0.9" mavenBom "org.eclipse.jetty:jetty-bom:11.0.9"
@ -252,7 +252,7 @@ configure(allprojects) { project ->
repositories { repositories {
mavenCentral() mavenCentral()
maven { url "https://repo.spring.io/libs-spring-framework-build" } maven { url "https://repo.spring.io/libs-spring-framework-build" }
maven { url "https://repo.spring.io/milestone"} maven { url "https://repo.spring.io/snapshot"} // Reactor
} }
} }
configurations.all { configurations.all {

12
spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2021 the original author or authors. * Copyright 2002-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -240,12 +241,13 @@ public class MultipartHttpMessageWriterTests extends AbstractLeakCheckingTests {
@Test // SPR-16402 @Test // SPR-16402
public void singleSubscriberWithStrings() { public void singleSubscriberWithStrings() {
@SuppressWarnings("deprecation") AtomicBoolean subscribed = new AtomicBoolean();
reactor.core.publisher.UnicastProcessor<String> processor = reactor.core.publisher.UnicastProcessor.create(); Flux<String> publisher = Flux.just("foo", "bar", "baz")
Flux.just("foo", "bar", "baz").subscribe(processor); .doOnSubscribe(subscription ->
assertThat(subscribed.compareAndSet(false, true)).isTrue());
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder(); MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
bodyBuilder.asyncPart("name", processor, String.class); bodyBuilder.asyncPart("name", publisher, String.class);
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build()); Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());

33
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -66,10 +66,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Nullable @Nullable
private final Sinks.Empty<Void> handlerCompletionSink; private final Sinks.Empty<Void> handlerCompletionSink;
@Nullable
@SuppressWarnings("deprecation")
private final reactor.core.publisher.MonoProcessor<Void> handlerCompletionMono;
private final WebSocketReceivePublisher receivePublisher; private final WebSocketReceivePublisher receivePublisher;
@Nullable @Nullable
@ -90,7 +86,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
public AbstractListenerWebSocketSession( public AbstractListenerWebSocketSession(
T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) { T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
this(delegate, id, info, bufferFactory, (Sinks.Empty<Void>) null); this(delegate, id, info, bufferFactory, null);
} }
/** /**
@ -105,25 +101,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
super(delegate, id, info, bufferFactory); super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher(); this.receivePublisher = new WebSocketReceivePublisher();
this.handlerCompletionSink = handlerCompletionSink; this.handlerCompletionSink = handlerCompletionSink;
this.handlerCompletionMono = null;
}
/**
* Alternative constructor with completion MonoProcessor to use to signal
* when the handling of the session is complete, with success or error.
* <p>Primarily for use with {@code WebSocketClient} to be able to
* communicate the end of handling.
* @deprecated as of 5.3 in favor of
* {@link #AbstractListenerWebSocketSession(Object, String, HandshakeInfo, DataBufferFactory, Sinks.Empty)}
*/
@Deprecated
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
DataBufferFactory bufferFactory, @Nullable reactor.core.publisher.MonoProcessor<Void> handlerCompletion) {
super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher();
this.handlerCompletionMono = handlerCompletion;
this.handlerCompletionSink = null;
} }
@ -244,9 +221,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
// Ignore result: can't overflow, ok if not first or no one listens // Ignore result: can't overflow, ok if not first or no one listens
this.handlerCompletionSink.tryEmitError(ex); this.handlerCompletionSink.tryEmitError(ex);
} }
if (this.handlerCompletionMono != null) {
this.handlerCompletionMono.onError(ex);
}
close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage())); close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage()));
} }
@ -256,9 +230,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
// Ignore result: can't overflow, ok if not first or no one listens // Ignore result: can't overflow, ok if not first or no one listens
this.handlerCompletionSink.tryEmitEmpty(); this.handlerCompletionSink.tryEmitEmpty();
} }
if (this.handlerCompletionMono != null) {
this.handlerCompletionMono.onComplete();
}
close(); close();
} }

11
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -50,7 +50,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
this(session, info, factory, (Sinks.Empty<Void>) null); this(session, info, factory, null);
} }
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@ -61,13 +61,6 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
// suspendReceiving(); // suspendReceiving();
} }
@Deprecated
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
}
@Override @Override
protected boolean canSuspendReceiving() { protected boolean canSuspendReceiving() {

11
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -46,7 +46,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> { public class StandardWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
this(session, info, factory, (Sinks.Empty<Void>) null); this(session, info, factory, null);
} }
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@ -55,13 +55,6 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S
super(session, session.getId(), info, factory, completionSink); super(session, session.getId(), info, factory, completionSink);
} }
@Deprecated
public StandardWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(session, session.getId(), info, factory, completionMono);
}
@Override @Override
protected boolean canSuspendReceiving() { protected boolean canSuspendReceiving() {

10
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -53,14 +53,6 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
suspendReceiving(); suspendReceiving();
} }
@Deprecated
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(session, info, factory, completionMono);
suspendReceiving();
}
@Override @Override
protected boolean canSuspendReceiving() { protected boolean canSuspendReceiving() {

12
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -48,7 +48,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> { public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) { public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, DataBufferFactory factory) {
this(channel, info, factory, (Sinks.Empty<Void>) null); this(channel, info, factory, null);
} }
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info, public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info,
@ -58,14 +58,6 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
suspendReceiving(); suspendReceiving();
} }
@Deprecated
public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo info,
DataBufferFactory factory, @Nullable reactor.core.publisher.MonoProcessor<Void> completionMono) {
super(channel, ObjectUtils.getIdentityHexString(channel), info, factory, completionMono);
suspendReceiving();
}
@Override @Override
protected boolean canSuspendReceiving() { protected boolean canSuspendReceiving() {

Loading…
Cancel
Save