From 3801cc695e2e7da15a508e418153f0b4b8cd6589 Mon Sep 17 00:00:00 2001 From: Vincent Berenz Date: Wed, 30 Nov 2022 00:56:30 +0100 Subject: [PATCH 1/2] added support to o80_robot_ball files (package o80_pam) --- python/context/ball_trajectories.py | 208 +++++++++++++++++++++++++++- tests/unit_tests.py | 185 +++++++++++++++++++++++-- 2 files changed, 382 insertions(+), 11 deletions(-) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index e88ca64..7e65be7 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -23,7 +23,13 @@ assert int(npt.__version__[0]) >= 2, "Need nptyping >=2." # 3: 3d position , Any: nb of points in trajectory -Trajectory = npt.NDArray[npt.Shape["*, 3"], npt.Float32] +BallTrajectory = npt.NDArray[npt.Shape["*, 3"], npt.Float32] + +# 7: 3d position of ball, and 4d position of robot +BallRobotTrajectory = npt.NDArray[npt.Shape["*, 7"], npt.Float32] + +# we support both BallTrajectory and BallRobotTrajectory +Trajectory = typing.Union[BallTrajectory, BallRobotTrajectory] # List of time stamps, in microseconds TimeStamps = npt.NDArray[ @@ -274,6 +280,206 @@ def overwrite( traj_group.create_dataset(self._TIME_STAMPS, data=time_stamps) traj_group.create_dataset(self._TRAJECTORY, data=positions) + def add_ball_robot_trajectories( + self, group_name: str, o80_robot_ball_path: pathlib.Path + ) -> int: + """ + It is assumed that o80_robot_ball_path is a directory hosting + a collection of files named o80_robot_ball_* that have been generated + by the executable o80_robot_ball_logger (package o80_pam). This function + will parse all these files and add them to the hdf5 under the specified + group name. + + Returns + ------- + The number of trajectories added to the file. + """ + + def _read_trajectory(o80_robot_ball_file: pathlib.Path) -> StampedTrajectory: + + Vector3d = typing.Tuple[float, float, float] + Vector4d = typing.Tuple[float, float, float, float] + + class _Ball: + """ + Information regarding the ball at a given step. + A negative ball_id means the ball was not detected + during this step. + """ + + def __init__( + self, ball_id: int, position: Vector3d, velocity: Vector3d + ): + self.ball_id = ball_id + self.position = position + self.velocity = velocity + + class _Robot: + """ + Information regarding the robot at a given step. + Position in radian and velocity in radian per second. + """ + + def __init__(self, position: Vector4d, velocity: Vector4d): + self.position = position + self.velocity = velocity + + Step = typing.Tuple[int, _Ball, _Robot] + """ + represent the observation at a given step, + the first value being the timestamp (in nanoseconds) + """ + + def _readline(line: str) -> Step: + """ + Parse a line of the file, and returns the corresponding + observation. + """ + entries = eval(line) + timestamp = entries[1][0] + ball_id = entries[0][0] + ball_position = entries[0][2] + ball_velocity = entries[0][3] + ball = _Ball(ball_id, ball_position, ball_velocity) + robot_position = entries[1][1] + robot_velocity = entries[1][2] + robot = _Robot(robot_position, robot_velocity) + return timestamp, ball, robot + + def _interpolation( + nb_steps: int, start: Vector3d, end: Vector3d + ) -> typing.List[Vector3d]: + """ + Returns a list of length nb_steps which + interpolates between start and end + """ + r: typing.List[Vector3d] = [] + one_step = [(e - s) / nb_steps for s, e in zip(start, end)] + for step in range(nb_steps): + position = [s + os * step for s, os in zip(start, one_step)] + r.append(tuple(position)) # type: ignore + return r + + def _trim(steps: typing.List[Step]) -> typing.List[Step]: + """ + Remove the first steps during which the ball has not been + detected (if any). Raise a ValueError if the ball + has not been detected at all. + """ + detection_start = -1 + for index, step in enumerate(steps): + if step[1].ball_id >= 0: + detection_start = index + break + if detection_start < 0: + raise ValueError( + "the ball was not detected at all " + "(ball id negative for all steps)" + ) + return steps[detection_start:] + + def _fix_undetected_ball(steps: typing.List[Step]) -> typing.List[Step]: + """ + For some step, the ball_id is -1, indicating that + the ball was not detected. For these steps, the + position of the ball is incorrect. We replace it + by a value obtained by linearly interpolating between the + previous and next valid steps. + (all first steps with no ball detections are trimmed) + """ + # if the ball is not detected at the start, + # removing the corresponding steps + steps = _trim(steps) + + # if the ball is not detected at the end, + # also removing the corresponding steps + steps.reverse() + steps = _trim(steps) + steps.reverse() + + # computing missing position (via + # linear interpolation) + last_detection = 0 + detecting = True + for index, step in enumerate(steps): + ball = step[1] + if ball.ball_id >= 0: + if not detecting: + values = _interpolation( + index - last_detection, + steps[last_detection][1].position, + ball.position, + ) + for index_, position in enumerate(values): + steps[index_ + last_detection][1].position = position + last_detection = index + detecting = True + else: + detecting = False + + return steps + + # reading the file + with open(o80_robot_ball_file, "r") as f: + lines = f.readlines() + + # parsing the content of the file + steps: typing.List[Step] + steps = [_readline(line) for line in lines] + + # fixing the steps for which the ball + # was not detected (ball_id < 0) + steps = _fix_undetected_ball(steps) + + # casting to stamped trajectory + start_time = steps[0][0] + time_stamps = np.array([(step[0] - start_time) * 1e-3 for step in steps]) + positions = np.array( + [step[1].position + step[2].position for step in steps] + ) + return time_stamps, positions + + def _read_folder(tennicam_path: pathlib.Path) -> StampedTrajectories: + """ + List all the file in tennicam_path that have the tennicam_ prefix, + parse them and returns the corresponding list of stamped trajectories. + """ + files = _list_files(o80_robot_ball_path, prefix="o80_robot_ball_") + stamped_trajectories = [ + _read_trajectory(o80_robot_ball_path / f) for f in files + ] + return stamped_trajectories + + def _save_trajectory( + group: h5py._hl.group.Group, + index: int, + stamped_trajectory: StampedTrajectory, + ): + """ + Create in the group a new subgroup named according to the index + and add to it 2 datasets, "time_stamps" (list of microseconds + time stamps) and "trajectory" (list of corresponding 3d positions) + """ + # creating a new group for this trajectory + traj_group = group.create_group(str(index)) + # adding 2 datasets: time_stamps and positions + time_stamps = stamped_trajectory[0] + positions = stamped_trajectory[1] + traj_group.create_dataset(self._TIME_STAMPS, data=time_stamps) + traj_group.create_dataset(self._TRAJECTORY, data=positions) + + # reading all trajectories present in the directory + stamped_trajectories = _read_folder(o80_robot_ball_path) + + # adding the new group to the hdf5 file + group = self._f.create_group(group_name) + + # adding all trajectories as datasets to this group + for index, stamped_trajectory in enumerate(stamped_trajectories): + _save_trajectory(group, index, stamped_trajectory) + + return len(stamped_trajectories) + def add_tennicam_trajectories( self, group_name: str, tennicam_path: pathlib.Path ) -> int: diff --git a/tests/unit_tests.py b/tests/unit_tests.py index 5999bdc..9893e4b 100644 --- a/tests/unit_tests.py +++ b/tests/unit_tests.py @@ -1,4 +1,6 @@ import h5py +import typing +import tempfile import pathlib import pytest import numpy as np @@ -18,6 +20,9 @@ _NB_TENNICAMS = 2 _TENNICAM_FILES = ["tennicam_{}".format(i) for i in range(_NB_TENNICAMS)] _TENNICAM_GROUP = "tennicam" +_NB_BALL_ROBOTS = 2 +_BALL_ROBOT_FILES = ["o80_robot_ball_{}".format(i) for i in range(_NB_TENNICAMS)] +_BALL_ROBOT_GROUP = "o80_robot_ball" _HDF5 = "test.hdf5" @@ -64,13 +69,47 @@ def tennicam_trajectory(duration_trajectory: bt.DurationTrajectory) -> str: ball_ids, time_stamps, positions, velocities ) ] + return "\n".join([repr(e) for e in entries]) + +@pytest.fixture +def ball_robot_trajectory(duration_trajectory: bt.DurationTrajectory) -> str: + """ + create the string representation of a ball / robot trajectory + in the format of files generated by o80_pam/o80_robot_ball_logger. + """ + + size = len(duration_trajectory[0]) + ball_ids = [i for i in range(size)] + stamped_trajectory = bt.to_stamped_trajectory(duration_trajectory) + time_stamps = list(stamped_trajectory[0]) + ball_positions = list([list(d) for d in duration_trajectory[1]]) + ball_velocities = list([list(d) for d in duration_trajectory[1]]) + robot_positions = [bp + [0.0] for bp in ball_positions] + robot_velocities = [bv + [0.0] for bv in ball_velocities] + entries = [] + for index in range(len(ball_ids)): + ball_id = ball_ids[index] + time_stamp = time_stamps[index] * 1e3 + ball_position = list(ball_positions[index]) + ball_velocity = list(ball_velocities[index]) + robot_position = list(robot_positions[index]) + robot_velocity = list(robot_velocities[index]) + entries.append( + ( + (ball_id, time_stamp, ball_position, ball_velocity), + (time_stamp, robot_position, robot_velocity), + ) + ) return "\n".join([repr(e) for e in entries]) @pytest.fixture def working_directory( - tmp_path: pathlib.Path, json_trajectory: str, tennicam_trajectory: str + tmp_path: pathlib.Path, + json_trajectory: str, + tennicam_trajectory: str, + ball_robot_trajectory: str, ) -> pathlib.Path: """ Generate some json and tennicam files, as well as an @@ -81,6 +120,8 @@ def working_directory( tennicams = [tmp_path / tf for tf in _TENNICAM_FILES] + ball_robots = [tmp_path / br for br in _BALL_ROBOT_FILES] + for jf in jsons: with open(jf, "w") as f: f.write(json_trajectory) @@ -89,6 +130,10 @@ def working_directory( with open(tf, "w") as f: f.write(tennicam_trajectory) + for br in ball_robots: + with open(br, "w") as f: + f.write(ball_robot_trajectory) + hdf5_file = tmp_path / _HDF5 with h5py.File(hdf5_file, "w"): pass @@ -109,6 +154,7 @@ def loaded_hdf5(working_directory) -> pathlib.Path: with bt.MutableRecordedBallTrajectories(path=hdf5_file) as rbt: rbt.add_json_trajectories(_JSON_GROUP, working_directory, _SAMPLING_RATE * 1e6) rbt.add_tennicam_trajectories(_TENNICAM_GROUP, working_directory) + rbt.add_ball_robot_trajectories(_BALL_ROBOT_GROUP, working_directory) return hdf5_file @@ -185,18 +231,13 @@ def test_add_trajectories( ): """ Test a hdf5 file can be updated via an instance of RecordedBallTrajectories - with trajectories stored in json files or tennicam files. + with trajectories stored in json files or tennicam files or o80_robot_ball files. """ stamped_trajectory = bt.to_stamped_trajectory(duration_trajectory) ref_time_stamps = stamped_trajectory[0] ref_trajectory_size = len(ref_time_stamps) - print() - print(duration_trajectory[0][:10]) - print(ref_time_stamps[:10]) - print() - hdf5_path = working_directory / _HDF5 group_name = formatting @@ -207,6 +248,9 @@ def test_add_trajectories( group_name, working_directory, _SAMPLING_RATE ) expected_size = _NB_JSONS + elif formatting == _BALL_ROBOT_GROUP: + nb_added = rbt.add_ball_robot_trajectories(group_name, working_directory) + expected_size = _NB_BALL_ROBOTS else: nb_added = rbt.add_tennicam_trajectories(group_name, working_directory) expected_size = _NB_TENNICAMS @@ -220,12 +264,17 @@ def test_add_trajectories( time_stamps = trajectory[0] positions = trajectory[1] assert time_stamps.shape == (ref_trajectory_size,) - assert positions.shape == (ref_trajectory_size, 3) + assert positions.shape == (ref_trajectory_size, 3) or positions.shape == ( + ref_trajectory_size, + 7, + ) assert list(time_stamps) == pytest.approx(list(ref_time_stamps), abs=1) np.testing.assert_almost_equal(trajectory[1], stamped_trajectory[1]) -@pytest.mark.parametrize("formatting", [_JSON_GROUP, _TENNICAM_GROUP]) +@pytest.mark.parametrize( + "formatting", [_JSON_GROUP, _TENNICAM_GROUP, _BALL_ROBOT_GROUP] +) def test_ball_trajectories( loaded_hdf5: pathlib.Path, duration_trajectory: bt.DurationTrajectory, @@ -238,6 +287,9 @@ def test_ball_trajectories( if formatting == _JSON_GROUP: ball_trajectories = bt.BallTrajectories(_JSON_GROUP, loaded_hdf5) expected_size = _NB_JSONS + elif formatting == _BALL_ROBOT_GROUP: + ball_trajectories = bt.BallTrajectories(_BALL_ROBOT_GROUP, loaded_hdf5) + expected_size = _NB_BALL_ROBOTS else: ball_trajectories = bt.BallTrajectories(_TENNICAM_GROUP, loaded_hdf5) expected_size = _NB_TENNICAMS @@ -256,4 +308,117 @@ def test_ball_trajectories( time_stamps = stamped_trajectory[0] positions = stamped_trajectory[1] assert time_stamps.shape == (len(duration_trajectory[0]),) - assert positions.shape == (len(duration_trajectory[0]), 3) + assert positions.shape == (len(duration_trajectory[0]), 3) or positions.shape == ( + len(duration_trajectory[0]), + 7, + ) + + +def test_ball_undetected_fix(): + """ + Testing BallTrajectories manages correctly steps with + undetected ball + """ + + robot_position = (0.0,) * 4 + robot_velocity = (0.0,) * 4 + ball_velocity = (0.1,) * 4 + unknown = (-100.0,) * 3 + + entries = [] + + entries.append(((-1, 1e3, unknown, unknown), (1e3, robot_position, robot_velocity))) + entries.append(((-1, 2e3, unknown, unknown), (2e3, robot_position, robot_velocity))) + entries.append(((-1, 3e3, unknown, unknown), (3e3, robot_position, robot_velocity))) + entries.append(((-1, 4e3, unknown, unknown), (4e3, robot_position, robot_velocity))) + entries.append( + ( + (1, 4e3, (0.0, 0.9, 0.0), ball_velocity), + (4e3, robot_position, robot_velocity), + ) + ) + entries.append( + ( + (1, 5e3, (0.0, 1.0, 0.0), ball_velocity), + (5e3, robot_position, robot_velocity), + ) + ) + entries.append(((-1, 6e3, unknown, unknown), (6e3, robot_position, robot_velocity))) + entries.append(((-1, 7e3, unknown, unknown), (7e3, robot_position, robot_velocity))) + entries.append(((-1, 8e3, unknown, unknown), (8e3, robot_position, robot_velocity))) + entries.append(((-1, 9e3, unknown, unknown), (9e3, robot_position, robot_velocity))) + entries.append( + ( + (1, 10e3, (0.0, 1.5, 0.0), ball_velocity), + (10e3, robot_position, robot_velocity), + ) + ) + entries.append( + ((-1, 11e3, unknown, unknown), (11e3, robot_position, robot_velocity)) + ) + entries.append( + ((-1, 12e3, unknown, unknown), (12e3, robot_position, robot_velocity)) + ) + entries.append( + ((-1, 13e3, unknown, unknown), (13e3, robot_position, robot_velocity)) + ) + entries.append( + ((-1, 14e3, unknown, unknown), (14e3, robot_position, robot_velocity)) + ) + entries.append( + ( + (1, 15e3, (0.0, 1.5, 2.0), ball_velocity), + (15e3, robot_position, robot_velocity), + ) + ) + entries.append( + ( + (1, 16e3, (0.0, 1.5, 2.2), ball_velocity), + (16e3, robot_position, robot_velocity), + ) + ) + entries.append( + ( + (1, 17e3, (0.0, 1.5, 2.4), ball_velocity), + (17e3, robot_position, robot_velocity), + ) + ) + entries.append( + ((-1, 18e3, unknown, unknown), (18e3, robot_position, robot_velocity)) + ) + entries.append( + ((-1, 19e3, unknown, unknown), (19e3, robot_position, robot_velocity)) + ) + + file_content = "\n".join([repr(e) for e in entries]) + + with tempfile.TemporaryDirectory() as tmp_dir_: + + tmp_dir = pathlib.Path(tmp_dir_) + + hdf5file = tmp_dir / "records.hdf5" + with h5py.File(hdf5file, "w"): + pass + + filename = "o80_robot_ball_001" + filepath = tmp_dir / filename + with open(filepath, "w") as f: + f.write(file_content) + + group_name = "robot_ball" + + records = bt.MutableRecordedBallTrajectories(hdf5file) + nb_records = records.add_ball_robot_trajectories(group_name, tmp_dir) + assert nb_records == 1 + + trajectory = records.get_stamped_trajectory(group_name, 0) + # -6: first 4 and last 2 entries of entries are undetected ball + # and should have been trimmed + assert trajectory[1].shape == (len(entries) - 6, 7) + + def assert_almost_equal(t1: typing.Iterable, t2: typing.Iterable, thres=1e-4): + for v1, v2 in zip(t1, t2): + v1 == pytest.approx(v2, thres) + + assert_almost_equal(trajectory[1][3], (0.0, 1.2, 0.0, 0.0, 0.0, 0.0, 0.0)) + assert_almost_equal(trajectory[1][9], (0.0, 1.5, 1.2, 0.0, 0.0, 0.0, 0.0)) From 44a30bf4ac172d9444973f6a40d943d67a8f8a9e Mon Sep 17 00:00:00 2001 From: Vincent Berenz Date: Fri, 23 Dec 2022 10:32:16 +0100 Subject: [PATCH 2/2] adding ball/robot trajectories --- bin/pam_ball_trajectories.py | 28 ++++++++++++-- python/context/ball_trajectories.py | 57 ++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/bin/pam_ball_trajectories.py b/bin/pam_ball_trajectories.py index 50e1154..f7088dd 100755 --- a/bin/pam_ball_trajectories.py +++ b/bin/pam_ball_trajectories.py @@ -61,6 +61,15 @@ def _add_json(hdf5_path: pathlib.Path, group_name: str, sampling: int): logging.info("added {} trajectories".format(nb_added)) +def _add_ball_robot(hdf5_path: pathlib.Path, group_name: str): + logging.info("recording trajectories in {}".format(hdf5_path)) + with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt: + if group_name in rbt.get_groups(): + raise ValueError("group {} already present in the file") + nb_added = rbt.add_ball_robot_trajectories(group_name, pathlib.Path.cwd()) + logging.info("added {} trajectories".format(nb_added)) + + def _add_tennicam(hdf5_path: pathlib.Path, group_name: str): logging.info("recording trajectories in {}".format(hdf5_path)) with bt.MutableRecordedBallTrajectories(path=hdf5_path) as rbt: @@ -131,6 +140,18 @@ def run(): help="record sampling rate, in microseconds (int)", ) + # for adding the json files of the current folder + # to a hdf5 trajectory file + add_ball_robot = subparser.add_parser( + "add-robot-ball", + help="""for saving in a new group all o80_robot_ball_* trajectories present in the current + directory + """, + ) + add_ball_robot.add_argument( + "--group", type=str, required=True, help="the group of trajectories" + ) + # for adding the tennicam files of the current folder # to a hdf5 trajectory file add_tennicam = subparser.add_parser( @@ -181,6 +202,9 @@ def run(): elif args.command == "add-json": _add_json(hdf5_path, args.group, args.sampling_rate_us) + elif args.command == "add-robot-ball": + _add_ball_robot(hdf5_path, args.group) + elif args.command == "add-tennicam": _add_tennicam(hdf5_path, args.group) @@ -195,10 +219,6 @@ def run(): logging.basicConfig(level=logging.INFO) - # try: run() - # except Exception as e: - # logging.error("failed with error: {}".format(e)) - # sys.exit(1) sys.exit(0) diff --git a/python/context/ball_trajectories.py b/python/context/ball_trajectories.py index 7e65be7..45c7295 100644 --- a/python/context/ball_trajectories.py +++ b/python/context/ball_trajectories.py @@ -10,6 +10,7 @@ import nptyping as npt import random +import copy import math import pathlib import h5py @@ -89,6 +90,9 @@ def to_stamped_trajectory(input: DurationTrajectory) -> StampedTrajectory: return stamps, positions +def _p(l): + return " ".join([f"{abs(v):.2f}" for v in l]) + def to_duration_trajectory(input: StampedTrajectory) -> DurationTrajectory: """ Converts a StampedTrajectories to a DurationTrajectory. @@ -222,6 +226,17 @@ def get_stamped_trajectories( for index in indexes } + def get_attributes( + self, group: str, index: int + )->typing.Dict[str,typing.Union[str,int,float]]: + """ + Returns all the attributes attached to the trajectory + """ + r = {} + for key,value in self._f[group].attrs.items(): + r[key]=value + return r + def close(self): """ Close the hdf5 file @@ -265,6 +280,17 @@ def rm_group(self, group: str) -> None: raise KeyError("No such group: {}".format(group)) del self._f[group] + def add_attribute( + self, + group: str, index: int, + key: str, value: typing.Union[str,int,float] + )->None: + """ + Add an attribute to the trajectory + """ + group: h5py._hl.group.Group = self._f[group] + group.attrs[key]=value + def overwrite( self, group: str, index: int, stamped_trajectory: StampedTrajectory ) -> None: @@ -378,6 +404,19 @@ def _trim(steps: typing.List[Step]) -> typing.List[Step]: ) return steps[detection_start:] + def _recompute_ball_velocity(steps: typing.List[Step])->None: + for current,following in zip(steps,steps[1:]): + time_diff = (following[0]-current[0])*1e-9 + following[1].velocity = [ + (fp-cp)/time_diff for cp,fp + in zip(current[1].position,following[1].position) + ] + + def _ball_id_duplicates(steps: typing.List[Step]) -> None: + for s1,s2 in zip(steps,steps[1:]): + if s1[1].ball_id == s2[1].ball_id: + s2[1].ball_id = -1 + def _fix_undetected_ball(steps: typing.List[Step]) -> typing.List[Step]: """ For some step, the ball_id is -1, indicating that @@ -391,12 +430,19 @@ def _fix_undetected_ball(steps: typing.List[Step]) -> typing.List[Step]: # removing the corresponding steps steps = _trim(steps) + # sometimes there is the same ball id for two steps in a row, + # i.e. new ball information did not come up yet during recording + # setting these ball id to -1 + _ball_id_duplicates(steps) + # if the ball is not detected at the end, # also removing the corresponding steps steps.reverse() steps = _trim(steps) steps.reverse() + init_steps = copy.deepcopy(steps) + # computing missing position (via # linear interpolation) last_detection = 0 @@ -416,7 +462,10 @@ def _fix_undetected_ball(steps: typing.List[Step]) -> typing.List[Step]: detecting = True else: detecting = False - + + # after computing the position, fixing the velocity + _recompute_ball_velocity(steps) + return steps # reading the file @@ -429,14 +478,18 @@ def _fix_undetected_ball(steps: typing.List[Step]) -> typing.List[Step]: # fixing the steps for which the ball # was not detected (ball_id < 0) + # (fixing: performing linear interpolation to fill the gaps) steps = _fix_undetected_ball(steps) # casting to stamped trajectory start_time = steps[0][0] time_stamps = np.array([(step[0] - start_time) * 1e-3 for step in steps]) positions = np.array( - [step[1].position + step[2].position for step in steps] + [list(step[1].position) + list(step[2].position) for step in steps] ) + + to_duration_trajectory((time_stamps, positions)) + return time_stamps, positions def _read_folder(tennicam_path: pathlib.Path) -> StampedTrajectories: