diff --git a/devices/uart_handler.py b/devices/uart_handler.py new file mode 100644 index 0000000..e946261 --- /dev/null +++ b/devices/uart_handler.py @@ -0,0 +1,35 @@ +from pyControl.hardware import IO_object, assign_ID, interrupt_queue, fw, sm + + +class UART_handler(IO_object): + """ + This class is used to generate a framework event when a UART message is received. + """ + + def __init__(self, event_name, first_char_interrupt=True): + self.event_name = event_name + self.last_interrupt_time = 0 + self.first_char_interrupt = first_char_interrupt + if self.first_char_interrupt: + self.msg_starting = False + assign_ID(self) + + def _initialise(self): + self.event_ID = sm.events[self.event_name] if self.event_name in sm.events else False + + def ISR(self, _): + if self.event_ID: + # - pyboard v1.1 (and other STM32F4 boards): IRQ_RXIDLE interrupt is triggered after the first character + # AND at the end when the RX is idle. + # - pyboard D-series board (STM32F7): IRQ_RXIDLE interrupt is triggered ONLY at the end when the RX is idle + # - see Micropytyhon UART docs for more info: https://docs.micropython.org/en/latest/library/machine.UART.html + if self.first_char_interrupt: + self.msg_starting = not self.msg_starting + if self.msg_starting: + # ignore the first interrupt after the message starts (when using pyboard v1.1) + return + self.timestamp = fw.current_time + interrupt_queue.put(self.ID) + + def _process_interrupt(self): + fw.event_queue.put(fw.Datatuple(self.timestamp, fw.EVENT_TYP, "i", self.event_ID)) diff --git a/source/pyControl/hardware.py b/source/pyControl/hardware.py index 9e03afd..98ab573 100644 --- a/source/pyControl/hardware.py +++ b/source/pyControl/hardware.py @@ -287,9 +287,9 @@ class Analog_channel(IO_object): def __init__(self, name, sampling_rate, data_type, plot=True): assert data_type in ("b", "B", "h", "H", "i", "I"), "Invalid data_type." - assert not any( - [name == io.name for io in IO_dict.values() if isinstance(io, Analog_channel)] - ), "Analog signals must have unique names." + assert not any([name == io.name for io in IO_dict.values() if isinstance(io, Analog_channel)]), ( + "Analog signals must have unique names." + ) self.name = name assign_ID(self) self.sampling_rate = sampling_rate @@ -348,9 +348,9 @@ class Analog_threshold(IO_object): # Generates framework events when an analog signal goes above or below specified threshold. def __init__(self, threshold=None, rising_event=None, falling_event=None): - assert isinstance( - threshold, int - ), "Integer threshold must be specified if rising or falling events are defined." + assert isinstance(threshold, int), ( + "Integer threshold must be specified if rising or falling events are defined." + ) self.threshold = threshold self.rising_event = rising_event self.falling_event = falling_event @@ -509,3 +509,4 @@ def _timer_callback(self): fw.data_output_queue.put(fw.Datatuple(fw.current_time, fw.EVENT_TYP, "s", self.event_ID)) self.state = not self.state self.sync_pin.value(self.state) +