Skip to content

Commit f4b071e

Browse files
Shulyakadmulcahey
authored andcommitted
xbee: update for zigpy changes (#193)
* xbee: reduced code duplication * fix outgoing serial data * fix deserialization * fix errors and formatting
1 parent 928e925 commit f4b071e

File tree

3 files changed

+488
-739
lines changed

3 files changed

+488
-739
lines changed

zhaquirks/xbee/__init__.py

Lines changed: 310 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,310 @@
1-
"""Module for xbee devices as remote sensors/switches."""
1+
"""Module for xbee devices as remote sensors/switches.
2+
3+
Allows for direct control of an xbee3's digital pins.
4+
5+
Reading pins should work with any coordinator (Untested)
6+
writing pins will only work with an xbee as the coordinator as
7+
it requires zigpy_xbee.
8+
9+
The xbee must be configured via XCTU to send samples to the coordinator,
10+
DH and DL to the coordiator's address (0). and each pin must be configured
11+
to act as a digital input.
12+
13+
Either configure reporting on state change by setting the appropriate bit
14+
mask on IC or set IR to a value greater than zero to send perodic reports
15+
every x milliseconds, I recommend the later, since this will ensure
16+
the xbee stays alive in Home Assistant.
17+
"""
18+
19+
import logging
20+
import struct
21+
22+
from zigpy.quirks import CustomCluster, CustomDevice
23+
import zigpy.types as t
24+
from zigpy.zcl import foundation
25+
from zigpy.zcl.clusters.general import BinaryInput, LevelControl, OnOff
26+
27+
from ..const import ENDPOINTS, INPUT_CLUSTERS, OUTPUT_CLUSTERS
28+
from .. import EventableCluster, LocalDataCluster
29+
30+
_LOGGER = logging.getLogger(__name__)
31+
32+
DATA_IN_CMD = 0x0000
33+
DIO_APPLY_CHANGES = 0x02
34+
DIO_PIN_HIGH = 0x05
35+
DIO_PIN_LOW = 0x04
36+
ON_OFF_CMD = 0x0000
37+
XBEE_DATA_CLUSTER = 0x11
38+
XBEE_DST_ENDPOINT = 0xE8
39+
XBEE_IO_CLUSTER = 0x92
40+
XBEE_PROFILE_ID = 0xC105
41+
XBEE_REMOTE_AT = 0x17
42+
XBEE_SRC_ENDPOINT = 0xE8
43+
44+
45+
class IOSample(bytes):
46+
"""Parse an XBee IO sample report."""
47+
48+
# pylint: disable=R0201
49+
def serialize(self):
50+
"""Serialize an IO Sample Report, Not implemented."""
51+
_LOGGER.debug("Serialize not implemented.")
52+
53+
@classmethod
54+
def deserialize(cls, data):
55+
"""Deserialize an xbee IO sample report.
56+
57+
xbee digital sample format
58+
Digital mask byte 0,1
59+
Analog mask byte 3
60+
Digital samples byte 4, 5
61+
Analog Sample, 2 bytes per
62+
"""
63+
digital_mask = data[0:2]
64+
analog_mask = data[2:3]
65+
digital_sample = data[3:5]
66+
num_bits = 13
67+
digital_pins = [
68+
(int.from_bytes(digital_mask, byteorder="big") >> bit) & 1
69+
for bit in range(num_bits - 1, -1, -1)
70+
]
71+
digital_pins = list(reversed(digital_pins))
72+
analog_pins = [
73+
(int.from_bytes(analog_mask, byteorder="big") >> bit) & 1
74+
for bit in range(8 - 1, -1, -1)
75+
]
76+
analog_pins = list(reversed(analog_pins))
77+
digital_samples = [
78+
(int.from_bytes(digital_sample, byteorder="big") >> bit) & 1
79+
for bit in range(num_bits - 1, -1, -1)
80+
]
81+
digital_samples = list(reversed(digital_samples))
82+
sample_index = 0
83+
analog_samples = []
84+
for apin in analog_pins:
85+
if apin == 1:
86+
analog_samples.append(
87+
int.from_bytes(
88+
data[5 + sample_index : 7 + sample_index], byteorder="big"
89+
)
90+
)
91+
sample_index += 1
92+
else:
93+
analog_samples.append(0)
94+
95+
return (
96+
{
97+
"digital_pins": digital_pins,
98+
"analog_pins": analog_pins,
99+
"digital_samples": digital_samples,
100+
"analog_samples": analog_samples,
101+
},
102+
b"",
103+
)
104+
105+
106+
# 4 AO lines
107+
# 10 digital
108+
# Discovered endpoint information: <SimpleDescriptor endpoint=232 profile=49413
109+
# device_type=1 device_version=0 input_clusters=[] output_clusters=[]>
110+
111+
112+
ENDPOINT_MAP = {
113+
0: 0xD0,
114+
1: 0xD1,
115+
2: 0xD2,
116+
3: 0xD3,
117+
4: 0xD4,
118+
5: 0xD5,
119+
8: 0xD8,
120+
9: 0xD9,
121+
10: 0xDA,
122+
11: 0xDB,
123+
12: 0xDC,
124+
}
125+
126+
127+
class XBeeOnOff(CustomCluster, OnOff):
128+
"""XBee on/off cluster."""
129+
130+
ep_id_2_pin = {
131+
0xD0: "D0",
132+
0xD1: "D1",
133+
0xD2: "D2",
134+
0xD3: "D3",
135+
0xD4: "D4",
136+
0xD5: "D5",
137+
0xD8: "D8",
138+
0xD9: "D9",
139+
0xDA: "P0",
140+
0xDB: "P1",
141+
0xDC: "P2",
142+
}
143+
144+
async def command(self, command, *args, manufacturer=None, expect_reply=True):
145+
"""Xbee change pin state command, requires zigpy_xbee."""
146+
pin_name = self.ep_id_2_pin.get(self._endpoint.endpoint_id)
147+
if command not in [0, 1] or pin_name is None:
148+
return super().command(command, *args)
149+
if command == 0:
150+
pin_cmd = DIO_PIN_LOW
151+
else:
152+
pin_cmd = DIO_PIN_HIGH
153+
await self._endpoint.device.remote_at(pin_name, pin_cmd)
154+
return 0, foundation.Status.SUCCESS
155+
156+
157+
class XBeeCommon(CustomDevice):
158+
"""XBee common class."""
159+
160+
def remote_at(self, command, *args, **kwargs):
161+
"""Remote at command."""
162+
if hasattr(self._application, "remote_at_command"):
163+
return self._application.remote_at_command(
164+
self.nwk, command, *args, apply_changes=True, encryption=True, **kwargs
165+
)
166+
_LOGGER.warning("Remote At Command not supported by this coordinator")
167+
168+
class DigitalIOCluster(CustomCluster, BinaryInput):
169+
"""Digital IO Cluster for the XBee."""
170+
171+
cluster_id = XBEE_IO_CLUSTER
172+
173+
def handle_cluster_request(self, tsn, command_id, args):
174+
"""Handle the cluster request.
175+
176+
Update the digital pin states
177+
"""
178+
if command_id == ON_OFF_CMD:
179+
values = args[0]
180+
if "digital_pins" in values and "digital_samples" in values:
181+
# Update digital inputs
182+
active_pins = [
183+
i for i, x in enumerate(values["digital_pins"]) if x == 1
184+
]
185+
for pin in active_pins:
186+
# pylint: disable=W0212
187+
self._endpoint.device.__getitem__(
188+
ENDPOINT_MAP[pin]
189+
).__getattr__(OnOff.ep_attribute)._update_attribute(
190+
ON_OFF_CMD, values["digital_samples"][pin]
191+
)
192+
else:
193+
super().handle_cluster_request(tsn, command_id, args)
194+
195+
def deserialize(self, data):
196+
"""Deserialize."""
197+
hdr, data = foundation.ZCLHeader.deserialize(data)
198+
self.debug("ZCL deserialize: %s", hdr)
199+
if hdr.frame_control.frame_type == foundation.FrameType.CLUSTER_COMMAND:
200+
# Cluster command
201+
if hdr.is_reply:
202+
commands = self.client_commands
203+
else:
204+
commands = self.server_commands
205+
206+
try:
207+
schema = commands[hdr.command_id][1]
208+
hdr.frame_control.is_reply = commands[hdr.command_id][2]
209+
except KeyError:
210+
data = (
211+
struct.pack(">i", hdr.tsn)[-1:]
212+
+ struct.pack(">i", hdr.command_id)[-1:]
213+
+ data
214+
)
215+
hdr.command_id = ON_OFF_CMD
216+
try:
217+
schema = commands[hdr.command_id][1]
218+
hdr.frame_control.is_reply = commands[hdr.command_id][2]
219+
except KeyError:
220+
self.warn("Unknown cluster-specific command %s", hdr.command_id)
221+
return hdr, data
222+
value, data = t.deserialize(data, schema)
223+
return hdr, value
224+
else:
225+
# General command
226+
try:
227+
schema = foundation.COMMANDS[hdr.command_id][0]
228+
hdr.frame_control.is_reply = foundation.COMMANDS[hdr.command_id][1]
229+
except KeyError:
230+
self.warn("Unknown foundation command %s", hdr.command_id)
231+
return hdr, data
232+
233+
value, data = t.deserialize(data, schema)
234+
if data != b"":
235+
_LOGGER.warning("Data remains after deserializing ZCL frame")
236+
return hdr, value
237+
238+
attributes = {0x0055: ("present_value", t.Bool)}
239+
client_commands = {0x0000: ("io_sample", (IOSample,), False)}
240+
server_commands = {0x0000: ("io_sample", (IOSample,), False)}
241+
242+
class EventRelayCluster(EventableCluster, LevelControl):
243+
"""A cluster with cluster_id which is allowed to send events."""
244+
245+
attributes = {}
246+
client_commands = {}
247+
server_commands = {0x0000: ("receive_data", (str,), None)}
248+
249+
class SerialDataCluster(LocalDataCluster):
250+
"""Serial Data Cluster for the XBee."""
251+
252+
cluster_id = XBEE_DATA_CLUSTER
253+
254+
def command(self, command, *args, manufacturer=None, expect_reply=False):
255+
"""Handle outgoing data."""
256+
data = bytes("".join(args), encoding="latin1")
257+
return self._endpoint.device.application.request(
258+
self._endpoint.device,
259+
XBEE_PROFILE_ID,
260+
XBEE_DATA_CLUSTER,
261+
XBEE_SRC_ENDPOINT,
262+
XBEE_DST_ENDPOINT,
263+
self._endpoint.device.application.get_sequence(),
264+
data,
265+
expect_reply=False,
266+
)
267+
268+
def handle_cluster_request(self, tsn, command_id, args):
269+
"""Handle incoming data."""
270+
if command_id == DATA_IN_CMD:
271+
self._endpoint.out_clusters[
272+
LevelControl.cluster_id
273+
].handle_cluster_request(tsn, command_id, str(args, encoding="latin1"))
274+
else:
275+
super().handle_cluster_request(tsn, command_id, args)
276+
277+
attributes = {}
278+
client_commands = {0x0000: ("send_data", (bytes,), None)}
279+
server_commands = {0x0000: ("receive_data", (bytes,), None)}
280+
281+
def deserialize(self, endpoint_id, cluster_id, data):
282+
"""Pretends to be parsing incoming data."""
283+
if cluster_id != XBEE_DATA_CLUSTER:
284+
return super().deserialize(endpoint_id, cluster_id, data)
285+
286+
tsn = self._application.get_sequence()
287+
command_id = DATA_IN_CMD
288+
is_reply = False
289+
290+
class Hdr(foundation.ZCLHeader):
291+
"""Trivial serialization class."""
292+
293+
def __init__(self, tsn, command_id, is_reply):
294+
frc = foundation.FrameControl()
295+
frc.is_reply = is_reply
296+
frc.frame_type = foundation.FrameType.CLUSTER_COMMAND
297+
super().__init__(frame_control=frc, tsn=tsn, command_id=command_id)
298+
299+
return Hdr(tsn, command_id, is_reply), data
300+
301+
replacement = {
302+
ENDPOINTS: {
303+
232: {
304+
"manufacturer": "XBEE",
305+
"model": "xbee.io",
306+
INPUT_CLUSTERS: [DigitalIOCluster, SerialDataCluster],
307+
OUTPUT_CLUSTERS: [SerialDataCluster, EventRelayCluster],
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)