Skip to content

Commit ec2af2f

Browse files
author
Sergey Mashkov
committed
IO: don't resume writers if they are waiting for joining completion
1 parent 4d01f7e commit ec2af2f

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ internal class ByteBufferChannel(
135135
}
136136

137137
if (avr >= minReadSize) resumeReadOp()
138-
if (avw >= minWriteSize) resumeWriteOp()
138+
val joining = joining
139+
if (avw >= minWriteSize && (joining == null || state === ReadWriteBufferState.Terminated)) resumeWriteOp()
139140
}
140141

141142
override fun flush() {
@@ -1215,7 +1216,7 @@ internal class ByteBufferChannel(
12151216

12161217
var partSize = 0
12171218

1218-
val rc = src.reading { srcState ->
1219+
src.reading { srcState ->
12191220
val srcBuffer = this
12201221

12211222
val rem = minOf(srcBuffer.remaining().toLong(), dstBuffer.remaining().toLong(), limit - copied).toInt()
@@ -1236,7 +1237,7 @@ internal class ByteBufferChannel(
12361237
true
12371238
}
12381239

1239-
if (rc) {
1240+
if (partSize > 0) {
12401241
dstBuffer.bytesWritten(state, partSize)
12411242
copied += partSize
12421243

@@ -1266,7 +1267,7 @@ internal class ByteBufferChannel(
12661267
}
12671268

12681269
if (joining != null) {
1269-
yield()
1270+
tryWriteSuspend(1)
12701271
}
12711272
}
12721273

@@ -2038,16 +2039,17 @@ internal class ByteBufferChannel(
20382039
}
20392040

20402041
private fun resumeWriteOp() {
2041-
WriteOp.getAndSet(this, null)?.apply {
2042+
while (true) {
2043+
val writeOp = writeOp ?: return
20422044
val closed = closed
2043-
if (closed == null) resume(Unit) else resumeWithException(closed.sendException)
2045+
if (closed == null && joining != null && state !== ReadWriteBufferState.Terminated) return
2046+
if (WriteOp.compareAndSet(this, writeOp, null)) {
2047+
if (closed == null) writeOp.resume(Unit) else writeOp.resumeWithException(closed.sendException)
2048+
return
2049+
}
20442050
}
20452051
}
20462052

2047-
private fun resumeWriteOp(cause: Throwable) {
2048-
WriteOp.getAndSet(this, null)?.resumeWithException(cause)
2049-
}
2050-
20512053
private fun resumeClosed(cause: Throwable?) {
20522054
ReadOp.getAndSet(this, null)?.let { c ->
20532055
if (cause != null)

0 commit comments

Comments
 (0)