Skip to content

Commit

Permalink
fix on synchronize: handle async buffer inputs, keep trimmed data for…
Browse files Browse the repository at this point in the history
… next execution
  • Loading branch information
arthurhauer committed Oct 14, 2024
1 parent 036b6c3 commit 75a9ac1
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 106 deletions.
10 changes: 8 additions & 2 deletions models/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from models.framework_data import FrameworkData


# TODO Isolamento de nodes em threads separadas. Cada nó deve ser executado em uma thread
class Node:
_MODULE_NAME: Final[str] = 'models.node'
"""Abstract base class for processing pipeline execution on this framework.
Expand Down Expand Up @@ -46,6 +45,7 @@ def __init__(self, parameters=None) -> None:
self.thread = None
self.new_data_available = False
self.condition = threading.Condition()
self.is_running_main_process = False

def _build_graph_inputs(self):
return f"""
Expand Down Expand Up @@ -255,19 +255,25 @@ def _call_children(self):
def _thread_runner(self):
while self.running:
with self.condition:
while not self.new_data_available and self.running:
while (not self.new_data_available) and self.running:
self.condition.wait()

if not self.running:
break

while not self.local_storage.empty():
input_name, data = self.local_storage.get()
try:
self.is_running_main_process = True
self._run(data, input_name)
except Exception as e:
self.print(f'Error: {e}', exception=e)
raise e
finally:
self.is_running_main_process = False
if self._is_next_node_call_enabled():
self._call_children()

self.new_data_available = False

def run(self, data: FrameworkData = None, input_name: str = None) -> None:
Expand Down
3 changes: 2 additions & 1 deletion models/node/processing/processing_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import copy
from typing import List, Dict, Final

from models.exception.missing_parameter import MissingParameterError
Expand Down Expand Up @@ -102,7 +103,7 @@ def _process_input_buffer(self):
if not self._is_processing_condition_satisfied():
return
self.print('Starting processing of input buffer')
processed_data = self._process(self._input_buffer)
processed_data = self._process(copy.deepcopy(self._input_buffer))
if self._clear_input_buffer_after_process:
self._clear_input_buffer()
if self._clear_output_buffer_after_process:
Expand Down
Loading

0 comments on commit 75a9ac1

Please sign in to comment.