From dd22b8fd3971c7b0cdc1976d45c81ba12d5da604 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 27 Aug 2019 21:42:07 +0300 Subject: [PATCH] Fix race condition with onCompletion/onError Closes gh-23096 --- .../AbstractListenerReadPublisher.java | 35 +++++++++++-------- .../AbstractListenerWriteFlushProcessor.java | 32 ++++++++++------- .../AbstractListenerWriteProcessor.java | 10 +++++- 3 files changed, 50 insertions(+), 27 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 7e849cf1a7..401e6c867a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); publisher.changeState(SUBSCRIBING, NO_DEMAND); - // Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND - String logPrefix = publisher.getLogPrefix(); - if (publisher.completionBeforeDemand) { - rsReadLogger.trace(logPrefix + "Completed before demand"); - publisher.state.get().onAllDataRead(publisher); - } - Throwable ex = publisher.errorBeforeDemand; - if (ex != null) { - if (rsReadLogger.isTraceEnabled()) { - rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex); - } - publisher.state.get().onError(publisher, ex); - } + handleCompletionOrErrorBeforeDemand(publisher); } else { throw new IllegalStateException("Failed to transition to SUBSCRIBING, " + @@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override void onAllDataRead(AbstractListenerReadPublisher publisher) { publisher.completionBeforeDemand = true; + handleCompletionOrErrorBeforeDemand(publisher); } @Override void onError(AbstractListenerReadPublisher publisher, Throwable ex) { publisher.errorBeforeDemand = ex; + handleCompletionOrErrorBeforeDemand(publisher); + } + + private void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher publisher) { + if (publisher.state.get().equals(NO_DEMAND)) { + if (publisher.completionBeforeDemand) { + rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand"); + publisher.state.get().onAllDataRead(publisher); + } + Throwable ex = publisher.errorBeforeDemand; + if (ex != null) { + if (rsReadLogger.isTraceEnabled()) { + String prefix = publisher.getLogPrefix(); + rsReadLogger.trace(prefix + "Completed with error before demand: " + ex); + } + publisher.state.get().onError(publisher, ex); + } + } } }, diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 1c6475f797..329cd3dbca 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -282,17 +282,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } if (processor.changeState(this, REQUESTED)) { if (processor.subscriberCompleted) { - if (processor.isFlushPending()) { - // Ensure the final flush - processor.changeState(REQUESTED, FLUSHING); - processor.flushIfPossible(); - } - else if (processor.changeState(REQUESTED, COMPLETED)) { - processor.resultPublisher.publishComplete(); - } - else { - processor.state.get().onComplete(processor); - } + handleSubscriberCompleted(processor); } else { Assert.state(processor.subscription != null, "No subscription"); @@ -303,6 +293,24 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public void onComplete(AbstractListenerWriteFlushProcessor processor) { processor.subscriberCompleted = true; + // A competing write might have completed very quickly + if (processor.state.get().equals(State.REQUESTED)) { + handleSubscriberCompleted(processor); + } + } + + private void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor processor) { + if (processor.isFlushPending()) { + // Ensure the final flush + processor.changeState(State.REQUESTED, State.FLUSHING); + processor.flushIfPossible(); + } + else if (processor.changeState(State.REQUESTED, State.COMPLETED)) { + processor.resultPublisher.publishComplete(); + } + else { + processor.state.get().onComplete(processor); + } } }, diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 83d73b8555..509076d02b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -376,6 +376,10 @@ public abstract class AbstractListenerWriteProcessor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { processor.subscriberCompleted = true; + // A competing write might have completed very quickly + if (processor.state.get().equals(State.REQUESTED)) { + processor.changeStateToComplete(State.REQUESTED); + } } }, @@ -383,6 +387,10 @@ public abstract class AbstractListenerWriteProcessor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { processor.subscriberCompleted = true; + // A competing write might have completed very quickly + if (processor.state.get().equals(State.REQUESTED)) { + processor.changeStateToComplete(State.REQUESTED); + } } },