Skip to content

Commit

Permalink
p2psim: Add visualizer
Browse files Browse the repository at this point in the history
  • Loading branch information
sonhv0212 committed Dec 24, 2024
1 parent f22279e commit bc138c9
Show file tree
Hide file tree
Showing 2 changed files with 334 additions and 1 deletion.
11 changes: 10 additions & 1 deletion p2p/simulations/discovery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ After running the simulation, some files will be generated in the `results_dir`
- `$test_name.log`: Log of all nodes in the simulation network
- `stats_$test_name.csv`: Statistics of the simulation, including the number of peers, distribution of nodes in the DHT, ...
- `peers_$test_name.log`: List peers of each node in the network
- `dht_$test_name.log`: List nodes in the DHT of each node
- `dht_$test_name.log`: List nodes in the DHT of each node

### Visualization

To visualize the data, we can use the `discovery.py` script to plot the data.
Supported types:
- `dht_peer`: Ratio between the number of peers (outbound) and the number of nodes in the DHT
- `PeerCount`: Number of peers of each node
- `DHTBuckets`: Size of DHT
- And more type can see in the stats file
324 changes: 324 additions & 0 deletions p2p/simulations/discovery/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
import argparse
import os
import pandas as pd
from matplotlib import pyplot as plt

'''
Check if the node_id has a valid prefix
@param node_id: the node_id to check
@param prefixes: the prefixes to check
@return True if the node_id has a valid prefix
'''
def node_valid_with_prefixes(node_id, prefixes):
if prefixes is None:
return True
for prefix in prefixes:
if node_id.startswith(prefix):
return True
return False

'''
Visualize the stats file generated by the simulation
'''
class StatsPlotter:
'''
@param stats_file: the stats file generated by the simulation
@param result_dir: the directory to save the result
@param metric: the metric to plot (e.g., "PeerCount", "DHTBuckets")
@param P: Pth percentile to plot (e.g., 50, 90, 99; -1 means average)
@param time_range: the time range (from start the simulation) to plot
@param prefixes: the prefixes to filter the nodes
'''
def __init__(self, stats_file, result_dir, metric, P, time_range=[-1,-1], prefixes=["node", "bootnode", "outbound"]):
self.stats_file = stats_file
self.result_dir = result_dir
self.prefixes = prefixes
self.metric = metric
self.P = P
self.time_range = time_range
os.makedirs(result_dir, exist_ok=True)

'''
Parse the stats file
@return: data_nodes is a dictionary where each key is a node, and its
data which is related to the metric, sorted by timestamps
'''
def parse_stats(self):
df = pd.read_csv(self.stats_file)
timestamps = df['timestamp'].unique()
timestamps.sort()
self.nodes = df['node'].unique()

# original timestamps and filter by time range
self.start_time = timestamps[0]
self.timestamps = []
for ts in timestamps:
if self.time_range[0] == -1 or (self.time_range[0] <= ts - self.start_time and ts - self.start_time <= self.time_range[1]):
self.timestamps.append(ts)

self.data_nodes = dict()
df = df[df['type'] == self.metric]
for node in self.nodes:
# filter node by prefixes
if not node_valid_with_prefixes(node, self.prefixes):
continue

df_node = df[df['node'] == node]
if node not in self.data_nodes:
self.data_nodes[node] = []

for ts in self.timestamps:
value_str = df_node[df_node['timestamp'] == ts]['value']
if value_str.empty:
self.data_nodes[node].append(0)
else:
# try to convert value to int, otherwise consider it as a array and sum it
value = 0
try:
value = int(value_str.values[0])
except:
try:
value = sum([int(v) for v in value_str.values[0][1:-1].split(" ")])
except:
print("Error: ", value_str.values[0])
self.data_nodes[node].append(value)

return self.data_nodes

'''
Group nodes by timestamp that starts rolling out their batch
(e.g., node-1734890389-17 -> 1734890389, outbound-1734887683-49 -> 1734887683)
@return: results is a dictionary where each key is a timestamp, and its
value is a list of nodes that rolled out at that timestamp
'''
def node_group_by_timestamp(self):
node_groups = dict()
for node in self.data_nodes:
ts = int(node.split("-")[1])
if ts not in node_groups:
node_groups[ts] = []
node_groups[ts].append(node)
return node_groups

'''
Calculate the average of the nodes in each group
@param node_groups: the node groups to calculate the average
@return: results is a dictionary where each key is name of the groups, and
its value is the average data of the nodes in that group in time range
'''
def calc_avg_by_group(self, node_groups):
results = dict()
for group in node_groups:
results[group] = []
nodes = node_groups[group]
for i, ts in enumerate(self.timestamps):
data = [self.data_nodes[node][i] for node in nodes]
results[group].append(sum(data) / len(data))
return results

'''
Calculate the Pth percentile of the nodes in each group
@param node_groups: the node groups to calculate the Pth percentile
@param P: the Pth percentile to calculate
@return: results is a dictionary where each key is name of the groups, and
its value is the Pth percentile data of the nodes in that group in time range
'''
def calc_p_by_group(self, node_groups, P, reverse=False):
results = dict()
for group in node_groups:
results[group] = []
nodes = node_groups[group]
for i, ts in enumerate(self.timestamps):
data = [self.data_nodes[node][i] for node in nodes]
data.sort(reverse=reverse)
idx = int(len(data) * P / 100)
results[group].append(data[idx])
return results

