Skip to content

Commit

Permalink
Push evaluation results into the datalake (#514)
Browse files Browse the repository at this point in the history
* Push evaluation results into the datalake

* quick change

* push changes
  • Loading branch information
vwxyzjn authored Jan 13, 2025
1 parent 45e0bde commit 6de607b
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 95 deletions.
1 change: 1 addition & 0 deletions mason.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def parse_beaker_dataset(dataset_str):
"ai2/saturn-cirrascale",
"ai2/neptune-cirrascale",
"ai2/allennlp-elara-cirrascale",
"ai2/ceres-cirrascale",

]
GCP_CLUSTERS = [
Expand Down
12 changes: 9 additions & 3 deletions open_instruct/dpo_tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,12 @@ def load_model():
if accelerator.is_local_main_process:
clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0)

if args.try_auto_save_to_beaker and accelerator.is_main_process and len(beaker_config.beaker_dataset_id_urls) > 0 and args.output_dir != "/output":
if (
args.try_auto_save_to_beaker
and accelerator.is_main_process
and len(beaker_config.beaker_dataset_id_urls) > 0
and args.output_dir != "/output"
):
shutil.copytree(args.output_dir, "/output", dirs_exist_ok=True)

if is_beaker_job() and accelerator.is_main_process:
Expand Down Expand Up @@ -1160,7 +1165,7 @@ def load_model():
if args.try_launch_beaker_eval_jobs:
command = f"""\
python mason.py \
--cluster ai2/allennlp-cirrascale ai2/pluto-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \
--cluster ai2/ganymede-cirrascale ai2/ceres-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \
--priority low \
--preemptible \
--budget ai2/allennlp \
Expand All @@ -1170,7 +1175,8 @@ def load_model():
--gpus 0 -- python scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py \
--beaker_workload_id {beaker_config.beaker_workload_id} \
--upload_to_hf {args.hf_metadata_dataset} \
--model_name {args.run_name}
--model_name {args.run_name} \
--run_id {wandb_tracker.run.get_url()}
"""
process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
Expand Down
12 changes: 9 additions & 3 deletions open_instruct/dpo_tune_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,12 @@ def load_model():
if accelerator.is_local_main_process:
clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0)

if args.try_auto_save_to_beaker and accelerator.is_main_process and len(beaker_config.beaker_dataset_id_urls) > 0 and args.output_dir != "/output":
if (
args.try_auto_save_to_beaker
and accelerator.is_main_process
and len(beaker_config.beaker_dataset_id_urls) > 0
and args.output_dir != "/output"
):
shutil.copytree(args.output_dir, "/output", dirs_exist_ok=True)

if is_beaker_job() and accelerator.is_main_process:
Expand Down Expand Up @@ -1180,7 +1185,7 @@ def load_model():
if args.try_launch_beaker_eval_jobs:
command = f"""\
python mason.py \
--cluster ai2/allennlp-cirrascale ai2/general-cirrascale-a5000 ai2/general-cirrascale-a5000 ai2/s2-cirrascale ai2/general-cirrascale \
--cluster ai2/ganymede-cirrascale ai2/ceres-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \
--priority low \
--preemptible \
--budget ai2/allennlp \
Expand All @@ -1190,7 +1195,8 @@ def load_model():
--gpus 0 -- python scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py \
--beaker_workload_id {beaker_config.beaker_workload_id} \
--upload_to_hf {args.hf_metadata_dataset} \
--model_name {args.run_name}
--model_name {args.run_name} \
--run_id {wandb_tracker.run.get_url()}
"""
process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
Expand Down
12 changes: 9 additions & 3 deletions open_instruct/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,12 @@ def main(args: FlatArguments):
if accelerator.is_local_main_process:
clean_last_n_checkpoints(args.output_dir, keep_last_n_checkpoints=0)

if args.try_auto_save_to_beaker and accelerator.is_main_process and len(beaker_config.beaker_dataset_id_urls) > 0 and args.output_dir != "/output":
if (
args.try_auto_save_to_beaker
and accelerator.is_main_process
and len(beaker_config.beaker_dataset_id_urls) > 0
and args.output_dir != "/output"
):
shutil.copytree(args.output_dir, "/output", dirs_exist_ok=True)

