diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index d65e94fbfa..0f2449ad80 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -132,7 +132,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { */ private boolean readAndPublish() throws IOException { long r; - while ((r = demand) > 0) { + while ((r = demand) > 0 && !publisherCompleted) { T data = read(); if (data != null) { if (r != Long.MAX_VALUE) { @@ -292,27 +292,45 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + if (publisher.changeState(NO_DEMAND, DEMAND)) { + publisher.checkOnDataAvailable(); + } } } @Override void onDataAvailable(AbstractListenerReadPublisher publisher) { + for (;;) { + if (!read(publisher)) { + return; + } + long r = publisher.demand; + if (r == 0 || publisher.changeState(NO_DEMAND, this)) { + break; + } + } + } + + boolean read(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, READING)) { try { boolean demandAvailable = publisher.readAndPublish(); if (demandAvailable) { - publisher.changeState(READING, DEMAND); - publisher.checkOnDataAvailable(); + if (publisher.changeState(READING, DEMAND)) { + publisher.checkOnDataAvailable(); + return false; + } } - else { - publisher.changeState(READING, NO_DEMAND); + else if (publisher.changeState(READING, NO_DEMAND)) { publisher.suspendReading(); } } catch (IOException ex) { publisher.onError(ex); } + return true; } + return false; } }, @@ -321,6 +339,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); + if (publisher.changeState(NO_DEMAND, DEMAND)) { + publisher.checkOnDataAvailable(); + } } } }, @@ -356,7 +377,10 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } void cancel(AbstractListenerReadPublisher publisher) { - if (!publisher.changeState(this, COMPLETED)) { + if (publisher.changeState(this, COMPLETED)) { + publisher.publisherCompleted = true; + } + else { publisher.state.get().cancel(publisher); } } @@ -367,6 +391,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { void onAllDataRead(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { + publisher.publisherCompleted = true; if (publisher.subscriber != null) { publisher.subscriber.onComplete(); }