Skip to content

Commit cedc118

Browse files
authored
Merge pull request #29 from TuringLang/task_exception
Catch task exception
2 parents a1ab5a6 + 57ee7fe commit cedc118

File tree

8 files changed

+161
-12
lines changed

8 files changed

+161
-12
lines changed

README.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ function f_ct()
2121
end
2222

2323
t = CTask(f_ct)
24-
# or t = Task(f_ct) |> enable_stack_copying
2524

2625
consume(t) == 0
2726
consume(t) == 1
@@ -42,8 +41,6 @@ function f_ct2()
4241
end
4342

4443
t = CTask(f_ct2)
45-
# or t = Task(f_ct2) |> enable_stack_copying
46-
4744

4845
consume(t) == 0
4946
consume(t) == 1
@@ -66,7 +63,7 @@ function f_cta()
6663
end
6764
end
6865

69-
t = Task(f_cta) |> enable_stack_copying
66+
t = CTask(f_cta)
7067

7168
consume(t) == 0
7269
consume(t) == 1

src/Libtask.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module Libtask
22

3-
export enable_stack_copying, CTask, consume, produce, TArray, get, tzeros, tfill, TRef
3+
export CTask, consume, produce, TArray, get, tzeros, tfill, TRef
44

55
include("../deps/deps.jl"); check_deps();
66
include("taskcopy.jl")

src/taskcopy.jl

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,42 @@ function enable_stack_copying(t::Task)
2222
return ccall((:jl_enable_stack_copying, libtask), Any, (Any,), t)::Task
2323
end
2424

25-
CTask(func) = Task(func) |> enable_stack_copying
25+
"""
26+
27+
task_wrapper()
28+
29+
`task_wrapper` is a wordaround for set the result/exception to the
30+
correct task which maybe copied/forked from another one(the original
31+
one). Without this, the result/exception is always sent to the
32+
original task. That is done in `JULIA_PROJECT/src/task.c`, the
33+
function `start_task` and `finish_task`.
34+
35+
This workaround is not the proper way to do the work it does. The
36+
proper way is refreshing the `current_task` (the variable `t`) in
37+
`start_task` after the call to `jl_apply` returns.
38+
39+
"""
40+
function task_wrapper(func)
41+
() ->
42+
try
43+
res = func()
44+
ct = current_task()
45+
ct.result = res
46+
isa(ct.storage, Nothing) && (ct.storage = IdDict())
47+
ct.storage[:_libtask_state] = :done
48+
wait()
49+
catch ex
50+
ct = current_task()
51+
ct.exception = ex
52+
ct.result = ex
53+
ct.backtrace = catch_backtrace()
54+
isa(ct.storage, Nothing) && (ct.storage = IdDict())
55+
ct.storage[:_libtask_state] = :failed
56+
wait()
57+
end
58+
end
59+
60+
CTask(func) = Task(task_wrapper(func)) |> enable_stack_copying
2661

2762
function Base.copy(t::Task)
2863
t.state != :runnable && t.state != :done &&
@@ -72,7 +107,10 @@ produce(v) = begin
72107
wait()
73108
end
74109

75-
t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
110+
if !(t.state in [:runnable, :queued])
111+
throw(AssertionError("producer.consumer.state in [:runnable, :queued]"))
112+
end
113+
if t.state == :queued yield() end
76114
if empty
77115
Base.schedule_and_wait(t, v)
78116
ct = current_task() # When a task is copied, ct should be updated to new task ID.
@@ -129,5 +167,16 @@ consume(p::Task, values...) = begin
129167
push!(p.storage[:consumers].waitq, ct)
130168
end
131169

132-
p.state == :runnable ? Base.schedule_and_wait(p) : wait() # don't attempt to queue it twice
170+
if p.state == :runnable
171+
Base.schedule(p)
172+
yield()
173+
174+
isa(p.storage, IdDict) && haskey(p.storage, :_libtask_state) &&
175+
(p.state = p.storage[:_libtask_state])
176+
177+
if p.exception != nothing
178+
throw(p.exception)
179+
end
180+
end
181+
wait()
133182
end

