Package congress :: Module node
[hide private]

Source Code for Module congress.node

  1  import pyev 
  2  import signal 
  3  import socket 
  4  import dogfood 
  5  import hashlib 
  6  import random 
  7  import traceback 
  8  from datetime import timedelta, datetime 
  9  import sys 
 10   
 11  ID_REQUEST = -1 
 12  ID_NOTIFY = -2 
 13  ACKNOWLEDGE = -3 
 14   
15 -class PeerList(list):
16
17 - def __del__(self, obj):
18 print '*** Deleting %s with id of %d ***' % (str(obj), obj.id) 19 list.__del__(self, obj)
20 21
22 -def addr_eq(tup1, tup2):
23 if tup1 is None or tup2 is None: 24 return False 25 tup1 = (tup1[0].encode('ascii'), int(tup1[1])) 26 tup2 = (tup2[0].encode('ascii'), int(tup1[1])) 27 tup1_info = socket.getaddrinfo(*tup1) 28 tup2_info = socket.getaddrinfo(*tup2) 29 tup1_addrs = [z[4] for z in tup1_info] 30 tup2_addrs = [z[4] for z in tup2_info] 31 for address in tup1_addrs: 32 if address in tup2_addrs: 33 return True 34 return False
35
36 -def dump_message(message):
37 return dogfood.encode(message) + '<k!>'
38
39 -def load_message(message):
40 x = dogfood.decode(message[:-4]) 41 return x
42
43 -def id_handler(message, server, peer):
44 if message.data['id'] is None: 45 self._debug("WARNING! ID IS NONE") 46 server._remove_peer(peer) 47 return 48 # Check for existing connections and remove duplicates. 49 50 new_id = long(message.data['id']) 51 52 for xpeer in list(server.peers): 53 # check the ID first, because it's the cheapest. 54 if xpeer.id == new_id and xpeer != peer or \ 55 (addr_eq(xpeer.address, server._address)): 56 server._debug("Peer is already present. Removing.") 57 server._debug("xpeer: %s, id %d\tpeer: %s, id %d" % \ 58 (xpeer, xpeer.id, peer, new_id)) 59 server._remove_peer(peer) 60 return 61 62 server.peers[server.peers.index(peer)].id = new_id 63 peer.server_address = list(peer.address) 64 peer.server_address[1] = message.data['server_port'] 65 peer.server_address = tuple(peer.server_address) 66 if message.re is None: 67 reply = Message(ID_NOTIFY, data={'id': server.id, 68 'server_port': server._address[1]}, re=message.id) 69 peer.enqueue_message(reply) 70 else: 71 reply = Message(ACKNOWLEDGE, re=message.id) 72 peer.enqueue_message(reply) 73 74 if not peer.active: 75 peer.active = True 76 for hs in server.handshakes: 77 try: 78 hs(peer, server) 79 except Exception, e: 80 traceback.print_exc(file=sys.stderr) 81 raise e 82 83 if peer.id in server.peer_handshakes: 84 try: 85 server.peer_handshakes[peer.id](peer, server) 86 del(server.peer_handshakes[peer.id]) 87 except Exception, e: 88 traceback.print_exc(file=sys.stderr) 89 raise e
90
91 -def id_requested(message, server, peer):
92 m = Message(ID_NOTIFY, data={'id': server.id, 93 'server_port': server._address[1]}, re=message.id) 94 peer.enqueue_message(m)
95 96
97 -class TimeoutCallback:
98
99 - def check(self):
100 if datetime.now() - self.created > timedelta(seconds=self.timeout): 101 # Timed out. 102 self.func(*self.args, **self.kwargs) 103 return True 104 return False
105
106 - def __init__(self, message_id, func, timeout, args=[], kwargs={}):
107 self.message_id = message_id 108 self.func = func 109 self.timeout = timeout 110 self.created = datetime.now() 111 self.args = args 112 self.kwargs = kwargs
113
114 -class Message(dogfood.Food):
115
116 - def __encode__(self):
117 return ['Message', [self.type], {'data': self.data, 118 'id': self.id, 're': self.re}]
119 120 source = None 121 dest = None 122
123 - def _gen_id(self):
124 self.id = random.getrandbits(160)
125
126 - def __init__(self, message_type, data={}, id=None, re=None):
127 self.type = message_type 128 self.data = data 129 if id is None: 130 self._gen_id() 131 else: 132 self.id = id 133 self.re = re
134 135
136 -class Peer:
137
138 - def enqueue_message(self, message):
139 message.dest = self.address 140 self.outgoing.append(message)
141
142 - def __repr__(self):
143 return "<CongressPeer %s %s %s>" % \ 144 (str(self.id), str(self.server_address), str(self.address))
145
146 - def __del__(self):
147 print "__del__ occurred" 148 self.stop()
149
150 - def stop(self):
151 self.active = False 152 #self._socket.shutdown(socket.SHUT_RDWR) 153 self._socket.close() 154 self._sockwatcher.stop()
155
156 - def _sock_ev(self, watcher, events):
157 if events & pyev.EV_READ: 158 # do stuff 159 try: 160 buffer = self._socket.recv(1024) 161 if len(buffer) > 0: 162 self.curr_buff += buffer 163 #self._parent._debug('<<< ' + buffer) 164 while '<k!>' in self.curr_buff: 165 x = self.curr_buff.split('<k!>') 166 message = load_message(x[0] + '<k!>') 167 self._parent._debug('RECV %d < %s %s' % \ 168 (message.type, str(self.server_address), 169 str(self.address))) 170 self._parent._debug(str(message.data)) 171 self.curr_buff = '<k!>'.join(x[1:]) 172 self._parent.handle_message(message, self) 173 else: 174 self._parent._debug("bufferlen is 0, removing peer") 175 self.stop() 176 self._parent._remove_peer(self) 177 except EOFError: 178 self._parent._debug("Received EOFError, removing peer.") 179 self.stop() 180 self._parent._remove_peer(self) 181 except socket.error: 182 #traceback.print_exc(file=sys.stderr) 183 self._parent._debug("Removing peer due to socket error.") 184 self._parent._remove_peer(self) 185 except Exception, e: 186 traceback.print_exc(file=sys.stderr) 187 self._parent._debug("Giving up, removing peer.") 188 self._parent._remove_peer(self) 189 raise 190 elif events & pyev.EV_WRITE: 191 while len(self.outgoing) > 0: 192 message = self.outgoing.pop(0) 193 self._parent._debug('SEND %d > %s %s' % \ 194 (message.type, str(self.server_address), str(self.address))) 195 self._parent._debug(str(message.data)) 196 sbytes = dump_message(message) 197 #self._parent._debug('>>> ' + sbytes) 198 self._socket.sendall(sbytes)
199
200 - def __init__(self, socket, address, parent):
201 self.outgoing = [] 202 self.curr_buff = '' 203 self.active = False 204 self._socket = socket 205 self.address = address 206 self._sockwatcher = pyev.Io(self._socket, pyev.EV_READ | pyev.EV_WRITE, 207 parent._loop, self._sock_ev) 208 self._parent = parent 209 # We don't know this yet -- Once the client gives it to us, we 210 # can reference it with communications with clients. 211 self.server_address = None 212 self.id = None 213 self._sockwatcher.start()
214
215 -class Node:
216
217 - def _debug(self, message):
218 if self.debug: 219 if self.debug_file is None: 220 print '%s> %s' % (repr(self._address), message) 221 else: 222 self.debug_file.write(message + '\n')
223
224 - def _remove_peer(self, peer):
225 self._debug("Adding peer to the chopping block.") 226 peer.active = False 227 self.prune_peers.append(peer)
228
229 - def peer_cleanup(self, peer):
230 """Stub for subclasses.""" 231 pass
232
233 - def _timer_cb(self, watcher, events):
234 if len(self.prune_peers) > 0: 235 for peer in list(self.prune_peers): 236 self.debug_peers() 237 self._debug("Axing peer %s" % repr(peer)) 238 peer.stop() 239 if peer in self.peers: 240 self.peers.remove(peer) 241 self._debug("Removed peer from main peer list.") 242 if peer in self.prune_peers: 243 self.prune_peers.remove(peer) 244 self.peer_cleanup(peer) 245 self._debug("Done with peer cleanup.") 246 self.debug_peers() 247 for tcb in list(self.timeout_callbacks): 248 result = tcb.check() 249 if result: 250 self.timeout_callbacks.remove(tcb) 251 watcher.data += 1
252
253 - def debug_peers(self):
254 self._debug('Peer list:') 255 for peer in self.peers: 256 self._debug('%s %s %s' % (str(peer.id), str(peer.address), str(peer.server_address)))
257
258 - def shutdown(self):
259 for peer in self.peers: 260 peer.stop() 261 if peer in self.peers: 262 self.peers.remove(peer) 263 if peer in self.prune_peers: 264 self.prune_peers.remove(peer) 265 self.peer_cleanup(peer) 266 # optional - stop all watchers 267 if self._sockwatcher.data: 268 print("stopping watchers: {0}".format(self._sockwatcher.data)) 269 for w in self._sockwatcher.data: 270 w.stop() 271 # unloop all nested loop 272 print("stopping the loop: {0}".format(self._sockwatcher.loop)) 273 self._sockwatcher.loop.unloop()
274 275
276 - def _sig_cb(self, watcher, events):
277 print("We get signal. Quitting.") 278 self.shutdown()
279
280 - def _sock_ev(self, watcher, events):
281 try: 282 if events & pyev.EV_READ: 283 conn, addr = self._socket.accept() 284 peer_added = False 285 addr_info = socket.getaddrinfo(addr[0], addr[1]) 286 connecting_addresses = [z[4] for z in addr_info] 287 for peer in self.peers: 288 if peer.address is None: 289 continue 290 291 # Check our IP. 292 our_info = socket.getaddrinfo(self._address[0], 293 int(self._address[1])) 294 our_addrs = [z[4] for z in our_info] 295 for our_addr in our_addrs: 296 if our_addr in connecting_addresses: 297 peer_added = True 298 break 299 300 # Check existing peer IPs. 301 peer_addresses = [z[4] for z in \ 302 socket.getaddrinfo(peer.address[0], 303 int(peer.address[1]))] 304 for paddr in peer_addresses: 305 if paddr in connecting_addresses: 306 peer_added = True 307 break 308 if peer_added: 309 break 310 if not peer_added: 311 self.add_peer(addr, existing_socket=conn) 312 else: 313 self._debug('WARNING: Peer already found.') 314 conn.close() 315 except socket.error: 316 traceback.print_exc(file=sys.stderr) 317 self._debug("Received socket eerror. Quitting.") 318 self.shutdown() 319 except Exception, e: 320 traceback.print_exc(file=sys.stderr)
321
322 - def register_timed(self, interval, func):
323 self.timed.append((interval, func))
324
325 - def register_message_timeout(self, message, func, peer, timeout=3):
326 tm = TimeoutCallback(long(message.id), func, timeout, 327 args=(message, self, peer)) 328 self.timeout_callbacks.append(tm)
329
330 - def register_message_callback(self, message, func):
331 if long(message.id) not in self.re_callbacks: 332 self.re_callbacks[long(message.id)] = func
333
334 - def register_handshake(self, func, peer=None):
335 """Any time a new incoming connectin is made, the argument will be 336 executed with the arguments peer and the server itself.""" 337 if func not in self.handshakes and peer is None: 338 self.handshakes.append(func) 339 else: 340 self.peer_handshakes[peer] = func
341
342 - def register_message_handler(self, atype, func):
343 if not self.handlers.has_key(atype): 344 self.handlers[atype] = [] 345 self.handlers[atype].append(func)
346
347 - def handle_message(self, message, peer):
348 # Check if the peer is on the chopping block. 349 # If it is, prune it now and stop this madness. 350 if peer in self.prune_peers: 351 self.prune_peers.remove(peer) 352 self.peer_cleanup(peer) 353 self._debug("Removed peer during handle_message.") 354 return 355 if message.type in self.handlers.keys(): 356 for handler in self.handlers[message.type]: 357 try: 358 handler(message, self, peer) 359 except Exception, e: 360 traceback.print_exc(file=sys.stderr) 361 raise e 362 363 if message.re is not None: 364 # Check for timeout callbacks that need to be remove as this is 365 # a reply. 366 tcbs = filter(lambda tcb: tcb.message_id == long(message.re), 367 self.timeout_callbacks) 368 369 for tcb in tcbs: 370 self.timeout_callbacks.remove(tcb) 371 372 if message.re is not None and \ 373 long(message.re) in self.re_callbacks.keys(): 374 # Check for callbacks for this reply. 375 cb = self.re_callbacks[long(message.re)] 376 try: 377 cb(message, self, peer) 378 except Exception, e: 379 traceback.print_exc(file=sys.stderr) 380 raise e 381 del(self.re_callbacks[long(message.re)])
382
383 - def enqueue_message(self, message):
384 message.source = self._address 385 for peer in self.peers: 386 if peer.address == message.dest: 387 peer.enqueue_message(message)
388
389 - def start(self):
390 self._loop.loop()
391
392 - def add_peer(self, conn, connect=False, existing_socket=None):
393 if existing_socket is None: 394 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 395 else: 396 s = existing_socket 397 xpeer = self.client_class(s, conn, self) 398 if connect: 399 xpeer.server_address = conn 400 s.connect(conn) 401 unsolicited_id = Message(ID_NOTIFY, 402 data={'id': self.id, 'server_port': self._address[1]}) 403 404 # Handshake: Send our ID and server port, set active if they reply. 405 406 def id_callback(i_message, i_server, i_peer): 407 # Future use 408 pass
409 410 def id_timeout(i_message, i_server, i_peer): 411 if i_peer.id is None: 412 i_server._debug("Removing peer due to timeout.") 413 i_server._remove_peer(i_peer)
414 415 self.register_message_callback(unsolicited_id, id_callback) 416 self.register_message_timeout(unsolicited_id, id_timeout, xpeer) 417 xpeer.enqueue_message(unsolicited_id) 418 self.peers.append(xpeer) 419 return xpeer 420
421 - def _gen_id(self):
422 self.id = random.getrandbits(160)
423
424 - def __init__(self, (host, port), client_class=Peer, debug=False, 425 id=None, pyev_loop=None, debug_file=None):
426 427 self.peers = PeerList() 428 self.handshakes = [] 429 self.handlers = {} 430 431 if id is None: 432 self._gen_id() 433 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 434 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 435 self._socket.bind((host, port)) 436 self._address = (host, port) 437 self._socket.listen(1) 438 if pyev_loop is None: 439 self._loop = pyev.default_loop(pyev.EVFLAG_NOSIGFD) 440 else: 441 self._loop = pyev_loop 442 443 self._loop.set_io_collect_interval(.01) 444 self._sockwatcher = pyev.Io(self._socket, pyev.EV_READ | pyev.EV_WRITE, 445 self._loop, self._sock_ev) 446 self._sockwatcher.start() 447 448 self._sigk = pyev.Signal(signal.SIGTERM, self._loop, self._sig_cb) 449 self._sigk.data = [self._sockwatcher, self._sigk] 450 self._sigk.start() 451 self._sig = pyev.Signal(signal.SIGINT, self._loop, self._sig_cb) 452 self._sig.data = [self._sockwatcher, self._sig] 453 self._sig.start() 454 455 self._timer = pyev.Timer(0, 1, self._loop, self._timer_cb, 0) 456 self._timer.start() 457 458 # Special message handling for propery message routing. 459 460 461 self.register_message_handler(ID_REQUEST, id_requested) 462 self.register_message_handler(ID_NOTIFY, id_handler) 463 464 self.client_class = client_class 465 self.debug = debug 466 self.re_callbacks = {} 467 self.timed = [] 468 self.prune_peers = [] 469 self.timeout_callbacks = [] 470 self.peer_handshakes = {} 471 472 if debug_file is not None: 473 self.debug_file = open(debug_file, 'aw') 474 else: 475 self.debug_file = None
476