Skip to content

Commit

Permalink
responder separated from controller (first step for #36)
Browse files Browse the repository at this point in the history
  • Loading branch information
rauljim committed Jan 11, 2012
1 parent 50b4a9a commit 012378b
Showing 1 changed file with 13 additions and 43 deletions.
56 changes: 13 additions & 43 deletions core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import identifier
from identifier import Id
import message
import token_manager
import tracker
from querier import Querier
from message import QUERY, RESPONSE, ERROR
from node import Node
import pkgutil
import responder
#import pkgutil

#from profilestats import profile

Expand All @@ -43,7 +42,6 @@

#TIMEOUT_DELAY = 2

NUM_NODES = 8
CACHE_VALID_PERIOD = 5 * 60 # 5 minutes


Expand All @@ -70,12 +68,14 @@ def __init__(self, version_label,
self._my_node = Node(my_addr, self._my_id, version=version_label)
self.msg_f = message.MsgFactory(version_label, self._my_id,
private_dht_name)
self._tracker = tracker.Tracker()
self._token_m = token_manager.TokenManager()

self._querier = Querier()
self._routing_m = routing_m_mod.RoutingManager(
self._my_node, saved_bootstrap_nodes, self.msg_f)

self._responder = responder.Responder(self._my_id, self._routing_m,
self.msg_f)
self._tracker = self._responder._tracker

self._lookup_m = lookup_m_mod.LookupManager(self._my_id, self.msg_f)
self._experimental_m = experimental_m_mod.ExperimentalManager(
self._my_node.id, self.msg_f)
Expand Down Expand Up @@ -261,7 +261,7 @@ def on_datagram_received(self, datagram):
#zinat: inform experimental_module
exp_queries_to_send = self._experimental_m.on_query_received(msg)

response_msg = self._get_response(msg)
response_msg = self._responder.get_response(msg)
if response_msg:
bencoded_response = response_msg.stamp(msg.tid)
datagrams_to_send.append(
Expand Down Expand Up @@ -323,7 +323,8 @@ def on_datagram_received(self, datagram):
# Query timed out or unrequested response
return self._next_main_loop_call_ts, datagrams_to_send
#TODO: zinat: same as response
exp_queries_to_send = self._experimental_m.on_error_received(msg, related_query)
exp_queries_to_send = self._experimental_m.on_error_received(
msg, related_query)
# lookup related tasks
if related_query.lookup_obj:
peers = None # an error msg doesn't have peers
Expand Down Expand Up @@ -374,39 +375,7 @@ def _on_response_received(self):
return
def _on_error_received(self):
return


def _get_response(self, msg):
if msg.query == message.PING:
return self.msg_f.outgoing_ping_response(msg.src_node)
elif msg.query == message.FIND_NODE:
log_distance = msg.target.log_distance(self._my_id)
rnodes = self._routing_m.get_closest_rnodes(log_distance,
NUM_NODES, False)
#TODO: return the closest rnodes to the target instead of the 8
#first in the bucket.
return self.msg_f.outgoing_find_node_response(
msg.src_node, rnodes)
elif msg.query == message.GET_PEERS:
token = self._token_m.get()
log_distance = msg.info_hash.log_distance(self._my_id)
rnodes = self._routing_m.get_closest_rnodes(log_distance,
NUM_NODES, False)
#TODO: return the closest rnodes to the target instead of the 8
#first in the bucket.
peers = self._tracker.get(msg.info_hash)
if peers:
logger.debug('RESPONDING with PEERS:\n%r' % peers)
return self.msg_f.outgoing_get_peers_response(
msg.src_node, token, nodes=rnodes, peers=peers)
elif msg.query == message.ANNOUNCE_PEER:
peer_addr = (msg.src_addr[0], msg.bt_port)
self._tracker.put(msg.info_hash, peer_addr)
return self.msg_f.outgoing_announce_peer_response(msg.src_node)
else:
logger.debug('Invalid QUERY: %r' % (msg.query))
#TODO: maybe send an error back?


def _on_timeout(self, related_query):
queries_to_send = []
#TODO: on_timeout should return queries (raul)
Expand All @@ -432,7 +401,8 @@ def _on_timeout(self, related_query):
related_query.lookup_obj))
lookup_id = related_query.lookup_obj.lookup_id
related_query.lookup_obj.callback_f(lookup_id, None, None)
maintenance_queries_to_send = self._routing_m.on_timeout(related_query.dst_node)
maintenance_queries_to_send = self._routing_m.on_timeout(
related_query.dst_node)
if maintenance_queries_to_send:
queries_to_send.extend(maintenance_queries_to_send)
if exp_queries_to_send:
Expand Down

0 comments on commit 012378b

Please sign in to comment.