Skip to content

Commit

Permalink
Merge pull request #5 from childsish/node-push
Browse files Browse the repository at this point in the history
Node push
  • Loading branch information
childsish authored Sep 26, 2016
2 parents 39fd6b6 + 5057605 commit 17f3514
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
12 changes: 12 additions & 0 deletions mimo/workflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,15 @@ def pipe(self, step, output=None, input=None):
self.workflow.graph.add_edge(output_id, input_id)
self.workflow.outputs[output_id].pipe(self.workflow.inputs[input_id])
return step

def push(self, item, input=None):
if input is None:
if len(self.input_ids) == 1:
input_id = next(iter(self.input_ids.values()))
else:
msg = '{} has multiple input and none chosen to push to'
raise ValueError(msg.format(self.workflow.streams[self.stream_id]))
else:
input_id = self.input_ids[input]

self.workflow.inputs[input_id].put_nowait(item)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='mimo',
version='1.0.9',
version='1.0.10',
author='Liam H. Childs',
author_email='[email protected]',
packages=find_packages(exclude=['tests']),
Expand Down
8 changes: 8 additions & 0 deletions tests/test_workflow/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ def test_pipe(self):
self.assertEqual(2, len(workflow.streams))
self.assertIn(step2.input_ids['c'], workflow.graph.graph.adjacency[step1.output_ids['b']].children)

def test_push(self):
workflow = Workflow()
step = workflow.add_stream(Stream(['a'], ['b']))

step.push(0)

self.assertEqual(0, workflow.inputs[step.input_ids['a']].get_nowait())


if __name__ == '__main__':
unittest.main()

0 comments on commit 17f3514

Please sign in to comment.