Skip to content

Commit

Permalink
New path planning approach
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackack committed Mar 7, 2024
1 parent d9a006a commit 0568646
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 131 deletions.
12 changes: 8 additions & 4 deletions rb_ws/src/buggy/scripts/auton/autonsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self,
self_name + "/debug/is_high_covariance", Bool, queue_size=1
)
self.steer_publisher = rospy.Publisher(
self_name + "/input/steering", Float64, queue_size=1
self_name + "/buggy/input/steering", Float64, queue_size=1
)
self.brake_publisher = rospy.Publisher(
self_name + "/input/brake", Float64, queue_size=1
Expand Down Expand Up @@ -157,11 +157,13 @@ def tick_caller(self):
else:
self.planner_tick()

self.cur_traj.current_traj_index = 0

self.local_controller_tick()

self.ticks += 1

if self.ticks >= 20:
if self.ticks >= 10:
self.ticks = 0

self.rosrate.sleep()
Expand Down Expand Up @@ -195,11 +197,13 @@ def local_controller_tick(self):

def planner_tick(self):
with self.lock:
self_pose, _ = self.get_world_pose_and_speed(self.self_odom_msg)
other_pose, other_speed = self.get_world_pose_and_speed(self.other_odom_msg)

# update local trajectory via path planner
self.cur_traj = self.path_planner.compute_traj(
other_pose,
other_speed)
self_pose,
other_pose)
if __name__ == "__main__":
rospy.init_node("auton_system")
parser = argparse.ArgumentParser()
Expand Down
200 changes: 73 additions & 127 deletions rb_ws/src/buggy/scripts/auton/path_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
from trajectory import Trajectory
from world import World
class PathPlanner():
LOOKAHEAD_TIME = 2.0 #s
RESOLUTION = 10 #samples/s

# move the curb towards the center of the course by CURB_MARGIN meters
# for a margin of safety
CURB_MARGIN = 1 #m
# the number of meters behind NAND before we start morphing the trajectory
REAR_MARGIN = 10 #m

# in meters, the number of meters in front of NAND,
# before the morphed trajectory rejoins the nominal trajectory
# WARNING: set this value to be greater than 15m/s * lookahead time (15 m/s is the upper limit
# of NAND speed) Failure to do so can result violent u-turns in the new trajectory.
FRONT_MARGIN = 35 #m
# the offset is calculated as a mirrored sigmoid function of distance
OFFSET_SCALE_CROSS_TRACK = 3 #m
OFFSET_SHIFT_ALONG_TRACK = 3 #m

# number of meters ahead of the buggy to generate local trajectory for
LOCAL_TRAJ_LEN = 20#m

# start generating local trajectory this many meters ahead of current position
LOOKAHEAD = 2#m

# number of points to sample along the nominal trajectory
RESOLUTION = 20

def __init__(self, nominal_traj:Trajectory, left_curb:Trajectory) -> None:
self.occupancy_grid = OccupancyGrid()
Expand All @@ -50,26 +52,20 @@ def __init__(self, nominal_traj:Trajectory, left_curb:Trajectory) -> None:
# TODO: estimate this value based on the curvature of NAND's recent positions
self.other_steering_angle = 0

def offset_func(self, dist) -> float:
return self.OFFSET_SCALE_CROSS_TRACK / (1 + np.exp(-(-0.2 * dist + self.OFFSET_SHIFT_ALONG_TRACK)))

def compute_traj(
self,
self_pose: Pose,
other_pose: Pose, #Currently NAND's location -- To be Changed
other_speed: float) -> Trajectory:
) -> Trajectory:
"""
draw trajectory, such that the section of the
trajectory near NAND is replaced by a new segment:
#1. the nominal trajectory, until REAR_MARGIN behind NAND
#2. passing targets
#3. nominal trajectory, starting at FRONT_MARGIN ahead of NAND
To generate the passing targets, we take NAND's future positions,
shifted along normal of nominal trajectory by PASSING_OFFSET
Since they are shifted by a fixed distance, this approach assumes NAND's
trajectory is similar to that of ego-buggy (short-circuit).
TODO: shift the future positions while taking into account smoothness of the path,
as well as curbs, such that even if NAND's trajectory is different from ego-buggy,
the generated passing targets will be safe.
draw trajectory starting at the current pose and ending at a fixed distance
ahead. For each trajectory point, calculate the required offset from the nominal
trajectory. Most of the time, this deviation will be zero. The offset is a function of
distance to the obstacle, defined in offset_func. The upperbound of the offset is
the distance from the nominal trajectory to the curb.
Args:
other_pose (Pose): Pose containing NAND's easting (x),
Expand All @@ -83,130 +79,80 @@ def compute_traj(
Returns:
Trajectory: new trajectory object for ego-buggy to follow.
"""
# grab slice of nominal trajectory
nominal_idx = self.nominal_traj.get_closest_index_on_path(self_pose.x, self_pose.y)
nominal_dist_along = self.nominal_traj.get_distance_from_index(nominal_idx)

