Skip to content

Commit

Permalink
added basic untested mimo
Browse files Browse the repository at this point in the history
  • Loading branch information
Liam Childs committed Sep 12, 2016
1 parent d933972 commit 543e381
Show file tree
Hide file tree
Showing 17 changed files with 335 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,5 @@ ENV/

# Rope project settings
.ropeproject

.idea
1 change: 1 addition & 0 deletions mimo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .stream import Stream
2 changes: 2 additions & 0 deletions mimo/connection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .connection import Connection
from .connection_set import ConnectionSet
44 changes: 44 additions & 0 deletions mimo/connection/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from collections import deque


class Connection:
def __init__(self, name, threshold=10):
self.entities = deque()
self.connections = set()

self.name = name
self.threshold = threshold

def peek(self):
return self.entities[0]

def pop(self):
return self.entities.popleft()

def push(self, entity):
"""
Add an entity to the end of the connection. Return if connection can still be pushed to.
:param entity:
:return:
"""
self.entities.append(entity)
return len(self.entities) < self.threshold

def extend(self, entities):
self.entities.extend(entities)
return len(self.entities) < self.threshold

def is_full(self):
return len(self.entities) >= self.threshold

def join(self, connection):
self.connections.add(connection)

def drain(self):
entities = self.entities
if len(entities) == 0 or any(connection.is_full() for connection in self.connections):
return False
for connection in self.connections:
connection.extend(entities)
entities.clear()
return True
29 changes: 29 additions & 0 deletions mimo/connection/connection_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class ConnectionSet:
def __init__(self, connections):
self.connections = {connection.name: connection for connection in connections}
self.streams = {connection.name: set() for connection in connections}

def __iter__(self):
return iter(self.connections.values())

def __len__(self):
return len(self.connections)

def __getattr__(self, key):
if key in self.connections:
return self.connections[key]
return self.__getattribute__(key)

def __getitem__(self, key):
return self.connections[key]

def drain(self):
"""
Drain all connections and return the streams that received updates.
:return: Set of updated streams
"""
drained = set()
for connection in self.connections.values():
if connection.drain():
drained.add(connection.name)
return drained
1 change: 1 addition & 0 deletions mimo/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .stream import Stream
17 changes: 17 additions & 0 deletions mimo/stream/apply_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from mimo.stream import Stream


class ApplyStream(Stream):

IN = ['entity']
OUT = ['entity']

def __init__(self, fn):
super().__init__()
self.fn = fn

def run(self, ins, outs):
fn = self.fn
while len(ins.entity) > 0 and outs.entity.push(fn(ins.entity.pop())):
continue
return len(ins.entity) > 0
9 changes: 9 additions & 0 deletions mimo/stream/converter_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from mimo.stream import Stream


class ConverterStream(Stream):

IN = ['entity']
OUT = ['entity']

def __init__(self, ):
21 changes: 21 additions & 0 deletions mimo/stream/extractor_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from mimo.stream import Stream


class ExtractorStream(Stream):

IN = ['entity']
OUT = ['entity']

def __init__(self, path, ins, outs):
super().__init__(ins, outs)
self.path = path

def run(self, ins, outs):
path = self.path
while len(ins.entity) > 0:
entity = ins.entity.pop()
for step in path:
entity = step(entity)
if not outs.entity.push(entity):
break
return len(ins.entity) > 0
17 changes: 17 additions & 0 deletions mimo/stream/iterator_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from mimo.stream import Stream


class IteratorStream(Stream):

IN = []
OUT = ['line']

def __init__(self, iterator):
super().__init__()
self.iterator = iterator

def run(self, ins, outs):
for line in self.iterator:
if not outs.line.push(line):
return True
return False
84 changes: 84 additions & 0 deletions mimo/stream/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from mimo.connection import Connection, ConnectionSet


class Stream:

IN = []
OUT = []

def __init__(self, ins=None, outs=None, name=None):
self.paused = False

ins = self.IN if ins is None else ins
outs = self.OUT if outs is None else outs
self.ins = ConnectionSet(Connection(name) for name in ins)
self.outs = ConnectionSet(Connection(name) for name in outs)
self.children = {out: set() for out in outs}
self.name = type(self).__name__ if name is None else name

def run(self, ins, outs):
"""
The main method to over-ride when implementing custom streams.
:param ins: connection set of input connections
:type ins: ConnectionSet
:param outs: connection set of output connections
:type outs: ConnectionSet
:return: True if stream is paused
"""
raise NotImplementedError

