diff --git a/areal/api/cli_args.py b/areal/api/cli_args.py index ea55f557a8..c1aa5d630b 100644 --- a/areal/api/cli_args.py +++ b/areal/api/cli_args.py @@ -2294,6 +2294,52 @@ def __post_init__(self): ) +@dataclass +class WandBSystemMetricsConfig: + """Worker-side W&B system metrics collection. + + The controller W&B client already records system metrics for the controller + process. Enable this config to attach GPU worker processes to the same W&B + run so W&B can sample system metrics from the GPU nodes too. + """ + + enabled: bool = field( + default=False, + metadata={ + "help": "Start non-primary W&B clients in worker processes to collect " + "worker system metrics. Requires wandb.mode='shared'.", + }, + ) + roles: list[str] | None = field( + default=("actor", "rollout", "critic", "ref", "teacher"), + metadata={ + "help": "Worker roles that should start W&B system metrics clients. " + "Set to null to enable every configured worker role.", + }, + ) + gpu_device_ids: list[int] | None = field( + default=None, + metadata={ + "help": "Optional GPU device ids passed to W&B's system metrics " + "collector. Leave unset to let W&B use the worker's visible devices.", + }, + ) + + def __post_init__(self): + if self.roles is not None: + if not self.roles: + raise ValueError( + "stats_logger.wandb.system_metrics.roles must be null or a non-empty list." + ) + self.roles = list(self.roles) + if self.gpu_device_ids is not None: + self.gpu_device_ids = list(self.gpu_device_ids) + if any(i < 0 for i in self.gpu_device_ids): + raise ValueError( + "stats_logger.wandb.system_metrics.gpu_device_ids must contain non-negative integers." + ) + + @dataclass class WandBConfig: """Configuration for Weights & Biases experiment tracking.""" @@ -2316,6 +2362,12 @@ class WandBConfig: tags: list[str] | None = None config: dict | None = None id_suffix: str | None = "train" + system_metrics: WandBSystemMetricsConfig = field( + default_factory=WandBSystemMetricsConfig, + metadata={ + "help": "Worker-side W&B system metrics configuration.", + }, + ) def __post_init__(self): """Validate WandB configuration.""" @@ -2324,6 +2376,11 @@ def __post_init__(self): raise ValueError( f"Invalid wandb mode: '{self.mode}'. Must be one of: {', '.join(valid_modes)}." ) + if self.system_metrics.enabled and self.mode != "shared": + raise ValueError( + "stats_logger.wandb.system_metrics.enabled requires " + "stats_logger.wandb.mode='shared'." + ) @dataclass diff --git a/areal/infra/rpc/ray_rpc_server.py b/areal/infra/rpc/ray_rpc_server.py index ce6d9ed892..ce4dfe99e3 100644 --- a/areal/infra/rpc/ray_rpc_server.py +++ b/areal/infra/rpc/ray_rpc_server.py @@ -17,6 +17,10 @@ ) from areal.utils.dynamic_import import import_from_string from areal.utils.network import find_free_ports +from areal.utils.wandb_system_metrics import ( + finish_worker_wandb_system_metrics, + init_worker_wandb_system_metrics, +) @ray.remote @@ -79,6 +83,7 @@ def alloc_ports(self, count: int): def configure(self, config: BaseExperimentConfig, role: str, rank: int) -> None: name_resolve.reconfigure(config.cluster.name_resolve) + init_worker_wandb_system_metrics(config, role=role, rank=rank) # Set seed for any TrainEngine instances for engine in self._engines.values(): if isinstance(engine, TrainEngine): @@ -219,4 +224,5 @@ def destroy(self) -> None: ) self._engines.clear() self._default_engine_name = None + finish_worker_wandb_system_metrics() ray.actor.exit_actor() diff --git a/areal/infra/rpc/rpc_server.py b/areal/infra/rpc/rpc_server.py index 20be4267bb..924e35fe42 100644 --- a/areal/infra/rpc/rpc_server.py +++ b/areal/infra/rpc/rpc_server.py @@ -26,6 +26,9 @@ from areal.infra.rpc.guard.data_blueprint import data_bp from areal.infra.rpc.guard.engine_blueprint import engine_bp, register_engine_hooks from areal.utils import logging, perf_tracer +from areal.utils.wandb_system_metrics import ( + register_worker_wandb_system_metrics_hooks, +) logger = logging.getLogger("SyncRPCServer") @@ -54,6 +57,7 @@ def main(): app.register_blueprint(data_bp) app.register_blueprint(engine_bp) register_engine_hooks(state) + register_worker_wandb_system_metrics_hooks(state) state.register_cleanup_hook(lambda: perf_tracer.save(force=True)) diff --git a/areal/infra/scheduler/local.py b/areal/infra/scheduler/local.py index 4522decbcb..f40cc5133e 100644 --- a/areal/infra/scheduler/local.py +++ b/areal/infra/scheduler/local.py @@ -53,6 +53,7 @@ gethostip, ) from areal.utils.offload import get_tms_env_vars +from areal.utils.wandb_system_metrics import prepare_wandb_run_identity logger = logging.getLogger("LocalScheduler") @@ -114,6 +115,7 @@ def __init__( exp_config: BaseExperimentConfig | None = None, ): self.gpu_devices = gpu_devices or self._detect_gpus() + prepare_wandb_run_identity(exp_config) # Resolve experiment/trial names (exp_config overwrites direct params) self.experiment_name = experiment_name diff --git a/areal/infra/scheduler/ray.py b/areal/infra/scheduler/ray.py index d8b18a425e..f5d6122de1 100644 --- a/areal/infra/scheduler/ray.py +++ b/areal/infra/scheduler/ray.py @@ -40,6 +40,7 @@ ) from areal.utils import logging from areal.utils.offload import get_tms_env_vars +from areal.utils.wandb_system_metrics import prepare_wandb_run_identity logger = logging.getLogger("RayScheduler") @@ -63,6 +64,8 @@ def __init__( exp_config: BaseExperimentConfig | None = None, n_gpus_per_node: int = 8, ): + prepare_wandb_run_identity(exp_config) + self.exp_config = exp_config self._n_gpus_per_node = n_gpus_per_node self.startup_timeout = startup_timeout diff --git a/areal/infra/scheduler/slurm.py b/areal/infra/scheduler/slurm.py index 54696ccd07..4160c5fcb3 100644 --- a/areal/infra/scheduler/slurm.py +++ b/areal/infra/scheduler/slurm.py @@ -52,6 +52,7 @@ from areal.utils.fs import validate_shared_path from areal.utils.network import format_hostport, split_hostport from areal.utils.offload import get_tms_env_vars +from areal.utils.wandb_system_metrics import prepare_wandb_run_identity logger = logging.getLogger("SlurmScheduler") @@ -88,6 +89,8 @@ def __init__( etcd3_addr: str = "localhost:2379", exp_config: BaseExperimentConfig | None = None, ): + prepare_wandb_run_identity(exp_config) + # Get n_gpus_per_node from parameter or config self._n_gpus_per_node = n_gpus_per_node if exp_config is not None: diff --git a/areal/trainer/dpo_trainer.py b/areal/trainer/dpo_trainer.py index 8e79261a26..de03927689 100644 --- a/areal/trainer/dpo_trainer.py +++ b/areal/trainer/dpo_trainer.py @@ -143,6 +143,9 @@ def __init__( train_batch_size=config.train_dataset.batch_size, ) + # Initialize W&B primary before worker configuration in shared mode. + self.stats_logger = StatsLogger(config, ft_spec) + self.actor.initialize(addr=None, ft_spec=ft_spec, role="actor") self.ref.initialize(addr=None, ft_spec=ft_spec, role="ref") @@ -174,9 +177,6 @@ def __init__( self.saver = Saver(config.saver, ft_spec) self.recover_handler = RecoverHandler(config.recover, ft_spec) - # Set up statistics logging (wandb, tensorboard, etc.) - self.stats_logger = StatsLogger(config, ft_spec) - # Set up checkpointing for recover self.recover_info = self.recover_handler.load( self.actor, diff --git a/areal/trainer/rl_trainer.py b/areal/trainer/rl_trainer.py index 083e352e27..ef79ad4703 100644 --- a/areal/trainer/rl_trainer.py +++ b/areal/trainer/rl_trainer.py @@ -272,6 +272,10 @@ def __init__( train_batch_size=config.train_dataset.batch_size, ) + # Initialize the controller W&B client before workers are configured so + # worker-side shared-mode clients can attach to an existing primary run. + self.stats_logger = StatsLogger(config, ft_spec) + # Initialize engines first — the scheduler must know about roles # before the data controller can colocate with them. engine_init_kwargs = {"addr": None, "ft_spec": ft_spec} @@ -354,9 +358,6 @@ def __init__( self.saver = Saver(config.saver, ft_spec) self.recover_handler = RecoverHandler(config.recover, ft_spec) - # Set up statistics logging (wandb, tensoboard, etc.) - self.stats_logger = StatsLogger(config, ft_spec) - # Set up checkpointing for recover self.recover_info = self.recover_handler.load( self.actor, diff --git a/areal/trainer/rw_trainer.py b/areal/trainer/rw_trainer.py index 5f37da863e..55bebbfe01 100644 --- a/areal/trainer/rw_trainer.py +++ b/areal/trainer/rw_trainer.py @@ -134,6 +134,8 @@ def __init__( train_batch_size=config.train_dataset.batch_size, ) + self.stats_logger = StatsLogger(config, ft_spec) + self.actor.initialize(addr=None, ft_spec=ft_spec, role="actor") self.valid_dataloader: StatefulDataLoader | None = None @@ -164,9 +166,6 @@ def __init__( self.saver = Saver(config.saver, ft_spec) self.recover_handler = RecoverHandler(config.recover, ft_spec) - # Set up statistics logging (wandb, tensorboard, etc.) - self.stats_logger = StatsLogger(config, ft_spec) - # Set up checkpointing for recover self.recover_info = self.recover_handler.load( self.actor, diff --git a/areal/trainer/sft_trainer.py b/areal/trainer/sft_trainer.py index 7669a00181..d01643fcb1 100644 --- a/areal/trainer/sft_trainer.py +++ b/areal/trainer/sft_trainer.py @@ -109,6 +109,8 @@ def __init__( train_batch_size=config.train_dataset.batch_size, ) + self.stats_logger = StatsLogger(config, ft_spec) + self.actor.initialize(addr=None, ft_spec=ft_spec, role="actor") self.valid_dataloader: StatefulDataLoader | None = None @@ -135,7 +137,6 @@ def __init__( self.evaluator = Evaluator(config.evaluator, ft_spec) self.saver = Saver(config.saver, ft_spec) self.recover_handler = RecoverHandler(config.recover, ft_spec) - self.stats_logger = StatsLogger(config, ft_spec) self.recover_info = self.recover_handler.load( self.actor, self.saver, diff --git a/areal/utils/logging.py b/areal/utils/logging.py index 94e5839a6e..6c19392662 100644 --- a/areal/utils/logging.py +++ b/areal/utils/logging.py @@ -57,6 +57,7 @@ "StatsLogger": "light_green", "StatsTracker": "light_green", "PerfTracer": "light_green", + "WandBSystemMetrics": "light_green", # RPC servers - white "SyncRPCServer": "white", "RayRPCServer": "white", diff --git a/areal/utils/stats_logger.py b/areal/utils/stats_logger.py index 220e6d3920..f6d90ef89a 100644 --- a/areal/utils/stats_logger.py +++ b/areal/utils/stats_logger.py @@ -1,6 +1,5 @@ # SPDX-License-Identifier: Apache-2.0 -import getpass import os import time from dataclasses import asdict @@ -15,6 +14,10 @@ from areal.api.cli_args import BaseExperimentConfig, StatsLoggerConfig from areal.utils import logging from areal.utils.printing import tabulate_stats +from areal.utils.wandb_system_metrics import ( + resolve_wandb_run_id, + stats_logger_log_path, +) from areal.version import version_info logger = logging.getLogger("StatsLogger", "system") @@ -48,10 +51,6 @@ def init(self): if self.config.wandb.mode != "disabled": wandb.login() - suffix = self.config.wandb.id_suffix - if suffix == "timestamp": - suffix = time.strftime("%Y_%m_%d_%H_%M_%S") - exp_config_dict = asdict(self.exp_config) exp_config_dict["version_info"] = { "commit_id": version_info.commit, @@ -60,7 +59,15 @@ def init(self): "version": version_info.full_version_with_dirty_description, } - wandb.init( + wandb_settings = None + if self.config.wandb.mode == "shared": + wandb_settings = wandb.Settings( + mode="shared", + x_primary=True, + x_label="controller", + ) + + wandb_init_kwargs = dict( mode=self.config.wandb.mode, entity=self.config.wandb.entity, project=self.config.wandb.project or self.config.experiment_name, @@ -73,9 +80,12 @@ def init(self): config=exp_config_dict, # save all experiment config to wandb dir=self.get_log_path(self.config), force=True, - id=f"{self.config.experiment_name}_{self.config.trial_name}_{suffix}", + id=resolve_wandb_run_id(self.config), resume="allow", ) + if wandb_settings is not None: + wandb_init_kwargs["settings"] = wandb_settings + wandb.init(**wandb_init_kwargs) swanlab_config = self.config.swanlab if swanlab_config.mode != "disabled": @@ -176,6 +186,10 @@ def get_log_path( raise ValueError( "fileroot, experiment_name, and trial_name must be provided." ) - path = f"{fileroot}/logs/{getpass.getuser()}/{experiment_name}/{trial_name}" - os.makedirs(path, exist_ok=True) - return path + return stats_logger_log_path( + StatsLoggerConfig( + experiment_name=experiment_name, + trial_name=trial_name, + fileroot=fileroot, + ) + ) diff --git a/areal/utils/wandb_system_metrics.py b/areal/utils/wandb_system_metrics.py new file mode 100644 index 0000000000..f22ecbd930 --- /dev/null +++ b/areal/utils/wandb_system_metrics.py @@ -0,0 +1,176 @@ +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import getpass +import os +import threading +import time +from typing import Any + +from areal.api.cli_args import BaseExperimentConfig, StatsLoggerConfig +from areal.infra.rpc.serialization import deserialize_value +from areal.utils import logging + +logger = logging.getLogger("WandBSystemMetrics", "system") + +_worker_wandb_run: Any | None = None +_worker_wandb_lock = threading.Lock() + + +def stats_logger_log_path(config: StatsLoggerConfig) -> str: + if not config.fileroot or not config.experiment_name or not config.trial_name: + raise ValueError("fileroot, experiment_name, and trial_name must be provided.") + path = ( + f"{config.fileroot}/logs/{getpass.getuser()}/" + f"{config.experiment_name}/{config.trial_name}" + ) + os.makedirs(path, exist_ok=True) + return path + + +def resolve_wandb_id_suffix(config: StatsLoggerConfig) -> str | None: + suffix = config.wandb.id_suffix + if suffix == "timestamp": + suffix = time.strftime("%Y_%m_%d_%H_%M_%S") + config.wandb.id_suffix = suffix + return suffix + + +def resolve_wandb_run_id(config: StatsLoggerConfig) -> str: + suffix = resolve_wandb_id_suffix(config) + return f"{config.experiment_name}_{config.trial_name}_{suffix}" + + +def prepare_wandb_run_identity(config: BaseExperimentConfig | None) -> None: + if config is None: + return + if not config.stats_logger.wandb.system_metrics.enabled: + return + resolve_wandb_run_id(config.stats_logger) + + +def worker_system_metrics_enabled( + config: BaseExperimentConfig, + role: str | None, +) -> bool: + wandb_config = config.stats_logger.wandb + system_metrics_config = wandb_config.system_metrics + if not system_metrics_config.enabled: + return False + if wandb_config.mode == "disabled": + return False + if wandb_config.mode != "shared": + raise ValueError( + "stats_logger.wandb.system_metrics.enabled requires " + "stats_logger.wandb.mode='shared'." + ) + roles = system_metrics_config.roles + return roles is None or role in roles + + +def init_worker_wandb_system_metrics( + config: BaseExperimentConfig, + role: str | None, + rank: int, +) -> bool: + global _worker_wandb_run + + if not worker_system_metrics_enabled(config, role): + return False + + with _worker_wandb_lock: + if _worker_wandb_run is not None: + return False + + import wandb + + stats_config = config.stats_logger + wandb_config = stats_config.wandb + system_metrics_config = wandb_config.system_metrics + + if wandb_config.wandb_base_url: + os.environ["WANDB_BASE_URL"] = wandb_config.wandb_base_url + if wandb_config.wandb_api_key: + os.environ["WANDB_API_KEY"] = wandb_config.wandb_api_key + + settings_kwargs: dict[str, Any] = { + "mode": "shared", + "x_primary": False, + "x_label": f"{role or 'worker'}-{rank}", + "x_update_finish_state": False, + } + if system_metrics_config.gpu_device_ids is not None: + settings_kwargs["x_stats_gpu_device_ids"] = list( + system_metrics_config.gpu_device_ids + ) + + try: + _worker_wandb_run = wandb.init( + mode=wandb_config.mode, + entity=wandb_config.entity, + project=wandb_config.project or stats_config.experiment_name, + name=wandb_config.name or stats_config.trial_name, + job_type=wandb_config.job_type, + group=wandb_config.group + or f"{stats_config.experiment_name}_{stats_config.trial_name}", + notes=wandb_config.notes, + tags=wandb_config.tags, + dir=stats_logger_log_path(stats_config), + force=True, + id=resolve_wandb_run_id(stats_config), + resume="allow", + settings=wandb.Settings(**settings_kwargs), + ) + except Exception as exc: # noqa: BLE001 — worker telemetry must not crash training + _worker_wandb_run = None + logger.warning( + "Failed to start worker W&B system metrics client " + "(role=%s rank=%s): %s", + role, + rank, + exc, + ) + return False + + logger.info( + "Initialized worker W&B system metrics client for role=%s rank=%s.", + role, + rank, + ) + return True + + +def finish_worker_wandb_system_metrics() -> None: + global _worker_wandb_run + with _worker_wandb_lock: + if _worker_wandb_run is None: + return + + run = _worker_wandb_run + try: + run.finish() + except Exception as exc: # noqa: BLE001 + logger.warning("Worker W&B run finish failed: %s", exc) + finally: + _worker_wandb_run = None + + +def configure_worker_wandb_system_metrics(data: dict) -> dict[str, Any]: + config_data = data.get("config") + if config_data is None: + raise ValueError("Missing 'config' field in request") + + rank = data.get("rank") + if rank is None: + raise ValueError("Missing 'rank' field in request") + + config = deserialize_value(config_data) + role = data.get("role") + enabled = init_worker_wandb_system_metrics(config, role=role, rank=rank) + return {"wandb_system_metrics": "enabled" if enabled else "skipped"} + + +def register_worker_wandb_system_metrics_hooks(state) -> None: + state.register_configure_hook(configure_worker_wandb_system_metrics) + state.register_cleanup_hook(finish_worker_wandb_system_metrics) diff --git a/docs/en/cli_reference.md b/docs/en/cli_reference.md index 0b217a2673..66973d2d67 100644 --- a/docs/en/cli_reference.md +++ b/docs/en/cli_reference.md @@ -87,6 +87,7 @@ For detailed examples, see the experiment configurations in the `examples/` dire - [SchedulingStrategy](section-scheduling-strategy) - [SessionTracer Configuration](section-session-tracer) - [Teacher Configuration](section-teacher) +- [WandBSystemMetrics Configuration](section-wand-b-system-metrics) ______________________________________________________________________ @@ -848,20 +849,21 @@ See: https://github.com/gradio-app/trackio Configuration for Weights & Biases experiment tracking. -| Parameter | Type | Default | Description | -| ---------------- | ---------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------- | -| `mode` | string | `"disabled"` | Tracking mode. One of 'online', 'offline', 'disabled', or 'shared'. **Choices:** `online`, `offline`, `disabled`, `shared` | -| `wandb_base_url` | string | `""` | - | -| `wandb_api_key` | string | `""` | - | -| `entity` | string \| None | `None` | - | -| `project` | string \| None | `None` | - | -| `name` | string \| None | `None` | - | -| `job_type` | string \| None | `None` | - | -| `group` | string \| None | `None` | - | -| `notes` | string \| None | `None` | - | -| `tags` | list of string \| None | `None` | - | -| `config` | `dict` \| None | `None` | - | -| `id_suffix` | string \| None | `"train"` | - | +| Parameter | Type | Default | Description | +| ---------------- | ----------------------------------------------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------- | +| `mode` | string | `"disabled"` | Tracking mode. One of 'online', 'offline', 'disabled', or 'shared'. **Choices:** `online`, `offline`, `disabled`, `shared` | +| `wandb_base_url` | string | `""` | - | +| `wandb_api_key` | string | `""` | - | +| `entity` | string \| None | `None` | - | +| `project` | string \| None | `None` | - | +| `name` | string \| None | `None` | - | +| `job_type` | string \| None | `None` | - | +| `group` | string \| None | `None` | - | +| `notes` | string \| None | `None` | - | +| `tags` | list of string \| None | `None` | - | +| `config` | `dict` \| None | `None` | - | +| `id_suffix` | string \| None | `"train"` | - | +| `system_metrics` | [`WandBSystemMetricsConfig`](section-wand-b-system-metrics) | **Required** | Worker-side W&B system metrics configuration. | (section-agent)= @@ -1296,3 +1298,19 @@ Configuration class: TeacherConfig | `max_new_tokens` | integer | `1024` | Maximum number of new tokens to generate | | `rl_loss_weight` | float | `1.0` | RL loss weight | | `distill_loss_weight` | float | `0.005` | Distillation loss weight | + +(section-wand-b-system-metrics)= + +## WandBSystemMetrics Configuration + +Worker-side W&B system metrics collection. + +The controller W&B client already records system metrics for the controller process. +Enable this config to attach GPU worker processes to the same W&B run so W&B can sample +system metrics from the GPU nodes too. + +| Parameter | Type | Default | Description | +| ---------------- | ----------------------- | -------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | +| `enabled` | boolean | `False` | Start non-primary W&B clients in worker processes to collect worker system metrics. Requires wandb.mode='shared'. | +| `roles` | list of string \| None | `('actor', 'rollout', 'critic', 'ref', 'teacher')` | Worker roles that should start W&B system metrics clients. Set to null to enable every configured worker role. | +| `gpu_device_ids` | list of integer \| None | `None` | Optional GPU device ids passed to W&B's system metrics collector. Leave unset to let W&B use the worker's visible devices. | diff --git a/docs/en/reference/metrics_tracking.md b/docs/en/reference/metrics_tracking.md index a83fe253aa..68d7240779 100644 --- a/docs/en/reference/metrics_tracking.md +++ b/docs/en/reference/metrics_tracking.md @@ -304,7 +304,7 @@ stats_logger: fileroot: "/path/to/logs" wandb: - mode: "online" # "online", "offline", or "disabled" + mode: "online" # "online", "offline", "shared", or "disabled" project: "my-project" entity: "my-team" @@ -316,6 +316,33 @@ stats_logger: path: "/path/to/tensorboard/logs" # null to disable ``` +### W&B GPU System Metrics + +W&B collects system metrics from the process that calls `wandb.init()`. In +single-controller training, the controller may run on a CPU node while actor and rollout +workers run on GPU nodes. In that setup, ordinary `wandb.mode: "online"` only reports +controller process metrics. + +To collect GPU utilization and memory from worker nodes, use W&B shared mode and enable +worker system metrics: + +```yaml +stats_logger: + wandb: + mode: "shared" + project: "my-project" + entity: "my-team" + system_metrics: + enabled: true + roles: ["actor", "rollout"] # null enables every worker role + gpu_device_ids: null # let W&B use the worker's visible GPUs +``` + +The controller initializes the primary W&B run. Each selected worker initializes a +non-primary W&B client with `x_primary=False`, a role/rank label, and +`x_update_finish_state=False`, so workers contribute only their system metrics and do +not finish the run. + ## Best Practices 1. **Choose the right paradigm**: Use `scalar()` for scalars, `stat()` with denominators diff --git a/docs/zh/cli_reference.md b/docs/zh/cli_reference.md index e9e6f11180..170ca31395 100644 --- a/docs/zh/cli_reference.md +++ b/docs/zh/cli_reference.md @@ -85,6 +85,7 @@ python3 train.py --config path/to/config.yaml actor.lr=1e-4 seed=42 - [SchedulingStrategy](section-scheduling-strategy) - [SessionTracer Configuration](section-session-tracer) - [Teacher Configuration](section-teacher) +- [WandBSystemMetrics Configuration](section-wand-b-system-metrics) ______________________________________________________________________ @@ -846,20 +847,21 @@ See: https://github.com/gradio-app/trackio Configuration for Weights & Biases experiment tracking. -| Parameter | Type | Default | Description | -| ---------------- | ---------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------- | -| `mode` | string | `"disabled"` | Tracking mode. One of 'online', 'offline', 'disabled', or 'shared'. **Choices:** `online`, `offline`, `disabled`, `shared` | -| `wandb_base_url` | string | `""` | - | -| `wandb_api_key` | string | `""` | - | -| `entity` | string \| None | `None` | - | -| `project` | string \| None | `None` | - | -| `name` | string \| None | `None` | - | -| `job_type` | string \| None | `None` | - | -| `group` | string \| None | `None` | - | -| `notes` | string \| None | `None` | - | -| `tags` | list of string \| None | `None` | - | -| `config` | `dict` \| None | `None` | - | -| `id_suffix` | string \| None | `"train"` | - | +| Parameter | Type | Default | Description | +| ---------------- | ----------------------------------------------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------- | +| `mode` | string | `"disabled"` | Tracking mode. One of 'online', 'offline', 'disabled', or 'shared'. **Choices:** `online`, `offline`, `disabled`, `shared` | +| `wandb_base_url` | string | `""` | - | +| `wandb_api_key` | string | `""` | - | +| `entity` | string \| None | `None` | - | +| `project` | string \| None | `None` | - | +| `name` | string \| None | `None` | - | +| `job_type` | string \| None | `None` | - | +| `group` | string \| None | `None` | - | +| `notes` | string \| None | `None` | - | +| `tags` | list of string \| None | `None` | - | +| `config` | `dict` \| None | `None` | - | +| `id_suffix` | string \| None | `"train"` | - | +| `system_metrics` | [`WandBSystemMetricsConfig`](section-wand-b-system-metrics) | **Required** | Worker-side W&B system metrics configuration. | (section-agent)= @@ -1294,3 +1296,19 @@ Configuration class: TeacherConfig | `max_new_tokens` | integer | `1024` | Maximum number of new tokens to generate | | `rl_loss_weight` | float | `1.0` | RL loss weight | | `distill_loss_weight` | float | `0.005` | Distillation loss weight | + +(section-wand-b-system-metrics)= + +## WandBSystemMetrics Configuration + +Worker-side W&B system metrics collection. + +The controller W&B client already records system metrics for the controller process. +Enable this config to attach GPU worker processes to the same W&B run so W&B can sample +system metrics from the GPU nodes too. + +| Parameter | Type | Default | Description | +| ---------------- | ----------------------- | -------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | +| `enabled` | boolean | `False` | Start non-primary W&B clients in worker processes to collect worker system metrics. Requires wandb.mode='shared'. | +| `roles` | list of string \| None | `('actor', 'rollout', 'critic', 'ref', 'teacher')` | Worker roles that should start W&B system metrics clients. Set to null to enable every configured worker role. | +| `gpu_device_ids` | list of integer \| None | `None` | Optional GPU device ids passed to W&B's system metrics collector. Leave unset to let W&B use the worker's visible devices. | diff --git a/docs/zh/reference/metrics_tracking.md b/docs/zh/reference/metrics_tracking.md index 2508ab7ce7..3273491464 100644 --- a/docs/zh/reference/metrics_tracking.md +++ b/docs/zh/reference/metrics_tracking.md @@ -295,7 +295,7 @@ stats_logger: fileroot: "/path/to/logs" wandb: - mode: "online" # "online"、"offline" 或 "disabled" + mode: "online" # "online"、"offline"、"shared" 或 "disabled" project: "my-project" entity: "my-team" @@ -307,6 +307,28 @@ stats_logger: path: "/path/to/tensorboard/logs" # null 禁用 ``` +### W&B GPU 系统指标 + +W&B 会从调用 `wandb.init()` 的进程采集系统指标。在 single-controller 训练中,控制器可能运行在 CPU 节点,而 actor 和 +rollout worker 运行在 GPU 节点上。此时普通的 `wandb.mode: "online"` 只会报告控制器进程指标。 + +要采集 worker 节点上的 GPU 利用率和显存指标,请使用 W&B shared mode 并启用 worker 系统指标: + +```yaml +stats_logger: + wandb: + mode: "shared" + project: "my-project" + entity: "my-team" + system_metrics: + enabled: true + roles: ["actor", "rollout"] # null 表示启用所有 worker role + gpu_device_ids: null # 让 W&B 使用 worker 可见的 GPU +``` + +控制器会初始化 primary W&B run。每个被选中的 worker 会以 `x_primary=False`、 role/rank 标签和 +`x_update_finish_state=False` 初始化 non-primary W&B 客户端, 因此 worker 只贡献系统指标,不会结束该 run。 + ## 最佳实践 1. **选择正确的范式**:对标量使用 `scalar()`,对批量 PyTorch 张量(通常是训练指标)使用带分母的 `stat()`。 diff --git a/tests/test_wandb_system_metrics.py b/tests/test_wandb_system_metrics.py new file mode 100644 index 0000000000..8d20042408 --- /dev/null +++ b/tests/test_wandb_system_metrics.py @@ -0,0 +1,145 @@ +import sys +from types import SimpleNamespace + +import pytest + +from areal.api.cli_args import ( + BaseExperimentConfig, + WandBConfig, + WandBSystemMetricsConfig, +) +from areal.utils.wandb_system_metrics import ( + finish_worker_wandb_system_metrics, + init_worker_wandb_system_metrics, + resolve_wandb_run_id, + worker_system_metrics_enabled, +) + + +def _make_config(tmp_path, *, roles=None, gpu_device_ids=None): + config = BaseExperimentConfig( + experiment_name="exp", + trial_name="trial", + total_train_epochs=1, + ) + config.stats_logger.experiment_name = "exp" + config.stats_logger.trial_name = "trial" + config.stats_logger.fileroot = str(tmp_path) + config.stats_logger.wandb = WandBConfig( + mode="shared", + project="proj", + entity="entity", + id_suffix="timestamp", + system_metrics=WandBSystemMetricsConfig( + enabled=True, + roles=roles, + gpu_device_ids=gpu_device_ids, + ), + ) + return config + + +def test_worker_system_metrics_requires_shared_mode(): + with pytest.raises(ValueError, match="requires stats_logger.wandb.mode='shared'"): + WandBConfig( + mode="online", + system_metrics=WandBSystemMetricsConfig(enabled=True), + ) + + +def test_worker_system_metrics_rejects_empty_roles(): + with pytest.raises(ValueError, match="must be null or a non-empty list"): + WandBSystemMetricsConfig(roles=[]) + + +def test_worker_system_metrics_rejects_negative_gpu_ids(): + with pytest.raises(ValueError, match="must contain non-negative integers"): + WandBSystemMetricsConfig(gpu_device_ids=[0, -1]) + + +def test_worker_system_metrics_normalizes_iterables(): + cfg = WandBSystemMetricsConfig(roles=("actor", "rollout"), gpu_device_ids=(0, 1)) + assert cfg.roles == ["actor", "rollout"] + assert cfg.gpu_device_ids == [0, 1] + + +def test_timestamp_run_id_is_resolved_once(monkeypatch, tmp_path): + config = _make_config(tmp_path) + timestamps = iter(["2026_05_14_00_00_01", "2026_05_14_00_00_02"]) + monkeypatch.setattr( + "areal.utils.wandb_system_metrics.time.strftime", + lambda _: next(timestamps), + ) + + run_id = resolve_wandb_run_id(config.stats_logger) + assert run_id == "exp_trial_2026_05_14_00_00_01" + assert resolve_wandb_run_id(config.stats_logger) == run_id + + +def test_worker_system_metrics_respects_role_filter(tmp_path): + config = _make_config(tmp_path, roles=["actor"]) + + assert worker_system_metrics_enabled(config, "actor") + assert not worker_system_metrics_enabled(config, "rollout") + + +def test_worker_wandb_init_uses_non_primary_shared_settings(monkeypatch, tmp_path): + config = _make_config(tmp_path, roles=["actor"], gpu_device_ids=[0, 1]) + config.stats_logger.wandb.id_suffix = "fixed" + calls = [] + run_finishes = [] + + class FakeSettings: + def __init__(self, **kwargs): + self.kwargs = kwargs + + class FakeRun: + def finish(self): + run_finishes.append(True) + + def fail_global_finish(): + raise AssertionError("worker cleanup must finish the owned run handle") + + fake_wandb = SimpleNamespace( + Settings=FakeSettings, + init=lambda **kwargs: calls.append(kwargs) or FakeRun(), + finish=fail_global_finish, + ) + monkeypatch.setitem(sys.modules, "wandb", fake_wandb) + + assert init_worker_wandb_system_metrics(config, role="actor", rank=3) + assert len(calls) == 1 + + call = calls[0] + assert call["mode"] == "shared" + assert call["id"] == "exp_trial_fixed" + assert call["settings"].kwargs == { + "mode": "shared", + "x_primary": False, + "x_label": "actor-3", + "x_update_finish_state": False, + "x_stats_gpu_device_ids": [0, 1], + } + + finish_worker_wandb_system_metrics() + assert run_finishes == [True] + + +def test_worker_wandb_init_failure_does_not_crash(monkeypatch, tmp_path): + config = _make_config(tmp_path, roles=["actor"]) + config.stats_logger.wandb.id_suffix = "fixed" + + class FakeSettings: + def __init__(self, **kwargs): + self.kwargs = kwargs + + def boom(**_kwargs): + raise RuntimeError("wandb backend unavailable") + + fake_wandb = SimpleNamespace(Settings=FakeSettings, init=boom, finish=lambda: None) + monkeypatch.setitem(sys.modules, "wandb", fake_wandb) + + # Worker telemetry must not propagate exceptions from wandb.init(). + assert not init_worker_wandb_system_metrics(config, role="actor", rank=0) + # finish() should be a no-op when init failed (no run was created). + finish_worker_wandb_system_metrics()