Skip to content

Commit b90e3cc

Browse files
authored
Add Kyo's Stream to ZIO's ZStream interops (#1461)
1 parent f19b91c commit b90e3cc

3 files changed

Lines changed: 310 additions & 6 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ jmh-result.json
1414
*.json
1515
*.gpg
1616
test-output
17-
.DS_Store
17+
.DS_Store
18+
.jvmopts

kyo-zio/shared/src/main/scala/kyo/ZStreams.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import ZIOs.toExit
44
import kyo.Result.*
55
import scala.reflect.ClassTag
66
import zio.Cause
7+
import zio.Chunk as ZChunk
78
import zio.Exit
89
import zio.FiberId
910
import zio.Runtime
1011
import zio.Scope as ZScope
1112
import zio.StackTrace
1213
import zio.Trace
1314
import zio.Unsafe
15+
import zio.ZIO
1416
import zio.stream.ZStream
1517

1618
object ZStreams:
@@ -43,4 +45,29 @@ object ZStreams:
4345
}
4446
end get
4547

48+
/** Interprets a Kyo's to ZIO's ZStream.
49+
* @param stream
50+
* The Kyo stream
51+
* @return
52+
* A zio.ZStream that, when consume, will consume the input stream
53+
*/
54+
def run[E, A](stream: => Stream[A, Abort[E] & Async])(using
55+
Frame,
56+
Trace,
57+
Tag[Emit[Chunk[A]]],
58+
ClassTag[A]
59+
): ZStream[Any, E, A] =
60+
type EmitType = Unit < (Emit[Chunk[A]] & Abort[E] & Async)
61+
62+
def peel(emit: EmitType): ZIO[Any, E, Option[(ZChunk[A], EmitType)]] =
63+
ZIO.uninterruptibleMask: restore =>
64+
restore(ZIOs.run(Emit.runFirst(emit))).map: (maybeChunk, contFn) =>
65+
maybeChunk
66+
.map: chunk =>
67+
ZChunk.fromArray(chunk.toArray) -> contFn()
68+
.toOption
69+
70+
ZStream.unfoldChunkZIO(stream.emit)(peel)
71+
end run
72+
4673
end ZStreams

kyo-zio/shared/src/test/scala/kyo/ZStreamsTest.scala

Lines changed: 281 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import zio.stream.ZStream
1212

1313
class ZStreamsTest extends Test:
1414

15-
def runZIO[T](v: Task[T]): T =
15+
def runZIO(v: Task[Assertion]): Future[Assertion] =
1616
zio.Unsafe.unsafe(implicit u =>
17-
zio.Runtime.default.unsafe.run(v).getOrThrow()
17+
zio.Runtime.default.unsafe.runToFuture(v)
1818
)
1919

2020
def runKyo(v: => Assertion < (Abort[Throwable] & Async)): Future[Assertion] =
@@ -27,7 +27,21 @@ class ZStreamsTest extends Test:
2727
case object Error extends RuntimeException("error")
2828