if is_beaker_job() and accelerator.is_main_process:
Expand Down Expand Up @@ -1074,7 +1079,7 @@ def main(args: FlatArguments):
if args.try_launch_beaker_eval_jobs:
command = f"""\
python mason.py \
--cluster ai2/allennlp-cirrascale ai2/pluto-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \
--cluster ai2/ganymede-cirrascale ai2/ceres-cirrascale ai2/neptune-cirrascale ai2/saturn-cirrascale ai2/jupiter-cirrascale-2 \
--priority low \
--preemptible \
--budget ai2/allennlp \
Expand All @@ -1084,7 +1089,8 @@ def main(args: FlatArguments):
--gpus 0 -- python scripts/wait_beaker_dataset_model_upload_then_evaluate_model.py \
--beaker_workload_id {beaker_config.beaker_workload_id} \
--upload_to_hf {args.hf_metadata_dataset} \
--model_name {args.run_name}
--model_name {args.run_name} \
--run_id {wandb_tracker.run.get_url()}
"""
process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
Expand Down
28 changes: 21 additions & 7 deletions open_instruct/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@
from torch.nn.parallel.distributed import DistributedDataParallel
from transformers import PreTrainedModel, PreTrainedTokenizer

from open_instruct.ground_truth_utils import (
verify_gsm8k_sample,
verify_ifeval_sample,
verify_math_sample,
)
from open_instruct.utils import retry_on_exception
from open_instruct.ground_truth_utils import verify_gsm8k_sample, verify_math_sample, verify_ifeval_sample


@dataclass
Expand Down Expand Up @@ -208,7 +212,13 @@ def get_reward(


def apply_verifiable_reward(
query_responses: torch.Tensor, tokenizer, ground_truths: List[str], datasets: List[str], verify_reward : int = 10, answer_extraction_model: Optional[torch.nn.Module] = None, answer_extraction_tokenizer: Optional[PreTrainedTokenizer] = None
query_responses: torch.Tensor,
tokenizer,
ground_truths: List[str],
datasets: List[str],
verify_reward: int = 10,
answer_extraction_model: Optional[torch.nn.Module] = None,
answer_extraction_tokenizer: Optional[PreTrainedTokenizer] = None,
):
# decode the responses
decoded_responses = tokenizer.batch_decode(query_responses, skip_special_tokens=True)
Expand All @@ -218,10 +228,14 @@ def apply_verifiable_reward(
# add the prompt to the responses
decoded_responses = [f"{response} {prompt}" for response in decoded_responses]
# extract the answer
answer_extraction_inputs = answer_extraction_tokenizer(decoded_responses, return_tensors="pt", padding=True, truncation=True)
answer_extraction_inputs = answer_extraction_tokenizer(
decoded_responses, return_tensors="pt", padding=True, truncation=True
)
answer_extraction_outputs = answer_extraction_model(**answer_extraction_inputs)
# get the predicted answer
decoded_responses = answer_extraction_tokenizer.batch_decode(answer_extraction_outputs.logits.argmax(-1), skip_special_tokens=True)
decoded_responses = answer_extraction_tokenizer.batch_decode(
answer_extraction_outputs.logits.argmax(-1), skip_special_tokens=True
)
# compare with ground truth.
# use same logic as in gsm8k evaluation
rewards = []
Expand All @@ -230,11 +244,11 @@ def apply_verifiable_reward(
if ground_truth is None:
rewards.append(0)
continue
if dataset.lower() == 'gsm8k':
if dataset.lower() == "gsm8k":
verified = verify_gsm8k_sample(prediction, ground_truth)
elif dataset.lower() == 'math':
elif dataset.lower() == "math":
verified = verify_math_sample(prediction, ground_truth)
elif dataset.lower() == 'ifeval':
elif dataset.lower() == "ifeval":
verified = verify_ifeval_sample(prediction, ground_truth)
# if verified, give reward
if verified:
Expand Down
40 changes: 0 additions & 40 deletions open_instruct/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,46 +874,6 @@ def maybe_use_ai2_hf_entity() -> Optional[str]:
return None


def submit_beaker_eval_jobs(
model_name: str,
location: str,
hf_repo_revision: str = "",
workspace: str = "tulu-3-results",
beaker_image: str = "nathanl/open_instruct_auto",
upload_to_hf: str = "allenai/tulu-3-evals",
run_oe_eval_experiments: bool = False,
run_safety_evaluations: bool = False,
skip_oi_evals: bool = False,
) -> None:
command = f"""
python scripts/submit_eval_jobs.py \
--model_name {model_name} \
--location {location} \
--is_tuned \
--workspace {workspace} \
--preemptible \
--use_hf_tokenizer_template \
--beaker_image {beaker_image} \
"""
if len(hf_repo_revision) > 0:
command += f" --hf_revision {hf_repo_revision}"
if len(upload_to_hf) > 0:
command += f" --upload_to_hf {upload_to_hf}"
if run_oe_eval_experiments:
command += " --run_oe_eval_experiments"
if run_safety_evaluations:
command += " --run_safety_evaluations"
if skip_oi_evals:
command += " --skip_oi_evals"

process = subprocess.Popen(["bash", "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

print(f"Beaker evaluation jobs: Stdout:\n{stdout.decode()}")
print(f"Beaker evaluation jobs: Stderr:\n{stderr.decode()}")
print(f"Beaker evaluation jobs: process return code: {process.returncode}")


@retry_on_exception()
def upload_metadata_to_hf(
metadata_dict,
Expand Down
23 changes: 19 additions & 4 deletions scripts/eval/oe-eval.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ while [[ "$#" -gt 0 ]]; do
--priority) PRIORITY="$2"; shift ;;
--tasks) CUSTOM_TASKS="$2"; shift ;;
--evaluate_on_weka) EVALUATE_ON_WEKA="true" ;;
--step) STEP="$2"; shift ;;
--run-id) RUN_ID="$2"; shift ;;
*) echo "Unknown parameter passed: $1"; usage ;;
esac
shift
Expand All @@ -88,6 +90,15 @@ MAX_LENGTH="${MAX_LENGTH:-4096}"
UNSEEN_EVALS="${UNSEEN_EVALS:-false}"
PRIORITY="${PRIORITY:normal}"
EVALUATE_ON_WEKA="${EVALUATE_ON_WEKA:-false}"
RUN_ID="${RUN_ID:-}"
STEP="${STEP:-}"
DATALAKE_ARGS=""
if [[ -n "$RUN_ID" ]]; then
DATALAKE_ARGS+="run_id=$RUN_ID"
fi
if [[ -n "$STEP" ]]; then
DATALAKE_ARGS+=",step=$STEP"
fi