other_idx:float = self.nominal_traj.get_closest_index_on_path(
other_pose.x,
other_pose.y)
nominal_slice = np.empty((self.RESOLUTION, 2))

#other is just NAND, for general purposes consider it other
# grab slice of curb
curb_idx = self.left_curb.get_closest_index_on_path(self_pose.x, self_pose.y)
curb_dist_along = self.left_curb.get_distance_from_index(curb_idx)

left_curb_idx = self.left_curb.get_closest_index_on_path(
other_pose.x,
other_pose.y)
curb_slice = np.empty((self.RESOLUTION, 2))

left_curb_end_idx = self.left_curb.get_index_from_distance(
self.nominal_traj.get_distance_from_index(left_curb_idx) + self.FRONT_MARGIN
)
# get index of the other buggy along the trajetory and convert to distance
other_idx = self.nominal_traj.get_closest_index_on_path(other_pose.x, other_pose.y)
other_dist = self.nominal_traj.get_distance_from_index(other_idx)

new_segment_start_idx:float = self.nominal_traj.get_index_from_distance(
self.nominal_traj.get_distance_from_index(other_idx) - self.REAR_MARGIN
) #Where new path index starts, CONSTANT delta from NAND
for i in range(self.RESOLUTION):
nominal_slice[i, :] = np.array(self.nominal_traj.get_position_by_distance(
nominal_dist_along + i * self.LOCAL_TRAJ_LEN / self.RESOLUTION + self.LOOKAHEAD
))

new_segment_end_idx:float = self.nominal_traj.get_index_from_distance(
self.nominal_traj.get_distance_from_index(other_idx) + self.FRONT_MARGIN
)
curb_slice[i, :] = np.array(self.left_curb.get_position_by_distance(
curb_dist_along + i * self.LOCAL_TRAJ_LEN / self.RESOLUTION + self.LOOKAHEAD
))

