# Written by Jie Yang, Arno Bakker
# see LICENSE.txt for license information
"""
- The middle layer between OverlaySwarm and BuddyCast/DownloadHelp
- A high level module, like buddycast or dlhelp, calls SecureOverlay.addTask,
and then SecureOverlay will handle the task.
- There is only one task for secure overlay: Send message (the message can be None)
- But each time before sending a message, secure overlay must connect the target's
overlay.
- If the message is None, secure overlay only creates an overlay connection.
- Each time after an normal connection is created and if the other peer supports
overlay swarm, it will always create a task without message
- After overlay connection is created, secure overlay will update the (permid, (ip, port))
in local cache.
- The target can be either permid or (ip, port)
- If the target is permid, the task much check if target's permid matches
the task's permid
- If the target is (ip, port), connect directly and record the peer's permid later on.
"""
from time import time,ctime
from socket import inet_aton,gethostbyname
from traceback import print_exc,print_stack
from threading import RLock,currentThread
import sys
from BitTornado.BT1.MessageID import CANCEL,getMessageName
from Swapper.CacheDB.CacheDBHandler import PeerDBHandler,MyDBHandler
from Swapper.utilities import *
from Swapper.__init__ import GLOBAL
from Swapper.Statistics.Logger import OverlayLogger
try:
True
except:
True = 1
False = 0
DEBUG = False
Length_of_permid = 0 # 0: no restriction
def isValidDNS(dns):
if isinstance(dns, tuple) and len(dns)==2 and \
validIP(dns[0]) and isValidPort(dns[1]):
return True
return False
class DNSOverlayTask:
"""
Basic task to connect peer's overlay swarm by dns and send message by Secure Overlay.
It is an observer class in Observer Pattern.
"""
def __init__(self, secure_overlay, subject_manager, dns, message=None, timeout=15):
self.subject_manager = subject_manager
self.dns = dns
self.message_to_send = message
self.callback = None # used by update
self.secure_overlay = secure_overlay
self.subject = None
self.expire = int(time() + timeout)
self.registered = False
def isExpired(self, now=0):
if now == 0:
now = time()
return now > self.expire
def register(self, dns): # register a subject
if self.registered or not dns:
return
self.subject = self.subject_manager.getSubject(dns) # register a subject or get an old subject
self.subject.attachObserver(self)
self.registered = True
def unregister(self, reason='done'): # TODO: count and record the fail reason
# if DEBUG:
# print >> sys.stderr, "secover: task on %s %s" % (self.dns, reason)
if self.registered:
if self.subject:
self.subject.detachObserver(self, reason)
self.registered = False
def setCallback(self, callback): # it must be set before start
self.callback = callback
def start(self): # phase 1: find or make overlay connection
if self.isExpired():
self.unregister('expired')
elif self.findConnection(): # if connection exists, send message now
self.sendMessage()
else:
self.register(self.dns) # otherwise make overlay connection
self.makeConnection()
return 1 # make a new connecting attempt
return 0
def update(self): # phase 2: overlay connection has been made; send msg now
# if DEBUG:
# print >> sys.stderr,"secover: overlay task update", self.dns, id(self)
if not self.registered:
return
# to improve performance, don't remove expired tasks at this point
if self.callback: # used by other task
reason = self.callback()
if reason != 'done':
self.unregister(reason)
else:
self.sendMessage()
def sendMessage(self):
if self.message_to_send != None:
if not self.permid:
self.permid = self.secure_overlay.findPermidByDNS(self.dns)
self.secure_overlay.sendMessage(self.permid, self.message_to_send)
if DEBUG:
print >> sys.stderr,"secover: dns task send message", getMessageName(self.message_to_send[0]), self.dns
self.message_to_send = None
self.unregister('done')
def makeConnection(self):
self.secure_overlay.connectPeer(self.dns)
def findConnection(self):
# if connection is created, secure overlay must have the permid
self.permid = self.secure_overlay.findPermidByDNS(self.dns)
return self.permid
class PermidOverlayTask:
"""
A task to connect peer's overlay swarm by permid and send message.
It delegates DNSOverlayTask to do the real stuffs.
"""
def __init__(self, secure_overlay, subject_manager, permid, message=None, timeout=15):
self.secure_overlay = secure_overlay
self.permid = permid
self.dns = self.secure_overlay.findDNSByPermid(self.permid) # first lookup overlay
self.peer_db = secure_overlay.peer_db
if not self.dns: # then lookup local cache
#if DEBUG:
# print >> sys.stderr,"secover: PermidOverlayTask: don't know dns for permid",show_permid(permid)
self.dns = self.findDNSFromCache()
else:
#if DEBUG:
# print >> sys.stderr,"secover: PermidOverlayTask: dns for permid is known",self.dns
pass
if isValidDNS(self.dns):
if GLOBAL.overlay_log:
write_overlay_log('CONN_TRY', permid, dns=self.dns)
self.task = DNSOverlayTask(secure_overlay, subject_manager, self.dns, message, timeout)
else:
self.task = None
def findDNSFromCache(self):
#if DEBUG:
# return ('1.2.3.4', 80)
peer = self.peer_db.getPeer(self.permid)
if peer:
return (peer['ip'], int(peer['port']))
def start(self): # phase 1: start basic overlay task
if self.task:
self.task.setCallback(self.update)
ret = self.task.start()
if ret == 1:
self.secure_overlay.addTryTimes(self.permid)
def update(self): # phase 2: check permids, and send msg if they match
# if DEBUG:
# print >> sys.stderr,"secover: permid task update", self.dns
if self.dns:
permid = self.secure_overlay.findPermidByDNS(self.dns)
#if DEBUG:
# print >> sys.stderr,"secover: Think connecting to",show_permid(self.permid)," and connected to",show_permid(permid)
if self.permid == permid and self.task:
self.task.sendMessage()
return 'done'
elif DEBUG and self.permid != permid:
print >> sys.stderr,"secover: Connection established but permid does not match!"
return 'wrong_permid'
class Subject:
""" A subject class in Observer Pattern """
def __init__(self, dns, subject_manager):
self.dns = dns # dns = (ip, port)
self.observers = [] # tasks
self.subject_manager = subject_manager
def isEmpty(self):
return len(self.observers) == 0
def notify(self):
for observer in self.observers:
#if DEBUG:
# print >> sys.stderr,"secover: subject", self.dns, "notifies observer", observer
observer.update()
def attachObserver(self, observer):
if observer not in self.observers:
self.observers.append(observer)
#if DEBUG:
# print >> sys.stderr,"secover: subject", self.dns, "attaches observer", self.observers
def detachObserver(self, observer, reason):
# if DEBUG:
# print >> sys.stderr,"secover: subject", self.dns, "detaches observer", observer
self.observers.remove(observer)
if self.isEmpty():
self.subject_manager.unregisterSubject(self.dns, reason)
class SubjectManager:
""" Command Pattern. Used for sending overlay message. """
def __init__(self):
self.subjects = {} # (ip,port): Subject
def registerSubject(self, dns):
#if DEBUG:
# print >> sys.stderr,"secover: register subject", dns
if not self.subjects.has_key(dns):
self.subjects[dns] = Subject(dns, self)
def unregisterSubject(self, dns, reason):
if self.subjects.has_key(dns) and self.subjects[dns].isEmpty():
if DEBUG:
print >> sys.stderr,"secover: unregister subject", dns, reason
sbj = self.subjects.pop(dns)
del sbj
def getSubject(self, dns):
self.registerSubject(dns) # ensure the subject exists
return self.subjects[dns]
def notifySubject(self, dns): # notify the connection is made
#if DEBUG:
# print >> sys.stderr,"secover: notify subject", dns
if dns and self.subjects.has_key(dns):
subject = self.subjects[dns]
subject.notify()
def scanTasks(self): # remove outdate subjects
now = time()
for dns in self.subjects.keys():
expired = True
sbj_obs = self.subjects[dns].observers
for obs in sbj_obs[:]:
if not obs.isExpired(now):
expired = False # don't remove a subject if one of its observer is not expired
else:
l = len(self.subjects[dns].observers)
obs.unregister('expired')
del obs
class IncomingMessageHandler:
""" a variant of Chain of Responsibility Pattern """
def __init__(self):
self.handlers = {}
def registerHandler(self, ids, handler):
for id in ids:
# if DEBUG:
# print >> sys.stderr,"secover: Handler registered for",getMessageName(id)
self.handlers[id] = handler
def handleMessage(self, permid, message): # connection is type of Conneter.Connection
id = message[0]
handled = False
if not self.handlers.has_key(id):
if DEBUG:
print >> sys.stderr,"secover: No handler found for",getMessageName(id),currentThread().getName()
return False
else:
#if DEBUG:
# print >> sys.stderr,"secover: Giving message to handler for",getMessageName(id)
self.handlers[id].handleMessage(permid, message)
return True
class SecureOverlay:
__single = None
def __init__(self):
if SecureOverlay.__single:
raise RuntimeError, "SecureOverlay is Singleton"
SecureOverlay.__single = self
self.subject_manager = SubjectManager() #???? for outgoing message
self.incoming_handler = IncomingMessageHandler() # for incoming message
self.peer_db = PeerDBHandler()
self.connection_list = {} # overlay connections. permid:{'c_conn': Connecter.Connection, 'expire':expire, 'dns':(ip, port)}
self.timeout = 300 # TODO: adjust it by firewall status. the value is smaller if no firewall
self.check_interval = 60
self.my_db = MyDBHandler()
self.permid = self.my_db.getMyPermid()
self.ip = self.my_db.getMyIP()
self.lock = RLock()
def getInstance(*args, **kw):
if SecureOverlay.__single is None:
SecureOverlay(*args, **kw)
return SecureOverlay.__single
getInstance = staticmethod(getInstance)
def register(self,overlayswarm):
self.overlayswarm = overlayswarm
self.overlayswarm.rawserver.add_task(self._auto_close, self.check_interval)
self.overlayswarm.rawserver.add_task(self._scan_tasks, self.check_interval)
def registerHandler(self, ids, handler):
"""
ids is the [ID1, ID2, ..] where IDn is a sort of message ID in overlay swarm.
Each ID can only be handled by one handler, but a handler can handle multiple IDs
"""
# I assume that all handler registration is done before handling, so no
# concurrency on incoming_handler
self.incoming_handler.registerHandler(ids, handler)
def _auto_close(self):
self.acquire()
self.overlayswarm.rawserver.add_task(self._auto_close, self.check_interval)
self._checkConnections()
self.release()
def _scan_tasks(self):
self.acquire()
self.overlayswarm.rawserver.add_task(self._scan_tasks, self.check_interval)
self.subject_manager.scanTasks()
self.release()
def _checkConnections(self):
for permid in self.connection_list.keys():
self._checkConnection(permid)
def _checkConnection(self, permid):
conn = self.connection_list[permid]['c_conn']
expire = self.connection_list[permid]['expire']
expired = time() > expire
if not conn or conn.closed or expired:
if expired:
if DEBUG:
print >> sys.stderr,"secover: closing expired conn",show_permid2(permid), int(time())
self._closeConnection(conn, 'TIME_OUT')
else:
if DEBUG:
print >> sys.stderr,"secover: removing closed conn",show_permid2(permid)
self._closePermidConnection(permid, 'CON_CLOS')
ret = None
else:
ret = conn
return ret
# the central place to close connection
def _closeConnection(self, connection, reason):
self.acquire()
if connection is not None and not connection.closed:
permid = connection.permid
connection.close()
## connectionLost callback is called by connection.close() which
## will remove the conn from the list, but just to be safe:
if permid and self.connection_list.has_key(permid):
if GLOBAL.overlay_log:
write_overlay_log('CONN_DEL', permid, reason=reason)
self.connection_list.pop(permid)
self.release()
def _closePermidConnection(self, permid, reason):
self.acquire()
if self.connection_list.has_key(permid):
connection = self.connection_list[permid]['c_conn']
if connection is not None and not connection.closed:
connection.close()
if GLOBAL.overlay_log:
write_overlay_log('CONN_DEL', permid, reason=reason)
self.connection_list.pop(permid)
self.release()
def _findConnByPermid(self, permid):
if self.connection_list.has_key(permid):
return self._checkConnection(permid)
else:
return None
def findPermidByDNS(self, dns): #find permid from connection_list
self.acquire()
ret = None
for permid, value in self.connection_list.items():
if value['dns'] == dns and self._checkConnection(permid):
ret = permid
break
self.release()
return ret
def findDNSByPermid(self, permid):
self.acquire()
ret = None
if self._findConnByPermid(permid):
ret = self.connection_list[permid]['dns']
self.release()
return ret
# Main function to send messages
def addTask(self, target, message=None, timeout=15): # target = [permid|(ip,port)]
""" Command Pattern """
if GLOBAL.do_overlay == 0:
return
self.acquire()
#TODO: priority task queue
try:
try:
if message is None:
msg_id = 'None'
else:
msg_id = getMessageName(message[0])
if msg_id.startswith('Unknown'):
return
if isValidPermid(target) and target != self.permid:
if DEBUG:
msg = msg_id + ' '+currentThread().getName()
if DEBUG:
print >> sys.stderr,"secover: add PermidOverlayTask", show_permid_short(target), msg
task = PermidOverlayTask(self, self.subject_manager, target, message, timeout)
elif isValidDNS(target): # and target[0] != self.ip: # for testing
if DEBUG:
if message is None:
msg = 'None'
else:
msg = getMessageName(message[0])
msg = msg_id + ' '+currentThread().getName()
if DEBUG:
print >> sys.stderr,"secover: add DNSOverlayTask", target, msg
task = DNSOverlayTask(self, self.subject_manager, target, message, timeout)
else:
return
if task and self.overlayswarm.registered:
## Arno: I don't see the need for letting the rawserver do it.
## Except that it potentially avoids a concurrency problem of
## multiple threads writing to the same socket.
if DEBUG:
if message:
msg_id = getMessageName(message[0])
else:
msg_id = ''
print >> sys.stderr,"secover: add task to rawserver", msg_id, currentThread().getName()
self.overlayswarm.rawserver.add_task(task.start, 0)
##task.start()
except Exception,e:
print_exc()
finally:
self.release()
def connectionMade(self, connection): # OverlayConnecter.Connection
self.acquire()
if DEBUG:
print >> sys.stderr,"secover: *** secure overlay to %s connection made." % show_permid2(connection.permid), connection.get_ip(), int(time())
#TODO: schedule it on rawserver task queue?
dns = self._addConnection(connection)
if dns:
if GLOBAL.overlay_log:
write_overlay_log('CONN_ADD', connection.permid)
self.subject_manager.notifySubject(dns)
self.release()
def addTryTimes(self, permid):
self.peer_db.updateTimes(permid, 'tried_times', 1)
def _addConnection(self, connection):
dns = connection.dns
permid = connection.permid
self.peer_db.updateTimes(permid, 'connected_times', 1)
auth_listen_port = connection.get_auth_listen_port()
if DEBUG:
print >> sys.stderr,"secover: add connection in secure overlay", dns, "auth listen port", auth_listen_port
#
# Arno: if DNS is none, this is an incoming connection from another
# peer. We cannot enter this connection into the table because we don't
# know the listen port of the peer (and if we would initiate a connection
# that is the port we look for). However, I encoded the listen port of a peer
# into its peerID. So now we know the initiating peers listen port and
# the problem is solved.
#
if dns is None:
dns = ( connection.get_ip(), auth_listen_port )
else:
if dns[1] != auth_listen_port:
if DEBUG:
print >> sys.stderr,"secover: WARNING given listen port not equal to authenticated one"
if isValidPermid(permid) and isValidDNS(dns):
if self.connection_list.has_key(permid):
# Conccurency: When a peer starts an overlay connection at
# the same time, and we start it before the C/R protocol
# has finished, we'll end up with two connections. In that
# case we drop the last one established.
if DEBUG:
print >> sys.stderr,"secover: dropping superfluous double connection to",show_permid2(permid)
connection.close()
# Don't stop
return dns
self._updateDNS(permid, dns)
expire = int(time() + self.timeout)
self.connection_list[permid] = {'c_conn':connection, 'dns':dns, 'expire':expire}
if DEBUG:
print >> sys.stderr,"secover: permid received is", show_permid2(permid)
#x = self.peer_db.getPeer(permid)
#print >> sys.stderr,"secover: old peer is",x
#self.peer_db.updatePeerIPPort(permid, dns[0], dns[1])
#y = self.peer_db.getPeer(permid)
#print >> sys.stderr,"secover: new peer is",y
return dns
return None
def _updateDNS(self, permid, dns):
self.peer_db.updatePeerIPPort(permid, dns[0], dns[1])
def _extendExpire(self, permid):
self.connection_list[permid]['expire'] = int(time() + self.timeout)
def connectionLost(self, connection): # OverlayConnecter.Connection
if DEBUG:
print >> sys.stderr,"secover: ***** secure overlay connection lost", show_permid2(connection.permid), connection.get_ip(), int(time())
self.acquire()
self._closeConnection(connection, 'CON_LOST')
self.release()
def connectPeer(self, dns): # called by task
self.acquire()
self.overlayswarm.connectPeer(dns)
self.release()
def sendMessage(self, permid, message):
if not permid:
return
self.acquire()
connection = self._findConnByPermid(permid)
if connection:
if GLOBAL.overlay_log:
write_overlay_log('SEND_MSG', permid, message)
self._extendExpire(permid)
self.overlayswarm.sendMessage(connection, message)
self.release()
def gotMessage(self, permid, message): # connection is type of Connecter.Connection
self.acquire()
try:
if GLOBAL.overlay_log:
write_overlay_log('RECV_MSG', permid, message)
t = message[0]
if t == CANCEL: # the only message handled by secure overlay
self._closePermidConnection(permid, 'CANCELED')
elif self.incoming_handler.handleMessage(permid, message) == False:
self._closePermidConnection(permid, 'FAKE_MSG')
else:
self._extendExpire(permid)
except:
print_exc()
self.release()
def acquire(self):
# if DEBUG:
# print >> sys.stderr,"secover: LOCK",currentThread().getName()
self.lock.acquire()
def release(self):
# if DEBUG:
# print >> sys.stderr,"secover: UNLOCK",currentThread().getName()
self.lock.release()
def write_overlay_log(action, permid, msg=None, dns=None, reason=None):
"""
SecureOverlay log format:
TIME - CONN_TRY - IP - PORT - PERMID
TIME - CONN_ADD - IP - PORT - PERMID
TIME - CONN_DEL - IP - PORT - REASON(TIME_OUT, CON_CLOS, CON_LOST, CANCELED, FAKE_MSG) - PERMID
TIME - SEND_MSG - IP - PORT - MSG_ID - PERMID - MSG
TIME - RECV_MSG - IP - PORT - MSG_ID - PERMID - MSG
"""
if dns is not None and permid is not None:
ip, port = dns
elif isValidPermid(permid): # permid, msg
secure_overlay = SecureOverlay.getInstance()
dns = secure_overlay.connection_list[permid]['dns']
ip = dns[0]
port = dns[1]
else: # connection
permid = 'Permid_None'
ip = 'None_ip'
port = 0
if permid != 'Permid_None':
permid = show_permid(permid)
port = str(port)
sp_log = OverlayLogger.getInstance(GLOBAL.overlay_log)
if msg:
msg_name = getMessageName(msg[0])
sp_log.log(action, ip, port, msg_name, permid, `msg`) # SEND_MSG, RECV_MSG
else:
if reason is not None:
sp_log.log(action, ip, port, reason, permid) # CONN_DEL
else:
sp_log.log(action, ip, port, permid) # CONN_TRY, CONN_ADD
def test():
so = SecureOverlay.getInstance()
so.overlayswarm.secure_overlay = so
dns = ('4.3.2.1', 1111)
permid = 'permid1'
so.addTask(permid)
so.addTask(dns, message="hello overlay")
|