Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
### 4.16.0

* Performance: Replaced `ref` cells with `mutable` locals in the `ofSeq`, `tryWith`, and `tryFinally` enumerator state machines. Each call to `ofSeq` (or any async CE block using `try...with` / `try...finally` / `use`) previously heap-allocated a `Ref<T>` wrapper object per enumerator; it now uses a direct mutable field in the generated class, reducing GC pressure. The change is equivalent to the `mutable`-for-`ref` improvement introduced in 4.11.0 for other enumerators.
* Added `AsyncSeq.insertManyAt` — inserts multiple values before the element at the specified index; mirrors `Seq.insertManyAt` / `List.insertManyAt`.
* Added `AsyncSeq.removeManyAt` — removes a range of elements starting at the specified index; mirrors `Seq.removeManyAt` / `List.removeManyAt`.
* Added `AsyncSeq.splitInto` — splits the sequence into at most N chunks of as-equal-as-possible size; mirrors `Seq.splitInto` / `Array.splitInto`.

### 4.15.0

Expand Down
37 changes: 37 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,27 @@
elif i < index then
invalidArg "index" "The index is outside the range of elements in the collection." }

let insertManyAt (index : int) (values : seq<'T>) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
let mutable i = 0
for x in source do
if i = index then yield! values
yield x
i <- i + 1
if i = index then yield! values
elif i < index then
invalidArg "index" "The index is outside the range of elements in the collection." }

let removeManyAt (index : int) (count : int) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
if count < 0 then invalidArg "count" "must be non-negative"
let mutable i = 0
for x in source do
if i < index || i >= index + count then yield x
i <- i + 1
if count > 0 && i < index + count then
invalidArg "index" "The index or count is outside the range of elements in the collection." }

