From 125b8e741805cc678c427abb248859bd084f0989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Deleuze?= Date: Wed, 6 Sep 2023 12:56:01 +0200 Subject: [PATCH] Make ReactorResourceFactory lifecycle-aware With this commit, ReactorResourceFactory now implements Lifecycle which allows supporting JVM Checkpoint Restore in Spring Boot with Reactor Netty server, and helps to support Reactor Netty client as well. Closes gh-31178 --- .../reactive/ReactorResourceFactory.java | 121 ++++++++++++------ .../reactive/ReactorResourceFactoryTests.java | 66 ++++++++++ 2 files changed, 145 insertions(+), 42 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java index 46d65551b1..e5737b2535 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import reactor.netty.resources.LoopResources; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.Lifecycle; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -34,14 +35,16 @@ import org.springframework.util.Assert; * event loop threads, and {@link ConnectionProvider} for the connection pool, * within the lifecycle of a Spring {@code ApplicationContext}. * - *

This factory implements {@link InitializingBean} and {@link DisposableBean} - * and is expected typically to be declared as a Spring-managed bean. + *

This factory implements {@link InitializingBean}, {@link DisposableBean} + * and {@link Lifecycle} and is expected typically to be declared as a + * Spring-managed bean. * * @author Rossen Stoyanchev * @author Brian Clozel + * @author Sebastien Deleuze * @since 5.1 */ -public class ReactorResourceFactory implements InitializingBean, DisposableBean { +public class ReactorResourceFactory implements InitializingBean, DisposableBean, Lifecycle { private boolean useGlobalResources = true; @@ -66,6 +69,10 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean private Duration shutdownTimeout = Duration.ofSeconds(LoopResources.DEFAULT_SHUTDOWN_TIMEOUT); + private volatile boolean running; + + private final Object lifecycleMonitor = new Object(); + /** * Whether to use global Reactor Netty resources via {@link HttpResources}. @@ -196,54 +203,84 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean @Override public void afterPropertiesSet() { - if (this.useGlobalResources) { - Assert.isTrue(this.loopResources == null && this.connectionProvider == null, - "'useGlobalResources' is mutually exclusive with explicitly configured resources"); - HttpResources httpResources = HttpResources.get(); - if (this.globalResourcesConsumer != null) { - this.globalResourcesConsumer.accept(httpResources); - } - this.connectionProvider = httpResources; - this.loopResources = httpResources; - } - else { - if (this.loopResources == null) { - this.manageLoopResources = true; - this.loopResources = this.loopResourcesSupplier.get(); - } - if (this.connectionProvider == null) { - this.manageConnectionProvider = true; - this.connectionProvider = this.connectionProviderSupplier.get(); - } - } + start(); } @Override public void destroy() { - if (this.useGlobalResources) { - HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); - } - else { - try { - ConnectionProvider provider = this.connectionProvider; - if (provider != null && this.manageConnectionProvider) { - provider.disposeLater().block(); + stop(); + } + + @Override + public void start() { + synchronized (this.lifecycleMonitor) { + if (!isRunning()) { + if (this.useGlobalResources) { + Assert.isTrue(this.loopResources == null && this.connectionProvider == null, + "'useGlobalResources' is mutually exclusive with explicitly configured resources"); + HttpResources httpResources = HttpResources.get(); + if (this.globalResourcesConsumer != null) { + this.globalResourcesConsumer.accept(httpResources); + } + this.connectionProvider = httpResources; + this.loopResources = httpResources; } + else { + if (this.loopResources == null) { + this.manageLoopResources = true; + this.loopResources = this.loopResourcesSupplier.get(); + } + if (this.connectionProvider == null) { + this.manageConnectionProvider = true; + this.connectionProvider = this.connectionProviderSupplier.get(); + } + } + this.running = true; } - catch (Throwable ex) { - // ignore - } + } - try { - LoopResources resources = this.loopResources; - if (resources != null && this.manageLoopResources) { - resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); + } + + @Override + public void stop() { + synchronized (this.lifecycleMonitor) { + if (isRunning()) { + if (this.useGlobalResources) { + HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); + this.connectionProvider = null; + this.loopResources = null; } - } - catch (Throwable ex) { - // ignore + else { + try { + ConnectionProvider provider = this.connectionProvider; + if (provider != null && this.manageConnectionProvider) { + this.connectionProvider = null; + provider.disposeLater().block(); + } + } + catch (Throwable ex) { + // ignore + } + + try { + LoopResources resources = this.loopResources; + if (resources != null && this.manageLoopResources) { + this.loopResources = null; + resources.disposeLater(this.shutdownQuietPeriod, this.shutdownTimeout).block(); + } + } + catch (Throwable ex) { + // ignore + } + } + this.running = false; } } } + @Override + public boolean isRunning() { + return this.running; + } + } diff --git a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java b/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java index be0dc5096b..fe97eea116 100644 --- a/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java +++ b/spring-web/src/test/java/org/springframework/http/client/reactive/ReactorResourceFactoryTests.java @@ -157,4 +157,70 @@ public class ReactorResourceFactoryTests { verifyNoMoreInteractions(this.connectionProvider, this.loopResources); } + @Test + void stopThenStartWithGlobalResources() { + + this.resourceFactory.setUseGlobalResources(true); + this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.stop(); + this.resourceFactory.start(); + + HttpResources globalResources = HttpResources.get(); + assertThat(this.resourceFactory.getConnectionProvider()).isSameAs(globalResources); + assertThat(this.resourceFactory.getLoopResources()).isSameAs(globalResources); + assertThat(globalResources.isDisposed()).isFalse(); + + this.resourceFactory.destroy(); + + assertThat(globalResources.isDisposed()).isTrue(); + } + + @Test + void stopThenStartWithLocalResources() { + + this.resourceFactory.setUseGlobalResources(false); + this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.stop(); + this.resourceFactory.start(); + + ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider(); + LoopResources loopResources = this.resourceFactory.getLoopResources(); + + assertThat(connectionProvider).isNotSameAs(HttpResources.get()); + assertThat(loopResources).isNotSameAs(HttpResources.get()); + + // The below does not work since ConnectionPoolProvider simply checks if pool is empty. + // assertFalse(connectionProvider.isDisposed()); + assertThat(loopResources.isDisposed()).isFalse(); + + this.resourceFactory.destroy(); + + assertThat(connectionProvider.isDisposed()).isTrue(); + assertThat(loopResources.isDisposed()).isTrue(); + } + + @Test + void stopThenStartWithExternalResources() { + + this.resourceFactory.setUseGlobalResources(false); + this.resourceFactory.setConnectionProvider(this.connectionProvider); + this.resourceFactory.setLoopResources(this.loopResources); + this.resourceFactory.afterPropertiesSet(); + this.resourceFactory.stop(); + this.resourceFactory.start(); + + ConnectionProvider connectionProvider = this.resourceFactory.getConnectionProvider(); + LoopResources loopResources = this.resourceFactory.getLoopResources(); + + assertThat(connectionProvider).isSameAs(this.connectionProvider); + assertThat(loopResources).isSameAs(this.loopResources); + + verifyNoMoreInteractions(this.connectionProvider, this.loopResources); + + this.resourceFactory.destroy(); + + // Not managed (destroy has no impact)... + verifyNoMoreInteractions(this.connectionProvider, this.loopResources); + } + }