From e69ab278b70f15aa591c2759a8c69d2c6b28db44 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 2 Mar 2025 18:21:32 -0800 Subject: [PATCH] [nanoeval] sync (#50) * [nanoeval] sync * Update alcatraz_computer_interface.py * x --- project/nanoeval/nanoeval/eval.py | 34 +++++-- project/nanoeval/nanoeval/evaluation.py | 5 +- project/nanoeval/nanoeval/evaluation_test.py | 91 ++++++++++++------- project/nanoeval/nanoeval/library_config.py | 82 ++++++++++++++++- project/nanoeval/nanoeval/metrics/standard.py | 11 ++- project/nanoeval/nanoeval/recorders.py | 6 ++ project/nanoeval/nanoeval/setup.py | 70 +------------- .../code_execution_interface.py | 2 +- .../solvers/computer_tasks/demo/__init__.py | 0 .../solvers/computer_tasks/demo/_demo_task.py | 27 ++++++ .../solvers/computer_tasks/demo/agent.py | 25 +++++ .../{demo.py => demo/runtime.py} | 0 .../nanoeval/solvers/computer_tasks/task.py | 2 + .../alcatraz_computer_interface.py | 12 +++ 14 files changed, 246 insertions(+), 121 deletions(-) create mode 100644 project/nanoeval/nanoeval/recorders.py create mode 100644 project/nanoeval/nanoeval/solvers/computer_tasks/demo/__init__.py create mode 100644 project/nanoeval/nanoeval/solvers/computer_tasks/demo/_demo_task.py create mode 100644 project/nanoeval/nanoeval/solvers/computer_tasks/demo/agent.py rename project/nanoeval/nanoeval/solvers/computer_tasks/{demo.py => demo/runtime.py} (100%) diff --git a/project/nanoeval/nanoeval/eval.py b/project/nanoeval/nanoeval/eval.py index c7a692d..82c8159 100644 --- a/project/nanoeval/nanoeval/eval.py +++ b/project/nanoeval/nanoeval/eval.py @@ -6,6 +6,7 @@ import inspect import logging +import os from pprint import pformat from typing import ( Any, @@ -23,7 +24,6 @@ from chz.factories import function from nanoeval._multiprocessing_utils import check_multiprocess_safe from nanoeval.asyncio_utils import HasAsyncContextManager -from nanoeval.recorder import dummy_recorder from nanoeval.recorder_protocol import RecorderConfig @@ -31,6 +31,7 @@ class Task(BaseModel): """ All nanoeval Tasks must inherit from this class. """ + question_id: str attempt_id: int = 1 retry_idx: int = 0 @@ -129,10 +130,10 @@ async def get_summary(self, results: list[tuple[TTask, TResult]]) -> dict[str, A is to compute accuracy (this will underweight instances with fewer rollouts). Instead, you should compute accuracy by instance, and then average over instances. - Notably, this function is called on an interval before the eval is completed, so it should be able to handle partial - results and partial results are very likely to be ragged. + Notably, this function is called on an interval, so it should be able to handle partial results and partial + results are very likely to be ragged. - This function is used by the default implementation of `eval.get_full_summary()`. + This function is called by `eval.get_full_summary()`. """ raise NotImplementedError @@ -198,7 +199,10 @@ async def self_test(self) -> None: @chz.chz class RunnerArgs: # Runner options. - concurrency: int = 4096 + concurrency: int | None = chz.field( + default=4096, + doc="Per-eval concurrency. If None, concurrency is not limited.", + ) # If enabled, use multiprocessing. This can be useful for CPU-bound tasks, and uses # multiprocessing as the outer loop, and asyncio concurrency as the inner loop. # We split tasks into groups of size `concurrency`. A subprocess processes one group @@ -215,10 +219,10 @@ class RunnerArgs: doc="Limit the number of tasks run. The limit is the first N tasks selected before shuffling.", ) run_set_id: str | None = None - recorder: RecorderConfig = chz.field( - meta_factory=function(default_module="nanoeval.recorders"), - default_factory=dummy_recorder, - doc="Recorder configuration used to create a recorder for the eval.", + recorder: RecorderConfig | None = chz.field( + meta_factory=function(), + default=None, + doc="Recorder configuration used to create a recorder for the eval. If None, default recorder is used (as determined by `library_config().get_default_recorder()`.", ) enable_slackbot: bool = True slack_name: str | None = None @@ -227,7 +231,7 @@ class RunnerArgs: summary_interval: float | None = None use_monitor: bool = chz.field( default=False, - doc="If enabled, starts a streamlit server on port 8501 to monitor the eval. You can also run it manually by running `streamlit run project/nanoeval/monitor.py`.", + doc="If enabled, starts a streamlit server on port 8501 to monitor the eval. You can also run it manually by running `python3 -m nanoeval.bin.mon`.", ) max_retries: int = chz.field( default=16, @@ -255,6 +259,16 @@ def _validate_multiprocessing_options(self) -> None: def _numerical_limits(self) -> None: assert self.n_tasks is None or self.n_tasks > 0 + @chz.validate + def _validate_concurrency(self) -> None: + if self.concurrency is not None and self.concurrency <= 0: + if self.concurrency == 0 and os.environ.get("NANOEVAL_ALLOW_ZERO_CONCURRENCY"): + pass + else: + raise ValueError( + "concurrency must be > 0 or None unless NANOEVAL_ALLOW_ZERO_CONCURRENCY is set." + ) + @chz.chz class EvalSpec: diff --git a/project/nanoeval/nanoeval/evaluation.py b/project/nanoeval/nanoeval/evaluation.py index 59f3a60..068b900 100644 --- a/project/nanoeval/nanoeval/evaluation.py +++ b/project/nanoeval/nanoeval/evaluation.py @@ -346,7 +346,8 @@ async def run(spec: EvalSpec) -> dict[str, Any]: # type: ignore ) # Build the recorder! - recorder = await spec.runner.recorder.factory(spec, len(tasks)) + recorder_config = spec.runner.recorder or get_library_config().get_default_recorder() + recorder = await recorder_config.factory(spec, len(tasks)) # Load all tasks into the database with db.conn() as conn: @@ -358,7 +359,7 @@ async def run(spec: EvalSpec) -> dict[str, Any]: # type: ignore datetime.now(), dill.dumps(spec), dill.dumps(recorder), - spec.runner.concurrency, + spec.runner.concurrency if spec.runner.concurrency is not None else 999999, ), ) diff --git a/project/nanoeval/nanoeval/evaluation_test.py b/project/nanoeval/nanoeval/evaluation_test.py index c3abc24..986ad73 100644 --- a/project/nanoeval/nanoeval/evaluation_test.py +++ b/project/nanoeval/nanoeval/evaluation_test.py @@ -1,7 +1,9 @@ from __future__ import annotations import asyncio +import os from typing import Any +from unittest.mock import patch import numpy as np import pytest @@ -24,45 +26,51 @@ async def test_concurrency_blocking_edge_case() -> None: If this test fails, it will hang forever. """ - async with global_exit_stack, db.open_run_set_db(backup=False), asyncio.TaskGroup() as tg: - # This eval will never finish and get queued in the executor forever. - # Note concurrency=0. - background = tg.create_task( - nanoeval.validate( - EvalSpec( - eval=GPQAEval(solver=MockSolver()), - runner=RunnerArgs(n_tasks=1, concurrency=0), - ) - ) - ) - # Wait for the task to get registered - while True: - with db.conn() as c: - (count,) = c.execute("SELECT COUNT(*) FROM eval").fetchone() - if count > 0: - break - await asyncio.sleep(0.5) - - print("made it to the point where the first one got picked up by the executor") - - for f in asyncio.as_completed( - [ - # Will never finish - and that's ok! - background, - # We expect this one to actually finish. + # In general setting concurrency to 0 is not allowed as it would just make the eval hang, + # however we can bypass this for tests in order to create an eval that intentionally hangs. + with patch.dict(os.environ, {"NANOEVAL_ALLOW_ZERO_CONCURRENCY": "1"}, clear=False): + async with global_exit_stack, db.open_run_set_db(backup=False), asyncio.TaskGroup() as tg: + # This eval will never finish and get queued in the executor forever. + background = tg.create_task( nanoeval.validate( EvalSpec( eval=GPQAEval(solver=MockSolver()), - runner=RunnerArgs(n_tasks=1, concurrency=1), + runner=RunnerArgs( + n_tasks=1, + concurrency=0, # Forces the eval to hang. + ), ) - ), - ] - ): - await f - print("We did it! one of the evals finished!") - await cancel_task(background) - break + ) + ) + + # Wait for the task to get registered + while True: + with db.conn() as c: + (count,) = c.execute("SELECT COUNT(*) FROM eval").fetchone() + if count > 0: + break + await asyncio.sleep(0.5) + + print("made it to the point where the first one got picked up by the executor") + + for f in asyncio.as_completed( + [ + # Will never finish - and that's ok! + background, + # We expect this one to actually finish. + nanoeval.validate( + EvalSpec( + eval=GPQAEval(solver=MockSolver()), + runner=RunnerArgs(n_tasks=1, concurrency=1), + ) + ), + ] + ): + await f + print("We did it! one of the evals finished!") + await cancel_task(background) + break @pytest.mark.asyncio @@ -346,3 +354,18 @@ async def solve(self, task: MCQTask) -> Answer: ) assert first_eval_succeeded + + +@pytest.mark.asyncio +async def test_concurrency_none() -> None: + async with global_exit_stack, db.open_run_set_db(backup=False): + report = await nanoeval.validate( + EvalSpec( + eval=GPQAEval(solver=MockSolver()), + runner=RunnerArgs( + n_tasks=1, + concurrency=None, + ), + ) + ) + assert "accuracy" in report diff --git a/project/nanoeval/nanoeval/library_config.py b/project/nanoeval/nanoeval/library_config.py index dcb637d..09b5f42 100644 --- a/project/nanoeval/nanoeval/library_config.py +++ b/project/nanoeval/nanoeval/library_config.py @@ -1,14 +1,18 @@ from __future__ import annotations import functools +import logging +import os import tempfile from contextlib import contextmanager from dataclasses import dataclass +from functools import partial from pathlib import Path from typing import TYPE_CHECKING, Any, Generator, Literal, Self import pandas as pd import structlog +from structlog.typing import EventDict import chz from nanoeval.recorder_protocol import BasicRunSpec, RecorderConfig, RecorderProtocol @@ -17,6 +21,9 @@ from nanoeval.eval import EvalSpec +ENV_NANOEVAL_LOG_ALL = "NANOEVAL_LOG_ALL" + + @functools.cache def root_dir() -> Path: return Path(tempfile.gettempdir()) / "nanoeval" @@ -91,6 +98,38 @@ async def factory(self, spec: EvalSpec, num_tasks: int) -> RecorderProtocol: return _DefaultDummyRecorder(run_spec=self._make_default_run_spec(spec)) +def _rename_field( + old: str, new: str, logger: logging.Logger, name: str, event_dict: EventDict +) -> EventDict: + del logger, name + + if value := event_dict.get(old): + event_dict[new] = value + del event_dict[old] + return event_dict + + +def _remove_all_fields_except( + to_keep: list[str], logger: logging.Logger, name: str, event_dict: EventDict +) -> EventDict: + del logger, name + + for key in list(event_dict.keys()): + if key not in to_keep: + del event_dict[key] + return event_dict + + +class PrintOrWarningFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + if record.levelno > logging.INFO or ( + os.environ.get(ENV_NANOEVAL_LOG_ALL) and record.levelno == logging.INFO + ): + return True + + return isinstance(record.msg, dict) and record.msg.get("_print", False) + + class LibraryConfig: """ Hooks to configure parts of the nanoeval library. Shared across all runs in the process. @@ -112,7 +151,7 @@ async def send_user_notification(self, message: str, extra: str | None = None) - extra=extra, ) - def on_logging_setup(self) -> None: + def setup_logging(self) -> None: # Set up structlog according to https://www.structlog.org/en/stable/standard-library.html # Basically, we convert structlogs to logging-style record and then process them using # structlog formatters into json for humio, and console for stdout @@ -124,6 +163,42 @@ def on_logging_setup(self) -> None: logger_factory=structlog.stdlib.LoggerFactory(), ) + # Remove all StreamHandlers from the root logger + for handler in logging.getLogger().handlers: + if isinstance(handler, logging.StreamHandler): + logging.getLogger().removeHandler(handler) + + handler = logging.StreamHandler() + # Use OUR `ProcessorFormatter` to format all `logging` entries to stdout. + handler.setFormatter( + structlog.stdlib.ProcessorFormatter( + processors=[ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.MaybeTimeStamper(fmt="iso"), + functools.partial( + _remove_all_fields_except, + ["timestamp", "level", "event", "component", "exc_info"], + ), + structlog.dev.ConsoleRenderer(), + ], + # logger -> structlog transforms + foreign_pre_chain=[ + structlog.stdlib.add_logger_name, + partial(_rename_field, "logger", "component"), + partial(_rename_field, "logger_name", "component"), + partial(_rename_field, "log", "event"), + structlog.stdlib.ExtraAdder(), + ], + ) + ) + + handler.addFilter(PrintOrWarningFilter()) + root_logger = logging.getLogger() + root_logger.addHandler(handler) + root_logger.setLevel(logging.INFO) + @contextmanager def set_recorder_context( self, rec: RecorderProtocol, sample_id: str, group_id: str @@ -133,6 +208,11 @@ def set_recorder_context( def get_dummy_recorder(self, log: bool) -> RecorderConfig: return _DummyRecorderConfig() + def get_default_recorder(self) -> RecorderConfig: + from nanoeval.json_recorder import json_recorder + + return json_recorder() + def writable_root_dir(self) -> str: return str(root_dir()) diff --git a/project/nanoeval/nanoeval/metrics/standard.py b/project/nanoeval/nanoeval/metrics/standard.py index dcbe914..1ca46ab 100644 --- a/project/nanoeval/nanoeval/metrics/standard.py +++ b/project/nanoeval/nanoeval/metrics/standard.py @@ -79,25 +79,28 @@ def compute_default_metrics_on_correctness_without_answer_groups( samples_df["answer_group_id"] = samples_df["is_correct"].astype(int) del samples_df["is_correct"] - # Create two answer groups: 0 is wrong, 1 is right. + # Create two answer groups: 0 is wrong, 1 is right. Ensure no instance-level duplicates. answer_group_correctness_df = pd.DataFrame( flatten( [ { - "instance": sample.instance, + "instance": instance, "answer_group_id": 0, "is_correct": False, }, { - "instance": sample.instance, + "instance": instance, "answer_group_id": 1, "is_correct": True, }, ] - for sample in samples_df.itertuples() + for instance in samples_df["instance"].unique() ) ) + # Should have 1 answer group for False and 1 for True + assert len(answer_group_correctness_df) == 2 * len(samples_df["instance"].unique()) + return compute_default_metrics(samples_df, answer_group_correctness_df) diff --git a/project/nanoeval/nanoeval/recorders.py b/project/nanoeval/nanoeval/recorders.py new file mode 100644 index 0000000..d89959a --- /dev/null +++ b/project/nanoeval/nanoeval/recorders.py @@ -0,0 +1,6 @@ +"""This module exports all bundled recorders for nanoeval.""" + +from nanoeval.json_recorder import json_recorder +from nanoeval.recorder import dummy_recorder + +__all__ = ["json_recorder", "dummy_recorder"] diff --git a/project/nanoeval/nanoeval/setup.py b/project/nanoeval/nanoeval/setup.py index 75b8597..65c8aa6 100644 --- a/project/nanoeval/nanoeval/setup.py +++ b/project/nanoeval/nanoeval/setup.py @@ -6,12 +6,10 @@ import resource from concurrent.futures import ThreadPoolExecutor from contextlib import AsyncExitStack, contextmanager -from functools import partial from typing import Any, Coroutine, Generator import structlog from structlog.contextvars import bound_contextvars -from structlog.types import EventDict from nanoeval._aiomonitor import start_aiomonitor from nanoeval.library_config import get_library_config @@ -26,75 +24,9 @@ """ -def _rename_field( - old: str, new: str, logger: logging.Logger, name: str, event_dict: EventDict -) -> EventDict: - del logger, name - - if value := event_dict.get(old): - event_dict[new] = value - del event_dict[old] - return event_dict - - -def _remove_all_fields_except( - to_keep: list[str], logger: logging.Logger, name: str, event_dict: EventDict -) -> EventDict: - del logger, name - - for key in list(event_dict.keys()): - if key not in to_keep: - del event_dict[key] - return event_dict - - -class PrintOrWarningFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - if record.levelno > logging.INFO or ( - os.environ.get("NANOEVAL_LOG_ALL") and record.levelno == logging.INFO - ): - return True - - return isinstance(record.msg, dict) and record.msg.get("_print", False) - - def nanoeval_logging() -> None: logging.captureWarnings(True) - get_library_config().on_logging_setup() - # Remove all StreamHandlers from the root logger - for handler in logging.getLogger().handlers: - if isinstance(handler, logging.StreamHandler): - logging.getLogger().removeHandler(handler) - - handler = logging.StreamHandler() - # Use OUR `ProcessorFormatter` to format all `logging` entries to stdout. - handler.setFormatter( - structlog.stdlib.ProcessorFormatter( - processors=[ - structlog.stdlib.ProcessorFormatter.remove_processors_meta, - structlog.contextvars.merge_contextvars, - structlog.processors.add_log_level, - structlog.processors.MaybeTimeStamper(fmt="iso"), - partial( - _remove_all_fields_except, - ["timestamp", "level", "event", "component", "exc_info"], - ), - structlog.dev.ConsoleRenderer(), - ], - # logger -> structlog transforms - foreign_pre_chain=[ - structlog.stdlib.add_logger_name, - partial(_rename_field, "logger", "component"), - partial(_rename_field, "logger_name", "component"), - structlog.stdlib.ExtraAdder(), - ], - ) - ) - - handler.addFilter(PrintOrWarningFilter()) - root_logger = logging.getLogger() - root_logger.addHandler(handler) - root_logger.setLevel(logging.INFO) + get_library_config().setup_logging() @contextmanager diff --git a/project/nanoeval/nanoeval/solvers/computer_tasks/code_execution_interface.py b/project/nanoeval/nanoeval/solvers/computer_tasks/code_execution_interface.py index c8b5027..2903d7a 100644 --- a/project/nanoeval/nanoeval/solvers/computer_tasks/code_execution_interface.py +++ b/project/nanoeval/nanoeval/solvers/computer_tasks/code_execution_interface.py @@ -75,7 +75,7 @@ async def send_shell_command(self, cmd: str) -> ExecutionResult: async def check_shell_command(self, cmd: str) -> ExecutionResult: res = await self.send_shell_command(cmd) assert res.exit_code == 0, ( - f"Command failed with {res.exit_code=}\n\n{res.output.decode(errors='ignore')}" + f"Command failed with {res.exit_code=}\n\n{cmd=}\n\n{res.output.decode(errors='ignore')}" ) return res diff --git a/project/nanoeval/nanoeval/solvers/computer_tasks/demo/__init__.py b/project/nanoeval/nanoeval/solvers/computer_tasks/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/project/nanoeval/nanoeval/solvers/computer_tasks/demo/_demo_task.py b/project/nanoeval/nanoeval/solvers/computer_tasks/demo/_demo_task.py new file mode 100644 index 0000000..45afd71 --- /dev/null +++ b/project/nanoeval/nanoeval/solvers/computer_tasks/demo/_demo_task.py @@ -0,0 +1,27 @@ +from openai.types.chat import ChatCompletionMessageParam +from pydantic import Field + +from nanoeval.solvers.computer_tasks.code_execution_interface import ComputerInterface +from nanoeval.solvers.computer_tasks.task import ComputerTask, Grade + + +class DemoTask(ComputerTask): + question_id: str = "test" + docker_image: str | None = "ubuntu" + prompt: list[ChatCompletionMessageParam] = Field( + default_factory=lambda: [ + {"role": "user", "content": "What is 7+7? Write your answer to /root/ANSWER.txt"} + ] + ) + + async def _setup(self, computer: ComputerInterface) -> None: + pass + + async def grade(self, computer: ComputerInterface) -> Grade: + res = await computer.check_shell_command( + "if [ -e /root/ANSWER.txt ]; then cat /root/ANSWER.txt; fi" + ) + return Grade( + score=1.0 if res.output.decode("utf-8", errors="ignore").strip() == "14" else 0.0, + grader_log=f"Answer was {res}.", + ) diff --git a/project/nanoeval/nanoeval/solvers/computer_tasks/demo/agent.py b/project/nanoeval/nanoeval/solvers/computer_tasks/demo/agent.py new file mode 100644 index 0000000..6426194 --- /dev/null +++ b/project/nanoeval/nanoeval/solvers/computer_tasks/demo/agent.py @@ -0,0 +1,25 @@ +import os + +import structlog.stdlib + +import chz +from nanoeval.setup import nanoeval_entrypoint +from nanoeval.solvers.computer_tasks.demo._demo_task import DemoTask +from nanoeval.solvers.computer_tasks.solver import PythonCodingSolver + +logger = structlog.stdlib.get_logger(component=__name__, _print=True) + + +async def demo(solver: PythonCodingSolver) -> None: + """ + Demos how to run a basic PythonCodingSolver. + """ + + async with solver: + async for step in solver.run(task=DemoTask()): + print("STEP:", step) + + +if __name__ == "__main__": + os.environ["NANOEVAL_LOG_ALL"] = "1" + nanoeval_entrypoint(chz.entrypoint(demo)) diff --git a/project/nanoeval/nanoeval/solvers/computer_tasks/demo.py b/project/nanoeval/nanoeval/solvers/computer_tasks/demo/runtime.py similarity index 100% rename from project/nanoeval/nanoeval/solvers/computer_tasks/demo.py rename to project/nanoeval/nanoeval/solvers/computer_tasks/demo/runtime.py diff --git a/project/nanoeval/nanoeval/solvers/computer_tasks/task.py b/project/nanoeval/nanoeval/solvers/computer_tasks/task.py index e446707..1fba53b 100644 --- a/project/nanoeval/nanoeval/solvers/computer_tasks/task.py +++ b/project/nanoeval/nanoeval/solvers/computer_tasks/task.py @@ -24,6 +24,7 @@ class Grade(BaseModel): score: float grader_log: str + is_continuous: bool = False class ComputerTask(ABC, ComputerConfiguration, Task, SerializableBaseModel): @@ -165,4 +166,5 @@ async def grade(self, computer: ComputerInterface) -> Grade: return Grade( score=score, grader_log=res.output, + # Should this be `is_continuous=True`? ) diff --git a/project/nanoeval_alcatraz/nanoeval_alcatraz/alcatraz_computer_interface.py b/project/nanoeval_alcatraz/nanoeval_alcatraz/alcatraz_computer_interface.py index 65d11af..690dcaa 100644 --- a/project/nanoeval_alcatraz/nanoeval_alcatraz/alcatraz_computer_interface.py +++ b/project/nanoeval_alcatraz/nanoeval_alcatraz/alcatraz_computer_interface.py @@ -21,8 +21,10 @@ from nanoeval_alcatraz.task_to_alcatraz_config import task_to_alcatraz_config logger = structlog.stdlib.get_logger(component=__name__) + ALCATRAZ_TIMEOUT = int(os.getenv("ALCATRAZ_TIMEOUT", 120)) + class Python3ExceptionDict(BaseModel): """A pydantic model for serializing a Python 3.x exception. @@ -49,6 +51,16 @@ async def disable_internet(self) -> None: logger.info("Disabling internet...") await self.cluster.add_weak_network_block_via_ip_tables() + # Verify + logger.info("Post-setup network access disabled") + try: + from alcatraz.utils.network import assert_internet_disabled # type: ignore + + await assert_internet_disabled(self.cluster) + logger.info("Verified network access successfully disabled") + except ImportError: + pass + async def upload(self, file: bytes, destination: str) -> None: return await self.cluster.upload(file, destination)