def plot(self):
self.parse_stats()
node_groups = self.node_group_by_timestamp()
results = self.calc_avg_by_group(node_groups) if self.P == -1 else self.calc_p_by_group(node_groups, self.P, reverse=True)
normalized_timestamps = [ts - self.start_time for ts in self.timestamps]

plt.figure(figsize=(10, 10))
sorted_groups = list(results.keys())
sorted_groups.sort()
for i, group in enumerate(sorted_groups):
plt.plot(normalized_timestamps, results[group], label=f"batch {i}" if i > 0 else "bootnode")
plt.xlabel("Time (s)")
plt.ylabel(self.metric)
plt.title(f"P{self.P} {self.metric}")
if self.P == -1:
plt.title(f"Average {self.metric}")
plt.legend()
plt.savefig(f"{self.result_dir}/{self.metric}_P{self.P}.png")

'''
Visualize the DHT and peers infor
'''
class PeerDHTPlotter:
def __init__(self, dht_log_file, peer_log_file, result_dir, prefixes=["node", "bootnode", "outbound"]):
self.dht_log_file = dht_log_file
self.peer_log_file = peer_log_file
self.result_dir = result_dir
self.prefixes = prefixes
os.makedirs(result_dir, exist_ok=True)

'''
Parse the DHT log file
@return: dht_data is a dictionary where each key is a node_id, and its
value is a set of peers in the DHT
'''
def parse_dht_log(self):
dht_data = dict()
with open(self.dht_log_file, "r") as f:
for line in f:
line = line.strip()
if line == '':
continue

# node_id: [peer1, peer2, ...], [peer3, peer4, ...], ...
node_id, peer_list = line.strip().split(':')
if not node_valid_with_prefixes(node_id, self.prefixes):
continue

buckets = peer_list.strip().split(',')
for bucket in buckets:
if bucket == '[]':
continue
peer_list = bucket.strip()[1:-1].split(' ')
for peer in peer_list:
if peer == '':
continue
if node_id not in dht_data:
dht_data[node_id] = set()
dht_data[node_id].add(peer)
return dht_data

'''
Parse the peers log file
@return: peer_data is a dictionary where each key is a node_id, and its
value is a list of peers that connected (oubound) to the node
'''
def parse_peers_log(self, only_outbound=True):
peer_data = dict()
with open(self.peer_log_file, 'r') as f:
for line in f:
line = line.strip()
if line == '':
continue

# node_id: (peer1 inbound), (peer2 inbound), ...
node_id, peer_list = line.strip().split(':')
if not node_valid_with_prefixes(node_id, self.prefixes):
continue

for ts in peer_list.strip().split(','):
if ts == '':
continue
peer, inbound = ts.strip()[1:-1].split(' ')
if only_outbound and inbound == 'true':
continue
if node_id not in peer_data:
peer_data[node_id] = []
peer_data[node_id].append(peer)
return peer_data

def plot(self):
dht_data = self.parse_dht_log()
peer_data = self.parse_peers_log()

x = []
y = []

for node_id, peers in peer_data.items():
count_peer_in_dht = 0
for peer in peers:
if peer in dht_data[node_id]:
count_peer_in_dht += 1
x.append(count_peer_in_dht)
y.append(len(peers))

hb = plt.hexbin(x, y, gridsize=30, cmap='Reds', mincnt=1)
plt.colorbar(hb, label='Frequency')
plt.xlabel('Number of peers in DHT')
plt.ylabel('Number of peers (outbound)')
plt.title('DHT vs. Outbound Peers')
plt.savefig(f"{self.result_dir}/dht_vs_outbound.png")

def parse_arguments():
parser = argparse.ArgumentParser(description="Plotting utility for various metrics.")

parser.add_argument(
"--type",
required=True,
help="Type of metrics to process and plot (dht_peer, PeerCount, DHTBuckets, ... see more in stats file)."
)
parser.add_argument(
"--P",
type=int,
default=-1,
help="Percentile for statistical calculations (e.g., 50, 90, 99; -1 means average)."
)
parser.add_argument(
"--stats_file",
type=str,
help="Path to the stats CSV file.",
default=None
)
parser.add_argument(
"--dht_file",
type=str,
help="Path to the DHT log file.",
default=None
)
parser.add_argument(
"--peers_file",
type=str,
help="Path to the peers log file.",
default=None
)
parser.add_argument(
"--result_dir",
type=str,
default="./results",
help="Directory where the resulting plots will be saved."
)
parser.add_argument(
"--time_range",
type=str,
default="-1,-1",
help="Time range to plot (from start of the simulation)."
)
parser.add_argument(
"--prefixes",
type=str,
default="node,bootnode,outbound",
help="Comma-separated list of prefixes to filter nodes"
)

return parser.parse_args()

def main():
args = parse_arguments()

if args.type == "dht_peer":
if args.dht_file is None or args.peers_file is None:
print("Error: DHT log file and peers log file are required for dht_peer type.")
return
plotter = PeerDHTPlotter(args.dht_file, args.peers_file, args.result_dir, args.prefixes.split(","))
plotter.plot()
else:
try:
time_range = [int(x) for x in args.time_range.split(",")]
except:
time_range = [-1, -1]
print("Error: Invalid time range format. Using default time")
if args.stats_file is None:
print("Error: Stats file is required for other types.")
plotter = StatsPlotter(args.stats_file, args.result_dir, args.type, args.P, time_range, args.prefixes.split(","))
plotter.plot()

if __name__ == "__main__":
main()

0 comments on commit bc138c9

Please sign in to comment.