Skip to content

Commit

Permalink
Add services and intro notebooks
Browse files Browse the repository at this point in the history
  • Loading branch information
grimadas committed Mar 18, 2020
1 parent a8dd838 commit f9a0251
Show file tree
Hide file tree
Showing 43 changed files with 2,297 additions and 5,299 deletions.
1,359 changes: 1,359 additions & 0 deletions 02_Services.ipynb

Large diffs are not rendered by default.

5,606 changes: 626 additions & 4,980 deletions Intro.ipynb

Large diffs are not rendered by default.

File renamed without changes.
File renamed without changes.
File renamed without changes.
Binary file added p2psimpy/__pycache__/config.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/config.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/consts.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/defaults.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/logger.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/logger.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/messages.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/messages.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/network.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/network.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/peer.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/peer.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/peer_factory.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/peer_factory.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/simulation.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/simulation.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/storage.cpython-37.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/storage.pypy36.pyc
Binary file not shown.
Binary file added p2psimpy/__pycache__/utils.cpython-37.pyc
Binary file not shown.
Binary file not shown.
69 changes: 24 additions & 45 deletions p2psimpy/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import inspect
import yaml

import scipy.stats
from ast import literal_eval as make_tuple
from collections import namedtuple
from random import choices

import scipy.stats
import yaml

PeerType = namedtuple('PeerType', ('config', 'service_map'), defaults=(None, {}))


class Dist:

Expand All @@ -13,15 +16,12 @@ def __init__(self, name: str, params):
self.params = params

def to_repr(self):
return {'name': self.name, 'params': str(self.params)}
return {self.__class__.__name__: {'name': self.name, 'params': str(self.params)}}

@classmethod
def from_repr(cls, yaml_dict):
return cls(**yaml_dict)

# def __str__(self):
# return str({'Dist': {'name': self.name, 'params': str(self.params)}})

