# -*- Mode: Python; tab-width: 4 -*-
# Copyright 1999, 2000 by eGroups, Inc.
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# eGroups not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# EGROUPS DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL EGROUPS BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# There are two RPC implementations here.
# The first ('rpc') attempts to be as transparent as possible, and
# passes along 'internal' methods like __getattr__, __getitem__, and
# __del__. It is rather 'chatty', and may not be suitable for a
# high-performance system.
# The second ('fastrpc') is less flexible, but has much less overhead,
# and is easier to use from an asynchronous client.
import marshal
import socket
import string
import sys
import types
import asyncore
import asynchat
from producers import scanning_producer
from counter import counter
MY_NAME = string.split (socket.gethostname(), '.')[0]
# ===========================================================================
# RPC server
# ===========================================================================
# marshal is good for low-level data structures.
# but when passing an 'object' (any non-marshallable object)
# we really want to pass a 'reference', which will act on
# the other side as a proxy. How transparent can we make this?
class rpc_channel (asynchat.async_chat):
'Simple RPC server.'
# a 'packet': NNNNNNNNmmmmmmmmmmmmmmmm
# (hex length in 8 bytes, followed by marshal'd packet data)
# same protocol used in both directions.
STATE_LENGTH = 'length state'
STATE_PACKET = 'packet state'
ac_out_buffer_size = 65536
request_counter = counter()
exception_counter = counter()
client_counter = counter()
def __init__ (self, root, conn, addr):
self.root = root
self.addr = addr
asynchat.async_chat.__init__ (self, conn)
self.pstate = self.STATE_LENGTH
self.set_terminator (8)
self.buffer = []
self.proxies = {}
rid = id(root)
self.new_reference (root)
p = marshal.dumps ((rid,))
# send root oid to the other side
self.push ('%08x%s' % (len(p), p))
self.client_counter.increment()
def new_reference (self, object):
oid = id(object)
ignore, refcnt = self.proxies.get (oid, (None, 0))
self.proxies[oid] = (object, refcnt + 1)
def forget_reference (self, oid):
object, refcnt = self.proxies.get (oid, (None, 0))
if refcnt > 1:
self.proxies[oid] = (object, refcnt - 1)
else:
del self.proxies[oid]
def log (self, *ignore):
pass
def collect_incoming_data (self, data):
self.buffer.append (data)
def found_terminator (self):
self.buffer, data = [], string.join (self.buffer, '')
if self.pstate is self.STATE_LENGTH:
packet_length = string.atoi (data, 16)
self.set_terminator (packet_length)
self.pstate = self.STATE_PACKET
else:
self.set_terminator (8)
self.pstate = self.STATE_LENGTH
oid, kind, arg = marshal.loads (data)
obj, refcnt = self.proxies[oid]
e = None
reply_kind = 2
try:
if kind == 0:
# __call__
result = apply (obj, arg)
elif kind == 1:
# __getattr__
result = getattr (obj, arg)
elif kind == 2:
# __setattr__
key, value = arg
result = setattr (obj, key, value)
elif kind == 3:
# __repr__
result = repr(obj)
elif kind == 4:
# __del__
self.forget_reference (oid)
result = None
elif kind == 5:
# __getitem__
result = obj[arg]
elif kind == 6:
# __setitem__
(key, value) = arg
obj[key] = value
result = None
elif kind == 7:
# __len__
result = len(obj)
except:
reply_kind = 1
(file,fun,line), t, v, tbinfo = asyncore.compact_traceback()
result = '%s:%s:%s:%s (%s:%s)' % (MY_NAME, file, fun, line, t, str(v))
self.log_info (result, 'error')
self.exception_counter.increment()
self.request_counter.increment()
# optimize a common case
if type(result) is types.InstanceType:
can_marshal = 0
else:
can_marshal = 1
try:
rb = marshal.dumps ((reply_kind, result))
except ValueError:
can_marshal = 0
if not can_marshal:
# unmarshallable object, return a reference
rid = id(result)
self.new_reference (result)
rb = marshal.dumps ((0, rid))
self.push_with_producer (
scanning_producer (
('%08x' % len(rb)) + rb,
buffer_size = 65536
)
)
class rpc_server_root:
pass
class rpc_server (asyncore.dispatcher):
def __init__ (self, root, address = ('', 8746)):
self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind (address)
self.listen (128)
self.root = root
def handle_accept (self):
conn, addr = self.accept()
rpc_channel (self.root, conn, addr)
# ===========================================================================
# Fast RPC server
# ===========================================================================
# no proxies, request consists
# of a 'chain' of getattrs terminated by a __call__.
# Protocol:
# <path>.<to>.<object> ( <param1>, <param2>, ... )
# => ( <value1>, <value2>, ... )
#
#
# (<path>, <params>)
# path: tuple of strings
# params: tuple of objects
class fastrpc_channel (asynchat.async_chat):
'Simple RPC server'
# a 'packet': NNNNNNNNmmmmmmmmmmmmmmmm
# (hex length in 8 bytes, followed by marshal'd packet data)
# same protocol used in both directions.
# A request consists of (<path-tuple>, <args-tuple>)
# where <path-tuple> is a list of strings (eqv to string.split ('a.b.c', '.'))
STATE_LENGTH = 'length state'
STATE_PACKET = 'packet state'
def __init__ (self, root, conn, addr):
self.root = root
self.addr = addr
asynchat.async_chat.__init__ (self, conn)
self.pstate = self.STATE_LENGTH
self.set_terminator (8)
self.buffer = []
def log (*ignore):
pass
def collect_incoming_data (self, data):
self.buffer.append (data)
def found_terminator (self):
self.buffer, data = [], string.join (self.buffer, '')
if self.pstate is self.STATE_LENGTH:
packet_length = string.atoi (data, 16)
self.set_terminator (packet_length)
self.pstate = self.STATE_PACKET
else:
self.set_terminator (8)
self.pstate = self.STATE_LENGTH
(path, params) = marshal.loads (data)
o = self.root
e = None
try:
for p in path:
o = getattr (o, p)
result = apply (o, params)
except:
e = repr (asyncore.compact_traceback())
result = None
rb = marshal.dumps ((e,result))
self.push (('%08x' % len(rb)) + rb)
class fastrpc_server (asyncore.dispatcher):
def __init__ (self, root, address = ('', 8748)):
self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind (address)
self.listen (128)
self.root = root
def handle_accept (self):
conn, addr = self.accept()
fastrpc_channel (self.root, conn, addr)
# ===========================================================================
if __name__ == '__main__':
class thing:
def __del__ (self):
print 'a thing has gone away %08x' % id(self)
class sample_calc:
def product (self, *values):
return reduce (lambda a,b: a*b, values, 1)
def sum (self, *values):
return reduce (lambda a,b: a+b, values, 0)
def eval (self, string):
return eval (string)
def make_a_thing (self):
return thing()
import sys
if '-f' in sys.argv:
server_class = fastrpc_server
address = ('', 8748)
else:
server_class = rpc_server
address = ('', 8746)
root = rpc_server_root()
root.calc = sample_calc()
root.sys = sys
rs = server_class (root, address)
asyncore.loop()
|