-
Notifications
You must be signed in to change notification settings - Fork 26
/
reactor.py
72 lines (64 loc) · 2.53 KB
/
reactor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import socket
import select
import sys
import multiprocessing
import logging
import bittorrent
from peers import PeerManager
class Reactor(multiprocessing.Process):
"""
This is our event loop that makes our program asynchronous. The program
keeps looping until the file is fully downloaded.
"""
def __init__(self, threadID, name, peerMngr, shared_mem, debug=False, info=True):
multiprocessing.Process.__init__(self)
self.threadID = threadID
self.name = name
self.shared_mem = shared_mem
if debug:
logging.basicConfig(level=logging.DEBUG)
elif info:
logging.basicConfig(level=logging.INFO)
self.peerMngr = peerMngr
def connect(self):
for peer in self.peerMngr.peers:
try:
peer.socket.connect((peer.ip, peer.port))
except socket.error:
# We are going to ignore the error, since we are turing blocking
# off. Since we are returning before connect can return a
# message, it will throw an error.
pass
def removePeer(self, peer):
if peer in self.peerMngr.peers:
self.peerMngr.peers.remove(peer)
def run(self):
self.connect()
while not self.peerMngr.checkIfDoneDownloading():
write = [x for x in self.peerMngr.peers if x.bufferWrite != '']
read = self.peerMngr.peers[:]
readList, writeList, err = select.select(read, write, [])
for peer in writeList:
sendMsg = peer.bufferWrite
try:
peer.socket.send(sendMsg)
except socket.error as err:
logging.debug(err)
self.removePeer(peer)
continue
peer.bufferWrite = ''
for peer in readList:
try:
peer.bufferRead += peer.socket.recv(1028)
except socket.error as err:
logging.debug(err)
self.removePeer(peer)
continue
result = bittorrent.process_message(peer, self.peerMngr, self.shared_mem)
if not result:
# Something went wrong with peer. Discconnect
peer.socket.close()
self.removePeer(peer)
if len(self.peerMngr.peers) <= 0:
raise Exception("NO MORE PEERS")
bittorrent.write(self.peerMngr.tracker['info'], self.shared_mem)