Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ results = workrb.evaluate( # Returns BenchmarkResults (Pydantic model)
model,
tasks,
output_folder="results/my_model",
save_rankings=False, # Optional: store full per-target score arrays for ranking tasks
)
print(results) # Benchmark/Per-task/Per-language metrics
```
Expand Down Expand Up @@ -208,6 +209,20 @@ results/my_model/
└── config.yaml # Final benchmark configuration dump
```

If you pass `save_rankings=True` to `evaluate`, WorkRB also writes per-task,
per-dataset ranking score artifacts under a model-scoped subdirectory:

```
results/my_model/
└── rankings/
└── <model_name>/
└── <task_name>__<dataset_id>.json
```

Each JSON contains `model_name`, `task_name`, `dataset_id`, `num_queries`,
`num_targets`, and `scores_by_target` (a mapping from target text to its
score across all queries).

To load & parse results from a run:

```python
Expand Down
42 changes: 42 additions & 0 deletions src/workrb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import json
import logging
import re
import time
from collections.abc import Sequence
from dataclasses import asdict, dataclass, field
Expand Down Expand Up @@ -134,6 +135,47 @@ def get_results_path(self) -> Path:
"""Get the path where final results should be saved."""
return self.get_output_path() / "results.json"

def get_rankings_dir(self) -> Path:
"""Get the directory where per-dataset ranking artifacts are saved.

Rankings are nested under a sanitized model-name directory so that
running multiple models into the same ``output_folder`` cannot clobber
each other's ranking files.
"""
safe_model_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", self.model_name).strip("_")
return self.get_output_path() / "rankings" / safe_model_name

def get_task_rankings_path(self, task_name: str, dataset_id: str) -> Path:
"""Get the output path for one task/dataset ranking artifact."""
safe_task_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", task_name).strip("_")
safe_dataset_id = re.sub(r"[^A-Za-z0-9_.-]+", "_", dataset_id).strip("_")
filename = f"{safe_task_name}__{safe_dataset_id}.json"
return self.get_rankings_dir() / filename

def save_rankings_artifact(
self,
task_name: str,
dataset_id: str,
scores_by_target: dict[str, list[float]],
num_queries: int,
num_targets: int,
) -> Path:
"""Save full ranking scores for one dataset as a JSON artifact."""
rankings_path = self.get_task_rankings_path(task_name=task_name, dataset_id=dataset_id)
rankings_path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"model_name": self.model_name,
"task_name": task_name,
"dataset_id": dataset_id,
"num_queries": num_queries,
"num_targets": num_targets,
"scores_by_target": scores_by_target,
}
with open(rankings_path, "w") as f:
json.dump(payload, f, indent=2)
logger.debug(f"Ranking artifact saved to {rankings_path}")
return rankings_path

def has_checkpoint(self) -> bool:
"""Check if a checkpoint exists."""
return self.get_checkpoint_path().exists()
Expand Down
47 changes: 44 additions & 3 deletions src/workrb/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
TaskResults,
)
from workrb.tasks.abstract.base import Task
from workrb.tasks.abstract.ranking_base import RankingTask
from workrb.types import ExecutionMode, LanguageAggregationMode, get_language_grouping_key

logger = logging.getLogger(__name__)
Expand All @@ -36,6 +37,7 @@ def evaluate(
metrics: dict[str, list[str]] | None = None,
description: str = "",
force_restart: bool = False,
save_rankings: bool = False,
language_aggregation_mode: LanguageAggregationMode = LanguageAggregationMode.MONOLINGUAL_ONLY,
execution_mode: ExecutionMode = ExecutionMode.LAZY,
) -> BenchmarkResults:
Expand All @@ -49,6 +51,10 @@ def evaluate(
metrics: Optional dict mapping task names to custom metrics lists
description: Description for the benchmark run
force_restart: If True, ignore checkpoints and restart from beginning
save_rankings: If True, save per-target ranking score arrays for each
ranking task dataset under
``<output_folder>/rankings/<model_name>/`` as JSON artifacts.
Has no effect for non-ranking tasks. Defaults to False.
language_aggregation_mode: How per-language results should be grouped
when calling ``get_summary_metrics()`` on the returned results.
When ``execution_mode`` is ``LAZY``, datasets that are
Expand Down Expand Up @@ -111,6 +117,7 @@ def evaluate(
results=results,
model=model,
metrics=metrics,
save_rankings=save_rankings,
total_evaluations=total_evaluations,
)
if results.metadata.resumed_from_checkpoint:
Expand Down Expand Up @@ -429,6 +436,7 @@ def _run_pending_work(
results: BenchmarkResults,
model: ModelInterface,
metrics: dict[str, list[str]] | None,
save_rankings: bool,
total_evaluations: int,
) -> BenchmarkResults:
"""Run pending evaluations.
Expand All @@ -439,6 +447,7 @@ def _run_pending_work(
results: BenchmarkResults object to store results.
model: ModelInterface object to evaluate.
metrics: Dictionary of task names to their custom metrics.
save_rankings: If True, save full ranking score artifacts for ranking tasks.
total_evaluations: Total number of compatible evaluations (for progress display).
"""
# Run pending evaluations
Expand Down Expand Up @@ -476,9 +485,30 @@ def _run_pending_work(

try:
start_time_eval = time.time()
dataset_results: dict[str, float] = task.evaluate(
model=model, metrics=task_metrics, dataset_id=dataset_id
)
if save_rankings and isinstance(task, RankingTask):
prediction_matrix = task.compute_prediction_matrix(
model=model, dataset_id=dataset_id
)
dataset_results = task.compute_metrics_from_prediction_matrix(
prediction_matrix=prediction_matrix,
dataset_id=dataset_id,
metrics=task_metrics,
)
rankings_path = config.save_rankings_artifact(
task_name=task.name,
dataset_id=dataset_id,
scores_by_target=_build_scores_by_target(
target_space=task.datasets[dataset_id].target_space,
prediction_matrix=prediction_matrix,
),
num_queries=prediction_matrix.shape[0],
num_targets=prediction_matrix.shape[1],
)
logger.info(f"\tSaved ranking scores to: {rankings_path}")
else:
dataset_results: dict[str, float] = task.evaluate(
model=model, metrics=task_metrics, dataset_id=dataset_id
)
evaluation_time = time.time() - start_time_eval

# Store results
Expand Down Expand Up @@ -508,3 +538,14 @@ def _run_pending_work(

logger.info(f"Completed {run_idx} / {total_evaluations} evaluations. ")
return results


def _build_scores_by_target(
target_space: list[str],
prediction_matrix: Any,
) -> dict[str, list[float]]:
"""Build a mapping from target text to its scores across all queries."""
return {
target_text: prediction_matrix[:, idx].tolist()
for idx, target_text in enumerate(target_space)
}
47 changes: 30 additions & 17 deletions src/workrb/tasks/abstract/ranking_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from enum import Enum
from typing import TYPE_CHECKING

import numpy as np
import torch

from workrb.metrics.ranking import calculate_ranking_metrics
Expand Down Expand Up @@ -306,30 +307,42 @@ def evaluate(
dict[str, float]
Dictionary containing metric scores and evaluation metadata.
"""
if metrics is None:
metrics = self.default_metrics
prediction_matrix = self.compute_prediction_matrix(model=model, dataset_id=dataset_id)
return self.compute_metrics_from_prediction_matrix(
prediction_matrix=prediction_matrix,
dataset_id=dataset_id,
metrics=metrics,
)

# Retrieve dataset by ID
def compute_prediction_matrix(
self,
model: ModelInterface,
dataset_id: str = "en",
) -> np.ndarray:
"""Compute the ranking score matrix for a dataset."""
dataset = self.datasets[dataset_id]
queries = dataset.query_texts
targets = dataset.target_space
labels = dataset.target_indices

# Get model predictions (similarity matrix)
prediction_matrix = model.compute_rankings(
queries=queries,
targets=targets,
queries=dataset.query_texts,
targets=dataset.target_space,
query_input_type=self.query_input_type,
target_input_type=self.target_input_type,
)

# Convert to numpy if needed
if isinstance(prediction_matrix, torch.Tensor):
prediction_matrix = prediction_matrix.cpu().float().numpy()
return prediction_matrix

# Calculate metrics
metric_results = calculate_ranking_metrics(
prediction_matrix=prediction_matrix, pos_label_idxs=labels, metrics=metrics
def compute_metrics_from_prediction_matrix(
self,
prediction_matrix: np.ndarray,
dataset_id: str = "en",
metrics: list[str] | None = None,
) -> dict[str, float]:
"""Compute ranking metrics from a precomputed prediction matrix."""
if metrics is None:
metrics = self.default_metrics
dataset = self.datasets[dataset_id]
return calculate_ranking_metrics(
prediction_matrix=prediction_matrix,
pos_label_idxs=dataset.target_indices,
metrics=metrics,
)

return metric_results
Loading
Loading