-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathabb.py
executable file
·357 lines (307 loc) · 12.2 KB
/
abb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
'''
Michael Dawson-Haggerty
abb.py: contains classes and support functions which interact with an ABB Robot running our software stack (RAPID code module SERVER)
For functions which require targets (XYZ positions with quaternion orientation),
targets can be passed as [[XYZ], [Quats]] OR [XYZ, Quats]
'''
import socket
import json
import time
import inspect
from threading import Thread
from collections import deque
import logging
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
class Robot:
def __init__(self,
ip = '192.168.125.1',
port_motion = 5000,
port_logger = 5001):
self.delay = .08
self.connect_motion((ip, port_motion))
#log_thread = Thread(target = self.get_net,
# args = ((ip, port_logger))).start()
self.set_units('millimeters', 'degrees')
self.set_tool()
self.set_workobject()
self.set_speed()
self.set_zone()
def connect_motion(self, remote):
log.info('Attempting to connect to robot motion server at %s', str(remote))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(2.5)
self.sock.connect(remote)
self.sock.settimeout(None)
log.info('Connected to robot motion server at %s', str(remote))
def connect_logger(self, remote, maxlen=None):
self.pose = deque(maxlen=maxlen)
self.joints = deque(maxlen=maxlen)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(remote)
s.setblocking(1)
try:
while True:
data = map(float, s.recv(4096).split())
#result =
if int(data[1]) == 0:
self.pose.append([data[2:5], data[5:]])
#elif int(data[1]) == 1: self.joints.append([a[2:5], a[5:]])
finally:
s.shutdown(socket.SHUT_RDWR)
def set_units(self, linear, angular):
units_l = {'millimeters': 1.0,
'meters' : 1000.0,
'inches' : 25.4}
units_a = {'degrees' : 1.0,
'radians' : 57.2957795}
self.scale_linear = units_l[linear]
self.scale_angle = units_a[angular]
def set_cartesian(self, pose):
'''
Executes a move immediately from the current pose,
to 'pose', with units of millimeters.
'''
msg = "01 " + self.format_pose(pose)
return self.send(msg)
def set_joints(self, joints):
'''
Executes a move immediately, from current joint angles,
to 'joints', in degrees.
'''
if len(joints) <> 6: return False
msg = "02 "
for joint in joints: msg += format(joint*self.scale_angle, "+08.2f") + " "
msg += "#"
return self.send(msg)
def get_cartesian(self):
'''
Returns the current pose of the robot, in millimeters
'''
msg = "03 #"
data = self.send(msg).split()
r = [float(s) for s in data]
return [r[2:5], r[5:9]]
def get_joints(self):
'''
Returns the current angles of the robots joints, in degrees.
'''
msg = "04 #"
data = self.send(msg).split()
return [float(s) / self.scale_angle for s in data[2:8]]
def get_external_axis(self):
'''
If you have an external axis connected to your robot controller
(such as a FlexLifter 600, google it), this returns the joint angles
'''
msg = "05 #"
data = self.send(msg).split()
return [float(s) for s in data[2:8]]
def get_robotinfo(self):
'''
Returns a robot- unique string, with things such as the
robot's model number.
Example output from and IRB 2400:
['24-53243', 'ROBOTWARE_5.12.1021.01', '2400/16 Type B']
'''
msg = "98 #"
data = str(self.send(msg))[5:].split('*')
log.debug('get_robotinfo result: %s', str(data))
return data
def set_tool(self, tool=[[0,0,0], [1,0,0,0]]):
'''
Sets the tool centerpoint (TCP) of the robot.
When you command a cartesian move,
it aligns the TCP frame with the requested frame.
Offsets are from tool0, which is defined at the intersection of the
tool flange center axis and the flange face.
'''
msg = "06 " + self.format_pose(tool)
self.send(msg)
self.tool = tool
def load_json_tool(self, file_obj):
if file_obj.__class__.__name__ == 'str':
file_obj = open(filename, 'rb');
tool = check_coordinates(json.load(file_obj))
self.set_tool(tool)
def get_tool(self):
log.debug('get_tool returning: %s', str(self.tool))
return self.tool
def set_workobject(self, work_obj=[[0,0,0],[1,0,0,0]]):
'''
The workobject is a local coordinate frame you can define on the robot,
then subsequent cartesian moves will be in this coordinate frame.
'''
msg = "07 " + self.format_pose(work_obj)
self.send(msg)
def set_speed(self, speed=[100,50,50,50]):
'''
speed: [robot TCP linear speed (mm/s), TCP orientation speed (deg/s),
external axis linear, external axis orientation]
'''
if len(speed) <> 4: return False
msg = "08 "
msg += format(speed[0], "+08.1f") + " "
msg += format(speed[1], "+08.2f") + " "
msg += format(speed[2], "+08.1f") + " "
msg += format(speed[3], "+08.2f") + " #"
self.send(msg)
def set_zone(self,
zone_key = 'z1',
point_motion = False,
manual_zone = []):
zone_dict = {'z0' : [.3,.3,.03],
'z1' : [1,1,.1],
'z5' : [5,8,.8],
'z10' : [10,15,1.5],
'z15' : [15,23,2.3],
'z20' : [20,30,3],
'z30' : [30,45,4.5],
'z50' : [50,75,7.5],
'z100': [100,150,15],
'z200': [200,300,30]}
'''
Sets the motion zone of the robot. This can also be thought of as
the flyby zone, AKA if the robot is going from point A -> B -> C,
how close do we have to pass by B to get to C
zone_key: uses values from RAPID handbook (stored here in zone_dict)
with keys 'z*', you should probably use these
point_motion: go to point exactly, and stop briefly before moving on
manual_zone = [pzone_tcp, pzone_ori, zone_ori]
pzone_tcp: mm, radius from goal where robot tool centerpoint
is not rigidly constrained
pzone_ori: mm, radius from goal where robot tool orientation
is not rigidly constrained
zone_ori: degrees, zone size for the tool reorientation
'''
if point_motion:
zone = [0,0,0]
elif len(manual_zone) == 3:
zone = manual_zone
elif zone_key in zone_dict.keys():
zone = zone_dict[zone_key]
else: return False
msg = "09 "
msg += str(int(point_motion)) + " "
msg += format(zone[0], "+08.4f") + " "
msg += format(zone[1], "+08.4f") + " "
msg += format(zone[2], "+08.4f") + " #"
self.send(msg)
def buffer_add(self, pose):
'''
Appends single pose to the remote buffer
Move will execute at current speed (which you can change between buffer_add calls)
'''
msg = "30 " + self.format_pose(pose)
self.send(msg)
def buffer_set(self, pose_list):
'''
Adds every pose in pose_list to the remote buffer
'''
self.clear_buffer()
for pose in pose_list:
self.buffer_add(pose)
if self.buffer_len() == len(pose_list):
log.debug('Successfully added %i poses to remote buffer',
len(pose_list))
return True
else:
log.warn('Failed to add poses to remote buffer!')
self.clear_buffer()
return False
def clear_buffer(self):
msg = "31 #"
data = self.send(msg)
if self.buffer_len() <> 0:
log.warn('clear_buffer failed! buffer_len: %i', self.buffer_len())
raise NameError('clear_buffer failed!')
return data
def buffer_len(self):
'''
Returns the length (number of poses stored) of the remote buffer
'''
msg = "32 #"
data = self.send(msg).split()
return int(float(data[2]))
def buffer_execute(self):
'''
Immediately execute linear moves to every pose in the remote buffer.
'''
msg = "33 #"
return self.send(msg)
def set_external_axis(self, axis_unscaled=[-550,0,0,0,0,0]):
if len(axis_values) <> 6: return False
msg = "34 "
for axis in axis_values:
msg += format(axis, "+08.2f") + " "
msg += "#"
return self.send(msg)
def move_circular(self, pose_onarc, pose_end):
'''
Executes a movement in a circular path from current position,
through pose_onarc, to pose_end
'''
msg_0 = "35 " + self.format_pose(pose_onarc)
msg_1 = "36 " + self.format_pose(pose_end)
data = self.send(msg_0).split()
if data[1] <> '1':
log.warn('move_circular incorrect response, bailing!')
return False
return self.send(msg_1)
def set_dio(self, value, id=0):
'''
A function to set a physical DIO line on the robot.
For this to work you're going to need to edit the RAPID function
and fill in the DIO you want this to switch.
'''
msg = '97 ' + str(int(bool(value))) + ' #'
return
#return self.send(msg)
def send(self, message, wait_for_response=True):
'''
Send a formatted message to the robot socket.
if wait_for_response, we wait for the response and return it
'''
caller = inspect.stack()[1][3]
log.debug('%-14s sending: %s', caller, message)
self.sock.send(message)
time.sleep(self.delay)
if not wait_for_response: return
data = self.sock.recv(4096)
log.debug('%-14s recieved: %s', caller, data)
return data
def format_pose(self, pose):
pose = check_coordinates(pose)
msg = ''
for cartesian in pose[0]:
msg += format(cartesian * self.scale_linear, "+08.1f") + " "
for quaternion in pose[1]:
msg += format(quaternion, "+08.5f") + " "
msg += "#"
return msg
def close(self):
self.send("99 #", False)
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
log.info('Disconnected from ABB robot.')
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def check_coordinates(coordinates):
if ((len(coordinates) == 2) and
(len(coordinates[0]) == 3) and
(len(coordinates[1]) == 4)):
return coordinates
elif (len(coordinates) == 7):
return [coordinates[0:3], coordinates[3:7]]
log.warn('Recieved malformed coordinate: %s', str(coordinates))
raise NameError('Malformed coordinate!')
if __name__ == '__main__':
formatter = logging.Formatter("[%(asctime)s] %(levelname)-7s (%(filename)s:%(lineno)3s) %(message)s", "%Y-%m-%d %H:%M:%S")
handler_stream = logging.StreamHandler()
handler_stream.setFormatter(formatter)
handler_stream.setLevel(logging.DEBUG)
log = logging.getLogger('abb')
log.setLevel(logging.DEBUG)
log.addHandler(handler_stream)