# if UNSEEN_EVALS, save results to a different directory
if [ "$UNSEEN_EVALS" == "true" ]; then
Expand Down Expand Up @@ -169,12 +180,14 @@ for TASK in "${TASKS[@]}"; do
--model-args "{\"model_path\":\"${MODEL_LOCATION}\", \"max_length\": ${MAX_LENGTH}}" \
${HF_UPLOAD_ARG} \
--gpus "$GPU_COUNT" \
--beaker-image "costah/oe-eval-olmo1124-12022024" \
--beaker-image "01JH9RAAZQXKNNDHYTN2DPNPKN" \
--gantry-args '{"env-secret": "OPENAI_API_KEY=openai_api_key", "weka": "oe-adapt-default:/weka/oe-adapt-default"}' \
${REVISION_ARG} \
--cluster ai2/neptune-cirrascale,ai2/saturn-cirrascale,ai2/jupiter-cirrascale-2 \
--beaker-retries 2 \
--beaker-priority "$PRIORITY"
--beaker-priority "$PRIORITY" \
--push-datalake \
--datalake-tags "$DATALAKE_ARGS"
else
python oe-eval-internal/oe_eval/launch.py \
--model "$MODEL_NAME" \
Expand All @@ -186,10 +199,12 @@ for TASK in "${TASKS[@]}"; do
--model-args "{\"model_path\":\"${MODEL_LOCATION}\", \"max_length\": ${MAX_LENGTH}}" \
${HF_UPLOAD_ARG} \
--gpus "$GPU_COUNT" \
--beaker-image "costah/oe-eval-olmo1124-12022024" \
--beaker-image "01JH9RAAZQXKNNDHYTN2DPNPKN" \
--gantry-args '{"env-secret": "OPENAI_API_KEY=openai_api_key"}' \
${REVISION_ARG} \
--beaker-retries 2 \
--beaker-priority "$PRIORITY"
--beaker-priority "$PRIORITY" \
--push-datalake \
--datalake-tags "$DATALAKE_ARGS"
fi
done
36 changes: 10 additions & 26 deletions scripts/submit_eval_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,12 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier):
########################################
# Launcher

NFS_CLUSTERS = [
"ai2/allennlp-cirrascale",
"ai2/aristo-cirrascale",
"ai2/climate-cirrascale",
"ai2/general-cirrascale",
"ai2/general-cirrascale-a5000",
"ai2/mosaic-cirrascale",
"ai2/mosaic-cirrascale-a100",
"ai2/pluto-cirrascale",
"ai2/prior-cirrascale",
"ai2/s2-cirrascale",
"ai2/s2-cirrascale-l40",
]

WEKA_CLUSTERS = [
"ai2/jupiter-cirrascale-2",
"ai2/saturn-cirrascale",
"ai2/neptune-cirrascale",
"ai2/allennlp-elara-cirrascale",
"ai2/ceres-cirrascale"
]
GCP_CLUSTERS = [
"ai2/augusta-google-1"
Expand Down Expand Up @@ -130,6 +117,8 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier):
parser.add_argument("--use_alternate_safety_image", type=str, default=None, help="Use a different image for safety eval.")
parser.add_argument("--evaluate_on_weka", action="store_true", help="Evaluate OE eval on Beaker.")
parser.add_argument("--oe_eval_tasks", type=str, default=None, help="Evaluate OE eval on Beaker.")
parser.add_argument("--step", type=int, default=None, help="Step number for postgresql logging.")
parser.add_argument("--run_id", type=str, default=None, help="A unique run ID for postgresql logging.")
args = parser.parse_args()


Expand All @@ -149,17 +138,8 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier):
d1['tasks'][0]['resources']['gpuCount'] = 1

# remove nfs if asked or jupiter in cluster list.
nfs_available = False
weka_available = False
if all(c in NFS_CLUSTERS for c in cluster):
d1['tasks'][0]['datasets'].append({
'mountPath': "/net/nfs.cirrascale",
"source": {
"hostPath": "/net/nfs.cirrascale"
}
})
nfs_available = True
elif all(c in WEKA_CLUSTERS for c in cluster):
if all(c in WEKA_CLUSTERS for c in cluster):
d1['tasks'][0]['datasets'].append({
'mountPath': "/weka/oe-adapt-default",
"source": {
Expand Down Expand Up @@ -504,7 +484,7 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier):
task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", f"--model_name_or_path {model_info[1]} --hf_revision {args.hf_revision}")]
task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", f"--tokenizer_name_or_path {model_info[1]}")]
elif model_info[1].startswith("/"): # if it's a local model, load it from the local directory
assert nfs_available or weka_available, "NFS / Weka is required for path-based models." # to be safe.
assert weka_available, "NFS / Weka is required for path-based models." # to be safe.
task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", f"--model_name_or_path {model_info[1]}")]
task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", f"--tokenizer_name_or_path {model_info[1]}")]
else: # if it's a beaker model, mount the beaker dataset to `/model`
Expand Down Expand Up @@ -635,6 +615,10 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier):
oe_eval_cmd += " --evaluate_on_weka"
if args.oe_eval_tasks:
oe_eval_cmd += f" --tasks {args.oe_eval_tasks}"
if args.run_id:
oe_eval_cmd += f" --run-id {args.run_id}"
if args.step:
oe_eval_cmd += f" --step {args.step}"
# add string with number of gpus
num_gpus = task_spec['resources']['gpuCount']
# if num_gpus > 1, double it again for oe-eval configs
Expand Down Expand Up @@ -678,7 +662,7 @@ def adjust_gpus(task_spec, experiment_group, model_name, gpu_multiplier):
task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", f"--model_name_or_path {model_info[1]} --hf_revision {args.hf_revision}")]
task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", f"--tokenizer_name_or_path {model_info[1]}")]
elif model_info[1].startswith("/"): # if it's a local model, load it from the local directory
assert nfs_available or weka_available, "NFS / Weka is required for path-based models." # to be safe.
assert weka_available, "NFS / Weka is required for path-based models." # to be safe.
task_spec['arguments'] = [task_spec['arguments'][0].replace("--model_name_or_path /model", "--model_name_or_path "+model_info[1])]
task_spec['arguments'] = [task_spec['arguments'][0].replace("--tokenizer_name_or_path /model", "--tokenizer_name_or_path "+model_info[1])]
else: # if it's a beaker model, mount the beaker dataset to `/model`
Expand Down
Loading

0 comments on commit 6de607b

Please sign in to comment.