Skip to content

Implemented a basic CLI #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/child-lab-cli/hello.py

This file was deleted.

29 changes: 28 additions & 1 deletion packages/child-lab-cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,31 @@ version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12.7"
dependencies = []
dependencies = [
"click>=8.1.8",
"pyserde>=0.23.0",
"torch>=2.6.0",
"tqdm>=4.67.1",
"viser>=0.2.23",
"child-lab-visualization",
"child-lab-procedures",
"depth-estimation",
"marker-detection",
"transformation-buffer",
"video-io",
]

[tool.uv.sources]
child-lab-procedures = { workspace = true }
child-lab-visualization = { workspace = true }
depth-estimation = { workspace = true }
marker-detection = { workspace = true }
transformation-buffer = { workspace = true }
video-io = { workspace = true }

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project.scripts]
child-lab = "child_lab_cli:child_lab"
28 changes: 28 additions & 0 deletions packages/child-lab-cli/src/child_lab_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import click

from .commands import (
calibrate_camera,
estimate_transformations,
generate_pointcloud,
process,
video,
visualize,
workspace,
)


@click.group('child-lab')
def cli() -> None: ...


cli.add_command(calibrate_camera)
cli.add_command(estimate_transformations)
cli.add_command(generate_pointcloud)
cli.add_command(process)
cli.add_command(video)
cli.add_command(visualize)
cli.add_command(workspace)


def child_lab() -> None:
cli(max_content_width=120)
17 changes: 17 additions & 0 deletions packages/child-lab-cli/src/child_lab_cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .calibrate_camera import calibrate_camera
from .estimate_transformations import estimate_transformations
from .generate_pointcloud import generate_pointcloud
from .process import process
from .video import video
from .visualize import visualize
from .workspace import workspace

