Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion flagscale/train/megatron/gpt_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# NOTE: Loading `megatron.legacy.model` earlier fails due to circular import


def gpt_builder(args, pre_process, post_process, vp_stage=None, config=None, pg_collection=None):
def gpt_builder(args, pre_process, post_process, vp_stage=None, config=None, pg_collection=None, dualpipev_stage=None):
print_rank_0('building GPT model ...')
if config is None:
if args.yaml_cfg is not None:
Expand Down Expand Up @@ -68,6 +68,7 @@ def gpt_builder(args, pre_process, post_process, vp_stage=None, config=None, pg_
normalization=args.normalization,
qk_l2_norm=args.qk_l2_norm,
vp_stage=vp_stage,
dualpipev_stage=dualpipev_stage,
)
elif args.heterogeneous_layers_config_path is not None:
assert not (config.transformer_impl == "inference_optimized")
Expand Down Expand Up @@ -97,6 +98,7 @@ def gpt_builder(args, pre_process, post_process, vp_stage=None, config=None, pg_
transformer_layer_spec_for_mtp,
use_transformer_engine=use_te,
vp_stage=vp_stage,
dualpipev_stage=dualpipev_stage,
)

model = GPTModel(
Expand All @@ -116,6 +118,7 @@ def gpt_builder(args, pre_process, post_process, vp_stage=None, config=None, pg_
mtp_block_spec=mtp_block_spec,
vp_stage=vp_stage,
pg_collection=pg_collection,
dualpipev_stage=dualpipev_stage,
)

return model
Expand Down
6 changes: 4 additions & 2 deletions flagscale/train/megatron/model_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


def model_provider(
model_builder: Callable, pre_process=True, post_process=True, vp_stage: Optional[int] = None, config=None, pg_collection=None,
model_builder: Callable, pre_process=True, post_process=True, vp_stage: Optional[int] = None, config=None, pg_collection=None, dualpipev_stage: Optional[int] = None
) -> Union[GPTModel, megatron.legacy.model.GPTModel, MambaModel]:
"""Builds the model.

Expand Down Expand Up @@ -59,8 +59,10 @@ def oom_observer(device, alloc, device_alloc, device_free):
if has_nvidia_modelopt and getattr(args, 'modelopt_enabled', False):
# [ModelOpt]: Use custom builder + spec when modelopt is enabled
model_builder = modelopt_gpt_mamba_builder
if args.use_dualpipev:
raise ValueError("")

return model_builder(args, pre_process, post_process, vp_stage, config=config, pg_collection=pg_collection)
return model_builder(args, pre_process, post_process, vp_stage, config=config, pg_collection=pg_collection, dualpipev_stage=dualpipev_stage)


def count_parameters_in_layer(model, layer_name):
Expand Down
9 changes: 5 additions & 4 deletions flagscale/train/megatron/training/arguments_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,11 @@ def _parse_recompute_refined_config(recom_config, recom_config_name):
"DualPipeV can only be used for pipeline scheduling in MoE models, "
"thus requiring both pipeline parallelism and expert parallelism."
)
assert args.expert_model_parallel_size > 1, (
"DualPipeV can only be used for pipeline scheduling in MoE models, "
"thus requiring both pipeline parallelism and expert parallelism."
)
# assert args.expert_model_parallel_size > 1, (
# "DualPipeV can only be used for pipeline scheduling in MoE models, "
# "thus requiring both pipeline parallelism and expert parallelism."
# )
args.dualpipev_pipeline_model_parallel_size = 2

middle_stage_layers = args.num_layers
num_middle_stages = args.pipeline_model_parallel_size
Expand Down
57 changes: 36 additions & 21 deletions flagscale/train/megatron/training/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ def set_startup_timestamps(program_start=None, main_entry=None):
is_pp_last_stage,
is_vp_first_stage,
is_vp_last_stage,
is_dualpipev_first_stage,
is_dualpipev_last_stage,
)
from megatron.core.optimizer import get_mup_config_overrides, get_standard_config_overrides
from megatron.training.checkpointing import load_checkpoint
Expand Down Expand Up @@ -1180,6 +1182,17 @@ def pretrain(
train_data_iterator.append(iterators[0])
valid_data_iterator.append(iterators[1])
test_data_iterator.append(iterators[2])
elif args.use_dualpipev:
train_data_iterator = []
valid_data_iterator = []
test_data_iterator = []
for _ in range(2):
iterators = build_train_valid_test_data_iterators(
train_valid_test_dataset_provider
)
train_data_iterator.append(iterators[0])
valid_data_iterator.append(iterators[1])
test_data_iterator.append(iterators[2])
else:
train_data_iterator, valid_data_iterator, test_data_iterator = (
build_train_valid_test_data_iterators(train_valid_test_dataset_provider)
Expand Down Expand Up @@ -1441,26 +1454,23 @@ def build_model():
model.append(this_model)
elif args.use_dualpipev:
model = []

pre_process, post_process = False, False
if mpu.is_pipeline_first_stage():
pre_process = True

first_model = model_provider_func(
pre_process=pre_process,
post_process=post_process,
is_dualpipev_first_chunk=True,
)
first_model.model_type = model_type
model.append(first_model)

second_model = model_provider_func(
pre_process=post_process,
post_process=pre_process,
is_dualpipev_first_chunk=False,
)
second_model.model_type = model_type
model.append(second_model)
for i in range(2):
pre_process = is_pp_first_stage(pg_collection.pp) and is_dualpipev_first_stage(
dualpipev_stage=i, dualpipev_size=2
)
post_process = is_pp_first_stage(pg_collection.pp) and is_dualpipev_last_stage(
dualpipev_stage=i, dualpipev_size=2
)
this_model = model_provider_func(
pre_process=pre_process,
post_process=post_process,
config=config,
pg_collection=pg_collection,
dualpipev_stage=i,
)
this_model.model_type = model_type
this_model.dualpipev_stage = i
model.append(this_model)
else:
pre_process = is_pp_first_stage(pg_collection.pp)
post_process = is_pp_last_stage(pg_collection.pp)
Expand All @@ -1480,6 +1490,8 @@ def build_model():
else:
model = build_model()

print(f"{model=}")

if not isinstance(model, list):
model = [model]

Expand Down Expand Up @@ -2077,7 +2089,10 @@ def train_step(forward_step_func, data_iterator, model, optimizer, opt_param_sch
if args.empty_unused_memory_level >= 2:
cur_platform.empty_cache()

if mpu.is_pipeline_last_stage(ignore_virtual=True):
is_last_stage = mpu.is_pipeline_last_stage(ignore_virtual=True)
if args.use_dualpipev:
is_last_stage = mpu.is_pipeline_first_stage(ignore_virtual=True)
if is_last_stage:
# Average loss across microbatches.
loss_reduced = {}

Expand Down
9 changes: 6 additions & 3 deletions flagscale/train/megatron/training/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,15 +528,18 @@ def is_hybrid_model(args):
return args.hybrid_layer_pattern is not None


def is_first_or_last_pipeline_stage(vp_stage):
def is_first_or_last_pipeline_stage(vp_stage, dualpipev_stage=None):
"""Return True if on first or last pipeline stage, taking into account virtual
pipeline parallelism."""
ignore_virtual = True
ignore_dualpipev = True
if vp_stage is not None:
ignore_virtual = False
if dualpipev_stage is not None:
ignore_dualpipev = False
return (
mpu.is_pipeline_first_stage(ignore_virtual=ignore_virtual, vp_stage=vp_stage)
or mpu.is_pipeline_last_stage(ignore_virtual=ignore_virtual, vp_stage=vp_stage)
mpu.is_pipeline_first_stage(ignore_virtual=ignore_virtual, vp_stage=vp_stage, ignore_dualpipev=ignore_dualpipev, dualpipev_stage=dualpipev_stage)
or mpu.is_pipeline_last_stage(ignore_virtual=ignore_virtual, vp_stage=vp_stage, ignore_dualpipev=ignore_dualpipev, dualpipev_stage=dualpipev_stage)
)


Expand Down
Loading