-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtrain.py
More file actions
81 lines (60 loc) · 3.09 KB
/
train.py
File metadata and controls
81 lines (60 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import ray
from slime.ray.placement_group import create_actor_group, create_placement_groups, create_rollout_group
from slime.utils.arguments import parse_args
def train(args):
# allocate the GPUs
pgs = create_placement_groups(args)
actor_model = create_actor_group(args, pgs["actor"])
# create the rollout generator, with sglang engines inside.
rollout_generator = create_rollout_group(args, pgs["rollout"])
# calculate num_rollout from num_epoch
num_rollout_per_epoch = None
if args.num_rollout is None:
num_rollout_per_epoch = ray.get(rollout_generator.data_buffer.get_num_rollout_per_epoch.remote())
args.num_rollout = num_rollout_per_epoch * args.num_epoch
assert args.num_rollout > 0
# sync the initialization (model initalization, load checkpoint, etc.)
start_rollout_ids = ray.get(
actor_model.async_init(args, role="actor", with_ref=args.kl_coef != 0 or args.use_kl_loss)
)
assert len(set(start_rollout_ids)) == 1
if args.start_rollout_id is None:
args.start_rollout_id = start_rollout_ids[0]
if args.rollout_global_dataset:
ray.get(rollout_generator.data_buffer.load.remote(args.start_rollout_id - 1))
# initialize the connection for weight update during training
ray.get(actor_model.async_init_weight_update_connections(rollout_generator))
if args.offload:
ray.get(rollout_generator.async_onload())
# always update weight first so that sglang has the loaded weights from training.
ray.get(actor_model.async_update_weights())
# train loop.
# note that for async training, one can change the position of the sync operation(ray.get).
for rollout_id in range(args.start_rollout_id, args.num_rollout):
if args.eval_interval is not None and rollout_id == 0:
ray.get(rollout_generator.async_generate(rollout_id, evaluation=True))
ray.get(actor_model.async_eval(rollout_id))
ray.get(rollout_generator.async_generate(rollout_id))
if args.offload:
ray.get(rollout_generator.async_offload())
ray.get(actor_model.async_train(rollout_id))
if args.save_interval is not None and (
(rollout_id + 1) % args.save_interval == 0
or (num_rollout_per_epoch is not None and (rollout_id + 1) % num_rollout_per_epoch == 0)
):
ray.get(actor_model.async_save_model(rollout_id))
if args.rollout_global_dataset:
ray.get(rollout_generator.data_buffer.save.remote(rollout_id))
if args.offload:
ray.get(actor_model.async_offload())
ray.get(rollout_generator.async_onload())
ray.get(actor_model.async_update_weights())
if args.eval_interval is not None and (
(rollout_id + 1) % args.eval_interval == 0
or (num_rollout_per_epoch is not None and (rollout_id + 1) % num_rollout_per_epoch == 0)
):
ray.get(rollout_generator.async_generate(rollout_id, evaluation=True))
ray.get(actor_model.async_eval(rollout_id))
if __name__ == "__main__":
args = parse_args()
train(args)