From b81487521135e0bfd12fa3ea1f98bf32e344313f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 21 Nov 2017 12:13:41 +0200 Subject: [PATCH] Fix race condition in transition from UNSUBSCRIBED->COMPLETED - Ensure completion signal (normal/exception) will be delivered to the subscriber when transition from UNSUBSCRIBED->COMPLETED - According to the specification "Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber" so ensure onComplete/onError signals will be called AFTER onSubscribe signal. Issue: SPR-16207 --- .../AbstractListenerReadPublisher.java | 46 ++++++++++++++++++- .../server/reactive/WriteResultPublisher.java | 40 +++++++++------- 2 files changed, 68 insertions(+), 18 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index bcdc2538e5..d65e94fbfa 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -52,6 +52,11 @@ public abstract class AbstractListenerReadPublisher implements Publisher { private volatile long demand; + private volatile boolean publisherCompleted; + + @Nullable + private volatile Throwable publisherError; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater DEMAND_FIELD_UPDATER = AtomicLongFieldUpdater.newUpdater(AbstractListenerReadPublisher.class, "demand"); @@ -208,15 +213,54 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void subscribe(AbstractListenerReadPublisher publisher, Subscriber subscriber) { Assert.notNull(publisher, "Publisher must not be null"); Assert.notNull(subscriber, "Subscriber must not be null"); - if (publisher.changeState(this, NO_DEMAND)) { + if (publisher.changeState(this, SUBSCRIBING)) { Subscription subscription = new ReadSubscription(publisher); publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); + publisher.changeState(SUBSCRIBING, NO_DEMAND); + if (publisher.publisherCompleted) { + publisher.onAllDataRead(); + } + Throwable publisherError = publisher.publisherError; + if (publisherError != null) { + publisher.onError(publisherError); + } } else { throw new IllegalStateException(toString()); } } + + @Override + void onAllDataRead(AbstractListenerReadPublisher publisher) { + publisher.publisherCompleted = true; + } + + @Override + void onError(AbstractListenerReadPublisher publisher, Throwable t) { + publisher.publisherError = t; + } + }, + + SUBSCRIBING { + void request(AbstractListenerReadPublisher publisher, long n) { + if (Operators.validate(n)) { + Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + if (publisher.changeState(this, DEMAND)) { + publisher.checkOnDataAvailable(); + } + } + } + + @Override + void onAllDataRead(AbstractListenerReadPublisher publisher) { + publisher.publisherCompleted = true; + } + + @Override + void onError(AbstractListenerReadPublisher publisher, Throwable t) { + publisher.publisherError = t; + } }, /** diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index fe4d8b53a8..acd92f5ac1 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -119,18 +119,17 @@ class WriteResultPublisher implements Publisher { @Override void subscribe(WriteResultPublisher publisher, Subscriber subscriber) { Assert.notNull(subscriber, "Subscriber must not be null"); - publisher.subscriber = subscriber; - if (publisher.changeState(this, SUBSCRIBED)) { + if (publisher.changeState(this, SUBSCRIBING)) { Subscription subscription = new ResponseBodyWriteResultSubscription(publisher); + publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); + publisher.changeState(SUBSCRIBING, SUBSCRIBED); if (publisher.publisherCompleted) { publisher.publishComplete(); } - else { - Throwable publisherError = publisher.publisherError; - if (publisherError != null) { - publisher.publishError(publisherError); - } + Throwable publisherError = publisher.publisherError; + if (publisherError != null) { + publisher.publishError(publisherError); } } else { @@ -147,6 +146,21 @@ class WriteResultPublisher implements Publisher { } }, + SUBSCRIBING { + @Override + void request(WriteResultPublisher publisher, long n) { + Operators.validate(n); + } + @Override + void publishComplete(WriteResultPublisher publisher) { + publisher.publisherCompleted = true; + } + @Override + void publishError(WriteResultPublisher publisher, Throwable t) { + publisher.publisherError = t; + } + }, + SUBSCRIBED { @Override void request(WriteResultPublisher publisher, long n) { @@ -183,14 +197,6 @@ class WriteResultPublisher implements Publisher { void cancel(WriteResultPublisher publisher) { // ignore } - @Override - void publishComplete(WriteResultPublisher publisher) { - // ignore - } - @Override - void publishError(WriteResultPublisher publisher, Throwable t) { - // ignore - } }; void subscribe(WriteResultPublisher publisher, Subscriber subscriber) { @@ -208,11 +214,11 @@ class WriteResultPublisher implements Publisher { } void publishComplete(WriteResultPublisher publisher) { - throw new IllegalStateException(toString()); + // ignore } void publishError(WriteResultPublisher publisher, Throwable t) { - throw new IllegalStateException(toString()); + // ignore } }