# Copyright (C) 2006-2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
__all__ = ['needs_read_lock',
'needs_write_lock',
'use_fast_decorators',
'use_pretty_decorators',
]
import sys
from bzrlib import trace
def _get_parameters(func):
"""Recreate the parameters for a function using introspection.
:return: (function_params, calling_params)
function_params: is a string representing the parameters of the
function. (such as "a, b, c=None, d=1")
This is used in the function declaration.
calling_params: is another string representing how you would call the
function with the correct parameters. (such as "a, b, c=c, d=d")
Assuming you sued function_params in the function declaration, this
is the parameters to put in the function call.
For example:
def wrapper(%(function_params)s):
return original(%(calling_params)s)
"""
# "import inspect" should stay in local scope. 'inspect' takes a long time
# to import the first time. And since we don't always need it, don't import
# it globally.
import inspect
args, varargs, varkw, defaults = inspect.getargspec(func)
formatted = inspect.formatargspec(args, varargs=varargs,
varkw=varkw,
defaults=defaults)
if defaults is None:
args_passed = args
else:
first_default = len(args) - len(defaults)
args_passed = args[:first_default]
for arg in args[first_default:]:
args_passed.append("%s=%s" % (arg, arg))
if varargs is not None:
args_passed.append('*' + varargs)
if varkw is not None:
args_passed.append('**' + varkw)
args_passed = ', '.join(args_passed)
return formatted[1:-1], args_passed
def _pretty_needs_read_lock(unbound):
"""Decorate unbound to take out and release a read lock.
This decorator can be applied to methods of any class with lock_read() and
unlock() methods.
Typical usage:
class Branch(...):
@needs_read_lock
def branch_method(self, ...):
stuff
"""
# This compiles a function with a similar name, but wrapped with
# lock_read/unlock calls. We use dynamic creation, because we need the
# internal name of the function to be modified so that --lsprof will see
# the correct name.
# TODO: jam 20070111 Modify this template so that the generated function
# has the same argument signature as the original function, which
# will help commands like epydoc.
# This seems possible by introspecting foo.func_defaults, and
# foo.func_code.co_argcount and foo.func_code.co_varnames
template = """\
def %(name)s_read_locked(%(params)s):
self.lock_read()
try:
result = unbound(%(passed_params)s)
except:
import sys
exc_info = sys.exc_info()
try:
self.unlock()
finally:
raise exc_info[0], exc_info[1], exc_info[2]
else:
self.unlock()
return result
read_locked = %(name)s_read_locked
"""
params, passed_params = _get_parameters(unbound)
variables = {'name':unbound.__name__,
'params':params,
'passed_params':passed_params,
}
func_def = template % variables
exec func_def in locals()
read_locked.__doc__ = unbound.__doc__
read_locked.__name__ = unbound.__name__
return read_locked
def _fast_needs_read_lock(unbound):
"""Decorate unbound to take out and release a read lock.
This decorator can be applied to methods of any class with lock_read() and
unlock() methods.
Typical usage:
class Branch(...):
@needs_read_lock
def branch_method(self, ...):
stuff
"""
def read_locked(self, *args, **kwargs):
self.lock_read()
try:
result = unbound(self, *args, **kwargs)
except:
import sys
exc_info = sys.exc_info()
try:
self.unlock()
finally:
raise exc_info[0], exc_info[1], exc_info[2]
else:
self.unlock()
return result
read_locked.__doc__ = unbound.__doc__
read_locked.__name__ = unbound.__name__
return read_locked
def _pretty_needs_write_lock(unbound):
"""Decorate unbound to take out and release a write lock."""
template = """\
def %(name)s_write_locked(%(params)s):
self.lock_write()
try:
result = unbound(%(passed_params)s)
except:
import sys
exc_info = sys.exc_info()
try:
self.unlock()
finally:
raise exc_info[0], exc_info[1], exc_info[2]
else:
self.unlock()
return result
write_locked = %(name)s_write_locked
"""
params, passed_params = _get_parameters(unbound)
variables = {'name':unbound.__name__,
'params':params,
'passed_params':passed_params,
}
func_def = template % variables
exec func_def in locals()
write_locked.__doc__ = unbound.__doc__
write_locked.__name__ = unbound.__name__
return write_locked
def _fast_needs_write_lock(unbound):
"""Decorate unbound to take out and release a write lock."""
def write_locked(self, *args, **kwargs):
self.lock_write()
try:
result = unbound(self, *args, **kwargs)
except:
exc_info = sys.exc_info()
try:
self.unlock()
finally:
raise exc_info[0], exc_info[1], exc_info[2]
else:
self.unlock()
return result
write_locked.__doc__ = unbound.__doc__
write_locked.__name__ = unbound.__name__
return write_locked
def only_raises(*errors):
"""Make a decorator that will only allow the given error classes to be
raised. All other errors will be logged and then discarded.
Typical use is something like::
@only_raises(LockNotHeld, LockBroken)
def unlock(self):
# etc
"""
def decorator(unbound):
def wrapped(*args, **kwargs):
try:
return unbound(*args, **kwargs)
except errors:
raise
except:
trace.mutter('Error suppressed by only_raises:')
trace.log_exception_quietly()
wrapped.__doc__ = unbound.__doc__
wrapped.__name__ = unbound.__name__
return wrapped
return decorator
# Default is more functionality, 'bzr' the commandline will request fast
# versions.
needs_read_lock = _pretty_needs_read_lock
needs_write_lock = _pretty_needs_write_lock
def use_fast_decorators():
"""Change the default decorators to be fast loading ones.
The alternative is to have decorators that do more work to produce
nice-looking decorated functions, but this slows startup time.
"""
global needs_read_lock, needs_write_lock
needs_read_lock = _fast_needs_read_lock
needs_write_lock = _fast_needs_write_lock
def use_pretty_decorators():
"""Change the default decorators to be pretty ones."""
global needs_read_lock, needs_write_lock
needs_read_lock = _pretty_needs_read_lock
needs_write_lock = _pretty_needs_write_lock
# This implementation of cachedproperty is copied from Launchpad's
# canonical.launchpad.cachedproperty module (with permission from flacoste)
# -- spiv & vila 100120
def cachedproperty(attrname_or_fn):
"""A decorator for methods that makes them properties with their return
value cached.
The value is cached on the instance, using the attribute name provided.
If you don't provide a name, the mangled name of the property is used.
>>> class CachedPropertyTest(object):
...
... @cachedproperty('_foo_cache')
... def foo(self):
... print 'foo computed'
... return 23
...
... @cachedproperty
... def bar(self):
... print 'bar computed'
... return 69
>>> cpt = CachedPropertyTest()
>>> getattr(cpt, '_foo_cache', None) is None
True
>>> cpt.foo
foo computed
23
>>> cpt.foo
23
>>> cpt._foo_cache
23
>>> cpt.bar
bar computed
69
>>> cpt._bar_cached_value
69
"""
if isinstance(attrname_or_fn, basestring):
attrname = attrname_or_fn
return _CachedPropertyForAttr(attrname)
else:
fn = attrname_or_fn
attrname = '_%s_cached_value' % fn.__name__
return _CachedProperty(attrname, fn)
class _CachedPropertyForAttr(object):
def __init__(self, attrname):
self.attrname = attrname
def __call__(self, fn):
return _CachedProperty(self.attrname, fn)
class _CachedProperty(object):
def __init__(self, attrname, fn):
self.fn = fn
self.attrname = attrname
self.marker = object()
def __get__(self, inst, cls=None):
if inst is None:
return self
cachedresult = getattr(inst, self.attrname, self.marker)
if cachedresult is self.marker:
result = self.fn(inst)
setattr(inst, self.attrname, result)
return result
else:
return cachedresult
|