From 2a481c541122e39c1736bc6b71da796b2c7a6e27 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 21 Nov 2017 14:46:33 +0200 Subject: [PATCH] Fix race conditions in AbstractListenerReadPublisher Transition from DEMAND->NO_DEMAND: Two concurrent threads enter DEMAND.request and DEMAND.onDataAvailable. And DEMAND.onDataAvailable finishes before DEMAND.request to be able to update the demand field then a request for reading will be lost. Transition from READING->NO_DEMAND: readAndPublish() returns false because there is no demand but before switching the states READING.request is invoked again a request for reading will be lost. Changing READING->DEMAND/NO_DEMAND is made conditional so that the operations will be executed only if changing states succeeds. When in READING state detect completion before each next item in order to exit sooner, if completed. Issue: SPR-16207 --- .../AbstractListenerReadPublisher.java | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index d65e94fbfa..0f2449ad80 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -132,7 +132,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { */ private boolean readAndPublish() throws IOException { long r; - while ((r = demand) > 0) { + while ((r = demand) > 0 && !publisherCompleted) { T data = read(); if (data != null) { if (r != Long.MAX_VALUE) { @@ -292,27 +292,45 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + if (publisher.changeState(NO_DEMAND, DEMAND)) { + publisher.checkOnDataAvailable(); + } } } @Override void onDataAvailable(AbstractListenerReadPublisher publisher) { + for (;;) { + if (!read(publisher)) { + return; + } + long r = publisher.demand; + if (r == 0 || publisher.changeState(NO_DEMAND, this)) { + break; + } + } + } + + boolean read(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, READING)) { try { boolean demandAvailable = publisher.readAndPublish(); if (demandAvailable) { - publisher.changeState(READING, DEMAND); - publisher.checkOnDataAvailable(); + if (publisher.changeState(READING, DEMAND)) { + publisher.checkOnDataAvailable(); + return false; + } } - else { - publisher.changeState(READING, NO_DEMAND); + else if (publisher.changeState(READING, NO_DEMAND)) { publisher.suspendReading(); } } catch (IOException ex) { publisher.onError(ex); } + return true; } + return false; } }, @@ -321,6 +339,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + if (publisher.changeState(NO_DEMAND, DEMAND)) { + publisher.checkOnDataAvailable(); + } } } }, @@ -356,7 +377,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } void cancel(AbstractListenerReadPublisher publisher) { - if (!publisher.changeState(this, COMPLETED)) { + if (publisher.changeState(this, COMPLETED)) { + publisher.publisherCompleted = true; + } + else { publisher.state.get().cancel(publisher); } } @@ -367,6 +391,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void onAllDataRead(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { + publisher.publisherCompleted = true; if (publisher.subscriber != null) { publisher.subscriber.onComplete(); }