diff --git a/.gitignore b/.gitignore index 838460f..c1584a1 100644 --- a/.gitignore +++ b/.gitignore @@ -98,3 +98,4 @@ docs/.vitepress/cache/ docs/public/models/ CLAUDE.md +*.mjb \ No newline at end of file diff --git a/README.md b/README.md index 88a4644..69a5039 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,11 @@ A Python client library for interacting with [I2RT](https://i2rt.com/) products - Plug-and-play Python interface for YAM arms and Flow Base - Real-time robot control via CAN bus (DM series motors) -- MuJoCo gravity compensation, simulation, and URDF/MJCF models +- MuJoCo gravity + Coulomb friction compensation, with sim-side parity +- Threaded MuJoCo control loop with self-collision detection (`examples/control_with_mujoco/`) - Gripper force control and auto-calibration - Bimanual teleoperation and trajectory record & replay +- URDF / MJCF models for every arm and gripper - Policy-deployment ready — works with standard robot learning pipelines ## Installation @@ -98,6 +100,15 @@ python scripts/run_yam_leader.py --channel $CAN_CHANNEL python examples/minimum_gello/minimum_gello.py --mode visualizer_local ``` +### Gravity & friction compensation + +YAM ships with hand-tuned per-arm YAML knobs in `i2rt/robots/config/.yml`: +`gravity_comp_factor`, `grav_comp_kd` (motor-side idle damping), and +`coulomb_friction` (applied as `friction · sign(q_dot)`). The same control law +runs in `SimRobot` via a background physics thread, so sim and hardware feel +the same. See [`docs/guides/gravity-compensation.md`](./docs/guides/gravity-compensation.md) +for the tuning workflow. + ## Gripper Types | Gripper | Motor | Notes | diff --git a/docs/.vitepress/config.mjs b/docs/.vitepress/config.mjs index c197eab..3948d6f 100644 --- a/docs/.vitepress/config.mjs +++ b/docs/.vitepress/config.mjs @@ -25,8 +25,9 @@ export default defineConfig({ { text: 'Products', link: '/products/' }, { text: 'Get Started', link: '/getting-started/installation' }, { text: 'SDK', link: '/sdk/' }, + { text: 'Guides', link: '/guides/gravity-compensation' }, { text: 'Examples', link: '/examples/' }, - { text: 'Releases', link: '/releases/v1.1' }, + { text: 'Releases', link: '/releases/v1.1.1' }, { text: 'i2rt.com', link: 'https://i2rt.com', target: '_blank' }, ], @@ -102,8 +103,17 @@ export default defineConfig({ { text: 'Release Notes', items: [ - { text: 'v1.1', link: '/releases/v1.1' }, - { text: 'v1.0', link: '/releases/v1.0' }, + { text: 'v1.1.1', link: '/releases/v1.1.1' }, + { text: 'v1.1.0', link: '/releases/v1.1.0' }, + { text: 'v1.0.0', link: '/releases/v1.0.0' }, + ], + }, + ], + '/guides/': [ + { + text: 'Guides', + items: [ + { text: 'Gravity & Friction Compensation', link: '/guides/gravity-compensation' }, ], }, ], diff --git a/docs/examples/control-with-mujoco.md b/docs/examples/control-with-mujoco.md index ce17f7b..c4d2597 100644 --- a/docs/examples/control-with-mujoco.md +++ b/docs/examples/control-with-mujoco.md @@ -4,7 +4,7 @@ Interactive MuJoCo viewer for i2rt robots. Visualises the robot in real time and lets you move it by dragging a target marker via inverse kinematics — no hardware required in simulation mode. -The control loop runs in a background thread, performs self-collision detection (commands that would self-collide are blocked), and applies gravity compensation when running on real hardware. Pass `--log` to print joint state and torques every loop iteration. +The sliders / mocap → IK → self-collision → `command_joint_pos` pipeline runs on a daemon control thread at `control_dt = 5 ms` (200 Hz), guarded by short `viewer.lock()` sections so viewer rendering never stalls commands and vice versa. Commands that would self-collide are blocked. On real hardware the arm holds itself under gravity compensation; in simulation the same loop drives a `SimRobot` physics thread that mirrors the hardware behaviour. Pass `--log` to print joint state and torques every loop iteration. ## Hardware Required @@ -17,12 +17,16 @@ The interface has two modes toggled with **SPACE**: ``` VIS mode (default): - Robot joint states ──► MuJoCo viewer (gravity comp active on real hw) + Robot joint states ──► MuJoCo viewer + (real hw: gravity comp idle sim: physics thread on, grav-comp active) CONTROL mode (press SPACE): Drag target marker ──► IK solver ──► Command arm + (real hw: PD tracks target sim: physics paused; arm teleports) ``` +In `--sim`, SPACE additionally calls `SimRobot.enable_gravity_comp()` / `disable_gravity_comp()` so the simulated arm settles under gravity in VIS and follows the IK target instantly in CONTROL — matching the hardware feel. Returning to VIS calls `MotorChainRobot.enter_gravity_comp_idle()` (real hw) or re-enables the physics thread (sim) so the arm is left in a clean float, not holding the last PD target. + ## Running ### Simulation @@ -82,3 +86,8 @@ All YAM-family arm × gripper combinations are supported: | `yam_ultra` | same as yam | | `big_yam` | same as yam | | `no_arm` | any gripper except `no_gripper` (gripper-only test rig) | + +## See Also + +- [Gravity & Friction Compensation](/guides/gravity-compensation) — what SPACE actually toggles, and how to tune it. +- [YAM Arm SDK reference](/sdk/yam-arm) — `get_yam_robot()` arguments and `MotorChainRobot` API. diff --git a/docs/guides/gravity-compensation.md b/docs/guides/gravity-compensation.md new file mode 100644 index 0000000..7682b01 --- /dev/null +++ b/docs/guides/gravity-compensation.md @@ -0,0 +1,131 @@ +# Gravity & Friction Compensation + +YAM arms compute gravity torques from their MuJoCo model and add them to every motor command, so the arm "floats" against gravity. From v1.1.1 the same control law also includes per-joint **Coulomb friction compensation** and per-joint **motor-side idle damping (`grav_comp_kd`)**. Together they make the arm easy to backdrive while keeping idle behaviour stable. + +This page covers what each knob does, how to tune them, and how the sim mirrors the hardware. + +## How the torque command is built + +In gravity-comp idle (`zero_gravity_mode=True`, no active control command), each motor receives: + +``` +τ = g(q) · gravity_comp_factor + coulomb_friction · sign(q_dot) +``` + +plus motor-side MIT-mode `kd` damping equal to `grav_comp_kd[i]` running at kHz on the motor's onboard loop. + +Under an active PD command (`command_joint_pos` / `command_joint_state`): + +``` +τ = τ_PD + g(q) · gravity_comp_factor + coulomb_friction · sign(q_dot) +``` + +`grav_comp_kd` is **not** applied during PD control — `self._kd` is used instead, so PD tracking gains are unchanged. The relevant code lives in `i2rt/robots/motor_chain_robot.py:340-355`. + +## Per-arm YAML configuration + +Tuning lives in `i2rt/robots/config/.yml`. The current YAM defaults: + +```yaml +gravity_comp_factor: [1.0, 1.1, 1.1, 1.2, 1.0, 1.0] +grav_comp_kd: [0.1, 0.1, 0.1, 0.3, 0.05, 0.05] +coulomb_friction: [0.3, 0.3, 0.3, 0.06, 0.06, 0.06] +``` + +| Field | Units | Purpose | +|-------|-------|---------| +| `gravity_comp_factor` | unitless multiplier on `g(q)` | Compensates per-joint underdrive (gear loss, harness drag). Six elements, one per arm joint. | +| `grav_comp_kd` | MIT-mode kd (motor units) | Passive idle damping on the motor's onboard loop. Six elements; only active when the robot is in gravity-comp idle. | +| `coulomb_friction` | Nm | Magnitude of `friction · sign(q_dot)` injection. Six elements; cancels static stiction so light pushes move the joint. | + +The gripper slot is automatically padded with `0.0` in [`i2rt/robots/get_robot.py:206-207`](https://github.com/i2rt-robotics/i2rt/blob/main/i2rt/robots/get_robot.py) — gripper joints are position-commanded and need no passive compensation. + +::: tip Per-arm defaults +`yam_pro.yml`, `yam_ultra.yml`, and `big_yam.yml` ship sensible starting values, but the wrist joints in particular vary with payload — expect to retune `grav_comp_kd[3]` (J4) and `coulomb_friction[3]` on a custom build. +::: + +## Runtime override + +Only `gravity_comp_factor` can be overridden per-call: + +```python +import numpy as np +from i2rt.robots.get_robot import get_yam_robot + +robot = get_yam_robot( + channel="can0", + gravity_comp_factor=np.array([1.0, 1.15, 1.10, 1.25, 1.0, 1.0]), +) +``` + +`grav_comp_kd` and `coulomb_friction` are YAML-only — they're hardware-tuning constants tied to the specific arm build, so they live with the arm config. + +## Mode transitions: `enter_gravity_comp_idle()` + +After issuing PD commands you can return the arm to a clean gravity-comp idle: + +```python +robot.command_joint_pos(target) # active PD +# ... do work ... +robot.enter_gravity_comp_idle() # back to floating, with grav_comp_kd damping +``` + +This sets `self._commands` to zeros with `kd = grav_comp_kd`, so the arm stops actively tracking the last target and resumes the float-and-damp behaviour. The MuJoCo viewer (`examples/control_with_mujoco/`) calls this automatically on every CONTROL → VIS toggle. + +::: warning Why this matters +Before v1.1.1 the viewer used `update_kp_kd(zeros, zeros)` on toggle. That only rewrote member caches and left the stale PD target in `_commands`, so the motors kept holding the last CONTROL pose — felt as per-joint friction after returning to VIS. If you call `update_kp_kd` directly, follow it with `enter_gravity_comp_idle()` to clear the stale target. +::: + +## Simulation parity + +`SimRobot` runs a daemon **physics thread** (`_physics_loop` in `i2rt/robots/sim_robot.py`) that applies model-based gravity comp at the same `gravity_comp_factor` schedule as hardware. The MuJoCo viewer toggles it via: + +```python +robot.enable_gravity_comp() # VIS mode +robot.disable_gravity_comp() # CONTROL mode (teleport on; physics paused) +``` + +`get_yam_robot(sim=True)` passes a unit factor (`np.ones(...)`) so sim uses the unscaled MuJoCo gravity — there's no harness-loss or gear-loss to compensate for. `coulomb_friction` and `grav_comp_kd` flow through but have no observable effect in sim, because the MuJoCo model has no Coulomb friction and no MIT-mode motor model. + +## Tuning workflow + +Start from the per-arm defaults and adjust in this order: + +1. **Set `gravity_comp_factor`.** Lift the arm to a horizontal pose and release. Increase `gravity_comp_factor[i]` on whichever joint sags first; decrease on whichever drifts up. Tune one joint at a time, shoulder-to-wrist. +2. **Set `coulomb_friction`.** Once the arm holds pose, give each joint a small push. If a joint resists at first then "snaps free", raise its `coulomb_friction` slightly. Stop just before joints start drifting on their own. +3. **Set `grav_comp_kd` only if needed.** If a joint chatters or limit-cycles after release, raise its `grav_comp_kd`. Most joints don't need it (defaults near 0.05–0.1); only J4 typically needs more on bigger wrists. + +::: tip Quick test loop +Run the arm in gravity-comp idle from any of these entry points — all three accept `--arm`, `--gripper`, and `--channel`: + +```bash +# Headless: just brings the arm up in grav-comp idle (add --log for a live joint/torque table) +python i2rt/robots/motor_chain_robot.py --arm yam --gripper linear_4310 --channel can0 + +# Browser viewer (Viser) — mirror, IK drag, per-joint sliders +python examples/control_with_viser/control_with_viser.py --arm yam --gripper linear_4310 --channel can0 + +# MuJoCo viewer — VIS/CONTROL toggle, auto-calls enter_gravity_comp_idle() on toggle +python examples/control_with_mujoco/control_with_mujoco.py --arm yam --gripper linear_4310 --channel can0 +``` + +Edit the YAML, re-run, repeat. No code changes needed. +::: + +## Troubleshooting + +| Symptom | Likely cause | Fix | +|---------|------|-----| +| Joint sags slowly under gravity | `gravity_comp_factor` too low | Raise the per-joint factor by 0.05–0.10 increments | +| Joint drifts up against gravity | `gravity_comp_factor` too high | Lower the per-joint factor | +| Joint chatters / limit-cycles when released | Stiction interacting with under-compensation; or `grav_comp_kd` too low | First, recheck `gravity_comp_factor`; if clean, raise `grav_comp_kd` | +| Joint feels "sticky" — needs a push to start moving, then moves freely | `coulomb_friction` too low | Raise the per-joint friction value | +| Joint walks on its own after release | `coulomb_friction` too high | Lower the per-joint friction value | +| Arm "holds" the previous PD target after CONTROL → VIS | `enter_gravity_comp_idle()` was not called | Use the v1.1.1 viewer (auto-calls), or call it yourself after `command_joint_pos` | +| Sim arm flops to the floor | `enable_gravity_comp()` was not called or `start_server()` not invoked | Use `mujoco_control_interface` (calls both for you) | + +## See also + +- [YAM Arm SDK reference](/sdk/yam-arm) — `get_yam_robot()` arguments and the `MotorChainRobot` API. +- [MuJoCo Control example](/examples/control-with-mujoco) — interactive grav-comp testbed. +- [v1.1.1 release notes](/releases/v1.1.1) — the PR that introduced these knobs. diff --git a/docs/releases/v1.0.md b/docs/releases/v1.0.0.md similarity index 99% rename from docs/releases/v1.0.md rename to docs/releases/v1.0.0.md index 05201a8..660fa51 100644 --- a/docs/releases/v1.0.md +++ b/docs/releases/v1.0.0.md @@ -1,4 +1,4 @@ -# v1.0 Release Notes +# v1.0.0 Release Notes **Date:** 2026-03-03 diff --git a/docs/releases/v1.1.md b/docs/releases/v1.1.0.md similarity index 99% rename from docs/releases/v1.1.md rename to docs/releases/v1.1.0.md index 5116a67..9b999b7 100644 --- a/docs/releases/v1.1.md +++ b/docs/releases/v1.1.0.md @@ -1,4 +1,4 @@ -# v1.1 Release Notes +# v1.1.0 Release Notes **Date:** 2026-05-05 diff --git a/docs/releases/v1.1.1.md b/docs/releases/v1.1.1.md new file mode 100644 index 0000000..82db064 --- /dev/null +++ b/docs/releases/v1.1.1.md @@ -0,0 +1,53 @@ +# v1.1.1 Release Notes + +**Date:** 2026-05-06 + +## Highlights + +A patch release on top of v1.1 focused on stability of the gravity-compensation idle and on real-hardware feel. Adds **Coulomb friction compensation**, **motor-side idle damping (`grav_comp_kd`)**, **gravity compensation in simulation**, and threads the **MuJoCo control loop** so the viewer never stalls commands. + +See the new [Gravity & Friction Compensation guide](/guides/gravity-compensation) for the physics, YAML knobs, and tuning workflow. + +## Gravity-Comp Idle Damping (`grav_comp_kd`) + +- New per-arm YAML field in `i2rt/robots/config/.yml` — one value per arm joint (gripper slot is auto-padded with `0.0` in `get_robot.py`). +- Implemented as **motor-side MIT-mode kd**, so the damping closes on the motor's onboard kHz loop with no CAN round-trip. This replaces an earlier host-side `-damping * qvel` approach that introduced one-sample delay and amplified CAN velocity quantization noise — together producing a stiction limit cycle. +- Active only while the arm is in gravity-comp idle (`zero_gravity_mode=True`); `command_joint_pos` / `command_joint_state` continue to use `self._kd` as before, so PD tracking is unchanged. +- Hand-tuned shipping defaults (`yam.yml`): + ```yaml + grav_comp_kd: [0.1, 0.1, 0.1, 0.3, 0.05, 0.05] + ``` + +## Coulomb Friction Compensation (`coulomb_friction`) + +- New per-arm YAML field. Added to gravity torque in `MotorChainRobot.update()` as `coulomb_friction * sign(q_dot)`. +- Cancels the velocity-direction-dependent stiction torque so small joint perturbations move freely instead of needing to overcome static friction first. +- Sim path: friction comp is harmless but has no observable effect, since `SimRobot` has no joint friction in MuJoCo by default. +- Shipping defaults (`yam.yml`): + ```yaml + coulomb_friction: [0.3, 0.3, 0.3, 0.06, 0.06, 0.06] + ``` + +## `MotorChainRobot.enter_gravity_comp_idle()` + +- New public method that resets `self._commands` to zeros with `kd = grav_comp_kd`, mirroring the `zero_gravity_mode=True` initialisation branch. Leaves `self._kp` / `self._kd` untouched so subsequent control commands keep their gains. +- The MuJoCo viewer (`mujoco_control_interface.py`) now calls this on every CONTROL → VIS toggle. The previous `update_kp_kd(zeros, zeros)` call only rewrote member caches and left a stale PD target in `_commands`, so motors kept holding the last CONTROL pose — felt as per-joint friction after toggling back to VIS. + +## Sim-Side Gravity Compensation in `SimRobot` + +- `SimRobot.start_server()` now spawns a daemon **physics thread** (`_physics_loop`) that steps MuJoCo with model-based gravity-comp torques applied at the same `gravity_comp_factor` schedule as hardware. `get_yam_robot(sim=True)` passes `np.ones(...)` so sim uses the unscaled model gravity. +- New `enable_gravity_comp()` / `disable_gravity_comp()` methods toggle the loop. The MuJoCo viewer drives both methods on SPACE in `--sim`, mirroring the hardware VIS ↔ CONTROL behaviour. +- Clean shutdown: `close()` joins the physics thread. + +## Threaded MuJoCo Control Loop + +- `i2rt/utils/mujoco_control_interface.py` moves sliders / mocap → IK → self-collision check → `command_joint_pos` into a daemon thread at `control_dt = 5 ms` (200 Hz). Commands are no longer driven by the viewer's render loop. +- Shared `MjData` is guarded by `viewer.lock()`; locks are held only for short snapshot / write-back sections so viewer rendering never stalls commands and vice versa. +- In sim mode, the interface defaults to **VIS** on launch (so the arm settles under gravity comp before any teleport) and toggles between sim grav-comp on (VIS) / off (CONTROL) on SPACE. + +## Tuning Notes + +- `gravity_comp_factor` was retuned across arms once friction comp became active — JN values were lowered slightly because static friction had previously been masking under-compensation. +- `big_yam` ships a higher `grav_comp_kd` on the wrist J4 (0.3 vs 0.15) to match its larger moment arm. + +See [`/guides/gravity-compensation`](/guides/gravity-compensation) for the recommended workflow when retuning a custom build. diff --git a/docs/sdk/yam-arm.md b/docs/sdk/yam-arm.md index 1c7f150..749c5f3 100644 --- a/docs/sdk/yam-arm.md +++ b/docs/sdk/yam-arm.md @@ -136,6 +136,18 @@ Update the PD gains at runtime. robot.update_kp_kd(kp=new_kp_array, kd=new_kd_array) ``` +### `enter_gravity_comp_idle() → None` + +Reset the command buffer to zeros with `kd = grav_comp_kd` (from the arm's YAML), returning the arm to gravity-comp idle. `self._kp` and `self._kd` are left untouched, so a later `command_joint_pos` keeps its tracking gains. + +```python +robot.command_joint_pos(target) +# ... do work ... +robot.enter_gravity_comp_idle() # arm floats again, with motor-side idle damping +``` + +The MuJoCo viewer (`examples/control_with_mujoco/`) calls this automatically on every CONTROL → VIS toggle. See [Gravity & Friction Compensation](/guides/gravity-compensation) for details. + ### `close() → None` Safely shuts down the robot: stops the control thread and closes the CAN interface. @@ -246,8 +258,26 @@ Higher `--bilateral-kp` = leader arm feels heavier (more force feedback from fol --- +## Per-Arm YAML Configuration + +Hardware tuning lives in `i2rt/robots/config/.yml` — one file per arm variant. The fields most users adjust: + +| Field | Type | Purpose | +|-------|------|---------| +| `kp` / `kd` | 6-element list | Per-joint PD gains used by `command_joint_pos` / `command_joint_state`. | +| `gravity_comp_factor` | 6-element list | Per-joint multiplier on the model gravity torque. May be overridden per-call via `get_yam_robot(gravity_comp_factor=...)`. | +| `grav_comp_kd` | 6-element list | Motor-side MIT-mode kd applied **only** in gravity-comp idle. Tunes how stiff the arm feels when released. YAML-only. | +| `coulomb_friction` | 6-element list, Nm | Magnitude of `friction · sign(q_dot)` injected alongside gravity comp. Cancels static stiction. YAML-only. | + +The gripper slot is automatically padded with `0.0` for `grav_comp_kd` and `coulomb_friction` in `i2rt/robots/get_robot.py` — gripper joints don't need passive compensation. + +See [Gravity & Friction Compensation](/guides/gravity-compensation) for the full physics, tuning workflow, and troubleshooting. + +--- + ## See Also - [Quick Start](/getting-started/quick-start) - [Grippers](/sdk/grippers) +- [Gravity & Friction Compensation](/guides/gravity-compensation) - [Bimanual Teleoperation Example](/examples/bimanual-teleoperation) diff --git a/examples/control_with_mujoco/control_with_mujoco.py b/examples/control_with_mujoco/control_with_mujoco.py index 0d7e7d5..2163483 100644 --- a/examples/control_with_mujoco/control_with_mujoco.py +++ b/examples/control_with_mujoco/control_with_mujoco.py @@ -1,9 +1,21 @@ -"""MuJoCo control interface for i2rt robots. +"""MuJoCo control interface for i2rt robots (multiprocess). -Starts in VIS mode, mirroring the robot's joint state (gravity-comp active -on real hardware). Press SPACE to toggle into CONTROL mode, then double-click -the target marker, ctrl+right-drag to translate, ctrl+left-drag to rotate. -Commands are blocked when a self-collision is detected. +The robot controller and the MuJoCo viewer each run in their own +subprocess so neither steals GIL time from the other. The main +process is just glue: it spawns both workers, hosts the shared +memory + command queue that they speak through, and shuts them +down together. + +Starts in VIS mode, mirroring the robot's joint state (gravity-comp +active on real hardware). Press SPACE in the viewer to toggle into +CONTROL mode; then use the per-joint sliders or double-click the +mocap marker + ctrl+drag for IK. Commands are blocked on self- +collision. + +With ``--log`` the table header shows ``[loop XX.X Hz]``, which is +the motor chain's measured ``comm_freq`` (the same number +``dm_driver.py`` reports) instead of the viewer's data-read +cadence. Usage: python examples/control_with_mujoco/control_with_mujoco.py --sim @@ -13,29 +25,383 @@ python examples/control_with_mujoco/control_with_mujoco.py --channel can0 """ +import argparse +import logging +import multiprocessing as mp import os +import queue as _queue import signal import sys +import time +from multiprocessing import shared_memory from pathlib import Path +from types import SimpleNamespace +from typing import Any, Dict, Optional -sys.path.insert(0, str(Path(__file__).resolve().parents[2])) +import numpy as np -import argparse -import logging +_REPO_ROOT = str(Path(__file__).resolve().parents[2]) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) logging.basicConfig(level=logging.INFO) -from i2rt.robots.get_robot import get_yam_robot -from i2rt.robots.utils import ArmType, GripperType -from i2rt.utils.mujoco_control_interface import MujocoControlInterface -if __name__ == "__main__": - arm_choices = [a.value for a in ArmType] - gripper_choices = [g.value for g in GripperType] +# --------------------------------------------------------------------------- +# Shared memory layout +# --------------------------------------------------------------------------- + +_FIXED_FIELDS = ( + ("gripper_pos", 1), + ("button_states", 2), + ("loop_hz", 1), + ("seq", 1), +) +_PER_DOF_FIELDS = ("joint_pos", "joint_vel", "joint_eff", "temp_mos", "temp_rotor") + + +def _build_layout(n_dofs: int) -> tuple[Dict[str, slice], int]: + layout: Dict[str, slice] = {} + offset = 0 + for name in _PER_DOF_FIELDS: + layout[name] = slice(offset, offset + n_dofs) + offset += n_dofs + for name, length in _FIXED_FIELDS: + layout[name] = slice(offset, offset + length) + offset += length + return layout, offset + + +class SharedState: + """Numpy views over an ``mp.shared_memory.SharedMemory`` block.""" + + def __init__(self, n_dofs: int, shm_name: Optional[str] = None): + layout, total_floats = _build_layout(n_dofs) + self._layout = layout + self._n_dofs = n_dofs + nbytes = total_floats * np.dtype(np.float64).itemsize + if shm_name is None: + self._shm = shared_memory.SharedMemory(create=True, size=nbytes) + self._owner = True + else: + self._shm = shared_memory.SharedMemory(name=shm_name) + self._owner = False + self._buf = np.ndarray((total_floats,), dtype=np.float64, buffer=self._shm.buf) + if self._owner: + self._buf[:] = np.nan + + @property + def name(self) -> str: + return self._shm.name + + def view(self, field: str) -> np.ndarray: + return self._buf[self._layout[field]] + + def close(self) -> None: + try: + self._shm.close() + except Exception: + pass + + def unlink(self) -> None: + try: + self._shm.unlink() + except FileNotFoundError: + pass + + +# --------------------------------------------------------------------------- +# Proxy robot (used inside the viewer subprocess) +# --------------------------------------------------------------------------- + + +class _ProxyJointState: + """Duck-types the bits of ``MotorChainRobot._joint_state`` that the + viewer reads (``temp_mos``, ``temp_rotor``).""" + + def __init__(self, shared: SharedState): + self._shared = shared + + @property + def temp_mos(self) -> np.ndarray: + return self._shared.view("temp_mos").copy() + + @property + def temp_rotor(self) -> np.ndarray: + return self._shared.view("temp_rotor").copy() + - parser = argparse.ArgumentParser( - description="MuJoCo control interface for i2rt robots", +class _ProxyMotorChain: + """Duck-types ``DMChainCanInterface`` for viewer-side reads.""" + + def __init__(self, shared: SharedState, has_teaching_handle: bool): + self._shared = shared + self._has_teaching_handle = has_teaching_handle + self.same_bus_device_driver = object() if has_teaching_handle else None + + @property + def comm_freq(self) -> float: + val = float(self._shared.view("loop_hz")[0]) + return 0.0 if np.isnan(val) else val + + def get_same_bus_device_states(self) -> Optional[list[SimpleNamespace]]: + if not self._has_teaching_handle: + return None + buttons = self._shared.view("button_states") + if np.isnan(buttons[0]): + return None + return [SimpleNamespace(io_inputs=[bool(buttons[0]), bool(buttons[1])])] + + +class ProxyRobot: + """The viewer subprocess sees this in place of the real robot. + + It implements the subset of the ``Robot`` surface that + :class:`MujocoControlInterface` consumes: state reads come from shared + memory, commands go onto a multiprocessing queue that the robot + subprocess drains. + """ + + def __init__(self, shared: SharedState, cmd_queue: mp.Queue, meta: Dict[str, Any]): + self._shared = shared + self._cmd_queue = cmd_queue + self._meta = meta + self.xml_path = meta["xml_path"] + self._gripper_index = meta.get("gripper_index") + self._n_dofs = int(meta["n_dofs"]) + self.motor_chain = _ProxyMotorChain(shared, bool(meta.get("has_teaching_handle", False))) + self._joint_state = _ProxyJointState(shared) + + def num_dofs(self) -> int: + return self._n_dofs + + def get_robot_info(self) -> Dict[str, Any]: + return { + "gripper_index": self._gripper_index, + "gripper_limits": self._meta.get("gripper_limits"), + "sim": bool(self._meta.get("is_sim", False)), + } + + def get_joint_pos(self) -> np.ndarray: + return self._shared.view("joint_pos")[: self._n_dofs].copy() + + def get_observations(self) -> Dict[str, np.ndarray]: + n = self._n_dofs + pos = self._shared.view("joint_pos")[:n].copy() + vel = self._shared.view("joint_vel")[:n].copy() + eff = self._shared.view("joint_eff")[:n].copy() + gi = self._gripper_index + if gi is None: + return {"joint_pos": pos, "joint_vel": vel, "joint_eff": eff} + return { + "joint_pos": pos[:gi], + "joint_vel": vel[:gi], + "joint_eff": eff[:gi], + "gripper_pos": np.array([pos[gi]]), + "gripper_vel": np.array([vel[gi]]), + "gripper_eff": np.array([eff[gi]]), + } + + def command_joint_pos(self, joint_pos: np.ndarray) -> None: + self._cmd_queue.put(("joint_pos", np.asarray(joint_pos, dtype=np.float64).copy())) + + def enter_gravity_comp_idle(self) -> None: + self._cmd_queue.put(("enter_gravity_comp_idle",)) + + def enable_gravity_comp(self) -> None: + self._cmd_queue.put(("enable_gravity_comp",)) + + def disable_gravity_comp(self) -> None: + self._cmd_queue.put(("disable_gravity_comp",)) + + def start_server(self) -> None: + return None + + +# --------------------------------------------------------------------------- +# Robot subprocess +# --------------------------------------------------------------------------- + + +def _apply_command(robot: Any, cmd: tuple) -> None: + kind = cmd[0] + if kind == "joint_pos": + robot.command_joint_pos(cmd[1]) + elif kind == "enter_gravity_comp_idle" and hasattr(robot, "enter_gravity_comp_idle"): + robot.enter_gravity_comp_idle() + elif kind == "enable_gravity_comp" and hasattr(robot, "enable_gravity_comp"): + robot.enable_gravity_comp() + elif kind == "disable_gravity_comp" and hasattr(robot, "disable_gravity_comp"): + robot.disable_gravity_comp() + + +def _robot_worker( + arm_value: str, + gripper_value: str, + channel: str, + sim: bool, + meta_queue: mp.Queue, + cmd_queue: mp.Queue, + stop_event: Any, +) -> None: + """Subprocess entry point: owns the real/sim robot and publishes state.""" + from i2rt.robots.get_robot import get_yam_robot + from i2rt.robots.utils import ArmType, GripperType + from i2rt.utils.utils import RateRecorder + + arm = ArmType.from_string_name(arm_value) + gripper = GripperType.from_string_name(gripper_value) + + robot = get_yam_robot(channel=channel, arm_type=arm, gripper_type=gripper, sim=sim) + if sim and hasattr(robot, "start_server"): + robot.start_server() + + info = robot.get_robot_info() if hasattr(robot, "get_robot_info") else {} + chain = getattr(robot, "motor_chain", None) + has_teaching_handle = ( + chain is not None + and hasattr(chain, "get_same_bus_device_states") + and hasattr(chain, "same_bus_device_driver") + and chain.same_bus_device_driver is not None ) + + n_dofs = int(robot.num_dofs()) + shared = SharedState(n_dofs=n_dofs) # owner — auto-generated name + meta: Dict[str, Any] = { + "n_dofs": n_dofs, + "xml_path": robot.xml_path, + "gripper_index": info.get("gripper_index"), + "gripper_limits": info.get("gripper_limits"), + "is_sim": sim, + "has_teaching_handle": has_teaching_handle, + "shm_name": shared.name, + } + meta_queue.put(meta) + + rate = RateRecorder(name="robot_publish", report_interval=1.0) + target_dt = 0.004 # 250 Hz publish cadence, matching CAN + + try: + with rate: + next_tick = time.perf_counter() + seq = 0.0 + while not stop_event.is_set(): + obs = robot.get_observations() + joint_pos = obs["joint_pos"] + joint_vel = obs["joint_vel"] + joint_eff = obs["joint_eff"] + + pos_buf = shared.view("joint_pos") + vel_buf = shared.view("joint_vel") + eff_buf = shared.view("joint_eff") + n_arm = min(len(joint_pos), n_dofs) + pos_buf[:n_arm] = joint_pos[:n_arm] + vel_buf[:n_arm] = joint_vel[:n_arm] + eff_buf[:n_arm] = joint_eff[:n_arm] + + gi = meta["gripper_index"] + if gi is not None and "gripper_pos" in obs: + pos_buf[gi] = float(obs["gripper_pos"][0]) + vel_buf[gi] = float(obs.get("gripper_vel", np.zeros(1))[0]) + eff_buf[gi] = float(obs.get("gripper_eff", np.zeros(1))[0]) + shared.view("gripper_pos")[0] = pos_buf[gi] + + js = getattr(robot, "_joint_state", None) + if js is not None: + tm = np.asarray(js.temp_mos) + tr = np.asarray(js.temp_rotor) + n_t = min(len(tm), n_dofs) + shared.view("temp_mos")[:n_t] = tm[:n_t] + shared.view("temp_rotor")[:n_t] = tr[:n_t] + + if has_teaching_handle: + states = chain.get_same_bus_device_states() + if states: + io_inputs = states[0].io_inputs + shared.view("button_states")[0] = float(bool(io_inputs[0])) + shared.view("button_states")[1] = float(bool(io_inputs[1])) + + chain_freq = float(chain.comm_freq) if chain is not None and hasattr(chain, "comm_freq") else 0.0 + shared.view("loop_hz")[0] = chain_freq if chain_freq > 0 else float(rate.last_rate) + + seq += 1.0 + shared.view("seq")[0] = seq + + while True: + try: + cmd = cmd_queue.get_nowait() + except _queue.Empty: + break + _apply_command(robot, cmd) + + rate.track() + + next_tick += target_dt + sleep_for = next_tick - time.perf_counter() + if sleep_for > 0: + time.sleep(sleep_for) + else: + next_tick = time.perf_counter() + except KeyboardInterrupt: + pass + finally: + try: + if hasattr(robot, "close"): + robot.close() + except Exception as e: + logging.warning("robot.close() failed: %s", e) + shared.close() + + +# --------------------------------------------------------------------------- +# Viewer subprocess +# --------------------------------------------------------------------------- + + +def _viewer_worker( + gripper_value: str, + site_arg: Optional[str], + dt: float, + log: bool, + cmd_queue: mp.Queue, + meta: Dict[str, Any], + stop_event: Any, +) -> None: + """Subprocess entry point: runs the MuJoCo viewer against a ProxyRobot.""" + from i2rt.robots.utils import GripperType + from i2rt.utils.mujoco_control_interface import MujocoControlInterface + + shared = SharedState(n_dofs=int(meta["n_dofs"]), shm_name=meta["shm_name"]) + proxy = ProxyRobot(shared=shared, cmd_queue=cmd_queue, meta=meta) + + if site_arg is not None: + site = site_arg + elif gripper_value == GripperType.YAM_TEACHING_HANDLE.value: + site = "tcp_site" + else: + site = "grasp_site" + + iface = MujocoControlInterface.from_robot(proxy, ee_site=site, dt=dt, log=log) + try: + iface.run() + except KeyboardInterrupt: + pass + finally: + stop_event.set() + shared.close() + + +# --------------------------------------------------------------------------- +# Main (orchestrator) +# --------------------------------------------------------------------------- + + +def _parse_args() -> argparse.Namespace: + from i2rt.robots.utils import ArmType, GripperType + + arm_choices = [a.value for a in ArmType] + gripper_choices = [g.value for g in GripperType] + parser = argparse.ArgumentParser(description="MuJoCo control interface for i2rt robots") parser.add_argument("--arm", type=str, default="yam", choices=arm_choices) parser.add_argument("--gripper", type=str, default="linear_4310", choices=gripper_choices) parser.add_argument("--channel", type=str, default="can0", help="CAN channel") @@ -43,38 +409,93 @@ parser.add_argument("--dt", type=float, default=0.02, help="Loop timestep (s)") parser.add_argument("--site", type=str, default=None, help="EE site name (auto-detected if omitted)") parser.add_argument("--log", action="store_true", help="Log joint state and torques each loop iteration") - args = parser.parse_args() + return parser.parse_args() + + +def _wait_for_meta(meta_queue: mp.Queue, robot_proc: mp.Process, timeout: float = 60.0) -> Dict[str, Any]: + """Block until the robot worker publishes its meta dict, or dies.""" + deadline = time.time() + timeout + while True: + try: + return meta_queue.get(timeout=0.5) + except _queue.Empty: + if not robot_proc.is_alive(): + raise RuntimeError( + f"Robot worker exited (code={robot_proc.exitcode}) before publishing meta" + ) from None + if time.time() > deadline: + raise RuntimeError(f"Timed out after {timeout:.0f}s waiting for robot meta") from None + + +def main() -> None: + args = _parse_args() + from i2rt.robots.utils import ArmType, GripperType arm = ArmType.from_string_name(args.arm) gripper = GripperType.from_string_name(args.gripper) - if arm == ArmType.NO_ARM and gripper == GripperType.NO_GRIPPER: - parser.error("--gripper cannot be 'no_gripper' when --arm is 'no_arm'") + raise SystemExit("--gripper cannot be 'no_gripper' when --arm is 'no_arm'") - robot = get_yam_robot( - channel=args.channel, - arm_type=arm, - gripper_type=gripper, - sim=args.sim, + mp.set_start_method("spawn", force=True) + + cmd_queue: mp.Queue = mp.Queue() + meta_queue: mp.Queue = mp.Queue(maxsize=1) + stop_event = mp.Event() + + robot_proc = mp.Process( + target=_robot_worker, + name="robot_worker", + args=(args.arm, args.gripper, args.channel, args.sim, meta_queue, cmd_queue, stop_event), ) + robot_proc.start() - # Teaching handle only has tcp_site; all grippers with fingers have grasp_site. - if args.site is not None: - site = args.site - elif gripper == GripperType.YAM_TEACHING_HANDLE: - site = "tcp_site" - else: - site = "grasp_site" + meta: Optional[Dict[str, Any]] = None + try: + meta = _wait_for_meta(meta_queue, robot_proc) + except Exception as e: + print(f"[control] {e}") + robot_proc.join(timeout=2.0) + if robot_proc.is_alive(): + robot_proc.terminate() + raise SystemExit(1) from e - iface = MujocoControlInterface.from_robot(robot, ee_site=site, dt=args.dt, log=args.log) + viewer_proc = mp.Process( + target=_viewer_worker, + name="viewer_worker", + args=(args.gripper, args.site, args.dt, args.log, cmd_queue, meta, stop_event), + ) + viewer_proc.start() try: - iface.run() + mp.connection.wait([robot_proc.sentinel, viewer_proc.sentinel]) except KeyboardInterrupt: pass finally: - # iface.run() already caught the first Ctrl+C, but the MuJoCo viewer - # process can hang (gray/unresponsive window). Force-kill ourselves so - # the user doesn't have to manually terminate. + stop_event.set() + for p in (robot_proc, viewer_proc): + if p.is_alive(): + try: + os.kill(p.pid, signal.SIGINT) + except ProcessLookupError: + pass + for p in (robot_proc, viewer_proc): + p.join(timeout=2.0) + if p.is_alive(): + p.terminate() + p.join(timeout=1.0) + if p.is_alive(): + p.kill() + + try: + shm = shared_memory.SharedMemory(name=meta["shm_name"]) + shm.close() + shm.unlink() + except FileNotFoundError: + pass + print("[control] Force killing process to close MuJoCo window") os.kill(os.getpid(), signal.SIGKILL) + + +if __name__ == "__main__": + main() diff --git a/i2rt/robot_models/arm/big_yam/big_yam.xml b/i2rt/robot_models/arm/big_yam/big_yam.xml index c98dded..d078c7f 100644 --- a/i2rt/robot_models/arm/big_yam/big_yam.xml +++ b/i2rt/robot_models/arm/big_yam/big_yam.xml @@ -10,27 +10,27 @@ - + - + - + - + - + - + diff --git a/i2rt/robot_models/gripper/crank_4310/crank_4310.xml b/i2rt/robot_models/gripper/crank_4310/crank_4310.xml index de231ef..f2c4d07 100644 --- a/i2rt/robot_models/gripper/crank_4310/crank_4310.xml +++ b/i2rt/robot_models/gripper/crank_4310/crank_4310.xml @@ -7,7 +7,7 @@ - + diff --git a/i2rt/robots/config/big_yam.yml b/i2rt/robots/config/big_yam.yml index 2fcce8c..e404c31 100644 --- a/i2rt/robots/config/big_yam.yml +++ b/i2rt/robots/config/big_yam.yml @@ -11,4 +11,8 @@ directions: [-1, -1, 1, 1, -1, 1] kp: [80.0, 80.0, 80.0, 40.0, 40.0, 10.0] kd: [5.0, 5.0, 5.0, 3.0, 1.5, 1.5] -gravity_comp_factor: [1.0, 1.1, 1.0, 1.0, 1.0, 1.0] +gravity_comp_factor: [1.0, 1.1, 1.0, 1.1, 1.0, 1.0] + +grav_comp_kd: [0.5, 0.5, 0.5, 0.3, 0.1, 0.1] + +coulomb_friction: [0.1, 0.1, 0.3, 0.3, 0.06, 0.06] diff --git a/i2rt/robots/config/crank_4310.yml b/i2rt/robots/config/crank_4310.yml index 7cd08c5..3a03640 100644 --- a/i2rt/robots/config/crank_4310.yml +++ b/i2rt/robots/config/crank_4310.yml @@ -1,5 +1,3 @@ -# Last-joint mounting geometry (set on the deepest arm link at runtime) -# Per-arm transforms account for different wrist geometries. last_joint_mount: yam: pos: "0.0 -0.0398 0.0412" diff --git a/i2rt/robots/config/flexible_4310.yml b/i2rt/robots/config/flexible_4310.yml index c00c9dc..03c9492 100644 --- a/i2rt/robots/config/flexible_4310.yml +++ b/i2rt/robots/config/flexible_4310.yml @@ -1,5 +1,3 @@ -# Last-joint mounting geometry (set on the deepest arm link at runtime) -# Per-arm transforms account for different wrist geometries. last_joint_mount: yam: pos: "0 -0.0398 0.0412" diff --git a/i2rt/robots/config/linear_3507.yml b/i2rt/robots/config/linear_3507.yml index 96f17ff..3ea449f 100644 --- a/i2rt/robots/config/linear_3507.yml +++ b/i2rt/robots/config/linear_3507.yml @@ -1,5 +1,3 @@ -# Last-joint mounting geometry (set on the deepest arm link at runtime) -# Per-arm transforms account for different wrist geometries. last_joint_mount: yam: pos: "2.39858e-07 -0.0419481 0.0404996" diff --git a/i2rt/robots/config/linear_4310.yml b/i2rt/robots/config/linear_4310.yml index c75b62d..015f1c7 100644 --- a/i2rt/robots/config/linear_4310.yml +++ b/i2rt/robots/config/linear_4310.yml @@ -1,5 +1,3 @@ -# Last-joint mounting geometry (set on the deepest arm link at runtime) -# Per-arm transforms account for different wrist geometries. last_joint_mount: yam: pos: "2.39858e-07 -0.0419481 0.0404996" diff --git a/i2rt/robots/config/no_gripper.yml b/i2rt/robots/config/no_gripper.yml index 4ebb600..6bcee48 100644 --- a/i2rt/robots/config/no_gripper.yml +++ b/i2rt/robots/config/no_gripper.yml @@ -1,5 +1,3 @@ -# Last-joint mounting geometry (set on the deepest arm link at runtime) -# Per-arm transforms account for different wrist geometries. last_joint_mount: yam: pos: "2.39858e-07 -0.0419481 0.0404996" diff --git a/i2rt/robots/config/yam.yml b/i2rt/robots/config/yam.yml index 8ecce91..3bedf33 100644 --- a/i2rt/robots/config/yam.yml +++ b/i2rt/robots/config/yam.yml @@ -11,4 +11,8 @@ directions: [1, 1, 1, 1, 1, 1] kp: [80.0, 80.0, 80.0, 40.0, 10.0, 10.0] kd: [5.0, 5.0, 5.0, 1.5, 1.5, 1.5] -gravity_comp_factor: [1.3, 1.1, 1.35, 1.3, 1.3, 1.3] +gravity_comp_factor: [1.0, 1.1, 1.1, 1.2, 1.0, 1.0] + +grav_comp_kd: [0.1, 0.1, 0.1, 0.3, 0.05, 0.05] + +coulomb_friction: [0.3, 0.3, 0.3, 0.06, 0.06, 0.06] diff --git a/i2rt/robots/config/yam_pro.yml b/i2rt/robots/config/yam_pro.yml index d29ef3d..3bedf33 100644 --- a/i2rt/robots/config/yam_pro.yml +++ b/i2rt/robots/config/yam_pro.yml @@ -11,4 +11,8 @@ directions: [1, 1, 1, 1, 1, 1] kp: [80.0, 80.0, 80.0, 40.0, 10.0, 10.0] kd: [5.0, 5.0, 5.0, 1.5, 1.5, 1.5] -gravity_comp_factor: [1.3, 1.3, 1.3, 1.1, 1.3, 1.3] +gravity_comp_factor: [1.0, 1.1, 1.1, 1.2, 1.0, 1.0] + +grav_comp_kd: [0.1, 0.1, 0.1, 0.3, 0.05, 0.05] + +coulomb_friction: [0.3, 0.3, 0.3, 0.06, 0.06, 0.06] diff --git a/i2rt/robots/config/yam_teaching_handle.yml b/i2rt/robots/config/yam_teaching_handle.yml index 044ff0d..ac69867 100644 --- a/i2rt/robots/config/yam_teaching_handle.yml +++ b/i2rt/robots/config/yam_teaching_handle.yml @@ -1,5 +1,3 @@ -# Last-joint mounting geometry (set on the deepest arm link at runtime) -# Per-arm transforms account for different wrist geometries. last_joint_mount: yam: pos: "0.00358881 -0.03891574 0.04880005" diff --git a/i2rt/robots/config/yam_ultra.yml b/i2rt/robots/config/yam_ultra.yml index c8a7e6e..3bedf33 100644 --- a/i2rt/robots/config/yam_ultra.yml +++ b/i2rt/robots/config/yam_ultra.yml @@ -11,4 +11,8 @@ directions: [1, 1, 1, 1, 1, 1] kp: [80.0, 80.0, 80.0, 40.0, 10.0, 10.0] kd: [5.0, 5.0, 5.0, 1.5, 1.5, 1.5] -gravity_comp_factor: [1.3, 1.3, 1.3, 1.3, 1.3, 1.3] +gravity_comp_factor: [1.0, 1.1, 1.1, 1.2, 1.0, 1.0] + +grav_comp_kd: [0.1, 0.1, 0.1, 0.3, 0.05, 0.05] + +coulomb_friction: [0.3, 0.3, 0.3, 0.06, 0.06, 0.06] diff --git a/i2rt/robots/get_robot.py b/i2rt/robots/get_robot.py index 3e72815..352de47 100644 --- a/i2rt/robots/get_robot.py +++ b/i2rt/robots/get_robot.py @@ -188,6 +188,8 @@ def get_yam_robot( directions = list(hw.directions) kp = hw.kp.copy() kd = hw.kd.copy() + grav_comp_kd = hw.grav_comp_kd.copy() + coulomb_friction = hw.coulomb_friction.copy() motor_offsets = [0.0] * len(motor_list) if with_gripper: @@ -201,6 +203,8 @@ def get_yam_robot( directions.append(gripper_type.get_motor_direction(arm_type)) kp = np.append(kp, _gripper_kp) kd = np.append(kd, _gripper_kd) + grav_comp_kd = np.append(grav_comp_kd, 0.0) + coulomb_friction = np.append(coulomb_friction, 0.0) if gripper_limits_override is not None and with_gripper: gripper_limits = np.asarray(gripper_limits_override) @@ -217,12 +221,15 @@ def get_yam_robot( if with_gripper and sim_gripper_limits is None: sim_gripper_limits = np.array([0.0, 1.0]) + sim_grav_comp = np.ones(len(motor_list)) + return SimRobot( xml_path=model_path, n_dofs=len(motor_list), joint_limits=joint_limits, gripper_index=n_arm_joints if with_gripper else None, gripper_limits=sim_gripper_limits, + gravity_comp_factor=sim_grav_comp, ) # --- Real hardware path --------------------------------------------------- @@ -266,6 +273,8 @@ def get_yam_robot( joint_limits=joint_limits, kp=kp, kd=kd, + grav_comp_kd=grav_comp_kd, + coulomb_friction=coulomb_friction, zero_gravity_mode=zero_gravity_mode, joint_state_saver_factory=joint_state_saver_factory, set_realtime_and_pin_callback=set_realtime_and_pin_callback, diff --git a/i2rt/robots/motor_chain_robot.py b/i2rt/robots/motor_chain_robot.py index ba3cd8d..7ffeca8 100644 --- a/i2rt/robots/motor_chain_robot.py +++ b/i2rt/robots/motor_chain_robot.py @@ -72,6 +72,10 @@ def __init__( gripper_index: Optional[int] = None, # Zero starting index: if you have a 6 dof arm and last one is gripper: 6 kp: Union[float, List[float]] = 10.0, kd: Union[float, List[float]] = 1.0, + grav_comp_kd: Optional[np.ndarray] = None, # per-joint MIT-mode kd, active only in grav-comp idle + coulomb_friction: Optional[ + np.ndarray + ] = None, # per-joint Coulomb friction (Nm); applied as coulomb_friction * sign(q_dot) joint_limits: Optional[np.ndarray] = None, # if provided, override the mujoco xml joint limits gripper_limits: Optional[np.ndarray] = None, # [closed, open] limit_gripper_force: float = -1, # whether to limit the gripper effort when it is blocked. -1 means no limit. @@ -178,6 +182,18 @@ def __init__( if isinstance(kd, float) else np.array(kd) ) + self._grav_comp_kd = ( + np.array(grav_comp_kd, dtype=float) if grav_comp_kd is not None else np.zeros(len(motor_chain)) + ) + assert len(self._grav_comp_kd) == len(motor_chain), ( + f"grav_comp_kd length {len(self._grav_comp_kd)} != motor_chain length {len(motor_chain)}" + ) + self._coulomb_friction = ( + np.array(coulomb_friction, dtype=float) if coulomb_friction is not None else np.zeros(len(motor_chain)) + ) + assert len(self._coulomb_friction) == len(motor_chain), ( + f"coulomb_friction length {len(self._coulomb_friction)} != motor_chain length {len(motor_chain)}" + ) self._joint_limits: Optional[np.ndarray] = None if xml_path is not None: @@ -211,6 +227,8 @@ def __init__( time.sleep(0.05) self._joint_state = self._motor_state_to_joint_state(self.motor_chain.read_states()) self._commands = JointCommands.init_all_zero(len(motor_chain)) + if zero_gravity_mode: + self._commands.kd = self._grav_comp_kd.copy() # For SWE-454, check if the current qpos is in the joint limits self._check_current_qpos_in_joint_limits() @@ -279,6 +297,8 @@ def get_robot_info(self) -> Dict[str, Any]: "gripper_type": self._gripper_type, "kp": self._kp, "kd": self._kd, + "grav_comp_kd": self._grav_comp_kd, + "coulomb_friction": self._coulomb_friction, "joint_limits": self._joint_limits, "gripper_limits": self._gripper_limits, "gravity_comp_factor": self.gravity_comp_factor, @@ -327,7 +347,8 @@ def update(self) -> None: joint_commands = copy.deepcopy(self._commands) with self._state_lock: g = self._compute_gravity_compensation(self._joint_state) - motor_torques = joint_commands.torques + g * self.gravity_comp_factor + friction_comp = self._coulomb_friction * np.sign(self._joint_state.vel) + motor_torques = joint_commands.torques + g * self.gravity_comp_factor + friction_comp motor_torques = np.clip(motor_torques, -self._clip_motor_torque, self._clip_motor_torque) self._last_motor_torques = motor_torques.copy() @@ -605,6 +626,20 @@ def update_kp_kd(self, kp: np.ndarray, kd: np.ndarray) -> None: self._kp = kp self._kd = kd + def enter_gravity_comp_idle(self) -> None: + """Reset active commands to gravity-comp idle. + + Sets ``self._commands`` to zeros with ``kd = self._grav_comp_kd``, + mirroring the ``zero_gravity_mode=True`` branch in ``__init__``. Use + this after running PD control (e.g. ``command_joint_pos``) to re-enter + grav-comp idle without leaving the previous target pose/gains active. + Leaves ``self._kp`` / ``self._kd`` unchanged so subsequent control + commands still use the configured control gains. + """ + with self._command_lock: + self._commands = JointCommands.init_all_zero(len(self.motor_chain)) + self._commands.kd = self._grav_comp_kd.copy() + def start_recording(self, save_dir: str) -> bool: """Start recording joint state data asynchronously.""" if self._joint_state_saver is None: diff --git a/i2rt/robots/sim_robot.py b/i2rt/robots/sim_robot.py index 7ea6151..470e3f1 100644 --- a/i2rt/robots/sim_robot.py +++ b/i2rt/robots/sim_robot.py @@ -49,6 +49,8 @@ def __init__( gripper_index: Optional[int] = None, gripper_limits: Optional[np.ndarray] = None, initial_qpos: Optional[np.ndarray] = None, + gravity_comp_factor: Optional[np.ndarray] = None, + control_freq: float = 200.0, ) -> None: self.xml_path = xml_path self._n_dofs = n_dofs @@ -85,6 +87,13 @@ def __init__( self._joint_state: Optional[_SimJointState] = None self._update_joint_state() + self._gravity_comp_factor = gravity_comp_factor if gravity_comp_factor is not None else np.ones(n_dofs) + self._control_freq = control_freq + self._physics_active = False + self._grav_comp_enabled = True + self._stop_event = threading.Event() + self._physics_thread: Optional[threading.Thread] = None + def _cmd_to_mj_qpos(self, cmd: np.ndarray) -> np.ndarray: """Map command-space positions to MuJoCo qpos. @@ -97,6 +106,19 @@ def _cmd_to_mj_qpos(self, cmd: np.ndarray) -> np.ndarray: mj[self._gripper_index] = lo + cmd[self._gripper_index] * (hi - lo) return mj + def _mj_to_cmd_qpos(self, mj: np.ndarray) -> np.ndarray: + """Map MuJoCo qpos back to command-space positions. + + Reverse of ``_cmd_to_mj_qpos``. For the gripper joint the physical + range is mapped back to [0, 1]. + """ + cmd = mj.copy() + if self._gripper_qpos_range is not None and self._gripper_index is not None: + lo, hi = self._gripper_qpos_range + if hi > lo: + cmd[self._gripper_index] = (mj[self._gripper_index] - lo) / (hi - lo) + return cmd + # ---- simulated feedback -------------------------------------------------- def _compute_gravity_torques(self) -> np.ndarray: @@ -135,6 +157,67 @@ def _update_joint_state(self) -> None: timestamp=time.time(), ) + # ---- physics simulation -------------------------------------------------- + + def start_server(self) -> None: + """Start the gravity-compensation physics loop. + + Once running, :meth:`_physics_loop` continuously applies gravity + compensation torques and steps MuJoCo physics. Call + :meth:`enable_gravity_comp` / :meth:`disable_gravity_comp` to + toggle the physics on and off (e.g. when switching between VIS and + CONTROL modes in the viewer). + """ + if self._physics_active: + return + self._physics_active = True + self._stop_event.clear() + self._physics_thread = threading.Thread( + target=self._physics_loop, + daemon=True, + name="sim_physics", + ) + self._physics_thread.start() + + def _physics_loop(self) -> None: + control_dt = 1.0 / self._control_freq + n_substeps = max(1, round(control_dt / self._model.opt.timestep)) + nq = min(self._n_dofs, self._model.nq) + nq_arm = self._gripper_index if self._gripper_index is not None else nq + + next_time = time.time() + control_dt + while not self._stop_event.is_set(): + with self._lock: + if self._grav_comp_enabled: + grav = self._compute_gravity_torques() + grav[:nq_arm] *= self._gravity_comp_factor[:nq_arm] + + self._data.qfrc_applied[:] = 0.0 + self._data.qfrc_applied[:nq_arm] = grav[:nq_arm] + + for _ in range(n_substeps): + mujoco.mj_step(self._model, self._data) + + raw = self._data.qpos[:nq].copy() + self._qpos[:nq] = self._mj_to_cmd_qpos(raw)[:nq] + self._qvel[:nq] = self._data.qvel[:nq] + + self._update_joint_state() + sleep_time = next_time - time.time() + if sleep_time > 0: + time.sleep(sleep_time) + next_time += control_dt + + def enable_gravity_comp(self) -> None: + """Resume gravity-compensation physics from the current pose.""" + with self._lock: + self._grav_comp_enabled = True + + def disable_gravity_comp(self) -> None: + """Pause gravity-compensation physics (CONTROL mode uses teleport).""" + with self._lock: + self._grav_comp_enabled = False + # ---- Robot protocol ------------------------------------------------------ def num_dofs(self) -> int: @@ -149,6 +232,14 @@ def get_joint_state(self) -> Dict[str, np.ndarray]: return {"pos": self._qpos.copy(), "vel": self._qvel.copy()} def command_joint_pos(self, joint_pos: np.ndarray) -> None: + """Command the sim robot to a target joint position. + + When physics is active (sim mode), this disables gravity comp and + teleports ``data.qpos`` directly. Gravity comp stays off for the + duration of control mode and is re-enabled when the caller (e.g. + ``MujocoControlInterface._on_key``) switches back to VIS mode via + ``enable_gravity_comp()``. + """ pos = np.array(joint_pos, dtype=float) if self._joint_limits is not None: arm_end = self._gripper_index if self._gripper_index is not None else len(pos) @@ -164,10 +255,13 @@ def command_joint_pos(self, joint_pos: np.ndarray) -> None: max(self._gripper_limits), ) with self._lock: + if self._physics_active: + self._grav_comp_enabled = False self._qpos = pos n = min(len(pos), self._model.nq) mj_qpos = self._cmd_to_mj_qpos(pos) self._data.qpos[:n] = mj_qpos[:n] + self._data.qvel[:] = 0.0 mujoco.mj_forward(self._model, self._data) self._update_joint_state() @@ -212,7 +306,12 @@ def get_robot_info(self) -> Dict[str, Any]: "gripper_limits": self._gripper_limits, "gripper_index": self._gripper_index, "sim": True, + "gravity_comp_factor": self._gravity_comp_factor, } def close(self) -> None: - pass + if self._physics_active: + self._stop_event.set() + if self._physics_thread is not None: + self._physics_thread.join(timeout=2.0) + self._physics_active = False diff --git a/i2rt/robots/utils.py b/i2rt/robots/utils.py index 6824890..6112b63 100644 --- a/i2rt/robots/utils.py +++ b/i2rt/robots/utils.py @@ -107,6 +107,8 @@ class _ArmHWConfig: kp: np.ndarray # position gain, one per arm joint kd: np.ndarray # damping gain, one per arm joint gravity_comp_factor: np.ndarray # per-joint factor, one per arm joint (6 elements) + grav_comp_kd: np.ndarray # motor MIT-mode kd used only in grav-comp idle mode (6 elements) + coulomb_friction: np.ndarray # per-joint Coulomb friction magnitude (Nm), one per arm joint (6 elements) @lru_cache(maxsize=None) @@ -122,12 +124,19 @@ def _load_arm_config(arm_type: "ArmType") -> _ArmHWConfig: kp = np.array(raw["kp"], dtype=float) kd = np.array(raw["kd"], dtype=float) gravity_comp_factor = np.array(raw["gravity_comp_factor"], dtype=float) + grav_comp_kd = np.array(raw["grav_comp_kd"], dtype=float) + coulomb_friction = np.array(raw["coulomb_friction"], dtype=float) + assert (coulomb_friction >= 0).all(), ( + f"coulomb_friction values must be non-negative in {config_path}: {coulomb_friction}" + ) logger.info(f" motor_list: {motor_list}") logger.info(f" directions: {directions}") logger.info(f" kp: {kp}") logger.info(f" kd: {kd}") logger.info(f" gravity_comp_factor: {gravity_comp_factor}") + logger.info(f" grav_comp_kd: {grav_comp_kd}") + logger.info(f" coulomb_friction: {coulomb_friction}") return _ArmHWConfig( motor_list=motor_list, @@ -135,6 +144,8 @@ def _load_arm_config(arm_type: "ArmType") -> _ArmHWConfig: kp=kp, kd=kd, gravity_comp_factor=gravity_comp_factor, + grav_comp_kd=grav_comp_kd, + coulomb_friction=coulomb_friction, ) diff --git a/i2rt/utils/mujoco_control_interface.py b/i2rt/utils/mujoco_control_interface.py index 19309b3..2bb7f1a 100644 --- a/i2rt/utils/mujoco_control_interface.py +++ b/i2rt/utils/mujoco_control_interface.py @@ -18,6 +18,7 @@ import logging import os import tempfile +import threading import time import xml.etree.ElementTree as ET from enum import Enum, auto @@ -162,13 +163,18 @@ def __init__( ee_site: str = "grasp_site", dt: float = 0.02, log: bool = False, + control_dt: float = 0.005, ): self._robot = robot self._ee_site = ee_site self._dt = dt + self._control_dt = control_dt self._mode = Mode.VIS self._logging = log + self._viewer: Optional[mujoco.viewer.Handle] = None + self._stop_event = threading.Event() + n_dofs = robot.num_dofs() robot_info = robot.get_robot_info() if hasattr(robot, "get_robot_info") else {} self._gripper_index: Optional[int] = robot_info.get("gripper_index") @@ -220,8 +226,9 @@ def from_robot( ee_site: str = "grasp_site", dt: float = 0.02, log: bool = False, + control_dt: float = 0.005, ) -> "MujocoControlInterface": - return cls(robot, robot.xml_path, ee_site, dt, log=log) + return cls(robot, robot.xml_path, ee_site, dt, log=log, control_dt=control_dt) # ---- model construction --------------------------------------------------- @@ -580,106 +587,178 @@ def _log(self) -> None: def _on_key(self, key: int) -> None: if key != 32: # SPACE return - if self._is_sim: - # Sim mode: always stay in CONTROL mode - return - if self._mode is Mode.VIS: - self._mode = Mode.CONTROL - self._sync_mocap_to_ee() - self._set_marker_color(self._CTRL_RGBA) - # Seed sliders with current robot joint positions - self._sync_sliders_to_robot() - print("[control] CONTROL mode — use sliders for per-joint control,") - print("[control] or double-click target + ctrl+drag for IK") - else: - self._mode = Mode.VIS - self._set_marker_color(self._VIS_RGBA) - if isinstance(self._robot, MotorChainRobot): - n = self._robot.num_dofs() - self._robot.update_kp_kd(np.zeros(n), np.zeros(n)) - print("[control] VIS mode — gravity comp, mirroring robot") + viewer = self._viewer + if viewer is None: + return # Fired before run() finished setting up — ignore. + with viewer.lock(): + if self._mode is Mode.VIS: + self._mode = Mode.CONTROL + self._set_marker_color(self._CTRL_RGBA) + if self._is_sim and hasattr(self._robot, "disable_gravity_comp"): + self._robot.disable_gravity_comp() + self._sync_mocap_to_ee() + # Seed sliders with current robot joint positions + self._sync_sliders_to_robot() + print("[control] CONTROL mode — use sliders for per-joint control,") + print("[control] or double-click target + ctrl+drag for IK") + else: + self._mode = Mode.VIS + self._set_marker_color(self._VIS_RGBA) + if self._is_sim and hasattr(self._robot, "enable_gravity_comp"): + self._robot.enable_gravity_comp() + elif isinstance(self._robot, MotorChainRobot): + self._robot.enter_gravity_comp_idle() + print("[control] VIS mode — gravity comp, mirroring robot") # ---- main loop ------------------------------------------------------------ def run(self) -> None: - """Open the viewer and run the vis / control loop.""" + """Open the viewer and run the vis / control loop. + + The viewer runs on the main thread (MuJoCo requirement). A dedicated + daemon thread (:meth:`_control_loop`) owns the user-intent → robot + command pipeline — slider / mocap change detection, IK, self-collision + checks, and ``robot.command_joint_pos(...)`` — so a slow render or a + blocking UI event in ``viewer.sync()`` does not stall commands to the + robot. + """ if self._is_sim: - self._mode = Mode.CONTROL - print("[control] Starting in CONTROL mode (sim)") + if hasattr(self._robot, "start_server"): + self._robot.start_server() + self._mode = Mode.VIS + print("[control] Starting in VIS (gravity comp) mode (sim)") + print("[control] Press SPACE to toggle CONTROL mode") else: print("[control] Starting in VIS (gravity comp) mode") print("[control] Press SPACE to toggle CONTROL mode") + self._stop_event.clear() with mujoco.viewer.launch_passive( self._model, self._data, key_callback=self._on_key, ) as viewer: + self._viewer = viewer viewer.opt.frame = mujoco.mjtFrame.mjFRAME_SITE if self._is_sim: - self._set_marker_color(self._CTRL_RGBA) + self._set_marker_color(self._VIS_RGBA) self._mirror_robot() self._sync_mocap_to_ee() self._sync_sliders_to_robot() + + control_thread = threading.Thread( + target=self._control_loop, + name="mujoco_control", + daemon=True, + ) + control_thread.start() try: while viewer.is_running(): - self._mirror_robot() + with viewer.lock(): + self._mirror_robot() + self._update_button_indicators() + if self._mode is Mode.VIS: + self._sync_mocap_to_ee() if self._logging: self._log() - self._update_button_indicators() - - if self._mode is Mode.VIS: - self._sync_mocap_to_ee() - else: - sliders_moved = self._sliders_changed() - mocap_moved = self._mocap_changed() - - if sliders_moved: - # User moved a slider — command joints directly from slider values - cmd = self._cmd_from_sliders() - n = min(len(cmd), self._nq) - if self._has_self_collision(cmd, n): - if not self._in_collision: - print("[control] Collision detected — command blocked") - self._in_collision = True - else: - self._robot.command_joint_pos(cmd) - logger.info(f"joint command: {cmd}") - if self._in_collision: - print("[control] Collision cleared — commands resumed") - self._in_collision = False - # Update mocap target to follow the new slider FK - self._sync_mocap_to_sliders() - elif mocap_moved: - # User dragged the mocap target — solve IK - target = self._mocap_pose_4x4() - init_q = self._data.qpos[: self._nq].copy() - ok, ik_q = self._kin.ik(target, self._ee_site, init_q=init_q) - if ok: - cmd = self._robot.get_joint_pos().copy() - cmd[: self._n_arm] = ik_q[: self._n_arm] - n = min(len(cmd), self._nq) - if self._has_self_collision(cmd, n): - if not self._in_collision: - print("[control] Collision detected — command blocked") - self._in_collision = True - else: - self._robot.command_joint_pos(cmd) - logger.info(f"IK joint command: {cmd}") - # Update sliders to reflect the IK solution - self._sync_sliders_to_ik(ik_q) - if self._in_collision: - print("[control] Collision cleared — commands resumed") - self._in_collision = False - - # Snapshot for next iteration's change detection - self._prev_ctrl[:] = self._data.ctrl - self._prev_mocap_pos[:] = self._data.mocap_pos[self._mocap_id] - self._prev_mocap_quat[:] = self._data.mocap_quat[self._mocap_id] - viewer.sync() time.sleep(self._dt) except KeyboardInterrupt: pass + finally: + self._stop_event.set() + control_thread.join(timeout=1.0) + self._viewer = None print("[control] Viewer closed") + + def _control_loop(self) -> None: + """Background loop: slider/mocap → IK → collision → command_joint_pos. + + Runs at ``self._control_dt`` cadence, independent of the viewer's + ``self._dt``. Acquires ``viewer.lock()`` only for brief snapshot and + write-back sections; the expensive FK/IK/collision work runs outside + the lock so the viewer thread is never blocked on CPU. + """ + viewer = self._viewer + assert viewer is not None, "_control_loop started before viewer was set" + + while not self._stop_event.is_set() and viewer.is_running(): + if self._mode is not Mode.CONTROL: + time.sleep(self._control_dt) + continue + + with viewer.lock(): + ctrl_snap = np.array(self._data.ctrl, copy=True) + mocap_pos_snap = np.array(self._data.mocap_pos[self._mocap_id], copy=True) + mocap_quat_snap = np.array(self._data.mocap_quat[self._mocap_id], copy=True) + init_q = self._data.qpos[: self._nq].copy() + prev_ctrl = self._prev_ctrl.copy() + prev_mocap_pos = self._prev_mocap_pos.copy() + prev_mocap_quat = self._prev_mocap_quat.copy() + + sliders_moved = not np.allclose(ctrl_snap, prev_ctrl, atol=1e-6) + mocap_moved = not np.allclose(mocap_pos_snap, prev_mocap_pos, atol=1e-6) or not np.allclose( + mocap_quat_snap, prev_mocap_quat, atol=1e-6 + ) + + ik_solution: Optional[np.ndarray] = None # arm-joint IK result + new_mocap_pos: Optional[np.ndarray] = None + new_mocap_quat: Optional[np.ndarray] = None + + if sliders_moved: + cmd = self._robot.get_joint_pos().copy() + nu = min(len(cmd), self._model.nu) + cmd[:nu] = ctrl_snap[:nu] + n = min(len(cmd), self._nq) + if self._has_self_collision(cmd, n): + if not self._in_collision: + print("[control] Collision detected — command blocked") + self._in_collision = True + else: + self._robot.command_joint_pos(cmd) + if self._in_collision: + print("[control] Collision cleared — commands resumed") + self._in_collision = False + q = self._robot_cmd_to_qpos(cmd) + pose = self._kin.fk(q, self._ee_site) + new_mocap_pos = pose[:3, 3].copy() + new_mocap_quat = np.empty(4) + mujoco.mju_mat2Quat(new_mocap_quat, pose[:3, :3].flatten()) + elif mocap_moved: + target = np.eye(4) + target[:3, 3] = mocap_pos_snap + rot = np.empty(9) + mujoco.mju_quat2Mat(rot, mocap_quat_snap) + target[:3, :3] = rot.reshape(3, 3) + ok, ik_q = self._kin.ik(target, self._ee_site, init_q=init_q) + if ok: + cmd = self._robot.get_joint_pos().copy() + cmd[: self._n_arm] = ik_q[: self._n_arm] + n = min(len(cmd), self._nq) + if self._has_self_collision(cmd, n): + if not self._in_collision: + print("[control] Collision detected — command blocked") + self._in_collision = True + else: + self._robot.command_joint_pos(cmd) + if self._in_collision: + print("[control] Collision cleared — commands resumed") + self._in_collision = False + ik_solution = ik_q + + with viewer.lock(): + if ik_solution is not None: + nu = min(len(ik_solution), self._model.nu) + for i in range(nu): + if i == self._gripper_index: + continue + self._data.ctrl[i] = ik_solution[i] + if new_mocap_pos is not None and new_mocap_quat is not None: + self._data.mocap_pos[self._mocap_id] = new_mocap_pos + self._data.mocap_quat[self._mocap_id] = new_mocap_quat + self._prev_ctrl[:] = self._data.ctrl + self._prev_mocap_pos[:] = self._data.mocap_pos[self._mocap_id] + self._prev_mocap_quat[:] = self._data.mocap_quat[self._mocap_id] + + time.sleep(self._control_dt) diff --git a/i2rt/utils/viser_control_interface.py b/i2rt/utils/viser_control_interface.py index eb1d37b..35ce4eb 100644 --- a/i2rt/utils/viser_control_interface.py +++ b/i2rt/utils/viser_control_interface.py @@ -81,6 +81,9 @@ def __init__( self._mesh_local_verts: Dict[int, np.ndarray] = {} self._mesh_local_faces: Dict[int, np.ndarray] = {} + self._check_data = mujoco.MjData(self._model) + self._in_collision = False + @classmethod def from_robot( cls, @@ -103,6 +106,9 @@ def _mirror_robot(self) -> None: mujoco.mj_forward(self._model, self._data) def _denormalize_slide_joints(self, n_set: int) -> None: + self._denormalize_slide_joints_on(self._data, n_set) + + def _denormalize_slide_joints_on(self, data: mujoco.MjData, n_set: int) -> None: """Scale normalised [0,1] slide-joint values to physical range (metres).""" for j in range(self._model.njnt): adr = self._model.jnt_qposadr[j] @@ -110,9 +116,12 @@ def _denormalize_slide_joints(self, n_set: int) -> None: continue if self._model.jnt_type[j] == mujoco.mjtJoint.mjJNT_SLIDE: lo, hi = self._model.jnt_range[j] - self._data.qpos[adr] = lo + self._data.qpos[adr] * (hi - lo) + data.qpos[adr] = lo + data.qpos[adr] * (hi - lo) def _enforce_eq_constraints(self) -> None: + self._enforce_eq_constraints_on(self._data) + + def _enforce_eq_constraints_on(self, data: mujoco.MjData) -> None: """Project qpos to satisfy joint equality constraints (e.g. coupled fingers).""" for i in range(self._model.neq): if self._model.eq_type[i] != mujoco.mjtEq.mjEQ_JOINT: @@ -120,7 +129,49 @@ def _enforce_eq_constraints(self) -> None: adr1 = self._model.jnt_qposadr[self._model.eq_obj1id[i]] adr2 = self._model.jnt_qposadr[self._model.eq_obj2id[i]] coef = self._model.eq_data[i, :5] - self._data.qpos[adr2] = np.polyval(coef[::-1], self._data.qpos[adr1]) + data.qpos[adr2] = np.polyval(coef[::-1], data.qpos[adr1]) + + def _has_self_collision(self, target_q: np.ndarray, n: int) -> bool: + """Return True if *target_q* would cause self-collision. + + Uses a scratch ``MjData`` so the render state is not corrupted. + Contacts involving the ground plane or adjacent (parent-child) bodies + are ignored — only unexpected link-link penetrations count. + """ + self._check_data.qpos[:n] = target_q[:n] + self._denormalize_slide_joints_on(self._check_data, n) + self._enforce_eq_constraints_on(self._check_data) + mujoco.mj_forward(self._model, self._check_data) + for i in range(self._check_data.ncon): + c = self._check_data.contact[i] + if c.dist >= -1e-3: + continue + if ( + self._model.geom_type[c.geom1] == mujoco.mjtGeom.mjGEOM_PLANE + or self._model.geom_type[c.geom2] == mujoco.mjtGeom.mjGEOM_PLANE + ): + continue + b1 = self._model.geom_bodyid[c.geom1] + b2 = self._model.geom_bodyid[c.geom2] + if self._model.body_parentid[b1] == b2 or self._model.body_parentid[b2] == b1: + continue + return True + return False + + def _enter_vis_grav_comp(self) -> None: + """Restore grav-comp on returning to VIS — sim resumes physics, real + clears any lingering PD command.""" + if hasattr(self._robot, "enable_gravity_comp"): + self._robot.enable_gravity_comp() + elif hasattr(self._robot, "enter_gravity_comp_idle"): + self._robot.enter_gravity_comp_idle() + + def _enter_control_grav_comp(self) -> None: + """Pause sim grav-comp so CONTROL mode can teleport. On real, the next + ``command_joint_pos`` implicitly switches to PD — no call needed here.""" + if hasattr(self._robot, "disable_gravity_comp"): + self._robot.disable_gravity_comp() + self._in_collision = False @staticmethod def _mat3_to_wxyz(mat3: np.ndarray) -> np.ndarray: @@ -418,6 +469,7 @@ def _(_: object) -> None: print(f"[viser] Gains applied: kp={new_kp.tolist()}, kd={new_kd.tolist()}") # ---- Main loop ------------------------------------------------------- + prev_controlled = False try: while True: self._mirror_robot() @@ -446,6 +498,15 @@ def _(_: object) -> None: if handle_grip_slider is not None and handle_state is not None: handle_grip_slider.value = float(np.clip(1.0 - float(handle_state.position), 0.0, 1.0)) + mode = state["mode"] + controlled = state["enabled"] and mode in ("ik", "joint") + if controlled != prev_controlled: + if controlled: + self._enter_control_grav_comp() + else: + self._enter_vis_grav_comp() + prev_controlled = controlled + if not state["enabled"]: # Read-only: update sliders to reflect live robot state q = self._robot.get_joint_pos() @@ -455,42 +516,57 @@ def _(_: object) -> None: if gripper_slider is not None and self._gripper_index is not None: gripper_slider.value = float(q[self._gripper_index]) - else: - mode = state["mode"] - - if mode == "vis": - # Mirror only — no commands - q = self._robot.get_joint_pos() - for i, s in enumerate(joint_sliders): - if i < len(q): - s.value = float(np.degrees(q[i])) - - elif mode == "ik": - # Build target from user-dragged transform control - target = np.eye(4) - target[:3, 3] = np.asarray(ik_ctrl.position) - target[:3, :3] = self._wxyz_to_mat3(np.asarray(ik_ctrl.wxyz)) - init_q = self._data.qpos[: self._nq].copy() - _, ik_q = self._kin.ik(target, self._ee_site, init_q=init_q) - cmd = self._robot.get_joint_pos().copy() - cmd[: self._n_arm] = ik_q[: self._n_arm] - if gripper_slider is not None and self._gripper_index is not None: - cmd[self._gripper_index] = float(gripper_slider.value) + elif mode == "vis": + # Mirror only — no commands + q = self._robot.get_joint_pos() + for i, s in enumerate(joint_sliders): + if i < len(q): + s.value = float(np.degrees(q[i])) + + elif mode == "ik": + # Build target from user-dragged transform control + target = np.eye(4) + target[:3, 3] = np.asarray(ik_ctrl.position) + target[:3, :3] = self._wxyz_to_mat3(np.asarray(ik_ctrl.wxyz)) + init_q = self._data.qpos[: self._nq].copy() + _, ik_q = self._kin.ik(target, self._ee_site, init_q=init_q) + cmd = self._robot.get_joint_pos().copy() + cmd[: self._n_arm] = ik_q[: self._n_arm] + if gripper_slider is not None and self._gripper_index is not None: + cmd[self._gripper_index] = float(gripper_slider.value) + n = min(len(cmd), self._nq) + if self._has_self_collision(cmd, n): + if not self._in_collision: + print("[viser] Collision detected — command blocked") + self._in_collision = True + else: self._robot.command_joint_pos(cmd) - # Reflect solved angles in sliders - for i, s in enumerate(joint_sliders): - if i < self._n_arm: - s.value = float(np.degrees(ik_q[i])) - - elif mode == "joint": - # Build command from slider values - cmd = self._robot.get_joint_pos().copy() - for i, s in enumerate(joint_sliders): - if i < self._n_arm: - cmd[i] = float(np.radians(s.value)) - if gripper_slider is not None and self._gripper_index is not None: - cmd[self._gripper_index] = float(gripper_slider.value) + if self._in_collision: + print("[viser] Collision cleared — commands resumed") + self._in_collision = False + # Reflect solved angles in sliders + for i, s in enumerate(joint_sliders): + if i < self._n_arm: + s.value = float(np.degrees(ik_q[i])) + + elif mode == "joint": + # Build command from slider values + cmd = self._robot.get_joint_pos().copy() + for i, s in enumerate(joint_sliders): + if i < self._n_arm: + cmd[i] = float(np.radians(s.value)) + if gripper_slider is not None and self._gripper_index is not None: + cmd[self._gripper_index] = float(gripper_slider.value) + n = min(len(cmd), self._nq) + if self._has_self_collision(cmd, n): + if not self._in_collision: + print("[viser] Collision detected — command blocked") + self._in_collision = True + else: self._robot.command_joint_pos(cmd) + if self._in_collision: + print("[viser] Collision cleared — commands resumed") + self._in_collision = False time.sleep(self._dt)