-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_community.py
322 lines (276 loc) · 11.6 KB
/
example_community.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""An example community (FloodCommunity) to both show
how Dispersy works and how VisualDispersy works.
Credit for the original tutorial Community goes to Boudewijn Schoon.
"""
import logging
import struct
import sys
import time
import string
import os.path
# Void all Dispersy log messages
logging.basicConfig(level=logging.CRITICAL)
logging.getLogger().propagate = False
from M2Crypto import EC, BIO
from twisted.internet import reactor, threads
from dispersy.authentication import MemberAuthentication
from dispersy.community import Community
from dispersy.conversion import DefaultConversion, BinaryConversion
from dispersy.destination import CommunityDestination
from dispersy.dispersy import Dispersy
from dispersy.distribution import FullSyncDistribution
from dispersy.endpoint import StandaloneEndpoint
from dispersy.member import DummyMember
from dispersy.message import Message, DropPacket, DropMessage, BatchConfiguration
from dispersy.payload import Payload
from dispersy.resolution import PublicResolution
from dispersyviz.visualdispersy import VisualDispersy, VisualCommunity
class FloodCommunity(VisualCommunity):
"""A simple community to exemplify Dispersy behavior.
"""
def __init__(self, dispersy, master_member, my_member):
"""Callback for when Dispersy initializes this community.
Note that this function signature is a Dispersy requirement.
"""
super(
FloodCommunity,
self).__init__(
dispersy,
master_member,
my_member)
self.message_received = 0
def initiate_conversions(self):
"""Tell Dispersy what wire conversion handlers we have.
"""
return [DefaultConversion(self), FloodConversion(self)]
@property
def dispersy_auto_download_master_member(self):
"""Do not automatically download our (bogus) master member.
"""
return False
@property
def dispersy_enable_fast_candidate_walker(self):
return True
def initiate_meta_messages(self):
""">EXTEND< the current meta messages with our custom Flood type.
"""
messages = super(FloodCommunity, self).initiate_meta_messages()
ourmessages = [Message(self,
u"flood",
# Unique identifier
MemberAuthentication(
encoding="sha1"),
# Member identifier hash type
PublicResolution(),
# All members can add messages
FullSyncDistribution(
enable_sequence_number=False,
synchronization_direction=u"ASC",
priority=255),
# Synchronize without sequence number, delivering messages with the lowest
# (Lamport) global time first and the highest priority
CommunityDestination(
node_count=10),
# Push to >AT MOST< 10 other nodes initially
FloodPayload(),
# The object to actually carry our payload
self.check_flood,
# Callback to validate a received message
self.on_flood,
# Callback to actually handle a validated
# message
batch=BatchConfiguration(0.0))] # Amount of time (seconds) to save up messages before handling them
messages.extend(ourmessages)
return messages
def create_flood(self, count):
"""Dump some messages into the Community overlay.
"""
self.start_flood_time = time.time()
if count <= 0:
return
# Retrieve the meta object we defined in initiate_meta_messages()
meta = self.get_meta_message(u"flood")
# Instantiate the message
messages = [meta.impl(authentication=(self.my_member,), # This client signs this message
# distribution=(self.claim_global_time(),meta.distribution.claim_sequence_number()),
# # When you enable sequence numbers (see
# initiate_meta_messages)
distribution=(
self.claim_global_time(),
),
# Without sequence numbers you just need our
# value of the Lamport clock
payload=("flood #%d" % (i + (self.peerid - 1) * count),)) # Some arbitrary message contents
for i
in xrange(count)]
# Spread this message into the network (including to ourselves)
self.dispersy.store_update_forward(messages, True, True, True)
def check_flood(self, messages):
"""Callback to verify the contents of the messages received.
"""
for message in messages:
# We don't actually check them, just forward them
# Otherwise check out DropPacket and the like in dispersy.message
yield message
def on_flood(self, messages):
"""Callback for when validated messages are received.
"""
self.message_received += len(messages)
# Report to Visual Dispersy
self.vz_report_target(
"messages",
self.message_received,
self.total_message_count)
if self.message_received == self.total_message_count:
# Wait for the experiment to end IN A THREAD
# If you don't do this YOU WILL BLOCK DISPERSY COMPLETELY
reactor.callInThread(self.wait_for_end)
def wait_for_end(self):
"""Busy wait for the experiment to end
"""
self.vz_wait_for_experiment_end()
self.dispersy.stop()
class FloodPayload(Payload):
"""The data container for FloodCommunity communications.
"""
class Implementation(Payload.Implementation):
def __init__(self, meta, data):
super(FloodPayload.Implementation, self).__init__(meta)
self.data = data
class FloodConversion(BinaryConversion):
"""Convert the payload into binary data (/a string) which can be
sent over the internet.
"""
def __init__(self, community):
"""Initialize the new Conversion object
"""
super(FloodConversion, self).__init__(community, "\x01")
# Use community version 1 (only communicates with other version
# 1's)
self.define_meta_message(
chr(1),
community.get_meta_message(u"flood"),
self._encode_flood,
self._decode_flood) # Our only message type is assigned id 1 (byte), with encode and decode callbacks
def _encode_flood(self, message):
"""The encode callback to convert a Message into a binary representation (string).
"""
return struct.pack("!L", len(message.payload.data)), message.payload.data
def _decode_flood(self, placeholder, offset, data):
"""Given a binary representation of our payload
convert it back to a message.
"""
if len(data) < offset + 4:
raise DropPacket("Insufficient packet size")
data_length, = struct.unpack_from("!L", data, offset)
offset += 4
if len(data) < offset + data_length:
raise DropPacket("Insufficient packet size")
data_payload = data[offset:offset + data_length]
offset += data_length
return offset, placeholder.meta.payload.implement(data_payload)
def join_flood_overlay(
dispersy,
masterkey,
peerid,
totalpeers,
new_message_count,
total_message_count):
"""Join our custom FloodCommunity.
"""
# Use our bogus master member
master_member = dispersy.get_member(public_key=masterkey)
# Register our client with Dispersy
my_member = dispersy.get_new_member()
# Register our community with Dispersy
community = FloodCommunity.init_community(
dispersy, master_member, my_member)
# Initialize our custom community, because we can't change the constructor
community.total_message_count = total_message_count
community.peerid = peerid
community.totalpeers = totalpeers
# Report to Visual Dispersy
community.vz_report_target("messages", 0, total_message_count)
print "%d] Joined community" % (dispersy.lan_address[1])
# Allow the Community members some time to find each other.
time.sleep(5.0)
print "%d] Flooding community" % (dispersy.lan_address[1])
# Call our message creation function to share a certain amount
# of messages with the Community.
community.create_flood(new_message_count)
def generateMasterkey():
"""Generate an M2Crypto Elliptic Curve key.
"""
membuffer = BIO.MemoryBuffer()
keypair = EC.gen_params(EC.NID_sect233k1)
keypair.gen_key()
keypair.save_pub_key_bio(membuffer)
rawpubkey = membuffer.read()
membuffer.reset()
fpubkey = rawpubkey[27:]
fpubkey = fpubkey[:string.find(fpubkey, '-')]
return fpubkey # BASE64 ENCODED
def establishMasterkey(peerid):
"""Get the master key for this community.
This is stored in the file 'generated_master_key.key'.
Peerid 1 is responsible for making sure this file exists.
"""
if peerid == 1:
# Peerid 1 makes sure the key file exists
if not os.path.isfile('generated_master_key.key'):
f = open('generated_master_key.key', 'w')
f.write(generateMasterkey())
f.close()
else:
# All other peers simply wait for the keyfile to exist
# [And pray peer 1 did not crash]
while not os.path.isfile('generated_master_key.key'):
time.sleep(0.5)
keyfile = open('generated_master_key.key', 'r')
masterkey = keyfile.read().decode("BASE64")
keyfile.close()
return masterkey
def stopOnDispersy(dispersy, reactor):
"""Exit when Dispersy closes.
"""
time.sleep(20.0)
while dispersy.running:
time.sleep(10.0)
reactor.stop()
def main(
peerid,
totalpeers,
new_message_count,
total_message_count,
vz_server_port):
"""VisualDispersy will call this function with:
- peerid: [1~totalpeers] our id
- totalpeers: the total amount of peers in our experiment
- new_message_count: the amount of messages we are supposed to share
- total_message_count: the total amount of messages we are supposed to receive (including our own)
- vz_server_port: the server port we need to connect to for VisualDispersy
"""
# Get the master key
masterkey = establishMasterkey(peerid)
# Make an endpoint (starting at port 10000, incrementing until we can open)
endpoint = StandaloneEndpoint(10000)
# Create a VisualDispersy instance for the endpoint and store the SQLite 3
# database in RAM
dispersy = VisualDispersy(endpoint, u".", u":memory:")
# Initialize the VisualDispersy server connection
dispersy.vz_init_server_connection(vz_server_port)
# Start Dispersy in a thread (it blocks)
reactor.callInThread(dispersy.start, True)
# Add an observer to do a clean exit when Dispersy is closed
reactor.callInThread(stopOnDispersy, dispersy, reactor)
# After 20 seconds, start the experiment
reactor.callLater(
5.0,
join_flood_overlay,
dispersy,
masterkey,
peerid,
totalpeers,
new_message_count,
total_message_count)
reactor.run()