def generate(self, n=1):
"""
Generate 'n' random values with given distribution
Expand All @@ -40,6 +40,12 @@ def generate(self, n=1):
res = dist.rvs(*param[:-2], loc=param[-2], scale=param[-1], size=n)
return res if n != 1 else res[0]

def get(self):
return self.generate(1)


class DistAttr(Dist):

def __get__(self, inst, obj):
return self.generate(1)

Expand All @@ -65,7 +71,7 @@ def generate(self, n=1):
return self._wrap.generate(n)


class ConfigWrap:
class ConfigAttr:
def __init__(self, cls):
self.cls = cls

Expand All @@ -80,6 +86,11 @@ def get(self):


class Config:
@classmethod
def save_to_yaml(cls, yaml_file):
with open(yaml_file, 'w') as s:
yaml.dump(cls.repr(), s)

@classmethod
def _serialize(cls, val):
if isinstance(val, (tuple, list, dict)):
Expand All @@ -88,7 +99,7 @@ def _serialize(cls, val):
else:
return list(cls._serialize(k) for k in val)
else:
if type(val) == Wrap or type(val) == ConfigWrap:
if type(val) == Dist or type(val) == ConfigAttr:
return val.to_repr()
else:
return val
Expand All @@ -97,7 +108,7 @@ def _serialize(cls, val):
def _deserialize(cls, val):
if type(val) == dict:
if 'Dist' in val:
return Wrap(Dist(**val['Dist']))
return Dist(**val['Dist'])
else:
return {k: cls._deserialize(v) for k, v in val.items()}
else:
Expand All @@ -124,13 +135,13 @@ def _get(cls, val):
return {k: cls._get(v) for k, v in val.items()}
elif isinstance(val, list):
return [cls._get(v) for v in val]
if type(val) == Wrap or type(val) == ConfigWrap:
if type(val) == Dist or type(val) == ConfigAttr:
return val.get()
else:
return val

@classmethod
def get(cls, n=1):
def get(cls):
full_dict = dict()
for i in inspect.getmembers(cls):
if not i[0].startswith('_') and not callable(i[1]):
Expand All @@ -145,43 +156,11 @@ def from_repr(cls, cls_name, yaml_dict):
setattr(cls, k, v)


class PeerNameGenerator:

def __init__(self):
self.peer_indexes = dict() # type -> number map

def generate_name(self, peer_type: str):
if peer_type not in self.peer_indexes:
# New peer type => Init
self.peer_indexes[peer_type] = 0
self.peer_indexes[peer_type] += 1
return peer_type + "_" + str(self.peer_indexes[peer_type])


class ConfigLoader:

@staticmethod
def load_services():
with open('p2psimpy/input/config.yml') as s:
return yaml.safe_load(s)

@staticmethod
def load_latencies():
with open('p2psimpy/input/locations.yml') as s:
return list(yaml.safe_load_all(s))[1]


def load_config_from_yaml(yaml_file):
class NewConfig(Config): pass

with open(yaml_file) as s:
raw = yaml.load(s)
raw = yaml.safe_load(s)
cls_name = list(raw.keys())[0]
NewConfig.from_repr(cls_name, raw[cls_name])
return NewConfig


def from_config(cls, name_gen: PeerNameGenerator, peer_type: str, config: dict):
params = [name_gen.generate_name(peer_type)]
params.extend([load_config_parameter(config[field]) for field in ('location', 'bandwidth_ul', 'bandwidth_dl')])
return cls(*params)
19 changes: 19 additions & 0 deletions p2psimpy/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from p2psimpy.config import *
from p2psimpy.consts import MBit

from p2psimpy.services.connection_manager import P2PConnectionManager, BaseConnectionManager


class BootstrapPeerConfig(Config):
bandwidth_ul = 100 * MBit
bandwidth_dl = 100 * MBit


class ConnectionConfig(Config):
max_peers = 100000


def get_default_bootstrap_type(locations, active_p2p=False):
BootstrapPeerConfig.location = Dist('sample', locations)
services = {P2PConnectionManager: ConnectionConfig} if active_p2p else (BaseConnectionManager,)
return PeerType(BootstrapPeerConfig, services)
4 changes: 2 additions & 2 deletions p2psimpy/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
formatter = logging.Formatter('%(name)s %(levelname)s %(message)s')


def setup_logger(name, log_file, level=logging.INFO):
def setup_logger(name, log_file, level=logging.INFO, mode='w'):
"""To setup as many loggers as you want"""

handler = logging.FileHandler(log_file)
handler = logging.FileHandler(log_file, mode=mode)
handler.setFormatter(formatter)

logger = logging.getLogger(name)
Expand Down
8 changes: 7 additions & 1 deletion p2psimpy/messages.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
class BaseMessage(object):
__slots__ = ('sender', 'data')
base_size = 20

def __init__(self, sender, data=None):
Expand All @@ -10,7 +11,9 @@ def size(self):
return self.base_size + len(repr(self.data))

def __repr__(self):
return '<%s>' % self.__class__.__name__
msg_type = '%s:' % self.__class__.__name__
data = self.data if self.data else ""
return msg_type + data


########## Messages ###############
Expand All @@ -36,6 +39,9 @@ class PeerList(BaseMessage):
def __init__(self, sender, peers):
super().__init__(sender, set(peers))

def __repr__(self):
return 'PeerList'


class Hello(BaseMessage):
"""Offer a peer to connect"""
Expand Down
10 changes: 6 additions & 4 deletions p2psimpy/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ def __init__(self, sender, receiver):
:param locations: Map that contains the latencies between locations
"""
self.env = sender.env
self.get_latency = self.sender.sim.get_latency_delay

self.sender = sender
self.receiver = receiver

self.get_latency = self.sender.sim.get_latency_delay
self.start_time = self.env.now

def __repr__(self):
return '<Connection %r -> %r>' % (self.sender, self.receiver)

@property
def latency(self):
return self.get_latency(self.sender.location, self.receiver.location)
return max(self.get_latency(self.sender.location, self.receiver.location), 0)

@property
def bandwidth(self):
Expand All @@ -32,11 +33,12 @@ def send(self, msg, connect=False):

def _transfer():
bytes = msg.size
sender = msg.sender
delay = bytes / self.sender.bandwidth_ul
delay += bytes / self.receiver.bandwidth_dl
delay += self.latency / 2
yield self.env.timeout(delay)
if self.receiver.is_connected(msg.sender) or connect:
if self.receiver.is_connected(sender) or connect:
self.receiver.msg_queue.put(msg)

self.env.process(_transfer())
62 changes: 39 additions & 23 deletions p2psimpy/peer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import os
import random

Expand All @@ -12,7 +11,8 @@

class Peer:

def __init__(self, sim, name: str, location: str, bandwidth_ul: float, bandwidth_dl: float):
def __init__(self, sim, peer_id: int, peer_type: str,
location: str, bandwidth_ul: float, bandwidth_dl: float, **kwargs):
"""
Physical representation of a Peer
:param sim: Simulation environment
Expand All @@ -21,14 +21,17 @@ def __init__(self, sim, name: str, location: str, bandwidth_ul: float, bandwidth
"""
self.sim = sim
self.env = sim.env
self.name = name
self.peer_type = peer_type
self.peer_id = peer_id
self.name = str(peer_id) + ":" + str(peer_type)
self.location = location
self.bandwidth_ul = bandwidth_ul
self.bandwidth_dl = bandwidth_dl
self.__dict__.update(kwargs)