test/brokentask.jl

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using Libtask
2+
using Test
3+
4+
r = @testset "Broken Functions Tests" begin
5+
6+
@testset "Error Test" begin
7+
function ftest()
8+
x = 1
9+
while true
10+
error("error test")
11+
produce(x)
12+
x += 1
13+
end
14+
end
15+
16+
t = CTask(ftest)
17+
try
18+
consume(t)
19+
catch ex
20+
@test isa(ex, ErrorException)
21+
end
22+
@test isa(t.exception, ErrorException)
23+
end
24+
25+
@testset "OutOfBounds Test Before" begin
26+
function ftest()
27+
x = zeros(2)
28+
while true
29+
x[1] = 1
30+
x[2] = 2
31+
x[3] = 3
32+
produce(x[1])
33+
end
34+
end
35+
36+
t = CTask(ftest)
37+
try
38+
consume(t)
39+
catch ex
40+
@test isa(ex, BoundsError)
41+
end
42+
@test isa(t.exception, BoundsError)
43+
end
44+
45+
@testset "OutOfBounds Test After `produce`" begin
46+
function ftest()
47+
x = zeros(2)
48+
while true
49+
x[1] = 1
50+
x[2] = 2
51+
produce(x[2])
52+
x[3] = 3
53+
end
54+
end
55+
56+
t = CTask(ftest)
57+
@test consume(t) == 2
58+
try
59+
consume(t)
60+
catch ex
61+
@test isa(ex, BoundsError)
62+
end
63+
@test isa(t.exception, BoundsError)
64+
end
65+
66+
@testset "OutOfBounds Test After `copy`" begin
67+
function ftest()
68+
x = zeros(2)
69+
while true
70+
x[1] = 1
71+
x[2] = 2
72+
produce(x[2])
73+
x[3] = 3
74+
end
75+
end
76+
77+
t = CTask(ftest)
78+
@test consume(t) == 2
79+
t_copy = copy(t)
80+
try
81+
consume(t_copy)
82+
catch ex
83+
@test isa(ex, BoundsError)
84+
end
85+
@test isa(t_copy.exception, BoundsError)
86+
end
87+
end
88+
Test.print_test_results(r)

test/clonetask.jl

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ function f_ct()
1212
end
1313
end
1414

15-
t = Task(f_ct) |> enable_stack_copying
15+
t = CTask(f_ct)
1616

1717
@test consume(t) == 0
1818
@test consume(t) == 1
@@ -43,3 +43,17 @@ a = copy(t);
4343
@test consume(t) == 5
4444
@test consume(a) == 6
4545
@test consume(a) == 7
46+
47+
48+
# Breaking test
49+
function g_break()
50+
t = 0
51+
while true
52+
t[3] = 1
53+
produce(t)
54+
t = t + 1
55+
end
56+
end
57+
58+
t = CTask(g_break)
59+
@test_throws MethodError consume(t)

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
include("clonetask.jl")
2+
include("brokentask.jl")
23
include("tarray.jl")
34
include("tarray2.jl")

test/tarray.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ function f_cta()
1313
end
1414
end
1515

16-
t = Task(f_cta) |> enable_stack_copying
16+
t = CTask(f_cta)
1717

1818
consume(t); consume(t)
1919
a = copy(t);

test/tref.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ function f_cta()
1212
end
1313
end
1414

15-
t = Task(f_cta) |> enable_stack_copying
15+
t = CTask(f_cta)
1616

1717
consume(t); consume(t)
1818
a = copy(t);
@@ -32,7 +32,7 @@ function dict_test()
3232
end
3333
end
3434

35-
t = Task(dict_test) |> enable_stack_copying
35+
t = CTask(dict_test)
3636

3737
consume(t); consume(t)
3838
a = copy(t);

0 commit comments

Comments
 (0)