# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: thread_utils.py 4747 2008-10-22 19:58:55Z kurt $
from __future__ import with_statement
import threading
import sys
from snaplogic.common.snap_exceptions import SnapValueError
class RecursiveLockError(Exception):
pass
class ThreadNotLockedError(Exception):
pass
class SubLockHandle(object):
"""
A context manager class used as a helper to RWLock.
Instances of this class will allow use of the RWLock.acquire_read and RWLock.acquire_write
methods in a 'with' statement. For example, the two following snippets are identical:
{{{
lock = RWLock()
lock.acquire_read()
<code statements>
lock.release()
}}}
{{{
lock = RWLock()
with lock.acquire_read():
<code statements>
}}}
"""
def __init__(self, rwlock, lock_type):
"""
Initialization.
"""
super(SubLockHandle, self).__init__()
self._rwlock = rwlock
self.lock_type = lock_type
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False
def acquire(self, blocking=True):
"""See description of RWLock.acquire_read and RWLock.acquire_write for details."""
if self.lock_type is RWLock.READ_LOCK:
return self._rwlock.acquire_read(blocking)
else:
return self._rwlock.acquire_write(blocking)
def release(self):
"""See description of RWLock.release for details."""
self._rwlock.release()
class RWLock(object):
READ_LOCK = 'read'
WRITE_LOCK = 'write'
def __init__(self):
self._cond = threading.Condition()
self._lockers = {}
self._write_active = False
def read_lock(self):
return SubLockHandle(self, self.READ_LOCK)
def write_lock(self):
return SubLockHandle(self, self.WRITE_LOCK)
def get_lock_handle(self, lock_mode):
"""
Return a lock handle for the given lock mode.
Returns the appropriate handle as returned by read_lock or write_lock depending on lock_mode. The
lock_mode parameter should be either RWLock.READ_LOCK or RWLock.WRITE_LOCK.
@param lock_mode: Lock mode of handle to return.
@type lock_mode: RWlock constant
@return: Handle object returned by read_lock or write_lock.
@rtype: object
@raises SnapValueError: The lock_mode parameter is invalid.
"""
if lock_mode is self.READ_LOCK:
return self.read_lock()
elif lock_mode is self.WRITE_LOCK:
return self.write_lock()
else:
raise SnapValueError("Invalid lock mode '%s'" % str(lock_mode))
def acquire_read(self, blocking=True):
"""
Acquire a (shared) read lock.
Multiple threads may hold a read lock at the same time. Write threads are blocked until all reader
thread locks are released. If a write lock is in progress, all read threads are blocked.
If a write lock is currently held by some thread and acquire_read() is called with blocking=False,
the call returns immediately with False. If a write lock is not currently held, the read lock is
acquired and True is returned.
@param blocking: Flag indicating if acquiring the read lock should block.
@type blocking: bool
@return: True if the read lock was acquired or shared with other read threads. False if the lock
could not be immediately acquired and blocking=False.
@rtype: bool
"""
if self._cond.acquire(blocking):
try:
thread = threading.currentThread()
if thread not in self._lockers:
if blocking or not self._lockers:
# Wait until no writing thread active.
while self._write_active:
self._cond.wait()
self._lockers[thread] = self.READ_LOCK
return True
else:
# Couldn't immediately acquire lock and blocking=False.
return False
else:
raise RecursiveLockError("Lock (%s) already acquired by thread '%s'" % (self._lockers[thread],
thread.getName()))
finally:
self._cond.release()
else:
return False
def acquire_write(self, blocking=True):
"""
Acquire an exclusive write lock.
Only one thread may hold a write lock at a time, and no threads may hold a read lock while a
write lock is held.
If a read or write lock is currently held and acquire_write() is called with blocking=False, the call
returns immediately with False. If neither a write nor read lock are currently held, the write lock is
acquired True is returned.
@param blocking: Flag indicating if acquiring the write lock should block.
@type blocking: bool
@return: True if the write lock was acquired. False if the lock could not be immediately acquired and
blocking=False.
@rtype: bool
"""
if self._cond.acquire(blocking):
try:
thread = threading.currentThread()
if thread not in self._lockers:
if blocking or not self._lockers:
# Wait until there are no locking threads.
while self._lockers:
self._cond.wait()
self._lockers[thread] = self.WRITE_LOCK
self._write_active = True
return True
else:
# Couldn't immediately acquire lock and blocking=False.
return False
else:
raise RecursiveLockError("Lock (%s) already acquired by thread '%s'" % (self._lockers[thread],
thread.getName()))
finally:
self._cond.release()
else:
# Couldn't immediately acquire lock and blocking=False.
return False
def acquire(self, lock_mode, blocking=True):
"""
Acquire the read or write lock depending on lock_mode.
The lock_mode parameter should be either RWLock.READ_LOCK or RWLock.WRITE_LOCK and the appropriate
acquire_* method will be called.
@param lock_mode: Lock mode to acquire.
@param lock_mode: RWLock constant
@param blocking: Blocking flag passed on to acquire_read or acquire_write.
@type blocking: bool
@return: Returns whatever acquire_read or acquire_write return.
@rtype: bool
@raises SnapValueError: The lock_mode parameter is invalid.
"""
if lock_mode is self.READ_LOCK:
return self.acquire_read(blocking)
elif lock_mode is self.WRITE_LOCK:
return self.acquire_write(blocking)
else:
raise SnapValueError("Invalid lock mode '%s'" % str(lock_mode))
def release(self):
"""
Release the currently held read or write lock.
"""
with self._cond:
thread = threading.currentThread()
if thread in self._lockers:
if self._lockers[thread] is self.WRITE_LOCK:
self._write_active = False
del self._lockers[thread]
self._cond.notifyAll()
else:
raise ThreadNotLockedError("Thread '%s' attempted to release unheld lock" % thread.getName())
|