peer_repr = repr(self)
self.log_name = os.path.join(sim.sim_dir, peer_repr+".log")
self.logger = setup_logger(peer_repr, self.log_name )
self.log_name = os.path.join(sim.sim_dir, peer_repr + ".log")
self.logger = setup_logger(peer_repr, self.log_name)

# Message queue for the received messages
self.msg_queue = Store(self.env)
Expand All @@ -52,7 +55,7 @@ def __init__(self, sim, name: str, location: str, bandwidth_ul: float, bandwidth
self.env.process(self.run())

def __repr__(self):
return '%s %s' % (self.__class__.__name__, self.name)
return '%s_%s' % (self.__class__.__name__, self.name)

def __lt__(self, other):
return self.name < other.name
Expand Down Expand Up @@ -81,7 +84,7 @@ def connect(self, other):
:param other: peer object
"""
if not self.is_connected(other):
self.logger.info("Connecting to %s", repr(other))
self.logger.info("%s: Connecting to %s", self.env.now, repr(other))
self.connections[other] = Connection(self, other)
# We create bilateral connection
if not other.is_connected(self):
Expand All @@ -106,12 +109,14 @@ def receive(self, msg):
:param msg:
:return:
"""
assert isinstance(msg, BaseMessage) # Make sure the message is known
self.logger.info("%s: Received message %s", self.env.now, type(msg))
if type(msg) not in self.mh_map:
self.logger.error("No handler for the message %s", type(msg))
raise Exception("No handler for the message %s", type(msg))
for service_id in self.mh_map[type(msg)]:
msg_type = type(msg)
msg_sender = msg.sender
self.logger.info("%s: Received message <%s> from %s", self.env.now, repr(msg), msg_sender)

if msg_type not in self.mh_map:
self.logger.error("No handler for the message %s", msg_type)
raise Exception("No handler for the message %s at peer %s", msg_type, repr(self))
for service_id in self.mh_map[msg_type]:
self.handlers[service_id].handle_message(msg)

def send(self, receiver, msg):
Expand All @@ -120,29 +125,40 @@ def send(self, receiver, msg):
If receiver is not connected will raise and exception
"""
# fire and forget
assert msg.sender == self, "Sending peer should be same %s %s" % (msg.sender, self)
if receiver not in self.connections:
self.logger.error("%s: Sending message to a not connected peer %s",
self.env.now, repr(receiver))
raise Exception("Not connected")
self.logger.info("%s: Sending message <%s> to %s", self.env.now, repr(msg), receiver)
self.connections[receiver].send(msg)

def gossip(self, msg, f, except_set: set = None):
def _get_connections(self, exclude_bootstrap=True, except_set: set = None, except_type: set = None):
if except_set is None:
except_set = set()
if except_type is None:
except_type = set()
if exclude_bootstrap:
except_type.add('bootstrap')
conn_set = set(self.connections.keys()) - except_set
return (p for p in conn_set if p.peer_type not in except_type)

def gossip(self, msg, f, exclude_bootstrap=True, except_peers: set = None, except_type: set = None):
"""
Send to f neighbours selected randomly
:param msg: Message
:param f: the fanout parameter (number of peers to gossip to)
:type except_set: connected peers to exclude from gossip
:param exclude_bootstrap: Exclude bootstrap from gossip
:param except_peers: connected peers to exclude from gossip
:param except_type: exclude from gossip type of peers
"""
if except_set is None:
except_set = set()
gossip_set = set(self.connections.keys()) - except_set
for other in random.sample(list(gossip_set), min(f, len(gossip_set))):
gossip_set = list(self._get_connections(exclude_bootstrap, except_peers, except_type))

for other in random.sample(gossip_set, min(f, len(gossip_set))):
self.send(other, msg)

def broadcast(self, msg):
"""Send to all connected peers """
for other in self.connections:
def broadcast(self, msg, exclude_bootstrap=True, except_set: set = None, except_type: set = None):
"""Send to all connected peers except given """
for other in self._get_connections(exclude_bootstrap, except_set, except_type):
self.send(other, msg)

def add_service(self, service):
Expand Down
Loading

0 comments on commit f9a0251

Please sign in to comment.