Package congress ::
Module node
|
|
1 import pyev
2 import signal
3 import socket
4 import dogfood
5 import hashlib
6 import random
7 import traceback
8 from datetime import timedelta, datetime
9 import sys
10
11 ID_REQUEST = -1
12 ID_NOTIFY = -2
13 ACKNOWLEDGE = -3
14
16
18 print '*** Deleting %s with id of %d ***' % (str(obj), obj.id)
19 list.__del__(self, obj)
20
21
23 if tup1 is None or tup2 is None:
24 return False
25 tup1 = (tup1[0].encode('ascii'), int(tup1[1]))
26 tup2 = (tup2[0].encode('ascii'), int(tup1[1]))
27 tup1_info = socket.getaddrinfo(*tup1)
28 tup2_info = socket.getaddrinfo(*tup2)
29 tup1_addrs = [z[4] for z in tup1_info]
30 tup2_addrs = [z[4] for z in tup2_info]
31 for address in tup1_addrs:
32 if address in tup2_addrs:
33 return True
34 return False
35
37 return dogfood.encode(message) + '<k!>'
38
40 x = dogfood.decode(message[:-4])
41 return x
42
44 if message.data['id'] is None:
45 self._debug("WARNING! ID IS NONE")
46 server._remove_peer(peer)
47 return
48
49
50 new_id = long(message.data['id'])
51
52 for xpeer in list(server.peers):
53
54 if xpeer.id == new_id and xpeer != peer or \
55 (addr_eq(xpeer.address, server._address)):
56 server._debug("Peer is already present. Removing.")
57 server._debug("xpeer: %s, id %d\tpeer: %s, id %d" % \
58 (xpeer, xpeer.id, peer, new_id))
59 server._remove_peer(peer)
60 return
61
62 server.peers[server.peers.index(peer)].id = new_id
63 peer.server_address = list(peer.address)
64 peer.server_address[1] = message.data['server_port']
65 peer.server_address = tuple(peer.server_address)
66 if message.re is None:
67 reply = Message(ID_NOTIFY, data={'id': server.id,
68 'server_port': server._address[1]}, re=message.id)
69 peer.enqueue_message(reply)
70 else:
71 reply = Message(ACKNOWLEDGE, re=message.id)
72 peer.enqueue_message(reply)
73
74 if not peer.active:
75 peer.active = True
76 for hs in server.handshakes:
77 try:
78 hs(peer, server)
79 except Exception, e:
80 traceback.print_exc(file=sys.stderr)
81 raise e
82
83 if peer.id in server.peer_handshakes:
84 try:
85 server.peer_handshakes[peer.id](peer, server)
86 del(server.peer_handshakes[peer.id])
87 except Exception, e:
88 traceback.print_exc(file=sys.stderr)
89 raise e
90
95
96
98
100 if datetime.now() - self.created > timedelta(seconds=self.timeout):
101
102 self.func(*self.args, **self.kwargs)
103 return True
104 return False
105
106 - def __init__(self, message_id, func, timeout, args=[], kwargs={}):
107 self.message_id = message_id
108 self.func = func
109 self.timeout = timeout
110 self.created = datetime.now()
111 self.args = args
112 self.kwargs = kwargs
113
115
117 return ['Message', [self.type], {'data': self.data,
118 'id': self.id, 're': self.re}]
119
120 source = None
121 dest = None
122
124 self.id = random.getrandbits(160)
125
126 - def __init__(self, message_type, data={}, id=None, re=None):
127 self.type = message_type
128 self.data = data
129 if id is None:
130 self._gen_id()
131 else:
132 self.id = id
133 self.re = re
134
135
137
139 message.dest = self.address
140 self.outgoing.append(message)
141
143 return "<CongressPeer %s %s %s>" % \
144 (str(self.id), str(self.server_address), str(self.address))
145
147 print "__del__ occurred"
148 self.stop()
149
151 self.active = False
152
153 self._socket.close()
154 self._sockwatcher.stop()
155
157 if events & pyev.EV_READ:
158
159 try:
160 buffer = self._socket.recv(1024)
161 if len(buffer) > 0:
162 self.curr_buff += buffer
163
164 while '<k!>' in self.curr_buff:
165 x = self.curr_buff.split('<k!>')
166 message = load_message(x[0] + '<k!>')
167 self._parent._debug('RECV %d < %s %s' % \
168 (message.type, str(self.server_address),
169 str(self.address)))
170 self._parent._debug(str(message.data))
171 self.curr_buff = '<k!>'.join(x[1:])
172 self._parent.handle_message(message, self)
173 else:
174 self._parent._debug("bufferlen is 0, removing peer")
175 self.stop()
176 self._parent._remove_peer(self)
177 except EOFError:
178 self._parent._debug("Received EOFError, removing peer.")
179 self.stop()
180 self._parent._remove_peer(self)
181 except socket.error:
182
183 self._parent._debug("Removing peer due to socket error.")
184 self._parent._remove_peer(self)
185 except Exception, e:
186 traceback.print_exc(file=sys.stderr)
187 self._parent._debug("Giving up, removing peer.")
188 self._parent._remove_peer(self)
189 raise
190 elif events & pyev.EV_WRITE:
191 while len(self.outgoing) > 0:
192 message = self.outgoing.pop(0)
193 self._parent._debug('SEND %d > %s %s' % \
194 (message.type, str(self.server_address), str(self.address)))
195 self._parent._debug(str(message.data))
196 sbytes = dump_message(message)
197
198 self._socket.sendall(sbytes)
199
200 - def __init__(self, socket, address, parent):
201 self.outgoing = []
202 self.curr_buff = ''
203 self.active = False
204 self._socket = socket
205 self.address = address
206 self._sockwatcher = pyev.Io(self._socket, pyev.EV_READ | pyev.EV_WRITE,
207 parent._loop, self._sock_ev)
208 self._parent = parent
209
210
211 self.server_address = None
212 self.id = None
213 self._sockwatcher.start()
214
216
218 if self.debug:
219 if self.debug_file is None:
220 print '%s> %s' % (repr(self._address), message)
221 else:
222 self.debug_file.write(message + '\n')
223
225 self._debug("Adding peer to the chopping block.")
226 peer.active = False
227 self.prune_peers.append(peer)
228
230 """Stub for subclasses."""
231 pass
232
234 if len(self.prune_peers) > 0:
235 for peer in list(self.prune_peers):
236 self.debug_peers()
237 self._debug("Axing peer %s" % repr(peer))
238 peer.stop()
239 if peer in self.peers:
240 self.peers.remove(peer)
241 self._debug("Removed peer from main peer list.")
242 if peer in self.prune_peers:
243 self.prune_peers.remove(peer)
244 self.peer_cleanup(peer)
245 self._debug("Done with peer cleanup.")
246 self.debug_peers()
247 for tcb in list(self.timeout_callbacks):
248 result = tcb.check()
249 if result:
250 self.timeout_callbacks.remove(tcb)
251 watcher.data += 1
252
254 self._debug('Peer list:')
255 for peer in self.peers:
256 self._debug('%s %s %s' % (str(peer.id), str(peer.address), str(peer.server_address)))
257
259 for peer in self.peers:
260 peer.stop()
261 if peer in self.peers:
262 self.peers.remove(peer)
263 if peer in self.prune_peers:
264 self.prune_peers.remove(peer)
265 self.peer_cleanup(peer)
266
267 if self._sockwatcher.data:
268 print("stopping watchers: {0}".format(self._sockwatcher.data))
269 for w in self._sockwatcher.data:
270 w.stop()
271
272 print("stopping the loop: {0}".format(self._sockwatcher.loop))
273 self._sockwatcher.loop.unloop()
274
275
276 - def _sig_cb(self, watcher, events):
277 print("We get signal. Quitting.")
278 self.shutdown()
279
281 try:
282 if events & pyev.EV_READ:
283 conn, addr = self._socket.accept()
284 peer_added = False
285 addr_info = socket.getaddrinfo(addr[0], addr[1])
286 connecting_addresses = [z[4] for z in addr_info]
287 for peer in self.peers:
288 if peer.address is None:
289 continue
290
291
292 our_info = socket.getaddrinfo(self._address[0],
293 int(self._address[1]))
294 our_addrs = [z[4] for z in our_info]
295 for our_addr in our_addrs:
296 if our_addr in connecting_addresses:
297 peer_added = True
298 break
299
300
301 peer_addresses = [z[4] for z in \
302 socket.getaddrinfo(peer.address[0],
303 int(peer.address[1]))]
304 for paddr in peer_addresses:
305 if paddr in connecting_addresses:
306 peer_added = True
307 break
308 if peer_added:
309 break
310 if not peer_added:
311 self.add_peer(addr, existing_socket=conn)
312 else:
313 self._debug('WARNING: Peer already found.')
314 conn.close()
315 except socket.error:
316 traceback.print_exc(file=sys.stderr)
317 self._debug("Received socket eerror. Quitting.")
318 self.shutdown()
319 except Exception, e:
320 traceback.print_exc(file=sys.stderr)
321
323 self.timed.append((interval, func))
324
326 tm = TimeoutCallback(long(message.id), func, timeout,
327 args=(message, self, peer))
328 self.timeout_callbacks.append(tm)
329
331 if long(message.id) not in self.re_callbacks:
332 self.re_callbacks[long(message.id)] = func
333
335 """Any time a new incoming connectin is made, the argument will be
336 executed with the arguments peer and the server itself."""
337 if func not in self.handshakes and peer is None:
338 self.handshakes.append(func)
339 else:
340 self.peer_handshakes[peer] = func
341
343 if not self.handlers.has_key(atype):
344 self.handlers[atype] = []
345 self.handlers[atype].append(func)
346
348
349
350 if peer in self.prune_peers:
351 self.prune_peers.remove(peer)
352 self.peer_cleanup(peer)
353 self._debug("Removed peer during handle_message.")
354 return
355 if message.type in self.handlers.keys():
356 for handler in self.handlers[message.type]:
357 try:
358 handler(message, self, peer)
359 except Exception, e:
360 traceback.print_exc(file=sys.stderr)
361 raise e
362
363 if message.re is not None:
364
365
366 tcbs = filter(lambda tcb: tcb.message_id == long(message.re),
367 self.timeout_callbacks)
368
369 for tcb in tcbs:
370 self.timeout_callbacks.remove(tcb)
371
372 if message.re is not None and \
373 long(message.re) in self.re_callbacks.keys():
374
375 cb = self.re_callbacks[long(message.re)]
376 try:
377 cb(message, self, peer)
378 except Exception, e:
379 traceback.print_exc(file=sys.stderr)
380 raise e
381 del(self.re_callbacks[long(message.re)])
382
384 message.source = self._address
385 for peer in self.peers:
386 if peer.address == message.dest:
387 peer.enqueue_message(message)
388
391
392 - def add_peer(self, conn, connect=False, existing_socket=None):
393 if existing_socket is None:
394 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
395 else:
396 s = existing_socket
397 xpeer = self.client_class(s, conn, self)
398 if connect:
399 xpeer.server_address = conn
400 s.connect(conn)
401 unsolicited_id = Message(ID_NOTIFY,
402 data={'id': self.id, 'server_port': self._address[1]})
403
404
405
406 def id_callback(i_message, i_server, i_peer):
407
408 pass
409
410 def id_timeout(i_message, i_server, i_peer):
411 if i_peer.id is None:
412 i_server._debug("Removing peer due to timeout.")
413 i_server._remove_peer(i_peer)
414
415 self.register_message_callback(unsolicited_id, id_callback)
416 self.register_message_timeout(unsolicited_id, id_timeout, xpeer)
417 xpeer.enqueue_message(unsolicited_id)
418 self.peers.append(xpeer)
419 return xpeer
420
422 self.id = random.getrandbits(160)
423
424 - def __init__(self, (host, port), client_class=Peer, debug=False,
425 id=None, pyev_loop=None, debug_file=None):
426
427 self.peers = PeerList()
428 self.handshakes = []
429 self.handlers = {}
430
431 if id is None:
432 self._gen_id()
433 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
434 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
435 self._socket.bind((host, port))
436 self._address = (host, port)
437 self._socket.listen(1)
438 if pyev_loop is None:
439 self._loop = pyev.default_loop(pyev.EVFLAG_NOSIGFD)
440 else:
441 self._loop = pyev_loop
442
443 self._loop.set_io_collect_interval(.01)
444 self._sockwatcher = pyev.Io(self._socket, pyev.EV_READ | pyev.EV_WRITE,
445 self._loop, self._sock_ev)
446 self._sockwatcher.start()
447
448 self._sigk = pyev.Signal(signal.SIGTERM, self._loop, self._sig_cb)
449 self._sigk.data = [self._sockwatcher, self._sigk]
450 self._sigk.start()
451 self._sig = pyev.Signal(signal.SIGINT, self._loop, self._sig_cb)
452 self._sig.data = [self._sockwatcher, self._sig]
453 self._sig.start()
454
455 self._timer = pyev.Timer(0, 1, self._loop, self._timer_cb, 0)
456 self._timer.start()
457
458
459
460
461 self.register_message_handler(ID_REQUEST, id_requested)
462 self.register_message_handler(ID_NOTIFY, id_handler)
463
464 self.client_class = client_class
465 self.debug = debug
466 self.re_callbacks = {}
467 self.timed = []
468 self.prune_peers = []
469 self.timeout_callbacks = []
470 self.peer_handshakes = {}
471
472 if debug_file is not None:
473 self.debug_file = open(debug_file, 'aw')
474 else:
475 self.debug_file = None
476