diff --git a/mimo/workflow/node.py b/mimo/workflow/node.py index 5d0baca..ae65797 100644 --- a/mimo/workflow/node.py +++ b/mimo/workflow/node.py @@ -42,3 +42,15 @@ def pipe(self, step, output=None, input=None): self.workflow.graph.add_edge(output_id, input_id) self.workflow.outputs[output_id].pipe(self.workflow.inputs[input_id]) return step + + def push(self, item, input=None): + if input is None: + if len(self.input_ids) == 1: + input_id = next(iter(self.input_ids.values())) + else: + msg = '{} has multiple input and none chosen to push to' + raise ValueError(msg.format(self.workflow.streams[self.stream_id])) + else: + input_id = self.input_ids[input] + + self.workflow.inputs[input_id].put_nowait(item) diff --git a/setup.py b/setup.py index 7518732..b30355e 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name='mimo', - version='1.0.9', + version='1.0.10', author='Liam H. Childs', author_email='liam.h.childs@gmail.com', packages=find_packages(exclude=['tests']), diff --git a/tests/test_workflow/test_node.py b/tests/test_workflow/test_node.py index 1d9ed38..83a3140 100644 --- a/tests/test_workflow/test_node.py +++ b/tests/test_workflow/test_node.py @@ -14,6 +14,14 @@ def test_pipe(self): self.assertEqual(2, len(workflow.streams)) self.assertIn(step2.input_ids['c'], workflow.graph.graph.adjacency[step1.output_ids['b']].children) + def test_push(self): + workflow = Workflow() + step = workflow.add_stream(Stream(['a'], ['b'])) + + step.push(0) + + self.assertEqual(0, workflow.inputs[step.input_ids['a']].get_nowait()) + if __name__ == '__main__': unittest.main()