diff --git a/models/node/node.py b/models/node/node.py index 8ebda86..83e73cc 100644 --- a/models/node/node.py +++ b/models/node/node.py @@ -13,7 +13,6 @@ from models.framework_data import FrameworkData -# TODO Isolamento de nodes em threads separadas. Cada nĂ³ deve ser executado em uma thread class Node: _MODULE_NAME: Final[str] = 'models.node' """Abstract base class for processing pipeline execution on this framework. @@ -46,6 +45,7 @@ def __init__(self, parameters=None) -> None: self.thread = None self.new_data_available = False self.condition = threading.Condition() + self.is_running_main_process = False def _build_graph_inputs(self): return f""" @@ -255,19 +255,25 @@ def _call_children(self): def _thread_runner(self): while self.running: with self.condition: - while not self.new_data_available and self.running: + while (not self.new_data_available) and self.running: self.condition.wait() + if not self.running: break + while not self.local_storage.empty(): input_name, data = self.local_storage.get() try: + self.is_running_main_process = True self._run(data, input_name) except Exception as e: self.print(f'Error: {e}', exception=e) raise e + finally: + self.is_running_main_process = False if self._is_next_node_call_enabled(): self._call_children() + self.new_data_available = False def run(self, data: FrameworkData = None, input_name: str = None) -> None: diff --git a/models/node/processing/processing_node.py b/models/node/processing/processing_node.py index 6401e54..92b34b1 100644 --- a/models/node/processing/processing_node.py +++ b/models/node/processing/processing_node.py @@ -1,4 +1,5 @@ import abc +import copy from typing import List, Dict, Final from models.exception.missing_parameter import MissingParameterError @@ -102,7 +103,7 @@ def _process_input_buffer(self): if not self._is_processing_condition_satisfied(): return self.print('Starting processing of input buffer') - processed_data = self._process(self._input_buffer) + processed_data = self._process(copy.deepcopy(self._input_buffer)) if self._clear_input_buffer_after_process: self._clear_input_buffer() if self._clear_output_buffer_after_process: diff --git a/models/node/processing/synchronize.py b/models/node/processing/synchronize.py index 9b593bb..1a3e61e 100644 --- a/models/node/processing/synchronize.py +++ b/models/node/processing/synchronize.py @@ -1,6 +1,8 @@ import statistics import copy +from bisect import bisect_left from typing import List, Dict, Final + from models.exception.invalid_parameter_value import InvalidParameterValue from models.exception.missing_parameter import MissingParameterError from models.framework_data import FrameworkData @@ -22,7 +24,7 @@ class Synchronize(ProcessingNode): FILL_TYPE_SAMPLE_AND_HOLD: Final[str] = 'sample_and_hold' def _validate_parameters(self, parameters: dict): - parameters['buffer_options']['clear_input_buffer_after_process'] = False + parameters['buffer_options']['clear_input_buffer_after_process'] = True super()._validate_parameters(parameters) if 'slave_filling' not in parameters: raise MissingParameterError(module=self._MODULE_NAME, name=self.name, @@ -43,86 +45,98 @@ def _initialize_parameter_fields(self, parameters: dict): self._zero_fill = parameters['slave_filling'] == self.FILL_TYPE_ZEROFILL self._sample_and_hold = parameters['slave_filling'] == self.FILL_TYPE_SAMPLE_AND_HOLD self._sync_errors: List[float] = [] + self._exec_index = 0 + self._last_valid_data = None + self._initialize_sync_buffer() def _is_next_node_call_enabled(self) -> bool: return True + def _initialize_sync_buffer(self): + """Sets sync buffer to new empty object for each input name + """ + self._sync_buffer = {} + for input_name in self._get_inputs(): + self._sync_buffer[input_name] = FrameworkData() + + def _insert_data_in_sync_buffer(self, data: Dict[str, FrameworkData]): + input_data = copy.deepcopy(data) + for input_name in self._get_inputs(): + self._sync_buffer[input_name].extend(input_data[input_name]) + + def _check_for_timestamp_intersection(self, slave_timestamp_data: List[float], + master_timestamp_data: List[float]) -> bool: + slave_start = slave_timestamp_data[0] + slave_end = slave_timestamp_data[-1] + master_start = master_timestamp_data[0] + master_end = master_timestamp_data[-1] + return slave_start < master_end and slave_end > master_start + + def _move_input_buffer_to_sync_buffer(self): + slave_main_length = self._input_buffer[self.INPUT_SLAVE_MAIN].get_data_count() + slave_timestamp_length = self._input_buffer[self.INPUT_SLAVE_TIMESTAMP].get_data_count() + master_main_length = self._input_buffer[self.INPUT_MASTER_MAIN].get_data_count() + master_timestamp_length = self._input_buffer[self.INPUT_MASTER_TIMESTAMP].get_data_count() + slave_main = self._input_buffer[self.INPUT_SLAVE_MAIN] + slave_timestamp = self._input_buffer[self.INPUT_SLAVE_TIMESTAMP] + master_main = self._input_buffer[self.INPUT_MASTER_MAIN] + master_timestamp = self._input_buffer[self.INPUT_MASTER_TIMESTAMP] + self._sync_buffer[self.INPUT_SLAVE_MAIN].extend(slave_main.splice(0, min(slave_main_length, slave_timestamp_length))) + self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].extend(slave_timestamp.splice(0, min(slave_main_length, slave_timestamp_length))) + self._sync_buffer[self.INPUT_MASTER_MAIN].extend(master_main.splice(0, min(master_main_length, master_timestamp_length))) + self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].extend(master_timestamp.splice(0, min(master_main_length, master_timestamp_length))) + + def _process_input_buffer(self): + self._move_input_buffer_to_sync_buffer() + + if not self._is_processing_condition_satisfied(): + return + + processed_data = self._process(copy.deepcopy(self._sync_buffer)) + + if self._clear_output_buffer_after_process: + self._clear_output_buffer() + self.print('Outputting data') + for output_name in self._get_outputs(): + self._insert_new_output_data(processed_data[output_name], output_name) + def _is_processing_condition_satisfied(self) -> bool: - return self._input_buffer[self.INPUT_SLAVE_TIMESTAMP].get_data_count() > 1 \ - and self._input_buffer[self.INPUT_MASTER_TIMESTAMP].get_data_count() > 0 \ - and self._input_buffer[self.INPUT_SLAVE_MAIN].get_data_count() > 0 \ - and self._input_buffer[self.INPUT_MASTER_MAIN].get_data_count() > 0 \ - and ( - self._input_buffer[self.INPUT_MASTER_TIMESTAMP].get_data_single_channel()[-1] - >= self._input_buffer[self.INPUT_SLAVE_TIMESTAMP].get_data_single_channel()[-1] - ) + input_data = self._sync_buffer + slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] + master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP] + slave_main = input_data[self.INPUT_SLAVE_MAIN] + master_main = input_data[self.INPUT_MASTER_MAIN] + + return (slave_timestamp.get_data_count() > 2 + and master_timestamp.get_data_count() > 2 + and slave_main.get_data_count() == slave_timestamp.get_data_count() + and master_main.get_data_count() == master_timestamp.get_data_count() + and self._check_for_timestamp_intersection(slave_timestamp.get_data_single_channel(), master_timestamp.get_data_single_channel())) def _process(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: - data = copy.deepcopy(input_data) - master_timestamp_data = data[self.INPUT_MASTER_TIMESTAMP].get_data_single_channel() - slave_main = data[self.INPUT_SLAVE_MAIN] - slave_timestamp = data[self.INPUT_SLAVE_TIMESTAMP] + self._exec_index += 1 + if self._exec_index == 1: + input_data = self._trim_start(input_data) + input_data = self._trim_end(input_data) + master_timestamp_data = input_data[self.INPUT_MASTER_TIMESTAMP].get_data_single_channel() + slave_main = input_data[self.INPUT_SLAVE_MAIN] + slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] slave_timestamp_data = slave_timestamp.get_data_single_channel() - master_sampling_frequency = data[self.INPUT_MASTER_TIMESTAMP].sampling_frequency - master_max_index = len(master_timestamp_data) - 1 - master_timestamp_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / master_max_index - - new_slave_data = FrameworkData(sampling_frequency_hz=master_sampling_frequency, - channels=slave_main.channels) - max_slave_index = len(slave_timestamp_data) - 1 - last_closest_index = 0 - last_slave_index = 0 - for slave_timestamp_index, slave_timestamp_value in enumerate(slave_timestamp_data): - # stop processing if slave timestamp is greater than master timestamp, as we can't be sure if there's more master data incoming to sync - if slave_timestamp_value > master_timestamp_data[master_max_index]: - break - - # stop processing if it's the last slave timestamp, as we can't be sure if there's more slave data incoming to sync - if slave_timestamp_index == max_slave_index: - break - - closest_point_start = self._get_closest_timestamp_index_in_master( - master_timestamp_data, - slave_timestamp_value, - master_timestamp_avg_increment, - master_max_index - ) - if last_closest_index == closest_point_start and last_closest_index == 0: - new_slave_data.splice(0, 1) - - value_index = slave_timestamp_index - if slave_timestamp_index - 1 >= 0: - value_index = slave_timestamp_index - 1 - new_slave_data.extend( - self._fill( - last_closest_index, - closest_point_start, - slave_main, - value_index, - master_sampling_frequency - ) - ) - self._statistics(abs(master_timestamp_data[closest_point_start] - slave_timestamp_value) * 1000000) - last_closest_index = closest_point_start - last_slave_index = slave_timestamp_index - - self.print(f'------------------------------------------------------------------------------------------------------------------'+ - f'\nOriginal MASTER {master_timestamp_data[0]} - {master_timestamp_data[master_max_index]}' - f'\nRemoving MASTER {master_timestamp_data[0]} - {master_timestamp_data[last_closest_index]}' - f'\nOriginal SLAVE {slave_timestamp_data[0]} - {slave_timestamp_data[max_slave_index]}' - f'\nRemoving SLAVE {slave_timestamp_data[0]} - {slave_timestamp_data[last_slave_index]}' - ) - - self._input_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, last_slave_index+1) - self._input_buffer[self.INPUT_SLAVE_MAIN].splice(0, last_slave_index+1) - output_timestamp = self._input_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, last_closest_index+1) - output_master = self._input_buffer[self.INPUT_MASTER_MAIN].splice(0, last_closest_index+1) + if slave_timestamp.get_data_count() < 1 or len(master_timestamp_data) < 1: + return { + self.OUTPUT_SYNCHRONIZED_SLAVE: slave_main, + self.OUTPUT_SYNCHRONIZED_MASTER: input_data[self.INPUT_MASTER_MAIN], + self.OUTPUT_SYNCHRONIZED_TIMESTAMP: input_data[self.INPUT_MASTER_TIMESTAMP] + } + + filled_slave_data = self._fill(master_timestamp_data, slave_timestamp_data, slave_main, + input_data[self.INPUT_MASTER_TIMESTAMP].sampling_frequency) return { - self.OUTPUT_SYNCHRONIZED_SLAVE: new_slave_data, - self.OUTPUT_SYNCHRONIZED_MASTER: output_master, - self.OUTPUT_SYNCHRONIZED_TIMESTAMP: output_timestamp + self.OUTPUT_SYNCHRONIZED_SLAVE: filled_slave_data, + self.OUTPUT_SYNCHRONIZED_MASTER: input_data[self.INPUT_MASTER_MAIN], + self.OUTPUT_SYNCHRONIZED_TIMESTAMP: input_data[self.INPUT_MASTER_TIMESTAMP] } def _statistics(self, sync_error_microseconds: float): @@ -148,8 +162,8 @@ def _get_outputs(self) -> List[str]: self.OUTPUT_SYNCHRONIZED_TIMESTAMP ] - @staticmethod def _get_closest_timestamp_index_in_master( + self, master_timestamp: List[float], slave_timestamp: float, master_timestamp_avg_increment: float, @@ -159,11 +173,20 @@ def _get_closest_timestamp_index_in_master( return 0 estimated_index = int((slave_timestamp - master_timestamp[0]) / master_timestamp_avg_increment) - + bisect_index = bisect_left(master_timestamp, slave_timestamp) if estimated_index < 0: return 0 - while 0 < estimated_index < master_max_index - 1: + if estimated_index < 0 and slave_timestamp <= master_timestamp[0]: + return 0 + elif estimated_index > master_max_index and slave_timestamp >= master_timestamp[master_max_index]: + return master_max_index + elif estimated_index == 0 and slave_timestamp < master_timestamp[1]: + return estimated_index + elif estimated_index == master_max_index and (slave_timestamp > master_timestamp[master_max_index - 1]): + return estimated_index + + while 0 < estimated_index < master_max_index: if master_timestamp[estimated_index - 1] <= slave_timestamp <= master_timestamp[estimated_index + 1]: return estimated_index elif slave_timestamp < master_timestamp[estimated_index]: @@ -172,37 +195,206 @@ def _get_closest_timestamp_index_in_master( estimated_index += 1 closest_point: int = min(range(len(master_timestamp)), - key=lambda i: abs(master_timestamp[i] - slave_timestamp)) + key=lambda i: abs(master_timestamp[i] - slave_timestamp)) return closest_point - def _fill(self, - start_index: int, - end_index: int, - slave_main: FrameworkData, - slave_main_value_index: int, - master_sampling_frequency: float) -> FrameworkData: + def _trim_start(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: + index = 0 + slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] + slave_main = input_data[self.INPUT_SLAVE_MAIN] + master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP] + master_main = input_data[self.INPUT_MASTER_MAIN] + slave_timestamp_data = slave_timestamp.get_data_single_channel() + master_timestamp_data = master_timestamp.get_data_single_channel() + + slave_avg_increment = (slave_timestamp_data[-1] - slave_timestamp_data[0]) / len(slave_timestamp_data) + master_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / len(master_timestamp_data) + + should_trim_slave = slave_timestamp_data[index] < master_timestamp_data[index] + should_trim_master = master_timestamp_data[index] < slave_timestamp_data[index] + + if should_trim_slave: + slave_index = self._get_closest_timestamp_index_in_master( + slave_timestamp_data, + master_timestamp_data[index], + slave_avg_increment, + len(slave_timestamp_data) - 1 + ) + start_index = 0 + remove_count = slave_index + slave_timestamp.splice(start_index, remove_count) + slave_main.splice(start_index, remove_count) + self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, slave_index) + self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, slave_index) + + elif should_trim_master: + master_index = self._get_closest_timestamp_index_in_master( + master_timestamp_data, + slave_timestamp_data[index], + master_avg_increment, + len(master_timestamp_data) - 1 + ) + start_index = 0 + remove_count = master_index + master_timestamp.splice(start_index, remove_count) + master_main.splice(start_index, remove_count) + self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, master_index) + self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, master_index) + + return { + self.INPUT_MASTER_MAIN: master_main, + self.INPUT_MASTER_TIMESTAMP: master_timestamp, + self.INPUT_SLAVE_MAIN: slave_main, + self.INPUT_SLAVE_TIMESTAMP: slave_timestamp + } + + def _trim_end(self, input_data: Dict[str, FrameworkData]) -> Dict[str, FrameworkData]: + index = -1 + slave_timestamp = input_data[self.INPUT_SLAVE_TIMESTAMP] + slave_main = input_data[self.INPUT_SLAVE_MAIN] + master_timestamp = input_data[self.INPUT_MASTER_TIMESTAMP] + master_main = input_data[self.INPUT_MASTER_MAIN] + slave_timestamp_data = slave_timestamp.get_data_single_channel() + master_timestamp_data = master_timestamp.get_data_single_channel() + slave_avg_increment = (slave_timestamp_data[-1] - slave_timestamp_data[0]) / len(slave_timestamp_data) + master_avg_increment = (master_timestamp_data[-1] - master_timestamp_data[0]) / len(master_timestamp_data) + + should_trim_slave = slave_timestamp_data[index] > master_timestamp_data[index] + should_trim_master = master_timestamp_data[index] > slave_timestamp_data[index] + + if should_trim_slave: + while int((slave_timestamp_data[index] - master_timestamp_data[0]) / master_avg_increment) > len( + master_timestamp_data) - 1: + index -= 1 + if index < -len(slave_timestamp_data): + return { + self.INPUT_MASTER_MAIN: FrameworkData(sampling_frequency_hz=master_main.sampling_frequency, + channels=master_main.channels), + self.INPUT_MASTER_TIMESTAMP: FrameworkData( + sampling_frequency_hz=master_timestamp.sampling_frequency, + channels=master_timestamp.channels), + self.INPUT_SLAVE_MAIN: FrameworkData(sampling_frequency_hz=slave_main.sampling_frequency, + channels=slave_main.channels), + self.INPUT_SLAVE_TIMESTAMP: FrameworkData( + sampling_frequency_hz=slave_timestamp.sampling_frequency, channels=slave_timestamp.channels) + } + master_index = self._get_closest_timestamp_index_in_master( + master_timestamp_data, + slave_timestamp_data[index], + master_avg_increment, + len(master_timestamp_data) - 1 + ) + # keep slave data from index to end in sync buffer and remove the rest + self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, len(slave_timestamp_data) + index + 1) + self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, len(slave_timestamp_data) + index + 1) + # process slave data from 0 to index + slave_main.splice(len(slave_timestamp_data) + index + 1, -index) + slave_timestamp.splice(len(slave_timestamp_data) + index + 1, -index) + # keep master data from master_index to end in sync buffer and remove the rest + self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, master_index + 1) + self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, master_index + 1) + # process master data from 0 to master_index + master_main.splice(master_index + 1, len(master_timestamp_data) - master_index) + master_timestamp.splice(master_index + 1, len(master_timestamp_data) - master_index) + + elif should_trim_master: + while int((master_timestamp_data[index] - slave_timestamp_data[0]) / slave_avg_increment) > len( + slave_timestamp_data) - 1: + index -= 1 + if index < -len(master_timestamp_data): + # return empty data + return { + self.INPUT_MASTER_MAIN: FrameworkData(sampling_frequency_hz=master_main.sampling_frequency, + channels=master_main.channels), + self.INPUT_MASTER_TIMESTAMP: FrameworkData( + sampling_frequency_hz=master_timestamp.sampling_frequency, + channels=master_timestamp.channels), + self.INPUT_SLAVE_MAIN: FrameworkData(sampling_frequency_hz=slave_main.sampling_frequency, + channels=slave_main.channels), + self.INPUT_SLAVE_TIMESTAMP: FrameworkData( + sampling_frequency_hz=slave_timestamp.sampling_frequency, channels=slave_timestamp.channels) + } + + slave_index = self._get_closest_timestamp_index_in_master( + slave_timestamp_data, + master_timestamp_data[index], + slave_avg_increment, + len(slave_timestamp_data) - 1 + ) + # keep master data from index to end in sync buffer and remove the rest + self._sync_buffer[self.INPUT_MASTER_TIMESTAMP].splice(0, len(master_timestamp_data) + index + 1) + self._sync_buffer[self.INPUT_MASTER_MAIN].splice(0, len(master_timestamp_data) + index + 1) + # process master data from 0 to index + master_main.splice(len(master_timestamp_data) + index + 1, -index) + master_timestamp.splice(len(master_timestamp_data) + index + 1, -index) + # keep slave data from slave_index to end in sync buffer and remove the rest + self._sync_buffer[self.INPUT_SLAVE_TIMESTAMP].splice(0, slave_index + 1) + self._sync_buffer[self.INPUT_SLAVE_MAIN].splice(0, slave_index + 1) + # process slave data from 0 to slave_index + slave_main.splice(slave_index + 1, len(master_timestamp_data) - slave_index) + slave_timestamp.splice(slave_index + 1, len(master_timestamp_data) - slave_index) + + return { + self.INPUT_MASTER_MAIN: master_main, + self.INPUT_MASTER_TIMESTAMP: master_timestamp, + self.INPUT_SLAVE_MAIN: slave_main, + self.INPUT_SLAVE_TIMESTAMP: slave_timestamp + } + + def _fill(self, master_timestamp: List[float], slave_timestamp: List[float], + slave_main: FrameworkData, master_sampling_frequency: float) -> FrameworkData: + """Fills slave data to align with master timestamps using sample-and-hold.""" + + # Create FrameworkData to store filled data fill_data = FrameworkData(master_sampling_frequency, slave_main.channels) - fill_size = (end_index - start_index) - if fill_size > 0: - if self._zero_fill: - channel_data = [0] * fill_size - input_data = slave_main.get_data_at_index(slave_main_value_index) - for channel in input_data: - channel_data = [0] * fill_size - channel_data[0] = input_data[channel] - fill_data.input_data_on_channel(channel_data, channel) - input_data = [channel_data] * len(slave_main.channels) - fill_data.input_2d_data(input_data) - elif self._sample_and_hold: - input_data = slave_main.get_data_at_index(slave_main_value_index) - for channel in input_data: - channel_data = [input_data[channel]] * fill_size - fill_data.input_data_on_channel(channel_data, channel) - else: - raise InvalidParameterValue(module=self._MODULE_NAME, name=self.name, - parameter='slave_filling', - cause='not_set') - if fill_data.get_data_count() == 0 and start_index == end_index: - for channel in fill_data.channels: - fill_data.input_data_on_channel([slave_main.get_data_on_channel(channel)[0]], channel) + + # Calculate average increment between master timestamps + max_slave_index = len(slave_timestamp) - 1 + max_master_index = len(master_timestamp) - 1 + previous_master_index = -1 # Start before the first index + # Iterate over slave timestamps + for current_slave_index, slave_time in enumerate(slave_timestamp): + + # Ignore repeated slave timestamps + if current_slave_index > 0 and slave_time == slave_timestamp[current_slave_index - 1]: + continue + + # Use binary search to find the closest master timestamp index + master_index = bisect_left(master_timestamp, slave_time) + + # Ensure master_index does not exceed valid range + if master_index > max_master_index: + master_index = max_master_index + + # Calculate how many master timestamps need to be filled + fill_size = master_index - previous_master_index - 1 + + # If there's a gap, fill it using the last valid data or 0 + if fill_size > 0 and self._last_valid_data is not None: + for channel in slave_main.channels: + fill_content = [self._last_valid_data[channel]] * fill_size if self._sample_and_hold else [ + 0] * fill_size + fill_data.input_data_on_channel(fill_content, channel) + + # If aligning slave data, or it's the first valid data point + if ( + self._last_valid_data is None or master_index >= previous_master_index) and current_slave_index <= max_slave_index: + # Update last valid data with current slave data + self._last_valid_data = slave_main.get_data_at_index(current_slave_index) + + # Store slave data for all channels at the aligned master timestamp + for channel in slave_main.channels: + fill_data.input_data_on_channel([self._last_valid_data[channel]], channel) + + # Update the previous master index for the next iteration + previous_master_index = master_index + + # Fill any remaining master timestamps with sample-and-hold or 0 + remaining_fill_size = max_master_index - previous_master_index + if remaining_fill_size > 0 and self._last_valid_data is not None: + for channel in slave_main.channels: + fill_content = [self._last_valid_data[channel]] * remaining_fill_size if self._sample_and_hold else [ + 0] * remaining_fill_size + fill_data.input_data_on_channel(fill_content, channel) + return fill_data