# Copyright (c) 2004 Divmod.
# See LICENSE for details.
import inspect, os.path
class UnexposedMethodError(Exception):
"""
Raised on any attempt to get a method which has not been exposed.
"""
class Expose(object):
"""
Helper for exposing methods for various uses using a simple decorator-style
callable.
Instances of this class can be called with one or more functions as
positional arguments. The names of these functions will be added to a list
on the class object of which they are methods.
@ivar attributeName: The attribute with which exposed methods will be
tracked.
"""
def __init__(self, doc=None):
self.doc = doc
def __call__(self, *funcObjs):
"""
Add one or more functions to the set of exposed functions.
This is a way to declare something about a class definition, similar to
L{zope.interface.implements}. Use it like this::
| magic = Expose('perform extra magic')
| class Foo(Bar):
| def twiddle(self, x, y):
| ...
| def frob(self, a, b):
| ...
| magic(twiddle, frob)
Later you can query the object::
| aFoo = Foo()
| magic.get(aFoo, 'twiddle')(x=1, y=2)
The call to C{get} will fail if the name it is given has not been
exposed using C{magic}.
@param funcObjs: One or more function objects which will be exposed to
the client.
@return: The first of C{funcObjs}.
"""
if not funcObjs:
raise TypeError("expose() takes at least 1 argument (0 given)")
for fObj in funcObjs:
fObj.exposedThrough = getattr(fObj, 'exposedThrough', [])
fObj.exposedThrough.append(self)
return funcObjs[0]
def exposedMethodNames(self, instance):
"""
Return an iterator of the names of the methods which are exposed on the
given instance.
"""
for k, callable in inspect.getmembers(instance, inspect.isroutine):
if self in getattr(callable, 'exposedThrough', []):
yield k
_nodefault = object()
def get(self, instance, methodName, default=_nodefault):
"""
Retrieve an exposed method with the given name from the given instance.
@raise UnexposedMethodError: Raised if C{default} is not specified and
there is no exposed method with the given name.
@return: A callable object for the named method assigned to the given
instance.
"""
method = getattr(instance, methodName, None)
exposedThrough = getattr(method, 'exposedThrough', [])
if self not in getattr(method, 'exposedThrough', []):
if default is self._nodefault:
raise UnexposedMethodError(self, methodName)
return default
return method
def escapeToXML(text, isattrib = False):
"""Borrowed from twisted.xish.domish
Escape text to proper XML form, per section 2.3 in the XML specification.
@type text: L{str}
@param text: Text to escape
@type isattrib: L{bool}
@param isattrib: Triggers escaping of characters necessary for use as attribute values
"""
text = text.replace("&", "&")
text = text.replace("<", "<")
text = text.replace(">", ">")
if isattrib:
text = text.replace("'", "'")
text = text.replace("\"", """)
return text
def getPOSTCharset(ctx):
"""Locate the unicode encoding of the POST'ed form data.
To work reliably you must do the following:
- set the form's enctype attribute to 'multipart/form-data'
- set the form's accept-charset attribute, probably to 'utf-8'
- add a hidden form field called '_charset_'
For instance::
<form action="foo" method="post" enctype="multipart/form-data" accept-charset="utf-8">
<input type="hidden" name="_charset_" />
...
</form>
"""
from nevow import inevow
request = inevow.IRequest(ctx)
# Try the magic '_charset_' field, Mozilla and IE set this.
charset = request.args.get('_charset_',[None])[0]
if charset:
return charset
# Look in the 'content-type' request header
contentType = request.received_headers.get('content-type')
if contentType:
charset = dict([ s.strip().split('=') for s in contentType.split(';')[1:] ]).get('charset')
if charset:
return charset
return 'utf-8'
from twisted.python.reflect import qual,namedAny,allYourBase,accumulateBases
from twisted.python.util import uniquify
from twisted.internet.defer import Deferred,succeed,maybeDeferred,DeferredList
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.python import log
## The tests rely on these, but they should be removed ASAP
def remainingSegmentsFactory(ctx):
return tuple(ctx.tag.postpath)
def currentSegmentsFactory(ctx):
return tuple(ctx.tag.prepath)
class _RandomClazz(object):
pass
class _NamedAnyError(Exception):
'Internal error for when importing fails.'
def _namedAnyWithBuiltinTranslation(name):
if name == '__builtin__.function':
name='types.FunctionType'
elif name == '__builtin__.method':
return _RandomClazz # Hack
elif name == '__builtin__.instancemethod':
name='types.MethodType'
elif name == '__builtin__.NoneType':
name='types.NoneType'
elif name == '__builtin__.generator':
name='types.GeneratorType'
return namedAny(name)
# Import resource_filename from setuptools's pkg_resources module if possible
# because it handles resources in .zip files. If it's not provide a version
# that assumes the resource is directly available on the filesystem.
try:
from pkg_resources import resource_filename
except ImportError:
def resource_filename(modulename, resource_name):
modulepath = namedAny(modulename).__file__
return os.path.join(os.path.dirname(os.path.abspath(modulepath)), resource_name)
class CachedFile(object):
"""
Helper for caching operations on files in the filesystem.
"""
def __init__(self, path, loader):
"""
@type path: L{str}
@param path: The path to the associated file in the filesystem.
@param loader: A callable that returns the relevant data; invoked when
the cache is empty or stale.
"""
self.path = path
self.loader = loader
self.invalidate()
def invalidate(self):
"""
Invalidate the cache, forcing a reload from disk at the next attempted
load.
"""
self._mtime = None
def load(self, *args, **kwargs):
"""
Load this file. Any positional or keyword arguments will be passed
along to the loader callable, after the path itself.
"""
currentTime = os.path.getmtime(self.path)
if self._mtime is None or currentTime != self._mtime:
self._cachedObj = self.loader(self.path, *args, **kwargs)
self._mtime = currentTime
return self._cachedObj
|