Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
### 4.17.0

* Added `AsyncSeq.exists2` β€” asynchronously tests whether any corresponding pair of elements in two async sequences satisfies the predicate. Evaluates pairwise up to the shorter sequence; short-circuits on first match. Mirrors `Seq.exists2`.
* Added `AsyncSeq.exists2Async` β€” asynchronous-predicate variant of `exists2`.
* Added `AsyncSeq.forall2` β€” asynchronously tests whether all corresponding pairs of elements in two async sequences satisfy the predicate. Evaluates pairwise up to the shorter sequence; short-circuits on first failure. Mirrors `Seq.forall2`.
* Added `AsyncSeq.forall2Async` β€” asynchronous-predicate variant of `forall2`.

### 4.16.0

* Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref<T>` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators.
Expand Down
50 changes: 50 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,56 @@
let forallAsync f (source : AsyncSeq<'T>) =
source |> existsAsync (fun v -> async { let! b = f v in return not b }) |> Async.map not

let exists2Async (predicate: 'T1 -> 'T2 -> Async<bool>) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
let! m1 = ie1.MoveNext()
let! m2 = ie2.MoveNext()
let mutable b1 = m1
let mutable b2 = m2
let mutable result = false
let mutable isDone = false
while not isDone do
match b1, b2 with
| None, _ | _, None -> isDone <- true
| Some v1, Some v2 ->
let! ok = predicate v1 v2
if ok then result <- true; isDone <- true
else
let! n1 = ie1.MoveNext()
let! n2 = ie2.MoveNext()
b1 <- n1
b2 <- n2
return result }

let exists2 (predicate: 'T1 -> 'T2 -> bool) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> =
exists2Async (fun a b -> async.Return (predicate a b)) source1 source2

let forall2Async (predicate: 'T1 -> 'T2 -> Async<bool>) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
let! m1 = ie1.MoveNext()
let! m2 = ie2.MoveNext()
let mutable b1 = m1
let mutable b2 = m2
let mutable result = true
let mutable isDone = false
while not isDone do
match b1, b2 with
| None, _ | _, None -> isDone <- true
| Some v1, Some v2 ->
let! ok = predicate v1 v2
if not ok then result <- false; isDone <- true
else
let! n1 = ie1.MoveNext()
let! n2 = ie2.MoveNext()
b1 <- n1
b2 <- n2
return result }

let forall2 (predicate: 'T1 -> 'T2 -> bool) (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : Async<bool> =
forall2Async (fun a b -> async.Return (predicate a b)) source1 source2

let compareWithAsync (comparer: 'T -> 'T -> Async<int>) (source1: AsyncSeq<'T>) (source2: AsyncSeq<'T>) : Async<int> = async {
use ie1 = source1.GetEnumerator()
use ie2 = source2.GetEnumerator()
Expand Down Expand Up @@ -2729,7 +2779,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2782 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2782 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
18 changes: 18 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ module AsyncSeq =
/// Asynchronously determine if the async predicate returns true for all values in the sequence
val forallAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<bool>

/// Asynchronously determine if any corresponding pair of elements in two async sequences satisfies
/// the predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first match.
/// Mirrors <c>Seq.exists2</c>.
val exists2 : predicate:('T1 -> 'T2 -> bool) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Asynchronously determine if any corresponding pair of elements in two async sequences satisfies
/// the async predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first match.
val exists2Async : predicate:('T1 -> 'T2 -> Async<bool>) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Asynchronously determine if all corresponding pairs of elements in two async sequences satisfy
/// the predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first failure.
/// Mirrors <c>Seq.forall2</c>.
val forall2 : predicate:('T1 -> 'T2 -> bool) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Asynchronously determine if all corresponding pairs of elements in two async sequences satisfy
/// the async predicate. Evaluates pairwise up to the shorter of the two sequences; short-circuits on first failure.
val forall2Async : predicate:('T1 -> 'T2 -> Async<bool>) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> Async<bool>

/// Compares two async sequences lexicographically using the given synchronous comparison function.
/// Returns a negative integer if source1 < source2, 0 if equal, and a positive integer if source1 > source2.
val compareWith : comparer:('T -> 'T -> int) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> Async<int>
Expand Down
2 changes: 1 addition & 1 deletion src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.6" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.7" />
<PackageReference Include="System.Threading.Channels" Version="*" />
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
</ItemGroup>
Expand Down
104 changes: 104 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -4490,7 +4490,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4493 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4503,7 +4503,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4506 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4512,7 +4512,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4515 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down Expand Up @@ -4622,3 +4622,107 @@
with _ -> yield 42 }
let result = s |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 42 |], result)

// ===== exists2 / exists2Async =====

[<Test>]
let ``AsyncSeq.exists2 returns true when a matching pair exists`` () =
let result =
AsyncSeq.exists2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [0;2;0])
|> Async.RunSynchronously
Assert.IsTrue(result)

[<Test>]
let ``AsyncSeq.exists2 returns false when no matching pair exists`` () =
let result =
AsyncSeq.exists2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [4;5;6])
|> Async.RunSynchronously
Assert.IsFalse(result)

[<Test>]
let ``AsyncSeq.exists2 on empty sequences returns false`` () =
let result =
AsyncSeq.exists2 (=) AsyncSeq.empty<int> AsyncSeq.empty<int>
|> Async.RunSynchronously
Assert.IsFalse(result)

[<Test>]
let ``AsyncSeq.exists2 short-circuits on first match`` () =
let count = ref 0
let result =
AsyncSeq.exists2
(fun a b -> incr count; a = b)
(AsyncSeq.ofSeq [1;2;3;4;5])
(AsyncSeq.ofSeq [0;2;0;0;0])
|> Async.RunSynchronously
Assert.IsTrue(result)
Assert.AreEqual(2, count.Value) // stopped after second pair

[<Test>]
let ``AsyncSeq.exists2 stops at shorter sequence`` () =
let result =
AsyncSeq.exists2 (=) (AsyncSeq.ofSeq [1;2]) (AsyncSeq.ofSeq [3;4;1])
|> Async.RunSynchronously
Assert.IsFalse(result) // shorter seq ends before (1,1) can match

[<Test>]
let ``AsyncSeq.exists2Async returns true with async predicate`` () =
let result =
AsyncSeq.exists2Async
(fun a b -> async { return a = b })
(AsyncSeq.ofSeq [1;2;3])
(AsyncSeq.ofSeq [0;2;0])
|> Async.RunSynchronously
Assert.IsTrue(result)

// ===== forall2 / forall2Async =====

[<Test>]
let ``AsyncSeq.forall2 returns true when all pairs satisfy the predicate`` () =
let result =
AsyncSeq.forall2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [1;2;3])
|> Async.RunSynchronously
Assert.IsTrue(result)

[<Test>]
let ``AsyncSeq.forall2 returns false when a pair fails`` () =
let result =
AsyncSeq.forall2 (=) (AsyncSeq.ofSeq [1;2;3]) (AsyncSeq.ofSeq [1;9;3])
|> Async.RunSynchronously
Assert.IsFalse(result)

[<Test>]
let ``AsyncSeq.forall2 on empty sequences returns true`` () =
let result =
AsyncSeq.forall2 (=) AsyncSeq.empty<int> AsyncSeq.empty<int>
|> Async.RunSynchronously
Assert.IsTrue(result)

[<Test>]
let ``AsyncSeq.forall2 short-circuits on first failure`` () =
let count = ref 0
let result =
AsyncSeq.forall2
(fun a b -> incr count; a = b)
(AsyncSeq.ofSeq [1;9;3;4;5])
(AsyncSeq.ofSeq [1;2;3;4;5])
|> Async.RunSynchronously
Assert.IsFalse(result)
Assert.AreEqual(2, count.Value) // stopped after second pair

[<Test>]
let ``AsyncSeq.forall2 stops at shorter sequence`` () =
let result =
AsyncSeq.forall2 (=) (AsyncSeq.ofSeq [1;2]) (AsyncSeq.ofSeq [1;2;99])
|> Async.RunSynchronously
Assert.IsTrue(result) // stops when shorter seq ends; all checked pairs passed

[<Test>]
let ``AsyncSeq.forall2Async returns true with async predicate`` () =
let result =
AsyncSeq.forall2Async
(fun a b -> async { return a = b })
(AsyncSeq.ofSeq [1;2;3])
(AsyncSeq.ofSeq [1;2;3])
|> Async.RunSynchronously
Assert.IsTrue(result)
Loading