-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdemo.py
120 lines (91 loc) · 2.71 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import random
import time
from pyturbo import Job, ReorderStage, Stage, System, Task, Options
class Stage1(Stage):
'''
Take in two integers x and y, generate x ~ x + y
'''
def process(self, task):
x, y = task.content
for i in range(y):
sub_task = Task(meta={'i': i}, parent_task=task).start(self)
time.sleep(0.02) # Fake process time
result = x + i
sub_task.finish(result)
yield sub_task
class Stage2(Stage):
'''
x -> x * 7
'''
def allocate_resource(self, resources, *, worker_per_cpu=1):
return resources.split(len(resources.get('cpu'))) * worker_per_cpu
def process(self, task):
task.start(self)
time.sleep(0.005) # Fake process time
x = task.content
result = x * 7
task.finish(result)
return task
class Stage3(Stage):
'''
x -> int(x / 7)
'''
def allocate_resource(self, resources, *, worker_per_cpu=4):
return resources.split(len(resources.get('cpu'))) * worker_per_cpu
def process(self, task):
task.start(self)
time.sleep(0.01) # Fake process time
x = task.content
result = int(x / 7)
task.finish(result)
return task
class Stage4(ReorderStage):
'''
x -> -x
'''
def get_sequence_id(self, task):
return task.meta['i']
def process(self, task):
task.start(self)
time.sleep(0.01) # Fake process time
x = task.content
result = -x
task.finish(result)
return task
class ToySystem(System):
'''
(x, y) -> [*range(-x, -x - y, -1)]
'''
def get_num_pipeline(self, resources):
return len(resources.get('cpu')) // 5
def get_stages(self, resources):
stages = [ # Fake resource allocation
Stage1(resources.select(cpu=(0, 1), gpu=False)),
Stage2(resources.select(cpu=(1, 3))),
Stage3(resources.select(cpu=(0.6, 0.9), gpu=True)),
Stage4(resources.select(cpu=(-0.2, None)))
]
return stages
def main(n_job=9):
system = ToySystem()
system.start()
jobs = []
for _ in range(n_job):
x = random.randint(0, 9000)
y = random.randint(200, 400)
task = Task((x, y), {'x': x, 'y': y})
name = '%d_%d' % (x, y)
job = Job(name, task, y)
jobs.append(job)
system.add_jobs(jobs)
try:
for job in system.wait_jobs(n_job):
x, y = job.task.content
assert job.results.results == [*range(-x, -x - y, -1)]
system.end()
except:
system.terminate()
if __name__ == "__main__":
main()
Options.single_sync_pipeline = True
main()