__all__ = [
'calibrate_camera',
'estimate_transformations',
'generate_pointcloud',
'process',
'video',
'visualize',
'workspace',
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from pathlib import Path

import click
from child_lab_procedures.calibrate_camera import Configuration, Procedure, VideoIoContext
from marker_detection.chessboard import BoardProperties, VisualizationContext
from serde.yaml import to_yaml
from tqdm import trange
from video_io import Reader, Visualizer, Writer

from child_lab_cli.workspace.model import Workspace


@click.command('calibrate-camera', options_metavar='<options>')
@click.argument('workspace-root', type=Path, metavar='<workspace>')
@click.argument('video-name', type=str, metavar='<video>')
@click.option(
'--square-size',
type=float,
required=True,
help='Board square size in centimeters',
metavar='<square-size>',
)
@click.option(
'--inner-board-corners',
nargs=2,
type=int,
required=True,
help="Number of chessboard's inner corners in rows and columns",
metavar='<inner-shape>',
)
@click.option(
'--max-samples',
type=int,
required=False,
help='Maximal number of board samples to collect',
)
@click.option(
'--max-speed',
type=float,
default=float('inf'),
required=False,
help='Maximal speed the board can move with to be captured, in pixels per second',
)
@click.option(
'--min-distance',
type=float,
default=0.3,
required=False,
help='Minimal distance between new observation and the previous observations to be captured',
)
def calibrate_camera(
workspace_root: Path,
video_name: str,
square_size: float,
inner_board_corners: tuple[int, int],
max_samples: int | None,
max_speed: float,
min_distance: float,
) -> None:
"""
Calibrate the camera using <video> from <workspace> by detecting
inner corners of a chessboard of <inner-shape> with <square-size>.
"""

workspace = Workspace.in_directory(workspace_root)

video_output = workspace.output / 'calibration'
video_output.mkdir(exist_ok=True)
video_destination = (video_output / video_name).with_suffix('.mp4')

calibration_destination = (workspace.calibration / video_name).with_suffix('.yml')

video = next((v for v in workspace.videos() if v.name == video_name), None)
if video is None:
raise click.ClickException(
f'Input video {video_name} not found in {workspace.input}'
)

reader = Reader(video.location)
writer = Writer(
video_destination,
reader.metadata,
Visualizer[VisualizationContext]({'chessboard_draw_corners': True}),
)
video_io_context = VideoIoContext(video_name, reader, writer)

configuration = Configuration(
BoardProperties(square_size, *inner_board_corners),
max_samples,
max_speed,
min_distance,
)

procedure = Procedure(configuration, video_io_context)

progress_bar = trange(
procedure.length_estimate(),
desc='Gathering samples for calibration...',
)

samples = procedure.run(lambda: progress_bar.update())
match samples:
case None:
raise click.ClickException('Procedure has diverged')

case samples:
click.echo('Computing calibration...')
result = samples.calibrate()
calibration_destination.touch()
calibration_destination.write_text(to_yaml(result.calibration))
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from pathlib import Path

import click
from child_lab_procedures.estimate_transformations import (
Configuration,
Procedure,
VideoIoContext,
)
from marker_detection.aruco import (
Dictionary,
RigidModel,
VisualizationContext,
)
from serde.json import to_json
from serde.yaml import from_yaml
from tqdm import trange
from transformation_buffer.rigid_model import Cube
from video_io.calibration import Calibration
from video_io.reader import Reader
from video_io.visualizer import Visualizer
from video_io.writer import Writer

from child_lab_cli.workspace.model import Workspace


@click.command('estimate-transformations')
@click.argument('workspace-root', type=Path, metavar='<workspace>')
@click.argument('video-names', type=str, nargs=-1, metavar='<videos>')
@click.option(
'--marker-dictionary',
type=str,
help='Dictionary to detect markers from',
metavar='<dictionary>',
)
@click.option(
'--marker-size',
type=float,
help='Marker size in centimeters',
metavar='<size>',
)
@click.option(
'--visualize',
type=bool,
is_flag=True,
default=False,
help='Produce videos with visualizations',
)
@click.option(
'--device',
type=str,
required=False,
help='Torch device to use for tensor computations',
)
@click.option(
'--checkpoint',
type=Path,
required=False,
help='File containing a serialized transformation buffer to load and place new transformations in',
)
@click.option(
'--skip',
type=int,
required=False,
help='Seconds of videos to skip at the beginning',
)
def estimate_transformations(
workspace_root: Path,
video_names: list[str],
marker_dictionary: str,
marker_size: float,
visualize: bool,
device: str | None,
checkpoint: Path | None,
skip: int | None,
) -> None:
"""
Estimate mutual poses of cameras using <videos> from <workspace>
by detecting ArUco markers of <size>, from <dictionary>
and save them as a JSON-serialized transformation buffer.
"""

workspace = Workspace.in_directory(workspace_root)

buffer_destination = workspace.transformation / 'buffer.json'

video_output = workspace.output / 'transformation'
video_output.mkdir(exist_ok=True)

video_io_contexts: list[VideoIoContext] = []
calibrated_videos = workspace.calibrated_videos()

for video_name in video_names:
video = next((v for v in calibrated_videos if v.name == video_name), None)

if video is None:
raise click.ClickException(
f'Input video {video_name} not found in {workspace.input}'
)

assert video.calibration.is_file()

calibration = from_yaml(Calibration, video.calibration.read_text())

reader = Reader(video.location)

writer = (
Writer(
(video_output / video.name).with_suffix('.mp4'),
reader.metadata,
Visualizer[VisualizationContext](
{
'intrinsics': calibration.intrinsics_matrix().numpy(),
'marker_draw_masks': True,
'marker_draw_ids': True,
'marker_draw_axes': True,
'marker_draw_angles': True,
'marker_mask_color': (0.0, 1.0, 0.0, 1.0),
'marker_axis_length': 100,
'marker_axis_thickness': 1,
}
),
)
if visualize
else None
)

context = VideoIoContext(
video.name,
calibration,
reader,
writer,
)
video_io_contexts.append(context)

dictionary = Dictionary.parse(marker_dictionary)
assert dictionary is not None

configuration = Configuration(
RigidModel(marker_size, 1.0),
dictionary,
arudice=DEFAULT_ARUDICE,
)

procedure = Procedure(configuration, video_io_contexts)
progress_bar = trange(
procedure.length_estimate(),
desc='Estimating transformations...',
)

result = procedure.run(lambda: progress_bar.update())

match result:
case None:
raise click.ClickException('Procedure has diverged')

case buffer:
buffer_destination.touch()
buffer_destination.write_text(to_json(buffer))
click.echo('Done!')


DEFAULT_ARUDICE = [
Cube[str](
50.0,
('marker_42', 'marker_43', 'marker_44', 'marker_45', 'marker_46', 'marker_47'),
)
]
Loading