2929
".get" - {
30-
"infinite" in runKyo {
30+
"finite stream" in runKyo {
31+
val zioStream = ZStream.fromIterable(List(1, 2, 3, 4, 5))
32+
val kyoStream = ZStreams.get(zioStream)
33+
kyoStream.run.map { chunk =>
34+
assert(chunk == Chunk(1, 2, 3, 4, 5))
35+
}
36+
}
37+
"empty stream" in runKyo {
38+
val zioStream = ZStream.empty
39+
val kyoStream = ZStreams.get[Nothing, Int](zioStream)
40+
kyoStream.run.map { chunk =>
41+
assert(chunk.isEmpty)
42+
}
43+
}
44+
"infinite stream with take" in runKyo {
3145
val zioStream = ZStream.iterate(0)(_ + 1)
3246
val kyoStream = ZStreams.get(zioStream)
3347
kyoStream.take(1024).run.map(v => assert(v == Chunk.range(0, 1024)))
@@ -37,7 +51,7 @@ class ZStreamsTest extends Test:
3751
val kyoStream = ZStreams.get(zioStream)
3852
kyoStream.take(10_000).discard.andThen(succeed)
3953
}
40-
"failing" in runKyo {
54+
"failing stream" in runKyo {
4155
val zioStream = ZStream.fromIterable(List.tabulate(5)(identity)) ++
4256
ZStream.fail(Error) ++
4357
ZStream.iterate(0)(_ + 1)
@@ -46,7 +60,16 @@ class ZStreamsTest extends Test:
4660
assert(result == Result.fail(Error))
4761
}
4862
}
49-
"parallel + async" in runKyo {
63+
"stream with async effects" in runKyo {
64+
val zioStream = ZStream.fromIterable(List(1, 2, 3, 4, 5)).mapZIO { v =>
65+
ZIO.sleep(1.milli.toJava) *> ZIO.succeed(v * 2)
66+
}
67+
val kyoStream = ZStreams.get(zioStream)
68+
kyoStream.run.map { chunk =>
69+
assert(chunk == Chunk(2, 4, 6, 8, 10))
70+
}
71+
}
72+
"parallel processing" in runKyo {
5073
val zioStream =
5174
ZStream
5275
.fromIterable(List.tabulate(20)(identity))
@@ -59,6 +82,259 @@ class ZStreamsTest extends Test:
5982
assert(v.sorted == Chunk.range(0, 20))
6083
}
6184
}
85+
"interruption propagates to zio stream" in runKyo {
86+
import java.util.concurrent.atomic.AtomicBoolean
87+
88+
// State flag that should be set when ZIO stream is interrupted/completed
89+
val streamFinalized = new AtomicBoolean(false)
90+
91+
val zioStream = ZStream.unfoldZIO(0) { n =>
92+
ZIO.sleep(5.millis.toJava) *> ZIO.succeed(Some((n, n + 1)))
93+
}.ensuring(ZIO.succeed(streamFinalized.set(true)))
94+
95+
val kyoStream = ZStreams.get(zioStream)
96+
97+
// Verify initial state is false
98+
assert(!streamFinalized.get())
99+
100+
Scope.run {
101+
Fiber.init(kyoStream.take(5).run).map { fiber =>
102+
Async.sleep(15.millis).andThen {
103+
Abort.run[Interrupted](fiber.interrupt).map { _ =>
104+
Async.sleep(50.millis).andThen {
105+
// Verify interruption was received
106+
assert(streamFinalized.get())
107+
}
108+
}
109+
}
110+
}
111+
}
112+
}
113+
"concurrent stream consumption" in runKyo {
114+
val zioStream = ZStream.fromIterable(List.range(0, 100))
115+
val kyoStream = ZStreams.get(zioStream)
116+
117+
Async.zip(
118+
kyoStream.run,
119+
kyoStream.run,
120+
kyoStream.run
121+
).map { case (r1, r2, r3) =>
122+
// Each should get the full stream
123+
assert(r1 == Chunk.from(List.range(0, 100)))
124+
assert(r2 == Chunk.from(List.range(0, 100)))
125+
assert(r3 == Chunk.from(List.range(0, 100)))
126+
}
127+
}
128+
"concurrent kyo streams racing on shared zio stream with mutable state" in runKyo {
129+
import java.util.concurrent.atomic.AtomicInteger
130+
131+
// ZIO stream with internal mutable counter that multiple Kyo streams will race on
132+
val counter = new AtomicInteger(0)
133+
val zioStream = ZStream.unfoldChunkZIO(()) { _ =>
134+
ZIO.succeed {
135+
val value = counter.getAndIncrement()
136+
if value < 100 then
137+
Some((zio.Chunk.single(value), ()))
138+
else
139+
None
140+
end if
141+
}
142+
}
143+
144+
val sharedKyoStream = ZStreams.get(zioStream)
145+
146+
// 4 Kyo streams racing to consume from the same ZIO stream
147+
Async.zip(
148+
sharedKyoStream.run,
149+
sharedKyoStream.run,
150+
sharedKyoStream.run,
151+
sharedKyoStream.run
152+
).map { case (r1, r2, r3, r4) =>
153+
// Combine all results
154+
val allValues = (r1.toSeq ++ r2.toSeq ++ r3.toSeq ++ r4.toSeq).toList
155+
val uniqueValues = allValues.distinct.sorted
156+
157+
// Verify total data is maintained:
158+
// 1. All values from 0 to 99 should be present exactly once
159+
assert(uniqueValues == List.range(0, 100))
160+
// 2. No duplicates - each value consumed by exactly one stream
161+
assert(allValues.length == uniqueValues.length)
162+
// 3. Total count should be 100
163+
assert(allValues.length == 100)
164+
}
165+
}
166+
}
167+
168+
".run" - {
169+
"finite stream" in runZIO {
170+
val kyoStream = Stream.init(List(1, 2, 3, 4, 5))
171+
val zioStream = ZStreams.run(kyoStream)
172+
zioStream.runCollect.map { chunk =>
173+
assert(chunk.toList == List(1, 2, 3, 4, 5))
174+
}
175+
}
176+
"empty stream" in runZIO {
177+
val kyoStream = Stream.empty[Int]
178+
val zioStream = ZStreams.run(kyoStream)
179+
zioStream.runCollect.map { chunk =>
180+
assert(chunk.isEmpty)
181+
}
182+
}
183+
"infinite stream with take" in runZIO {
184+
val kyoStream = Stream.unfold(0)(n => Maybe((n, n + 1)))
185+
val zioStream = ZStreams.run(kyoStream)
186+
zioStream.take(1024).runCollect.map { chunk =>
187+
assert(chunk.toList == List.range(0, 1024))
188+
}
189+
}
190+
"stack safety" in runZIO {
191+
val kyoStream = Stream.init(List.fill(10_000)(1))
192+
val zioStream = ZStreams.run(kyoStream)
193+
zioStream.runCount.map { count =>
194+
assert(count == 10_000)
195+
}
196+
}
197+
"failing stream" in runZIO {
198+
val kyoStream: Stream[Int, Abort[RuntimeException] & Async] =
199+
Stream.init(List(1, 2, 3)).map(v => Abort.get(Right(v))).concat(
200+
Stream(Abort.fail(Error).map(_ => Emit.value(Chunk.empty[Int])))
201+
)
202+
val zioStream = ZStreams.run(kyoStream)
203+
zioStream.runCollect.either.map { result =>
204+
assert(result == Left(Error))
205+
}
206+
}
207+
"stream with async effects" in runZIO {
208+
val kyoStream = Stream.init(List(1, 2, 3, 4, 5)).map { v =>
209+
Async.sleep(1.milli).andThen(v * 2)
210+
}
211+
val zioStream = ZStreams.run(kyoStream)
212+
zioStream.runCollect.map { chunk =>
213+
assert(chunk.toList == List(2, 4, 6, 8, 10))
214+
}
215+
}
216+
"round trip: get then run" in runZIO {
217+
val original = ZStream.fromIterable(List(1, 2, 3, 4, 5))
218+
val kyoStream = ZStreams.get(original)
219+
val zioStream = ZStreams.run(kyoStream)
220+
zioStream.runCollect.map { chunk =>
221+
assert(chunk.toList == List(1, 2, 3, 4, 5))
222+
}
223+
}
224+
"round trip: run then get" in runKyo {
225+
val original = Stream.init(List(1, 2, 3, 4, 5))
226+
val zioStream = ZStreams.run(original)
227+
val kyoStream = ZStreams.get(zioStream)
228+
kyoStream.run.map { chunk =>
229+
assert(chunk == Chunk(1, 2, 3, 4, 5))
230+
}
231+
}
232+
"parallel processing" in runZIO {
233+
val kyoStream = Stream.init(List.range(0, 20)).map { v =>
234+
Async.sleep(1.milli).andThen(v)
235+
}
236+
val zioStream = ZStreams.run(kyoStream)
237+
zioStream.mapZIOParUnordered(4) { v =>
238+
zio.Random.nextIntBounded(10)
239+
.flatMap(t => ZIO.sleep(t.millis.toJava)) *> ZIO.succeed(v * 2)
240+
}.runCollect.map { chunk =>
241+
assert(chunk.toList.sorted == List.range(0, 20).map(_ * 2))
242+
}
243+
}
244+
"interruption propagates to kyo stream" in runZIO {
245+
import java.util.concurrent.atomic.AtomicBoolean
246+
247+
// State flag that should be set when Kyo stream is interrupted/completed
248+
val streamFinalized = new AtomicBoolean(false)
249+
250+
val kyoStream: Stream[Int, Abort[Nothing] & Async] = Stream {
251+
Scope.run {
252+
Scope.ensure {
253+
streamFinalized.set(true)
254+
}.andThen {
255+
Stream.unfold(0) { n =>
256+
Async.sleep(5.millis).andThen(Maybe((n, n + 1)))
257+
}.emit
258+
}
259+
}
260+
}
261+
262+
val zioStream = ZStreams.run(kyoStream)
263+
264+
for
265+
fiber <- zioStream.take(5).runCollect.fork
266+
// Verify initial state is false
267+
_ = assert(!streamFinalized.get())
268+
_ <- ZIO.sleep(15.millis.toJava)
269+
_ <- fiber.interrupt
270+
result <- fiber.await
271+
_ <- ZIO.sleep(50.millis.toJava) // Give time for cleanup to propagate
272+
yield
273+
// Verify ZIO interruption was received
274+
assert(result.isInterrupted)
275+
// Verify Kyo stream received the interruption signal and finalized
276+
assert(streamFinalized.get())
277+
end for
278+
}
279+
"concurrent stream consumption" in runZIO {
280+
val kyoStream = Stream.init(List.range(0, 100))
281+
val zioStream = ZStreams.run(kyoStream)
282+
for
283+
fiber1 <- zioStream.runCollect.fork
284+
fiber2 <- zioStream.runCollect.fork
285+
fiber3 <- zioStream.runCollect.fork
286+
r1 <- fiber1.join
287+
r2 <- fiber2.join
288+
r3 <- fiber3.join
289+
yield
290+
// Each fiber should get the full stream
291+
assert(r1.toList == List.range(0, 100))
292+
assert(r2.toList == List.range(0, 100))
293+
assert(r3.toList == List.range(0, 100))
294+
end for
295+
}
296+
"concurrent zio streams racing on shared kyo stream with mutable state" in runZIO {
297+
import java.util.concurrent.atomic.AtomicInteger
298+
import scala.collection.concurrent.TrieMap
299+
300+
// Kyo stream with internal mutable counter that multiple ZIO streams will race on
301+
val counter = new AtomicInteger(0)
302+
val kyoStream = Stream.unfold((), chunkSize = 1) { _ =>
303+
val value = counter.getAndIncrement()
304+
if value < 100 then
305+
// Small delay to encourage interleaving
306+
Async.sleep(1.milli).andThen(Maybe((value, ())))
307+
else
308+
Maybe.empty
309+
end if
310+
}
311+
312+
val sharedZioStream = ZStreams.run(kyoStream)
313+
314+
// 4 ZIO streams racing to consume from the same Kyo stream
315+
for
316+
fiber1 <- sharedZioStream.runCollect.fork
317+
fiber2 <- sharedZioStream.runCollect.fork
318+
fiber3 <- sharedZioStream.runCollect.fork
319+
fiber4 <- sharedZioStream.runCollect.fork
320+
r1 <- fiber1.join
321+
r2 <- fiber2.join
322+
r3 <- fiber3.join
323+
r4 <- fiber4.join
324+
yield
325+
// Combine all results
326+
val allValues = (r1 ++ r2 ++ r3 ++ r4).toList
327+
val uniqueValues = allValues.distinct.sorted
328+
329+
// Verify total data is maintained:
330+
// 1. All values from 0 to 99 should be present exactly once
331+
assert(uniqueValues == List.range(0, 100))
332+
// 2. No duplicates - each value consumed by exactly one stream
333+
assert(allValues.length == uniqueValues.length)
334+
// 3. Total count should be 100
335+
assert(allValues.length == 100)
336+
end for
337+
}
62338
}
63339

64340
end ZStreamsTest

0 commit comments

Comments
 (0)