def activate(self):
"""
Run a step and propogate the output entities to any connected child streams.
:return:
"""
run = self.run
ins = self.ins
outs = self.outs
children = self.children

paused = True
while paused:
paused = run(ins, outs)
updated_streams = set()
for streams in children.values():
updated_streams.update(stream for stream in streams if stream.paused)
for out in outs.drain():
updated_streams.update(children[out])
for updated_stream in updated_streams:
updated_stream.activate()
self.paused = paused

def pipe(self, stream, output=None, input=None):
"""
Pipe the output of one stream to the input of another. If there are more than one outputs or inputs, the
specific output/input must be specified.
:param stream: stream to connect to
:param output: name of the output connection (default: None)
:param input: name of the input connection (default: None)
:return: stream connected to
"""
if len(self.outs) == 0:
raise ValueError('{} has no output to pipe from'.format(self.name))
elif len(stream.ins) == 0:
raise ValueError('{} has no input to pipe to'.format(stream.name))

if output is None:
if len(self.outs) == 1:
from_connection = next(iter(self.outs))
else:
raise ValueError('{} has multiple output and none chosen to pipe from'.format(self.name))
else:
from_connection = self.outs[output]

if input is None:
if len(stream.ins) == 1:
to_connection = next(iter(stream.ins))
else:
raise ValueError('{} has multiple output and none chosen to pipe from'.format(stream.name))
else:
to_connection = stream.ins[input]

from_connection.join(to_connection)
self.children[from_connection.name].add(stream)
return stream
Empty file added tests/__init__.py
Empty file.
Empty file.
60 changes: 60 additions & 0 deletions tests/test_connection/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest

from mimo.connection import Connection
from mimo.stream import Stream


class TestConnection(unittest.TestCase):
def test_peek(self):
connection = Connection('a')

self.assertRaises(IndexError, connection.peek)
connection.entities.append(1)
self.assertEqual(1, connection.peek())

def test_pop(self):
connection = Connection('a')

self.assertRaises(IndexError, connection.pop)
connection.entities.extend((1, 2, 3))
self.assertEqual(1, connection.pop())
self.assertEqual([2, 3], list(connection.entities))

def test_push(self):
connection = Connection('a', 3)

self.assertTrue(connection.push(1))
self.assertEqual([1], list(connection.entities))
self.assertTrue(connection.push(2))
self.assertEqual([1, 2], list(connection.entities))
self.assertFalse(connection.push(3))
self.assertEqual([1, 2, 3], list(connection.entities))

def test_extend(self):
connection = Connection('a', 3)

self.assertFalse(connection.extend((1, 2, 3, 4)))
self.assertEqual([1, 2, 3, 4], list(connection.entities))

def test_connect_to_input(self):
connection = Connection('a')
stream = Stream(['input'], ['output'])

connection.join(stream, 'input')

self.assertEqual(stream, next(iter(connection.streams)))
self.assertEqual(stream.ins['input'], next(iter(connection.connections)))

def test_drain(self):
connection = Connection('a')
stream = Stream(['input', 'output'])

connection.join(stream, 'input')
connection.entities.extend([1, 2, 3, 4, 5])
connection.drain()

self.assertEqual([1, 2, 3, 4, 5], list(stream.ins.input.entities))


if __name__ == '__main__':
unittest.main()
30 changes: 30 additions & 0 deletions tests/test_connection/test_connection_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import unittest

from mimo.connection import Connection, ConnectionSet


class TestConnectionSet(unittest.TestCase):
def test_items_and_attributes(self):
a = Connection('a')
b = Connection('b')
connections = ConnectionSet([a, b])

self.assertEqual(a, connections.a)
self.assertEqual(b, connections.b)
self.assertEqual(a, connections['a'])
self.assertEqual(b, connections['b'])

def test_drain(self):
connection = Connection('a')
connection.entities.extend((1, 2, 3, 4))

self.assertTrue(connection.drain())

def test_drain_empty(self):
connection = Connection('a')

self.assertFalse(connection.drain())


if __name__ == '__main__':
unittest.main()
Empty file added tests/test_stream/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions tests/test_stream/test_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import unittest

from mimo import Stream


class TestStream(unittest.TestCase):
def test_pipe(self):
stream1 = Stream(['a'], ['b'])
stream2 = Stream(['c'], ['d'])

stream1.pipe(stream2)

self.assertEqual({stream2}, stream1.children['b'])
self.assertEqual({stream2.ins['c']}, stream1.outs['b'].connections)


if __name__ == '__main__':
unittest.main()

0 comments on commit 543e381

Please sign in to comment.