-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathline_follower.py
127 lines (98 loc) · 4.91 KB
/
line_follower.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import rospy
import numpy as np
import tf.transformations as tf
from sensor_msgs.msg import Image
from geometry_msgs.msg import Point, TwistStamped, Vector3, PoseStamped
from std_msgs.msg import Float64, Bool, String
from mavros_msgs.msg import PositionTarget
class LineFollower:
def __init__(self, debug=False, desired_drone_position=0.0, desired_angle = 0.0, default_velocity=0.25) -> None:
#attributes
self.debug = debug
self.y_error = Float64()
self.angle_error = Float64()
self.line_detected = Bool()
self.desired_drone_position = Float64()
self.desired_angle = Float64()
self.setpoint = PositionTarget()
self.velocity_setpoint = Vector3()
self.default_velocity = default_velocity
self.current_yaw = 0.0
self.desired_drone_position.data = desired_drone_position
self.desired_angle.data = desired_angle
# ROS node
rospy.init_node('line_follower')
if self.debug:
rospy.loginfo("Line follower node started")
# Subscribers
rospy.Subscriber('/sky_vision/down_cam/line/pose', Point, self.line_pose_callback)
rospy.Subscriber('/drone/control_effort', Float64, self.drone_ajustment_callback)
rospy.Subscriber('/angle/control_effort', Float64, self.angle_ajustment_callback)
rospy.Subscriber('/mavros/local_position/pose', PoseStamped, self.drone_pose_callback)
if self.debug:
rospy.loginfo("Subscribers initialized")
# Publishers
self.setpoint_drone_pub = rospy.Publisher('/drone/setpoint', Float64, queue_size=10)
self.current_drone_pub = rospy.Publisher('/drone/state', Float64, queue_size=10)
self.setpoint_angle_pub = rospy.Publisher('/angle/setpoint', Float64, queue_size=10)
self.current_angle_pub = rospy.Publisher('/angle/state', Float64, queue_size=10)
self.setpoint_pub = rospy.Publisher('/mavros/setpoint_raw/local', PositionTarget, queue_size=10)
self.type_pub = rospy.Publisher('/sky_vision/down_cam/type', String, queue_size=10)
if self.debug:
rospy.loginfo("Publishers initialized")
def drone_pose_callback(self, msg) -> None:
_,_,self.current_yaw = tf.euler_from_quaternion([msg.pose.orientation.x, msg.pose.orientation.y, msg.pose.orientation.z, msg.pose.orientation.w])
if self.debug:
rospy.loginfo(f"yaw: {self.current_yaw}")
def line_pose_callback(self, msg) -> None:
self.drone_error = msg.x
self.angle_error = msg.y
self.line_detected = msg.z
if self.debug:
rospy.loginfo(f"Line detected: {self.line_detected}, Y error: {self.y_error}, Angle error: {self.angle_error}")
# send setpoint
self.setpoint_drone_pub.publish(self.desired_drone_position)
self.setpoint_angle_pub.publish(self.desired_angle)
# send current state
self.current_drone_pub.publish(self.drone_error)
self.current_angle_pub.publish(self.angle_error)
def drone_ajustment_callback(self, msg) -> None:
if self.debug:
rospy.loginfo("Drone ajustment callback")
self.velocity_setpoint.x = 0
self.velocity_setpoint.y = 0
self.velocity_setpoint.z = 0
self.setpoint.velocity = self.velocity_setpoint
def angle_ajustment_callback(self, msg) -> None:
if self.debug:
rospy.loginfo(f"Angle ajustment callback, angle: {msg.data}")
self.setpoint.yaw = (-msg.data + self.current_yaw) #angle in radians
#rospy.loginfo(f"curr_yar:{self.current_yaw} | yaw:{self.setpoint.yaw} | msg.data:{msg.data}")
#if abs(msg.data ) > 0.002:
self.publish_setpoint()
def publish_setpoint(self) -> None:
self.setpoint.header.stamp = rospy.Time.now()
self.setpoint.header.frame_id = "base_footprint"
self.setpoint.type_mask = PositionTarget.IGNORE_PX | PositionTarget.IGNORE_PY | PositionTarget.IGNORE_PZ | PositionTarget.IGNORE_AFX | PositionTarget.IGNORE_AFY | PositionTarget.IGNORE_AFZ | PositionTarget.IGNORE_YAW_RATE
self.setpoint.coordinate_frame = PositionTarget.FRAME_BODY_NED
if self.debug:
rospy.logwarn("Publishing setpoint")
rospy.loginfo(f"velocity: x: {self.setpoint.velocity.x}, y ={self.setpoint.velocity.y} | yaw: {self.setpoint.yaw * 180 / np.pi} degrees")
self.setpoint_pub.publish(self.setpoint)
def start_following(self, val) -> None:
msg = String()
msg.data = "line" if val else " "
self.type_pub.publish(msg)
def main():
follower = LineFollower(debug=True)
rate = rospy.Rate(0.0001)
while not rospy.is_shutdown():
try:
follower.start_following(True)
rate.sleep()
except rospy.ROSInterruptException:
pass
#rospy.spin()
if __name__ == '__main__':
main()