# project other buggy path
# TODO: put other buggy command
other_future_poses:list = self.path_projector.project(
other_pose,
self.other_steering_angle,
other_speed,
self.LOOKAHEAD_TIME,
# compute the distance along nominal trajectory between samples and the obstacle
nominal_slice_dists = np.linspace(
nominal_dist_along + self.LOOKAHEAD,
nominal_dist_along + self.LOOKAHEAD + self.LOCAL_TRAJ_LEN,
self.RESOLUTION)

other_poses_idx_along_nominal = np.empty((len(other_future_poses), ))

# indexes of points along the left curb that are closest NAND's future poses
left_curb_idxes = np.empty((len(other_future_poses), ))
nominal_slice_to_other_dist = np.abs(nominal_slice_dists - other_dist)

# TODO: optimize this lookup -- how tho
for i in range(len(other_future_poses)):
other_poses_idx_along_nominal[i] = self.nominal_traj.get_closest_index_on_path(
other_future_poses[i][0],
other_future_poses[i][1],
start_index=other_idx)
# compute distances from the sample points to the curb
nominal_slice_to_curb_dist = np.linalg.norm(curb_slice - nominal_slice)
passing_offsets = self.offset_func(nominal_slice_to_other_dist)

left_curb_idxes[i] = self.left_curb.get_closest_index_on_path(
other_future_poses[i][0],
other_future_poses[i][1],
start_index=left_curb_idx,
end_index=left_curb_end_idx)
# compute signed cross-track distance between NAND and nominal
nominal_to_other = np.array((other_pose.x, other_pose.y)) - \
np.array(self.nominal_traj.get_position_by_index(other_idx))

nominal_traj_unit_normal:np.typing.NDArray = self.nominal_traj.get_unit_normal_by_index(
other_poses_idx_along_nominal
)

nominal_traj_slice = self.nominal_traj.get_position_by_index(
other_poses_idx_along_nominal
)
# dot product with the unit normal to produce the left-positive signed distance
other_normal = self.nominal_traj.get_unit_normal_by_index(np.array(other_idx.ravel()))
other_cross_track_dist = np.sum(
nominal_to_other * other_normal, axis=1)

# here, use passing offsets to weight NAND's cross track signed distance:
# if the sample point is far from SC, the cross track distance doesn't matter
# if the sample point is near, we add cross track distance to the offset,
# such that the resulting offset is adjusted by position of NAND

left_curb_unit_normal = self.left_curb.get_unit_normal_by_index(left_curb_idxes)
passing_offsets = passing_offsets + np.minimum(1, passing_offsets) * other_cross_track_dist

# grab points on the left curb that run next to future trajectory of NAND
left_curb_slice = self.left_curb.get_position_by_index(left_curb_idxes)
# the left curb slice is shifted further "into" the course along the normal of the curb
# to provide a margin of safety
left_curb_slice = left_curb_slice - left_curb_unit_normal * self.CURB_MARGIN
# clamp passing offset distances to distance to the curb
passing_offsets = np.minimum(passing_offsets, nominal_slice_to_curb_dist)

# generate passing targets by taking the midpoint between NAND and the left curb trajectories
# the passing target's offset from the nominal is capped at the distance between nominal
# trajectory and the curb
passing_targets = (left_curb_slice + other_future_poses) / 2

# bound the passing targets by the left curb:
# for curb point C and passing target P, vector P->C dotted with the unit normal of the curb at C
# should be positive. (the unit normal is left-hand). This ensures the passing points are to the right
# of the curb
passing_targets_to_curb = left_curb_slice - passing_targets
# elementwise multiply, then sum along rows = rowise dot product
curb_mask = (np.sum(passing_targets_to_curb * left_curb_unit_normal, axis=1) > 0).reshape(-1, 1)

# to match dimension of passing offsets mask, repeat along row
# in row (x, y) of passing_targets, if (x, y) is to the left of the curb, the corresponding row
# in curb_mask = (false, false). Else, the row is (true, true)
curb_mask = np.repeat(curb_mask, 2, axis=1)

# use this mask to select the curb, if the original passing target is to the left of the curb.
passing_targets = np.where(curb_mask, passing_targets, left_curb_slice)

# bound the passing targets by the nominal trajectory. This means NAND shouldn't "pull"
# short circuit to the right of the nominal trajectory
passing_targets_to_nominal = nominal_traj_slice - passing_targets
nominal_mask = (np.sum(passing_targets_to_nominal * nominal_traj_unit_normal, axis=1) < 0).reshape(-1, 1)
nominal_mask = np.repeat(nominal_mask, 2, axis=1)
passing_targets = np.where(nominal_mask, passing_targets, nominal_traj_slice)
# shift the nominal slice by passing offsets
nominal_normals = self.nominal_traj.get_unit_normal_by_index(
self.nominal_traj.get_index_from_distance(nominal_slice_dists)
)

print(nominal_slice.shape, passing_offsets.shape, nominal_normals.shape)
positions = nominal_slice + (passing_offsets[:, None] * nominal_normals)

pre_slice = self.nominal_traj.positions[:int(new_segment_start_idx), :]
post_slice = self.nominal_traj.positions[int(new_segment_end_idx):, :]
new_path = np.vstack((pre_slice, passing_targets, post_slice))
# prepend current pose
positions = np.vstack((np.array([self_pose.x, self_pose.y]), positions))

# publish passing targets for debugging
for i in range(len(passing_targets)):
for i in range(len(positions)):
reference_navsat = NavSatFix()
ref_gps = World.world_to_gps(passing_targets[i, 0], passing_targets[i, 1])
ref_gps = World.world_to_gps(positions[i, 0], positions[i, 1])
reference_navsat.latitude = ref_gps[0]
reference_navsat.longitude = ref_gps[1]
self.debug_passing_traj_publisher.publish(reference_navsat)

# for debugging:
# publish the first and last point of the part of the original trajectory
# that got spliced out
# reference_navsat = NavSatFix()
# ref_gps = World.world_to_gps(*self.nominal_traj.get_position_by_index(int(new_segment_start_idx)))
# reference_navsat.latitude = ref_gps[0]
# reference_navsat.longitude = ref_gps[1]
# self.debug_splice_pt_publisher.publish(reference_navsat)

# ref_gps = World.world_to_gps(*self.nominal_traj.get_position_by_index(int(new_segment_end_idx)))
# reference_navsat.latitude = ref_gps[0]
# reference_navsat.longitude = ref_gps[1]
# self.debug_splice_pt_publisher.publish(reference_navsat)




# generate new path
return Trajectory(json_filepath=None, positions=new_path)
return Trajectory(json_filepath=None, positions=positions)

0 comments on commit 0568646

Please sign in to comment.