|
|
|
@ -524,13 +524,9 @@ public abstract class DataBufferUtils {
@@ -524,13 +524,9 @@ public abstract class DataBufferUtils {
|
|
|
|
|
long pos = this.position.addAndGet(read); |
|
|
|
|
dataBuffer.writePosition(read); |
|
|
|
|
this.sink.next(dataBuffer); |
|
|
|
|
// It's possible for cancellation to happen right before the push into the sink
|
|
|
|
|
// onNext may have led to onCancel (e.g. downstream takeUntil)
|
|
|
|
|
if (this.disposed.get()) { |
|
|
|
|
// TODO:
|
|
|
|
|
// This is not ideal since we already passed the buffer into the sink and
|
|
|
|
|
// releasing may cause something reading to fail. Maybe we won't have to
|
|
|
|
|
// do this after https://github.com/reactor/reactor-core/issues/1634
|
|
|
|
|
complete(dataBuffer); |
|
|
|
|
complete(); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); |
|
|
|
@ -539,12 +535,12 @@ public abstract class DataBufferUtils {
@@ -539,12 +535,12 @@ public abstract class DataBufferUtils {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
complete(dataBuffer); |
|
|
|
|
release(dataBuffer); |
|
|
|
|
complete(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void complete(DataBuffer dataBuffer) { |
|
|
|
|
release(dataBuffer); |
|
|
|
|
private void complete() { |
|
|
|
|
this.sink.complete(); |
|
|
|
|
closeChannel(this.channel); |
|
|
|
|
} |
|
|
|
|