diff --git a/mason.py b/mason.py index faae04ff9..9759918bf 100644 --- a/mason.py +++ b/mason.py @@ -32,6 +32,7 @@ def parse_beaker_dataset(dataset_str): "ai2/saturn-cirrascale", "ai2/neptune-cirrascale", "ai2/allennlp-elara-cirrascale", + "ai2/ceres-cirrascale", ] GCP_CLUSTERS = [ diff --git a/open_instruct/dpo_tune.py b/open_instruct/dpo_tune.py index ef44c6776..f8d767ee7 100644 --- a/open_instruct/dpo_tune.py +++ b/open_instruct/dpo_tune.py @@ -1120,7 +1120,12 @@ def load_model(): if accelerator.is_local_main_process: clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0) - if args.try_auto_save_to_beaker and accelerator.is_main_process and len(beaker_config.beaker_dataset_id_urls) > 0 and args.output_dir != "/output": + if ( + args.try_auto_save_to_beaker + and accelerator.is_main_process + and len(beaker_config.beaker_dataset_id_urls) > 0 + and args.output_dir != "/output" + ): shutil.copytree(args.output_dir, "/output", dirs_exist_ok=True) if is_beaker_job() and accelerator.is_main_process: @@ -1160,7 +1165,7 @@ def load_model(): if args.try_launch_beaker_eval_jobs: command = f"""\ python mason.py \ - --cluster ai2/allennlp-cirrascale ai2/pluto-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \ + --cluster ai2/ganymede-cirrascale ai2/ceres-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \ --priority low \ --preemptible \ --budget ai2/allennlp \ @@ -1170,7 +1175,8 @@ def load_model(): --gpus 0 -- python scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py \ --beaker_workload_id {beaker_config.beaker_workload_id} \ --upload_to_hf {args.hf_metadata_dataset} \ - --model_name {args.run_name} + --model_name {args.run_name} \ + --run_id {wandb_tracker.run.get_url()} """ process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() diff --git a/open_instruct/dpo_tune_cache.py b/open_instruct/dpo_tune_cache.py index 1a04b6f11..4b92dd1a9 100644 --- a/open_instruct/dpo_tune_cache.py +++ b/open_instruct/dpo_tune_cache.py @@ -1140,7 +1140,12 @@ def load_model(): if accelerator.is_local_main_process: clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0) - if args.try_auto_save_to_beaker and accelerator.is_main_process and len(beaker_config.beaker_dataset_id_urls) > 0 and args.output_dir != "/output": + if ( + args.try_auto_save_to_beaker + and accelerator.is_main_process + and len(beaker_config.beaker_dataset_id_urls) > 0 + and args.output_dir != "/output" + ): shutil.copytree(args.output_dir, "/output", dirs_exist_ok=True) if is_beaker_job() and accelerator.is_main_process: @@ -1180,7 +1185,7 @@ def load_model(): if args.try_launch_beaker_eval_jobs: command = f"""\ python mason.py \ - --cluster ai2/allennlp-cirrascale ai2/general-cirrascale-a5000 ai2/general-cirrascale-a5000 ai2/s2-cirrascale ai2/general-cirrascale \ + --cluster ai2/ganymede-cirrascale ai2/ceres-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \ --priority low \ --preemptible \ --budget ai2/allennlp \ @@ -1190,7 +1195,8 @@ def load_model(): --gpus 0 -- python scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py \ --beaker_workload_id {beaker_config.beaker_workload_id} \ --upload_to_hf {args.hf_metadata_dataset} \ - --model_name {args.run_name} + --model_name {args.run_name} \ + --run_id {wandb_tracker.run.get_url()} """ process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() diff --git a/open_instruct/finetune.py b/open_instruct/finetune.py index 101325f18..3e688b3f4 100644 --- a/open_instruct/finetune.py +++ b/open_instruct/finetune.py @@ -1034,7 +1034,12 @@ def main(args: FlatArguments): if accelerator.is_local_main_process: clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0) - if args.try_auto_save_to_beaker and accelerator.is_main_process and len(beaker_config.beaker_dataset_id_urls) > 0 and args.output_dir != "/output": + if ( + args.try_auto_save_to_beaker + and accelerator.is_main_process + and len(beaker_config.beaker_dataset_id_urls) > 0 + and args.output_dir != "/output" + ): shutil.copytree(args.output_dir, "/output", dirs_exist_ok=True) if is_beaker_job() and accelerator.is_main_process: @@ -1074,7 +1079,7 @@ def main(args: FlatArguments): if args.try_launch_beaker_eval_jobs: command = f"""\ python mason.py \ - --cluster ai2/allennlp-cirrascale ai2/pluto-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \ + --cluster ai2/ganymede-cirrascale ai2/ceres-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \ --priority low \ --preemptible \ --budget ai2/allennlp \ @@ -1084,7 +1089,8 @@ def main(args: FlatArguments): --gpus 0 -- python scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py \ --beaker_workload_id {beaker_config.beaker_workload_id} \ --upload_to_hf {args.hf_metadata_dataset} \ - --model_name {args.run_name} + --model_name {args.run_name} \ + --run_id {wandb_tracker.run.get_url()} """ process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() diff --git a/open_instruct/model_utils.py b/open_instruct/model_utils.py index f0a2d48bf..de15d55b6 100644 --- a/open_instruct/model_utils.py +++ b/open_instruct/model_utils.py @@ -38,8 +38,12 @@ from torch.nn.parallel.distributed import DistributedDataParallel from transformers import PreTrainedModel, PreTrainedTokenizer +from open_instruct.ground_truth_utils import ( + verify_gsm8k_sample, + verify_ifeval_sample, + verify_math_sample, +) from open_instruct.utils import retry_on_exception -from open_instruct.ground_truth_utils import verify_gsm8k_sample, verify_math_sample, verify_ifeval_sample @dataclass @@ -208,7 +212,13 @@ def get_reward( def apply_verifiable_reward( - query_responses: torch.Tensor, tokenizer, ground_truths: List[str], datasets: List[str], verify_reward : int = 10, answer_extraction_model: Optional[torch.nn.Module] = None, answer_extraction_tokenizer: Optional[PreTrainedTokenizer] = None + query_responses: torch.Tensor, + tokenizer, + ground_truths: List[str], + datasets: List[str], + verify_reward: int = 10, + answer_extraction_model: Optional[torch.nn.Module] = None, + answer_extraction_tokenizer: Optional[PreTrainedTokenizer] = None, ): # decode the responses decoded_responses = tokenizer.batch_decode(query_responses, skip_special_tokens=True) @@ -218,10 +228,14 @@ def apply_verifiable_reward( # add the prompt to the responses decoded_responses = [f"{response} {prompt}" for response in decoded_responses] # extract the answer - answer_extraction_inputs = answer_extraction_tokenizer(decoded_responses, return_tensors="pt", padding=True, truncation=True) + answer_extraction_inputs = answer_extraction_tokenizer( + decoded_responses, return_tensors="pt", padding=True, truncation=True + ) answer_extraction_outputs = answer_extraction_model(**answer_extraction_inputs) # get the predicted answer - decoded_responses = answer_extraction_tokenizer.batch_decode(answer_extraction_outputs.logits.argmax(-1), skip_special_tokens=True) + decoded_responses = answer_extraction_tokenizer.batch_decode( + answer_extraction_outputs.logits.argmax(-1), skip_special_tokens=True + ) # compare with ground truth. # use same logic as in gsm8k evaluation rewards = [] @@ -230,11 +244,11 @@ def apply_verifiable_reward( if ground_truth is None: rewards.append(0) continue - if dataset.lower() == 'gsm8k': + if dataset.lower() == "gsm8k": verified = verify_gsm8k_sample(prediction, ground_truth) - elif dataset.lower() == 'math': + elif dataset.lower() == "math": verified = verify_math_sample(prediction, ground_truth) - elif dataset.lower() == 'ifeval': + elif dataset.lower() == "ifeval": verified = verify_ifeval_sample(prediction, ground_truth) # if verified, give reward if verified: diff --git a/open_instruct/utils.py b/open_instruct/utils.py index 853c845d3..5bedc9d7e 100644 --- a/open_instruct/utils.py +++ b/open_instruct/utils.py @@ -874,46 +874,6 @@ def maybe_use_ai2_hf_entity() -> Optional[str]: return None -def submit_beaker_eval_jobs( - model_name: str, - location: str, - hf_repo_revision: str = "", - workspace: str = "tulu-3-results", - beaker_image: str = "nathanl/open_instruct_auto", - upload_to_hf: str = "allenai/tulu-3-evals", - run_oe_eval_experiments: bool = False, - run_safety_evaluations: bool = False, - skip_oi_evals: bool = False, -) -> None: - command = f""" - python scripts/submit_eval_jobs.py \ - --model_name {model_name} \ - --location {location} \ - --is_tuned \ - --workspace {workspace} \ - --preemptible \ - --use_hf_tokenizer_template \ - --beaker_image {beaker_image} \ - """ - if len(hf_repo_revision) > 0: - command += f" --hf_revision {hf_repo_revision}" - if len(upload_to_hf) > 0: - command += f" --upload_to_hf {upload_to_hf}" - if run_oe_eval_experiments: - command += " --run_oe_eval_experiments" - if run_safety_evaluations: - command += " --run_safety_evaluations" - if skip_oi_evals: - command += " --skip_oi_evals" - - process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - - print(f"Beaker evaluation jobs: Stdout:\n{stdout.decode()}") - print(f"Beaker evaluation jobs: Stderr:\n{stderr.decode()}") - print(f"Beaker evaluation jobs: process return code: {process.returncode}") - - @retry_on_exception() def upload_metadata_to_hf( metadata_dict, diff --git a/scripts/eval/oe-eval.sh b/scripts/eval/oe-eval.sh index 1bd1a3282..23ea1eb3c 100755 --- a/scripts/eval/oe-eval.sh +++ b/scripts/eval/oe-eval.sh @@ -65,6 +65,8 @@ while [[ "$#" -gt 0 ]]; do --priority) PRIORITY="$2"; shift ;; --tasks) CUSTOM_TASKS="$2"; shift ;; --evaluate_on_weka) EVALUATE_ON_WEKA="true" ;; + --step) STEP="$2"; shift ;; + --run-id) RUN_ID="$2"; shift ;; *) echo "Unknown parameter passed: $1"; usage ;; esac shift @@ -88,6 +90,15 @@ MAX_LENGTH="${MAX_LENGTH:-4096}" UNSEEN_EVALS="${UNSEEN_EVALS:-false}" PRIORITY="${PRIORITY:normal}" EVALUATE_ON_WEKA="${EVALUATE_ON_WEKA:-false}" +RUN_ID="${RUN_ID:-}" +STEP="${STEP:-}" +DATALAKE_ARGS="" +if [[ -n "$RUN_ID" ]]; then + DATALAKE_ARGS+="run_id=$RUN_ID" +fi +if [[ -n "$STEP" ]]; then + DATALAKE_ARGS+=",step=$STEP" +fi # if UNSEEN_EVALS, save results to a different directory if [ "$UNSEEN_EVALS" == "true" ]; then @@ -169,12 +180,14 @@ for TASK in "${TASKS[@]}"; do --model-args "{\"model_path\":\"${MODEL_LOCATION}\", \"max_length\": ${MAX_LENGTH}}" \ ${HF_UPLOAD_ARG} \ --gpus "$GPU_COUNT" \ - --beaker-image "costah/oe-eval-olmo1124-12022024" \ + --beaker-image "01JH9RAAZQXKNNDHYTN2DPNPKN" \ --gantry-args '{"env-secret": "OPENAI_API_KEY=openai_api_key", "weka": "oe-adapt-default:/weka/oe-adapt-default"}' \ ${REVISION_ARG} \ --cluster ai2/neptune-cirrascale,ai2/saturn-cirrascale,ai2/jupiter-cirrascale-2 \ --beaker-retries 2 \ - --beaker-priority "$PRIORITY" + --beaker-priority "$PRIORITY" \ + --push-datalake \ + --datalake-tags "$DATALAKE_ARGS" else python oe-eval-internal/oe_eval/launch.py \ --model "$MODEL_NAME" \ @@ -186,10 +199,12 @@ for TASK in "${TASKS[@]}"; do --model-args "{\"model_path\":\"${MODEL_LOCATION}\", \"max_length\": ${MAX_LENGTH}}" \ ${HF_UPLOAD_ARG} \ --gpus "$GPU_COUNT" \ - --beaker-image "costah/oe-eval-olmo1124-12022024" \ + --beaker-image "01JH9RAAZQXKNNDHYTN2DPNPKN" \ --gantry-args '{"env-secret": "OPENAI_API_KEY=openai_api_key"}' \ ${REVISION_ARG} \ --beaker-retries 2 \ - --beaker-priority "$PRIORITY" + --beaker-priority "$PRIORITY" \ + --push-datalake \ + --datalake-tags "$DATALAKE_ARGS" fi done diff --git a/scripts/submit_eval_jobs.py b/scripts/submit_eval_jobs.py index 0c6f7cd18..ef231c7ab 100755 --- a/scripts/submit_eval_jobs.py +++ b/scripts/submit_eval_jobs.py @@ -65,25 +65,12 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier): ######################################## # Launcher -NFS_CLUSTERS = [ - "ai2/allennlp-cirrascale", - "ai2/aristo-cirrascale", - "ai2/climate-cirrascale", - "ai2/general-cirrascale", - "ai2/general-cirrascale-a5000", - "ai2/mosaic-cirrascale", - "ai2/mosaic-cirrascale-a100", - "ai2/pluto-cirrascale", - "ai2/prior-cirrascale", - "ai2/s2-cirrascale", - "ai2/s2-cirrascale-l40", -] - WEKA_CLUSTERS = [ "ai2/jupiter-cirrascale-2", "ai2/saturn-cirrascale", "ai2/neptune-cirrascale", "ai2/allennlp-elara-cirrascale", + "ai2/ceres-cirrascale" ] GCP_CLUSTERS = [ "ai2/augusta-google-1" @@ -130,6 +117,8 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier): parser.add_argument("--use_alternate_safety_image", type=str, default=None, help="Use a different image for safety eval.") parser.add_argument("--evaluate_on_weka", action="store_true", help="Evaluate OE eval on Beaker.") parser.add_argument("--oe_eval_tasks", type=str, default=None, help="Evaluate OE eval on Beaker.") +parser.add_argument("--step", type=int, default=None, help="Step number for postgresql logging.") +parser.add_argument("--run_id", type=str, default=None, help="A unique run ID for postgresql logging.") args = parser.parse_args() @@ -149,17 +138,8 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier): d1['tasks'][0]['resources']['gpuCount'] = 1 # remove nfs if asked or jupiter in cluster list. -nfs_available = False weka_available = False -if all(c in NFS_CLUSTERS for c in cluster): - d1['tasks'][0]['datasets'].append({ - 'mountPath': "/net/nfs.cirrascale", - "source": { - "hostPath": "/net/nfs.cirrascale" - } - }) - nfs_available = True -elif all(c in WEKA_CLUSTERS for c in cluster): +if all(c in WEKA_CLUSTERS for c in cluster): d1['tasks'][0]['datasets'].append({ 'mountPath': "/weka/oe-adapt-default", "source": { @@ -504,7 +484,7 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier): task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", f"--model_name_or_path {model_info[1]} --hf_revision {args.hf_revision}")] task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", f"--tokenizer_name_or_path {model_info[1]}")] elif model_info[1].startswith("/"): # if it's a local model, load it from the local directory - assert nfs_available or weka_available, "NFS / Weka is required for path-based models." # to be safe. + assert weka_available, "NFS / Weka is required for path-based models." # to be safe. task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", f"--model_name_or_path {model_info[1]}")] task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", f"--tokenizer_name_or_path {model_info[1]}")] else: # if it's a beaker model, mount the beaker dataset to `/model` @@ -635,6 +615,10 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier): oe_eval_cmd += " --evaluate_on_weka" if args.oe_eval_tasks: oe_eval_cmd += f" --tasks {args.oe_eval_tasks}" + if args.run_id: + oe_eval_cmd += f" --run-id {args.run_id}" + if args.step: + oe_eval_cmd += f" --step {args.step}" # add string with number of gpus num_gpus = task_spec['resources']['gpuCount'] # if num_gpus > 1, double it again for oe-eval configs @@ -678,7 +662,7 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier): task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", f"--model_name_or_path {model_info[1]} --hf_revision {args.hf_revision}")] task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", f"--tokenizer_name_or_path {model_info[1]}")] elif model_info[1].startswith("/"): # if it's a local model, load it from the local directory - assert nfs_available or weka_available, "NFS / Weka is required for path-based models." # to be safe. + assert weka_available, "NFS / Weka is required for path-based models." # to be safe. task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", "--model_name_or_path "+model_info[1])] task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", "--tokenizer_name_or_path "+model_info[1])] else: # if it's a beaker model, mount the beaker dataset to `/model` diff --git a/scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py b/scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py index d1fe58c0e..de7c55b20 100644 --- a/scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py +++ b/scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py @@ -1,13 +1,14 @@ +import subprocess import sys import time from dataclasses import dataclass +from typing import Optional from open_instruct.utils import ( ArgumentParserPlus, BeakerRuntimeConfig, beaker_experiment_succeeded, get_beaker_dataset_ids, - submit_beaker_eval_jobs, ) """ @@ -22,6 +23,7 @@ class Args: max_wait_time_for_beaker_dataset_upload_seconds: int = 60 * 30 # 30 minutes check_interval_seconds: int = 60 upload_to_hf: str = "allenai/tulu-3-evals" + run_id: Optional[str] = None def main(args: Args, beaker_runtime_config: BeakerRuntimeConfig): @@ -35,14 +37,31 @@ def main(args: Args, beaker_runtime_config: BeakerRuntimeConfig): # I have checked a couple of beaker jobs and found the first dataset is the model # but we should check this assumption beaker_dataset_ids = get_beaker_dataset_ids(beaker_runtime_config.beaker_workload_id, sort=True) - submit_beaker_eval_jobs( - model_name=args.model_name, - location=beaker_dataset_ids[-1], - run_oe_eval_experiments=True, - run_safety_evaluations=True, - skip_oi_evals=True, - upload_to_hf=args.upload_to_hf, - ) + command = f""" + python scripts/submit_eval_jobs.py \ + --model_name {args.model_name} \ + --location {beaker_dataset_ids[-1]} \ + --is_tuned \ + --workspace tulu-3-results \ + --preemptible \ + --use_hf_tokenizer_template \ + --beaker_image nathanl/open_instruct_auto \ + --skip_oi_evals \ + --run_safety_evaluations \ + --run_oe_eval_experiments \ + --upload_to_hf {args.upload_to_hf} + """ + if args.run_id: + command += f" --run_id {args.run_id}" + + process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + + print(f"Beaker evaluation jobs: Stdout:\n{stdout.decode()}") + print(f"Beaker evaluation jobs: Stderr:\n{stderr.decode()}") + print(f"Beaker evaluation jobs: process return code: {process.returncode}") + + return time.sleep(args.check_interval_seconds) # If we reach here, the experiment failed