-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcozmo_ar.fsm
112 lines (91 loc) · 3.79 KB
/
cozmo_ar.fsm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from cozmo_fsm import *
import requests
import time
import math
BRIDGE_SERVER = "http://localhost:8000"
"""
Node that publishes the pose of the robot. The calling FSM is responsible
for ensuring this node is not called too often (minimum 100-200ms delay recommended)
"""
class RobotPublisherNode(StateNode):
def start(self, event=None):
super().start(event)
x = str(float(pf.pose[0]) / 1000)
y = str(float(pf.pose[1]) / 1000)
theta = str(math.degrees(pf.pose[2]))
visible = "1"
# Send the data
requests.get(f"{BRIDGE_SERVER}/update_obj/cozmo/{x},{y},{theta},{visible}")
"""
Node that publishes the pose of the a worldmap object. The calling FSM is responsible
for ensuring this node is not called too often (minimum 100-200ms delay recommended)
"""
class ObjPublisherNode(StateNode):
def __init__(self, obj, obj_name):
super().__init__()
self.obj = obj
self.obj_name = obj_name
def start(self, event=None):
super().start(event)
try:
obj = self.obj.wm_obj
except:
return # Object not in worldmap yet
x = str(float(obj.x) / 1000)
y = str(float(obj.y) / 1000)
theta = str(math.degrees(obj.theta))
visible = "1" if obj.is_visible else "0"
# Send the data
#print(self.obj_name, x, y, theta, visible)
requests.get(f"{BRIDGE_SERVER}/update_obj/{self.obj_name}/{x},{y},{theta},{visible}")
"""
Node that checks if there is a new waypoint command and sends a data transition if so,
otherwise sends a failure transition
"""
class WaypointCheckerNode(StateNode):
def start(self, event=None):
super().start(event)
res = requests.get(f"{BRIDGE_SERVER}/get_waypoint").text.strip()
if res == "NONE":
self.post_failure()
else:
x = float(res.split(",")[0]) * 1000
y = float(res.split(",")[1]) * 1000
self.post_data(Pose(x, y, 0, angle_z=degrees(math.nan)))
"""
Inform the AR application that the waypoint navigation has completed
"""
class WaypointAcknowledgeNode(StateNode):
def start(self, event=None):
super().start(event)
requests.get(f"{BRIDGE_SERVER}/reset_waypoint")
class cozmo_ar(StateMachineProgram):
$setup {
launcher: StateNode() =N=> {publisher, spinner, waypoint_checker_init}
publisher: RobotPublisherNode() =N=>
ObjPublisherNode(cube1, "cube1") =N=>
ObjPublisherNode(cube2, "cube2") =N=>
ObjPublisherNode(cube3, "cube3") =T(0.1)=> publisher
waypoint_checker_init: WaypointAcknowledgeNode() =N=> waypoint_checker
waypoint_checker: WaypointCheckerNode()
waypoint_checker =F=> waypoint_checker
waypoint_checker =D=> PilotToPose() =C=> WaypointAcknowledgeNode() =N=> waypoint_checker
# Placeholder for real application logic code (for when AR visualization is being used alongside a more complex behavior)
spinner: StateNode() =N=> spinner
}
def __init__(self):
global pf
landmarks = {
'Aruco-1': Pose(35, 0, 0, angle_z=degrees(180)),
'Aruco-2': Pose(35, 100, 0, angle_z=degrees(180)),
#'Aruco-5': Pose(200, 250, 0, angle_z=degrees(180)),
#'Aruco-6': Pose(300, 250, 0, angle_z=degrees(180)),
}
pf = ParticleFilter(robot,
landmarks = landmarks,
num_particles = 1000,
motion_model = DefaultMotionModel(robot, sigma_trans=0.4, sigma_rot=0.06),
sensor_model = ArucoCombinedSensorModel(robot, distance_variance=20)
)
super().__init__(aruco_marker_size=50,
particle_filter=pf)