Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time/Space stats #31

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions emulators/AsyncEmulator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import random
import sys
import threading
import time
from threading import Lock
Expand All @@ -16,6 +17,9 @@ def __init__(self, number_of_devices: int, kind):
self._terminated = 0
self._messages = {}
self._messages_sent = 0
self._time_started = time.perf_counter_ns()
self._data_space = 0 # data usage in bytes
self.s_ns = 1e9 # seconds to nanoseconds

def run(self):
self._progress.acquire()
Expand All @@ -40,9 +44,9 @@ def queue(self, message: MessageStub):
print(f'\tSend {message}')
if message.destination not in self._messages:
self._messages[message.destination] = []
self._messages[message.destination].append(copy.deepcopy(message)) # avoid accidental memory sharing
random.shuffle(self._messages[message.destination]) # shuffle to emulate changes in order
time.sleep(random.uniform(0.01, 0.1)) # try to obfuscate delays and emulate network delays
self._messages[message.destination].append(copy.deepcopy(message)) # avoid accidental memory sharing
random.shuffle(self._messages[message.destination]) # shuffle to emulate changes in order
time.sleep(random.uniform(0.01, 0.1)) # try to obfuscate delays and emulate network delays
self._progress.release()

def dequeue(self, index: int) -> Optional[MessageStub]:
Expand All @@ -57,18 +61,43 @@ def dequeue(self, index: int) -> Optional[MessageStub]:
m = self._messages[index].pop()
print(f'\tRecieve {m}')
self._progress.release()
self._data_space += _get_real_size(m)
return m

def done(self, index: int):
time.sleep(random.uniform(0.01, 0.1)) # try to obfuscate delays and emulate network delays
time.sleep(random.uniform(0.01, 0.1)) # try to obfuscate delays and emulate network delays
return


def print_statistics(self):
print(f'\tTotal {self._messages_sent} messages')
print(f'\tAverage {self._messages_sent/len(self._devices)} messages/device')
print(f'\tAverage {self._messages_sent / len(self._devices)} messages/device')
print(f'\tFull time elapsed: {round((time.perf_counter_ns() - self._time_started) / self.s_ns, 4)} seconds')
print(f'\tData transferred: {self._data_space} bytes')

def terminated(self, index:int):
def terminated(self, index: int):
self._progress.acquire()
self._terminated += 1
self._progress.release()
self._progress.release()


def _get_real_size(obj, seen=None):
"""
python offers no built-in to fully measure the size of an object, this function will achieve that
"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([_get_real_size(v, seen) for v in obj.values()])
size += sum([_get_real_size(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += _get_real_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum([_get_real_size(i, seen) for i in obj])
return size
41 changes: 36 additions & 5 deletions emulators/SyncEmulator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import copy
import random
import sys
import threading
import time
from typing import Optional

from emulators.EmulatorStub import EmulatorStub
Expand All @@ -18,6 +20,9 @@ def __init__(self, number_of_devices: int, kind):
self._current_round_messages = {}
self._messages_sent = 0
self._rounds = 0
self._time_started = time.perf_counter_ns()
self._data_space = 0 # data usage in bytes
self.s_ns = 1e9 # seconds to nanoseconds

def reset_done(self):
self._done = [False for _ in self.ids()]
Expand Down Expand Up @@ -52,7 +57,7 @@ def run(self):
self._current_round_messages = {}
self.reset_done()
self._rounds += 1
ids = [x for x in self.ids()] # convert to list to make it shuffleable
ids = [x for x in self.ids()] # convert to list to make it shuffleable
random.shuffle(ids)
for index in ids:
if self._awaits[index].locked():
Expand All @@ -68,7 +73,8 @@ def queue(self, message: MessageStub):
print(f'\tSend {message}')
if message.destination not in self._current_round_messages:
self._current_round_messages[message.destination] = []
self._current_round_messages[message.destination].append(copy.deepcopy(message)) # avoid accidental memory sharing
self._current_round_messages[message.destination].append(
copy.deepcopy(message)) # avoid accidental memory sharing
self._progress.release()

def dequeue(self, index: int) -> Optional[MessageStub]:
Expand All @@ -83,6 +89,7 @@ def dequeue(self, index: int) -> Optional[MessageStub]:
m = self._last_round_messages[index].pop()
print(f'\tReceive {m}')
self._progress.release()
self._data_space += _get_real_size(m)
return m

def done(self, index: int):
Expand All @@ -99,17 +106,41 @@ def done(self, index: int):
self._progress.release()
self._awaits[index].acquire()


def print_statistics(self):
print(f'\tTotal {self._messages_sent} messages')
print(f'\tAverage {self._messages_sent/len(self._devices)} messages/device')
print(f'\tAverage {self._messages_sent / len(self._devices)} messages/device')
print(f'\tTotal {self._rounds} rounds')
print(f'\tFull time elapsed: {round((time.perf_counter_ns() - self._time_started) / self.s_ns, 4)} seconds')
print(f'\tData transferred: {self._data_space} bytes')

def terminated(self, index:int):
def terminated(self, index: int):
self._progress.acquire()
self._done[index] = True
if all([self._done[x] or not self._threads[x].is_alive()
for x in self.ids()]):
if self._round_lock.locked():
self._round_lock.release()
self._progress.release()


def _get_real_size(obj, seen=None):
"""
python offers no built-in to fully measure the size of an object, this function will achieve that
"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([_get_real_size(v, seen) for v in obj.values()])
size += sum([_get_real_size(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += _get_real_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum([_get_real_size(i, seen) for i in obj])
return size