Skip to content

Commit

Permalink
completed version 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Liam Childs committed Sep 14, 2016
1 parent 543e381 commit b3c13de
Show file tree
Hide file tree
Showing 22 changed files with 577 additions and 186 deletions.
50 changes: 50 additions & 0 deletions examples/join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from mimo import Workflow, Stream


def main():
workflow = Workflow(10)
step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1))
step2 = workflow.add_stream(Stream(outs=['b'], fn=stream2))
step3 = workflow.add_stream(Stream(['c', 'd'], fn=stream3))

step1.pipe(step3, input='c')
step2.pipe(step3, input='d')

print(str(workflow))
workflow.run()


def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True


def stream2(ins, outs, state):
"""
Generates integers from 99 to 0.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(99, -1, -1))
iterator = state['iterator']
for item in iterator:
if not outs.b.push(item):
return True


def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0 and len(ins.d) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() + ins.d.pop()))

if __name__ == '__main__':
import sys
sys.exit(main())
48 changes: 48 additions & 0 deletions examples/linear.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from mimo import Workflow, Stream


def main():
workflow = Workflow(10)
step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1))
step2 = workflow.add_stream(Stream(['b'], ['c'], fn=stream2))
step3 = workflow.add_stream(Stream(['d'], fn=stream3))

step1.pipe(step2).pipe(step3)

print(str(workflow))
workflow.run()


def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True


def stream2(ins, outs, state):
"""
Multiplies the integers by 2.
"""
while len(ins.b) > 0:
item = ins.b.pop()
if not outs.c.push(item * 2):
break
return len(ins.b) > 0


def stream3(ins, outs, state):
"""
Print incoming entities to stdout
"""
while len(ins.d) > 0:
print(ins.d.pop())

if __name__ == '__main__':
import sys
sys.exit(main())
46 changes: 46 additions & 0 deletions examples/multi_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from mimo import Workflow, Stream


def main():
workflow = Workflow(10)
step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1))
step2 = workflow.add_stream(Stream(['b'], fn=stream2))
step3 = workflow.add_stream(Stream(['c'], fn=stream3))

step1.pipe(step2)
step1.pipe(step3)

print(str(workflow))
workflow.run()


def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True


def stream2(ins, outs, state):
"""
Multiply incoming entities by 2 and print to stdout
"""
while len(ins.b) > 0:
sys.stdout.write('{}\n'.format(2 * ins.b.pop()))


def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() / 10))

if __name__ == '__main__':
import sys
sys.exit(main())
46 changes: 46 additions & 0 deletions examples/split_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from mimo import Workflow, Stream


def main():
workflow = Workflow(10)
step1 = workflow.add_stream(Stream(outs=['a'], fn=stream1))
step2 = workflow.add_stream(Stream(['b'], fn=stream2))
step3 = workflow.add_stream(Stream(['c'], fn=stream3))

step1.pipe(step2)
step1.pipe(step3)

print(str(workflow))
workflow.run()


def stream1(ins, outs, state):
"""
Generates integers from 0 to 99.
"""
if 'iterator' not in state:
state['iterator'] = iter(range(100))
iterator = state['iterator']
for item in iterator:
if not outs.a.push(item):
return True


def stream2(ins, outs, state):
"""
Multiply incoming entities by 2 and print to stdout
"""
while len(ins.b) > 0:
sys.stdout.write('{}\n'.format(2 * ins.b.pop()))


def stream3(ins, outs, state):
"""
Divide incoming entities by 10 and print to stdout
"""
while len(ins.c) > 0:
sys.stdout.write('{}\n'.format(ins.c.pop() / 10))

if __name__ == '__main__':
import sys
sys.exit(main())
1 change: 1 addition & 0 deletions mimo/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .stream import Stream
from .workflow import Workflow
2 changes: 0 additions & 2 deletions mimo/connection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
from .connection import Connection
from .connection_set import ConnectionSet
33 changes: 2 additions & 31 deletions mimo/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,12 @@
class Connection:
def __init__(self, name, threshold=10):
self.entities = deque()
self.connections = set()

self.name = name
self.threshold = threshold

def peek(self):
return self.entities[0]

def pop(self):
return self.entities.popleft()

def push(self, entity):
"""
Add an entity to the end of the connection. Return if connection can still be pushed to.
:param entity:
:return:
"""
self.entities.append(entity)
return len(self.entities) < self.threshold

def extend(self, entities):
self.entities.extend(entities)
return len(self.entities) < self.threshold
def __len__(self):
return len(self.entities)

def is_full(self):
return len(self.entities) >= self.threshold

def join(self, connection):
self.connections.add(connection)

def drain(self):
entities = self.entities
if len(entities) == 0 or any(connection.is_full() for connection in self.connections):
return False
for connection in self.connections:
connection.extend(entities)
entities.clear()
return True
11 changes: 0 additions & 11 deletions mimo/connection/connection_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,3 @@ def __getattr__(self, key):

def __getitem__(self, key):
return self.connections[key]

def drain(self):
"""
Drain all connections and return the streams that received updates.
:return: Set of updated streams
"""
drained = set()
for connection in self.connections.values():
if connection.drain():
drained.add(connection.name)
return drained
13 changes: 13 additions & 0 deletions mimo/connection/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .connection import Connection


class Input(Connection):
def peek(self):
return self.entities[0]

def pop(self):
return self.entities.popleft()

def extend(self, entities):
self.entities.extend(entities)
return len(self.entities) < self.threshold
12 changes: 12 additions & 0 deletions mimo/connection/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .connection import Connection


class Output(Connection):
def push(self, entity):
"""
Add an entity to the end of the connection. Return if connection can still be pushed to.
:param entity:
:return:
"""
self.entities.append(entity)
return len(self.entities) < self.threshold
96 changes: 29 additions & 67 deletions mimo/stream/stream.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,46 @@
from mimo.connection import Connection, ConnectionSet
from mimo.connection.connection_set import ConnectionSet


class Stream:

__slots__ = ('state', 'ins', 'outs', 'name', 'fn')

IN = []
OUT = []

def __init__(self, ins=None, outs=None, name=None):
self.paused = False
def __init__(self, ins=None, outs=None, fn=None, name=None):
"""
Initialise a stream. Streams can be sub-classed to alter the behaviour or customised directly.
If sub-classing a stream, the class members `IN` and `OUT` define the names of the input and output entities.
Overriding the `run` function will determine what the stream does and the name of the class determines the name
of the stream.
If creating a stream directly, the parameters `ins` and `outs` define the names of the input and output
entities. The `fn` parameter is a function that will determine what the stream does. This function takes a set
of inputs, a set of outputs and the state of the stream as a dictionary. The `name` parameter determines the
name of the stream.
:param ins: names of input entities
:param outs: names of output entities
:param name: name of the stream
:param fn: run function
"""
self.state = {}

ins = self.IN if ins is None else ins
outs = self.OUT if outs is None else outs
self.ins = ConnectionSet(Connection(name) for name in ins)
self.outs = ConnectionSet(Connection(name) for name in outs)
self.children = {out: set() for out in outs}
self.ins = self.IN if ins is None else ins
self.outs = self.OUT if outs is None else outs
self.fn = fn
self.name = type(self).__name__ if name is None else name

def run(self, ins, outs):
"""
The main method to over-ride when implementing custom streams.
The main method to over-ride when implementing custom streams. This can also be over-ridden by providing the
'fn' parameter when creating a new stream.
:param ins: connection set of input connections
:type ins: ConnectionSet
:param outs: connection set of output connections
:type outs: ConnectionSet
:return: True if stream is paused
"""
raise NotImplementedError

def activate(self):
"""
Run a step and propogate the output entities to any connected child streams.
:return:
:return: True if stream is did not finish running (eg. was suspended because output was full)
:rtype: bool
"""
run = self.run
ins = self.ins
outs = self.outs
children = self.children

paused = True
while paused:
paused = run(ins, outs)
updated_streams = set()
for streams in children.values():
updated_streams.update(stream for stream in streams if stream.paused)
for out in outs.drain():
updated_streams.update(children[out])
for updated_stream in updated_streams:
updated_stream.activate()
self.paused = paused

def pipe(self, stream, output=None, input=None):
"""
Pipe the output of one stream to the input of another. If there are more than one outputs or inputs, the
specific output/input must be specified.
:param stream: stream to connect to
:param output: name of the output connection (default: None)
:param input: name of the input connection (default: None)
:return: stream connected to
"""
if len(self.outs) == 0:
raise ValueError('{} has no output to pipe from'.format(self.name))
elif len(stream.ins) == 0:
raise ValueError('{} has no input to pipe to'.format(stream.name))

if output is None:
if len(self.outs) == 1:
from_connection = next(iter(self.outs))
else:
raise ValueError('{} has multiple output and none chosen to pipe from'.format(self.name))
else:
from_connection = self.outs[output]

if input is None:
if len(stream.ins) == 1:
to_connection = next(iter(stream.ins))
else:
raise ValueError('{} has multiple output and none chosen to pipe from'.format(stream.name))
else:
to_connection = stream.ins[input]

from_connection.join(to_connection)
self.children[from_connection.name].add(stream)
return stream
return self.fn(ins, outs, self.state)
Loading

0 comments on commit b3c13de

Please sign in to comment.