Package congress :: Module congress
[hide private]

Source Code for Module congress.congress

  1  import sys 
  2  import random 
  3  import hashlib 
  4  import socket 
  5  import traceback 
  6  import argparse 
  7  import pyev 
  8  from node import * 
  9  from control import * 
 10  from _congress import *  
 11   
 12  k = 20 
 13  a = 3 
 14   
 15  PING = 1 
 16  PONG = 2 
 17  RPC_STORE = 5 
 18  RPC_FIND_NODE = 6 
 19  RPC_GET = 7 
 20  RPC_FIND_NODE_REPLY = 8 
 21  RPC_GET_REPLY = 9 
 22  RPC_CHAT = 10 
 23   
24 -def sha1_hash(key):
25 """ 26 Generate a SHA1 hash for a given string. 27 28 :param key: 29 """ 30 sha1 = hashlib.sha1() 31 sha1.update(key) 32 new_id = long(sha1.digest().encode('hex'), 16) 33 return new_id
34
35 -def ping_handler(message, server, peer):
36 """ 37 Callback for PING messages. 38 Send back a PONG. 39 40 :param message: PING message 41 :param server: The Congress server 42 :param peer: The peer the message was received from 43 """ 44 reply = Message(PONG, re=message.id) 45 peer.enqueue_message(reply)
46
47 -def pong_handler(message, server, peer):
48 """ 49 Callback for PONG messages. 50 Refresh the k-bucket information for the given peer. 51 52 :param messages: PONG message 53 :param server: The Congress server 54 :param peer: The peer the message was received from 55 """ 56 if peer.active: 57 server._hit_peer(peer)
58
59 -def handle_rpc_store(message, server, peer):
60 """ 61 Callback for STORE messages. 62 Store the hashed key: value in our store. 63 64 :param message: STORE message 65 :param server: The Congress server 66 :param peer: The peer the message was received from 67 """ 68 for key in message.data['store'].keys(): 69 server.store[long(key)] = message.data['store'][key]
70
71 -def chat_handler(message, server, peer):
72 """ 73 Callback for CHAT messages. Congress makes no attempt to encrypt or 74 secure the message from eavesdropping, and leaves any implementation of 75 chat message security to the developer. 76 77 To handle chat messages intended for the node itself, register your 78 callback with register_chat_callback. 79 80 :param message: CHAT message 81 :param server: The Congress server 82 :param peer: The peer hte message was received from 83 """ 84 target_node = message.data['target_node_id'] 85 source_node = message.data['source_node_id'] 86 chat_message = message.data['chat_message'] 87 if target_node == server.id: 88 for cb in server.chat_callbacks: 89 try: 90 cb(source_node, chat_message, server, peer) 91 except Exception, e: 92 traceback.print_exc(file=sys.stderr) 93 else: 94 closest = server._closest_peers(target_node, a) 95 for npeer in closest: 96 m = Message(RPC_CHAT, data={ 97 'target_node_id': target_node, 98 'source_node_id': source_node, 99 'chat_message': chat_message 100 }) 101 npeer.enqueue_message(m)
102 103
104 -def handle_rpc_get(message, server, peer):
105 """ 106 Callback for GET messages. 107 If we have the value, send it back to the peer. 108 If we don't, build a list of alpha closest peers and send it back to the 109 peer. 110 111 :param message: GET message 112 :param server: The Congress server 113 :param peer: The peer the message was receved from 114 """ 115 if 'attempt' in message.data.keys(): 116 attempt = message.data['attempt'] 117 else: 118 attempt = 1 119 key = message.data['key'] 120 if key in server.store.keys(): 121 m = Message(RPC_GET_REPLY, 122 data={'store': {str(key): server.store[key]}, 'attempt': attempt}, 123 re=message.id) 124 peer.enqueue_message(m) 125 else: 126 m = Message(RPC_GET_REPLY, data={'peers': [], 'key': key, 127 'attempt': attempt}, re=message.id) 128 closest = server._closest_peers(key, a) 129 for other_peer in closest: 130 m.data['peers'].append((other_peer.id, 131 other_peer.server_address[0], other_peer.server_address[1])) 132 peer.enqueue_message(m)
133
134 -def value_check(message, server):
135 """ 136 Check if a given hashed key and value are present in the store. 137 138 :param message: This function is only called from the GET reply, so the 139 message in the callback is passed to this function. 140 :param server: Same deal as the message, this is the Congress server. 141 """ 142 server._debug('Retrieval callback length: %d' % \ 143 len(server.retrieval_callbacks)) 144 for tup in list(server.retrieval_callbacks): 145 (key, callback) = tup 146 hash_key = long(sha1_hash(key)) 147 server._debug("Key: %s\nStore: %s" % (hash_key, server.store)) 148 if hash_key in [long(yek) for yek in server.store.keys()]: 149 try: 150 callback(key, message.data['store'][str(hash_key)]) 151 except Exception, e: 152 traceback.print_exc(file=sys.stderr) 153 server.retrieval_callbacks.remove(tup) 154 return True 155 return False
156
157 -def handle_rpc_get_reply(message, server, peer):
158 """ 159 Callback for GET replies. 160 If we get the value back in the message, run value_check. 161 If we get a list of peers, bootstrap them and request the same 162 value, but only if we don't have the value. 163 164 :param message: The GET reply itself. 165 :param server: The Congress server 166 :param peer: The peer that sent the message 167 """ 168 if 'attempt' in message.data.keys(): 169 attempt = message.data['attempt'] 170 else: 171 attempt = 1 172 # We actually got the value 173 if 'peers' not in message.data.keys(): 174 # store it 175 for (skey, sval) in message.data['store'].iteritems(): 176 server._debug('Storing value received from peer.') 177 server.store[long(skey)] = sval 178 if value_check(message, server): 179 return 180 # got a peer list instead 181 else: 182 if long(message.data['key']) in server.store or attempt > k: 183 return 184 for (node_id, address, port) in message.data['peers']: 185 node_id = long(node_id) 186 if node_id == server.id: 187 continue 188 request = Message(RPC_GET, 189 data={'key': message.data['key'], 'attempt': attempt + 1}, 190 re=message.id) 191 server._debug("Attempting to bootstrap peer %s " % str(node_id) + \ 192 "via RPC_GET_REPLY message.") 193 if node_id not in [pee.id for pee in server.peers] and \ 194 node_id != server.id: 195 server._debug('New node.') 196 def request_storage(xxpeer, server): 197 xxpeer.enqueue_message(request)
198 server.register_handshake(request_storage, peer=node_id) 199 xpeer = server.bootstrap_peer((address, port), id=node_id) 200 else: 201 server._debug('Existing node.') 202 matched = filter(lambda pee: pee.id == node_id, server.peers) 203 if len(matched) > 0: 204 matched[0].enqueue_message(request) 205 206
207 -def handle_rpc_find_node(message, server, peer):
208 """ 209 Callback for FIND_NODE messages. 210 Pass back k closest peers to the ID. 211 212 :param message: The FIND_NODE message itself. 213 :param server: The Congress server 214 :param peer: The peer that sent the message. 215 """ 216 new_id = message.data['node_id'] 217 closest = server._closest_peers(new_id, k) 218 m = Message(RPC_FIND_NODE_REPLY, 219 re=message.id, 220 data={'peers': [(p.id, p.server_address[0], p.server_address[1]) for \ 221 p in closest]}) 222 peer.enqueue_message(m)
223
224 -def handle_rpc_find_node_reply(message, server, peer):
225 """ 226 Callback for FIND_NODE replies. 227 Bootstrap any unseen peers. 228 229 :param message: The FIND_NODE reply. 230 :param server: The Congress server 231 :param peer: The peer that sent the reply. 232 """ 233 peer_tuples = message.data['peers'] 234 server_ids = [p.id for p in server.peers] 235 for (node_id, address, port) in \ 236 filter(lambda e: e[0] not in server_ids and e[0] != server.id, 237 peer_tuples): 238 new_peer = server.bootstrap_peer((address, port), id=node_id)
239 240
241 -class CongressPeer(Peer):
242 """Peer subclass for Congress. Currently no real logic is here.""" 243
244 - def __del__(self):
245 Peer.__del__(self)
246
247 - def __init__(self, conn, addr, server):
248 Peer.__init__(self, conn, addr, server)
249
250 -class Congress(Node):
251 """Main DHT class. The main event loop is a member of this class, and all 252 peers and other pyev callbacks attach to this loop.""" 253
254 - def _debug(self, message):
255 Node._debug(self, message) 256 if self._ctl is not None and self.debug: 257 self._ctl.broadcast(message)
258
259 - def _hit_peer(self, peer):
260 """ 261 Update the node's k-buckets with the peer, using a simplified version 262 of the kademlia specification's routine for updating k-buckets. 263 264 :param peer: 265 """ 266 if not peer.active: 267 self._debug("Peer not active. What.") 268 return 269 if peer.id is None: 270 self._debug("Peer ID None during _hit_peer.") 271 return 272 dist = peer.id ^ self.id 273 matching_buckets = filter(lambda (i, b): bucket_leaf(dist, i), 274 enumerate(self.buckets)) 275 for i, bucket in matching_buckets: 276 if peer in bucket: 277 bucket.remove(peer) 278 bucket.insert(0, peer) 279 elif len(bucket) < 20: 280 bucket.insert(0, peer) 281 else: 282 if peer in self.replacement_buckets[i]: 283 self.replacement_buckets[i].remove(peer) 284 self.replacement_buckets[i].insert(0, peer) 285 bucket[-1].enqueue_message(Message(PING))
286 #if self.debug: 287 # bs = filter(lambda (i, b): b != [], enumerate(self.buckets)) 288 # rbs = filter(lambda (i, r): r != [], 289 # enumerate(self.replacement_buckets)) 290
291 - def _node_id_present(self, node_id):
292 """Determine if any of the connected peers have the specified 160-bit 293 node ID. 294 """ 295 for bucket in self.buckets: 296 for peer in bucket: 297 if type(peer.id) != long: 298 peer.id = long(peer.id) 299 if peer.id == node_id: 300 return True 301 return False
302
303 - def _conn_present(self, conn):
304 """ 305 Determine if the given (address, port) tuple is presently connected 306 via any of the peers. 307 """ 308 if type(conn) != tuple: 309 conn = tuple(conn) 310 for bucket in self.buckets: 311 for peer in bucket: 312 if type(peer.address) != tuple: 313 peer.address = tuple(peer.address) 314 if conn == peer.address: 315 return True 316 if type(peer.server_address) != tuple: 317 peer.server_address = tuple(peer.server_address) 318 if conn == peer.server_address: 319 return True 320 return False
321
322 - def _make_buckets(self):
323 """ 324 At node, start, create 160 k-buckets and 160 replacement buckets. 325 Currently, replacement buckets are not used but the peer is placed into 326 them if a bucket's peer count is over k. 327 """ 328 self.buckets = [] 329 self.replacement_buckets = [] 330 for i in range(160): 331 self.buckets.append([]) 332 self.replacement_buckets.append([])
333
334 - def _closest_peers(self, id, how_many, filter_id=None):
335 """ 336 Given the specified 160-bit ID, use the k-buckets to return, at most, 337 how_many closest peers to the ID using a XOR metric (id XOR peer.id) 338 339 :param id: 160-bit ID 340 :param how_many: The maximum number of peers to return, usually k. 341 """ 342 self._debug("Finding %d closest peers." % how_many) 343 try: 344 dist = self.id ^ id 345 cl_buckets = [t[1] for t in sorted(enumerate(self.buckets), 346 key=lambda x: bucket_sort(dist, x[0]))] 347 closest = [] 348 i = 0 349 while len(closest) < k and i < 160: 350 closest.extend(filter(lambda p: p.active and p.id != filter_id, 351 cl_buckets[i])) 352 i += 1 353 if len(closest) > k: 354 closest = closest[:k] 355 return closest 356 except Exception, e: 357 traceback.print_exc(file=sys.stderr)
358
359 - def rpc_get(self, key, callback):
360 """Since value retrieval is async, provide a callback that will handle 361 the value. 362 363 :param key: String key 364 :param callback: Callable to execute when we get the value. 365 """ 366 new_id = sha1_hash(key) 367 if new_id in self.store.keys(): 368 callback(key, self.store[new_id]) 369 self._debug('Fired a callback for key %s' % key) 370 return True 371 closest = self._closest_peers(new_id, a) 372 self._debug('Fetched %d closest peers' % len(closest)) 373 for peer in closest: 374 message = Message(RPC_GET, data={'key': new_id}) 375 self.register_message_callback(message, handle_rpc_get_reply) 376 peer.enqueue_message(message) 377 self.retrieval_callbacks.append((key, callback))
378
379 - def rpc_store(self, key, value):
380 """ 381 Store hashed key: val with k closest connected peers. 382 383 :param key: string key 384 :param value: Any python value that is json serializable. 385 """ 386 new_id = sha1_hash(key) 387 for peer in self._closest_peers(new_id, k): 388 message = Message(RPC_STORE, data={'store': {str(new_id): value}}) 389 peer.enqueue_message(message) 390 self.store[new_id] = value
391
392 - def rpc_chat(self, node_id, chat_message):
393 """ 394 Initiate an RPC_CHAT message. 395 396 :param node_id: The 160-bit ID of the peer you'd like to send a 397 message to. It need not be a connected peer, just a known ID. 398 :param chat_message: The message to send. 399 """ 400 401 matched_peers = filter(lambda p: p.id == node_id, self.peers) 402 if len(matched_peers) > 0: 403 m = Message(RPC_CHAT, data={'target_node_id': node_id, 404 'source_node_id': self.id, 'chat_message': chat_message}) 405 matched_peers[0].enqueue_message(m) 406 else: 407 closest = self._closest_peers(node_id, a) 408 m = Message(RPC_CHAT, data={'target_node_id': node_id, 409 'source_node_id': self.id, 'chat_message': chat_message}) 410 for cpeer in closest: 411 cpeer.enqueue_message(m)
412
413 - def register_chat_callback(self, callback):
414 """ 415 Store a callable in our chat callbacks list. Each will be called in 416 order of registration, when we receive an RPC_CHAT with a 417 target_node_id that matches our server's 160-bit ID. 418 419 The signature of the callable should be as follows: 420 cb(source_node, chat_message, server, peer) 421 """ 422 self.chat_callbacks.append(callback)
423
424 - def peer_cleanup(self, peer):
425 """ 426 When error handling or explicit peer removal schedules a peer object 427 for removal, this function is called to remove it from all applicable 428 buckets by the core node.py removal routine. 429 """ 430 self._debug("Entering Congress peer cleanup.") 431 for (i, bucket) in enumerate(self.buckets): 432 for p in bucket: 433 if p == peer or \ 434 p.id == peer.id: 435 self._debug("Removing peer from bucket %d" % i) 436 self.buckets[i].remove(p) 437 for (i, rbucket) in enumerate(self.replacement_buckets): 438 for p in rbucket: 439 if p == peer or \ 440 p.id == peer.id: 441 self._debug("Removing peer from rbucket %d" % i) 442 self.replacement_buckets[i].remove(p) 443 del(peer)
444
445 - def bootstrap_peer(self, conn_address, id=None):
446 """ 447 Add a peer. This function does not add it to k-buckets, 448 as this is handled by the final handshake callback. 449 450 :param conn_address: (hostname, port) tuple of the peer 451 :param id: The ID is supplied if FIND_NODE replies provide the peer. 452 """ 453 try: 454 speer = self.add_peer(conn_address, connect=True) 455 except socket.error: 456 return False 457 if id is not None: 458 speer.id = id 459 return speer
460
461 - def _setup_ctl_socket(self, port=29800):
462 """ 463 Setup a socket to listen for simple control connections. 464 465 :param port: The port that local connections can use. 466 """ 467 self._ctl = Controller(self, port=port)
468
469 - def __init__(self, host='0.0.0.0', port=16800, initial_peers=[], 470 debug=False, ctl_port=None, pyev_loop=None, debug_file=None):
471 """ 472 :param initial_peers: List of (hostname, port) tuples to connect to. 473 :param debug: If True, many debug messages will be printed. 474 :param ctl_port: Local port for the control socket. 475 :param pyev_loop: If you have your own pyev loop to attach the node to, 476 supply it here. 477 """ 478 Node.__init__(self, (host, port), client_class=CongressPeer, 479 debug=debug, pyev_loop=pyev_loop, debug_file=debug_file) 480 self._gen_id() 481 self._make_buckets() 482 self.store = {} 483 self.register_message_handler(PING, ping_handler) 484 self.register_message_handler(PONG, pong_handler) 485 486 self.register_message_handler(RPC_STORE, handle_rpc_store) 487 self.register_message_handler(RPC_GET, handle_rpc_get) 488 self.register_message_handler(RPC_GET_REPLY, 489 handle_rpc_get_reply) 490 self.register_message_handler(RPC_FIND_NODE, handle_rpc_find_node) 491 self.register_message_handler(RPC_FIND_NODE_REPLY, 492 handle_rpc_find_node_reply) 493 self.register_message_handler(RPC_CHAT, chat_handler) 494 self.retrieval_callbacks = [] 495 self.chat_callbacks = [] 496 497 # Once we bootstrap a peer, ask them for all peers closest to 498 # our own id. 499 def node_getter(i_peer, i_server): 500 """ 501 FIND_NODE callback for the initial handshake. 502 :param i_peer: The peer the handshake is with. 503 :param i_server: Our node object. 504 """ 505 i_server._hit_peer(i_peer) 506 new_m = Message(RPC_FIND_NODE, data={'node_id': i_server.id}) 507 i_peer.enqueue_message(new_m)
508 509 self.register_handshake(node_getter) 510 511 if ctl_port is not None: 512 self._setup_ctl_socket(ctl_port) 513 else: 514 self._ctl = None 515 516 517 for conn in initial_peers: 518 self.bootstrap_peer(conn)
519 520
521 -def main():
522 parser = argparse.ArgumentParser(description='Run a Congress instance.') 523 parser.add_argument('-H', '--host', help='Local hostname', 524 default='0.0.0.0') 525 parser.add_argument('-P', '--port', help='Local port', 526 default=16800, type=int) 527 parser.add_argument('-p', '--peer', help='Add a peer in the form of ' 528 'HOST PORT', nargs=2, action='append') 529 parser.add_argument('-d', '--debug', help='Turn on debug output.', 530 action='store_true', default=False) 531 parser.add_argument('-c', '--ctl', help='Enable control socket on' 532 'port 29800', default=None, type=int) 533 parser.add_argument('-F', '--debugfile', help='Debug output is saved' 534 'in the argument to this option', default=None) 535 args = parser.parse_args() 536 537 if args.peer is not None: 538 peer_conns = [(p[0], int(p[1])) for p in args.peer] 539 if args.debug: 540 print 'Peer connections to make: %s' % repr(peer_conns) 541 else: 542 peer_conns = [] 543 544 server = Congress(host=args.host, port=args.port, initial_peers=peer_conns, 545 debug=args.debug, ctl_port=args.ctl, debug_file=args.debugfile) 546 server.start()
547 548 if __name__ == "__main__": 549 main() 550