Skip to content

Commit 8afac93

Browse files
committed
Ensure async streams can be consumed from a separate process
1 parent b725b8c commit 8afac93

File tree

4 files changed

+39
-17
lines changed

4 files changed

+39
-17
lines changed

lib/elixir/lib/task.ex

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -657,12 +657,14 @@ defmodule Task do
657657
end
658658

659659
defp build_stream(enumerable, fun, options) do
660-
owner = self()
660+
fn acc, acc_fun ->
661+
owner = get_owner(self())
661662

662-
&Task.Supervised.stream(enumerable, &1, &2, get_callers(owner), fun, options, fn ->
663-
{:ok, pid} = Task.Supervised.start_link(get_owner(owner), :nomonitor)
664-
{:ok, :link, pid}
665-
end)
663+
Task.Supervised.stream(enumerable, acc, acc_fun, get_callers(self()), fun, options, fn ->
664+
{:ok, pid} = Task.Supervised.start_link(owner, :nomonitor)
665+
{:ok, :link, pid}
666+
end)
667+
end
666668
end
667669

668670
# Returns a tuple with the node where this is executed and either the

lib/elixir/lib/task/supervisor.ex

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -484,20 +484,22 @@ defmodule Task.Supervisor do
484484
end
485485

486486
defp build_stream(supervisor, link_type, enumerable, fun, options) do
487-
shutdown = options[:shutdown]
488-
owner = self()
487+
fn acc, acc_fun ->
488+
shutdown = options[:shutdown]
489+
owner = get_owner(self())
489490

490-
&Task.Supervised.stream(enumerable, &1, &2, get_callers(owner), fun, options, fn ->
491-
args = [get_owner(owner), :monitor]
491+
Task.Supervised.stream(enumerable, acc, acc_fun, get_callers(self()), fun, options, fn ->
492+
args = [owner, :monitor]
492493

493-
case start_child_with_spec(supervisor, args, :temporary, shutdown) do
494-
{:ok, pid} ->
495-
if link_type == :link, do: Process.link(pid)
496-
{:ok, link_type, pid}
494+
case start_child_with_spec(supervisor, args, :temporary, shutdown) do
495+
{:ok, pid} ->
496+
if link_type == :link, do: Process.link(pid)
497+
{:ok, link_type, pid}
497498

498-
{:error, :max_children} ->
499-
{:error, :max_children}
500-
end
501-
end)
499+
{:error, :max_children} ->
500+
{:error, :max_children}
501+
end
502+
end)
503+
end
502504
end
503505
end

lib/elixir/test/elixir/task/supervisor_test.exs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,15 @@ defmodule Task.SupervisorTest do
450450

451451
assert_receive :done
452452
end
453+
454+
test "consuming from another process", config do
455+
parent = self()
456+
stream = Task.Supervisor.async_stream(config[:supervisor], [1, 2, 3], &send(parent, &1))
457+
Task.start(Stream, :run, [stream])
458+
assert_receive 1
459+
assert_receive 2
460+
assert_receive 3
461+
end
453462
end
454463

455464
describe "async_stream_nolink" do

lib/elixir/test/elixir/task_test.exs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,15 @@ defmodule TaskTest do
829829

830830
assert_receive :done
831831
end
832+
833+
test "consuming from another process" do
834+
parent = self()
835+
stream = Task.async_stream([1, 2, 3], &send(parent, &1))
836+
Task.start(Stream, :run, [stream])
837+
assert_receive 1
838+
assert_receive 2
839+
assert_receive 3
840+
end
832841
end
833842

834843
for {desc, concurrency} <- [==: 4, <: 2, >: 8] do

0 commit comments

Comments
 (0)