|
|
|
@ -1159,23 +1159,22 @@ public abstract class DataBufferUtils {
@@ -1159,23 +1159,22 @@ public abstract class DataBufferUtils {
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void completed(Integer written, Attachment attachment) { |
|
|
|
|
this.writing.set(false); |
|
|
|
|
attachment.iterator().close(); |
|
|
|
|
DataBuffer.ByteBufferIterator iterator = attachment.iterator(); |
|
|
|
|
iterator.close(); |
|
|
|
|
|
|
|
|
|
long pos = this.position.addAndGet(written); |
|
|
|
|
ByteBuffer byteBuffer = attachment.byteBuffer(); |
|
|
|
|
DataBuffer.ByteBufferIterator iterator = attachment.iterator(); |
|
|
|
|
|
|
|
|
|
if (byteBuffer.hasRemaining()) { |
|
|
|
|
this.writing.set(true); |
|
|
|
|
this.channel.write(byteBuffer, pos, attachment, this); |
|
|
|
|
} |
|
|
|
|
else if (iterator.hasNext()) { |
|
|
|
|
ByteBuffer next = iterator.next(); |
|
|
|
|
this.writing.set(true); |
|
|
|
|
this.channel.write(next, pos, attachment, this); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
sinkDataBuffer(attachment.dataBuffer()); |
|
|
|
|
this.sink.next(attachment.dataBuffer()); |
|
|
|
|
this.writing.set(false); |
|
|
|
|
|
|
|
|
|
Throwable throwable = this.error.get(); |
|
|
|
|
if (throwable != null) { |
|
|
|
@ -1192,15 +1191,12 @@ public abstract class DataBufferUtils {
@@ -1192,15 +1191,12 @@ public abstract class DataBufferUtils {
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void failed(Throwable exc, Attachment attachment) { |
|
|
|
|
this.writing.set(false); |
|
|
|
|
attachment.iterator().close(); |
|
|
|
|
|
|
|
|
|
sinkDataBuffer(attachment.dataBuffer()); |
|
|
|
|
this.sink.error(exc); |
|
|
|
|
} |
|
|
|
|
this.sink.next(attachment.dataBuffer()); |
|
|
|
|
this.writing.set(false); |
|
|
|
|
|
|
|
|
|
private void sinkDataBuffer(DataBuffer dataBuffer) { |
|
|
|
|
this.sink.next(dataBuffer); |
|
|
|
|
this.sink.error(exc); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|