#if !FABLE_COMPILER
let iterAsyncParallel (f:'a -> Async<unit>) (s:AsyncSeq<'a>) : Async<unit> = async {
use mb = MailboxProcessor.Start (ignore >> async.Return)
Expand Down Expand Up @@ -2215,6 +2236,22 @@
let toArraySynchronously (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously
#endif

let splitInto (count : int) (source : AsyncSeq<'T>) : Async<'T[] array> = async {
if count < 1 then invalidArg "count" "must be positive"
let! arr = toArrayAsync source
let total = arr.Length
let result =
if total = 0 then [||]
else
let n = Operators.min count total
let minSize = total / n
let extras = total % n
Array.init n (fun i ->
let chunkStart = i * minSize + Operators.min i extras
let chunkSize = minSize + (if i < extras then 1 else 0)
Array.sub arr chunkStart chunkSize)
return result }

let cycle (source: AsyncSeq<'T>) : AsyncSeq<'T> =
asyncSeq {
let! arr = source |> toArrayAsync
Expand Down Expand Up @@ -2729,7 +2766,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2769 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2769 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
14 changes: 14 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,20 @@ module AsyncSeq =
/// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertAt.
val insertAt : index:int -> value:'T -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence with the given values inserted before the element at the specified index.
/// An index equal to the length of the sequence appends the values at the end.
/// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertManyAt.
val insertManyAt : index:int -> values:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence with the given number of elements removed starting at the specified index.
/// Raises ArgumentException if index is negative, count is negative, or index + count exceeds the sequence length. Mirrors Seq.removeManyAt.
val removeManyAt : index:int -> count:int -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Splits the input asynchronous sequence into at most <c>count</c> chunks of as-equal-as-possible size.
/// The first (length mod count) chunks have one extra element. Materialises the source sequence into memory.
/// Raises ArgumentException if count is not positive. Mirrors Seq.splitInto.
val splitInto : count:int -> source:AsyncSeq<'T> -> Async<'T[] array>

/// Creates an asynchronous sequence that lazily takes element from an
/// input synchronous sequence and returns them one-by-one.
val ofSeq : source:seq<'T> -> AsyncSeq<'T>
Expand Down
2 changes: 1 addition & 1 deletion src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Update="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.6" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.7" />
<PackageReference Include="System.Threading.Channels" Version="*" />
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
</ItemGroup>
Expand Down
190 changes: 190 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2097 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2103,7 +2103,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2106 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3914,6 +3914,196 @@
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== insertManyAt =====

[<Test>]
let ``AsyncSeq.insertManyAt inserts multiple elements at specified index`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 1 [ 10; 20 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 10; 20; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt prepends when index is 0`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 0 [ 10; 20 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 10; 20; 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt appends when index equals sequence length`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 3 [ 10; 20 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 10; 20 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt with empty values returns original sequence`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 1 []
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertManyAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt -1 [ 10 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.insertManyAt raises ArgumentException when index exceeds length`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertManyAt 5 [ 10 ]
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== removeManyAt =====

[<Test>]
let ``AsyncSeq.removeManyAt removes elements at specified index and count`` () =
let result =
AsyncSeq.ofSeq [ 0; 1; 2; 3; 4 ]
|> AsyncSeq.removeManyAt 1 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 0; 3; 4 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt removes from beginning`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4 ]
|> AsyncSeq.removeManyAt 0 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 3; 4 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt removes from end`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4 ]
|> AsyncSeq.removeManyAt 2 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt with count 0 returns original sequence`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 1 0
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.removeManyAt removes all elements`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 0 3
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.removeManyAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt -1 1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.removeManyAt raises ArgumentException for negative count`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 0 -1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.removeManyAt raises ArgumentException when range exceeds sequence`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeManyAt 2 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== splitInto =====

[<Test>]
let ``AsyncSeq.splitInto splits sequence into equal chunks`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4; 5; 6 ]
|> AsyncSeq.splitInto 3
|> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1; 2 |], result.[0])
Assert.AreEqual([| 3; 4 |], result.[1])
Assert.AreEqual([| 5; 6 |], result.[2])

[<Test>]
let ``AsyncSeq.splitInto distributes remainder to first chunks`` () =
let result =
AsyncSeq.ofSeq [ 1 .. 7 ]
|> AsyncSeq.splitInto 3
|> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1; 2; 3 |], result.[0])
Assert.AreEqual([| 4; 5 |], result.[1])
Assert.AreEqual([| 6; 7 |], result.[2])

[<Test>]
let ``AsyncSeq.splitInto with count 1 returns single chunk`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.splitInto 1
|> Async.RunSynchronously
Assert.AreEqual(1, result.Length)
Assert.AreEqual([| 1; 2; 3 |], result.[0])

[<Test>]
let ``AsyncSeq.splitInto with count greater than length returns one chunk per element`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.splitInto 10
|> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1 |], result.[0])
Assert.AreEqual([| 2 |], result.[1])
Assert.AreEqual([| 3 |], result.[2])

[<Test>]
let ``AsyncSeq.splitInto with empty sequence returns empty array`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.splitInto 3
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.splitInto raises ArgumentException when count is zero`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.splitInto 0
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.take more than length returns all elements`` () =
let result =
Expand Down Expand Up @@ -4490,7 +4680,7 @@
let ``AsyncSeq.groupByAsync groups elements by async projection`` () =
let result =
AsyncSeq.ofSeq [1..6]
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4683 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand All @@ -4503,7 +4693,7 @@
let ``AsyncSeq.groupByAsync on empty sequence returns empty`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.groupByAsync (fun x -> async { return x % 2 })

Check warning on line 4696 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)
Expand All @@ -4512,7 +4702,7 @@
let ``AsyncSeq.groupByAsync with all-same key produces single group`` () =
let result =
AsyncSeq.ofSeq [1; 2; 3]
|> AsyncSeq.groupByAsync (fun _ -> async { return "same" })

Check warning on line 4705 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (fun (key, grp) -> async {
let! items = AsyncSeq.toArrayAsync grp
return key, Array.sort items })
Expand Down
Loading