data_cache.py :  » Development » SnapLogic » snaplogic » server » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » server » data_cache.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

# $Id: data_cache.py 4423 2008-09-23 18:47:31Z jbrendel $


"""
This module helps cache data results of a pipeline run.

The cache is a file system based cache. Each data result
is stored in a separate file. The URL used to invoke the
pipeline is hashed (using md5) and used as the key to
retrieve the cache data.

When an entry is created, the URL is hashed and a header
file is created with that hash name. This file contains
the URL and the name of the data file which actual data.
We have this header file to deal with any hashing collisions.

*** Order of lock acquisition to avoid deadlocks ***
Acquire header lock first, then delete list lock.

"""
import os
import stat
import glob
import threading
import md5
import tempfile
import time
import re
import simplejson
import errno

from snaplogic.common.snap_exceptions import *
from snaplogic.common import file_lock
from snaplogic.common import snap_log

cache_timeout = 300
"""Cache timeout in seconds."""

cache_dir = None
"""Location of the cache directory."""

max_size = None
"""Cache size limit in bytes."""


high_water_mark = None
"""
High water mark (in bytes)
When the cache size reaches this level, it should be
cleaned up until it reduces to the low_water_mark

"""

low_water_mark = None
"""Low water mark (in bytes)."""

log = None
"""Log file method."""

elog = None
"""Exception log method."""

_tmp_dir = None
"""The directory under cache, which has the temp files."""

_header_dir = None
"""The directory under cache, which has the header files."""

_result_dir = None
"""The directory under cache, which has the result files."""

_index_dir = None
"""The directory under cache, which has all the index file names."""

_delete_list_file = None
"""This file contains all cache control related information."""

_current_cache_size = None
"""Current cache size."""

_lock = threading.RLock()
"""Lock for cache size increment/decrement."""

HDR_INDEX_FILE  = "index_f"
HDR_DATA_FILE   = "data_f"
HDR_ROW_COUNT   = "row_c"
HDR_GEN_ID      = "gen_id"
HDR_GUID        = "guid"
HDR_DATA_FORMAT = "data_format"

def _inc_current_size(sz):
    """
    Increment current cache size by sz bytes.
    
    @param sz: Size by which to increment.
    @type sz:  int or long
    
    """
    global _current_cache_size
    _lock.acquire()

    if _current_cache_size is not None:
        _current_cache_size += sz
    else:
        _current_cache_size = sz

    _lock.release()

def _dec_current_size(sz):
    """
    Decrement current cache size by sz bytes.
    
    @param sz: Size by which to decrement.
    @type sz:  int or long
    
    """
    global _current_cache_size
    _lock.acquire()
    if _current_cache_size is not None:
        _current_cache_size -= sz
        if _current_cache_size < 0:
            _current_cache_size = 0

    _lock.release()
    
def _set_current_size(sz):
    """
    Set current cache size.
    
    @param sz: Updated size for cache.
    @type sz:  int or long
    
    """
    
    global _current_cache_size
    _lock.acquire()
    _current_cache_size = sz
    _lock.release()

def _get_current_size():
    """Return the current size (in bytes)."""
    _lock.acquire()
    sz = _current_cache_size
    _lock.release()
    return sz 

def _add_to_delete_list(fname):
    """
    Add specified data file to delete list.
    
    @param fname: name of the data file.
    @type fname:  string
    
    """ 
    # Add \n, since this file will occupy a single line in the delete list file.
    entry = fname + "\n" 
    # Open and lock the delete list file
    fp = open(_delete_list_file, "r+")
    file_lock.lock(fp, file_lock.LOCK_EX)
    try:
        lineslist = fp.readlines()
        if entry not in lineslist:
            # We must be at the end of the file, append new entry.
            fp.write(entry)
    finally:
        # Whatever happens, unlock the delete list file.
        fp.close()
        

def _remove_data_file(fname, use_delete_list, err_val = None):
    """
    Remove data file from the cache.
    
    If the remove fails, then the file is added to the delete list. This
    method increments the current size of the cache, as it deletes the
    entry.
    
    @param fname: Name of the data file that should be deleted.
    @type fname:  string
    
    @param use_delete_list: True, if data file should be put in delete list when delete fails.
    @type use_delete_list:  bool
    
    @param err_val: If not None, should be a dict. This method will be set to the error number and error string
        returned by OS in the event of delete failure. The keys are "errno" and "errstr"
    @param err_val: dict
    
    @return: Size of dat file, if the remove succeeded. None, if the file had to be added to delete list
        or did not exist.
    @rtype:  long or None

    """
    global global_remove_list
    del_ok = False
    try:
        sz = os.path.getsize(fname)
        os.remove(fname)
        del_ok = True
    except OSError, (errnum, errstr):
        if err_val:
            err_val['errno'] = errnum
            err_val['errstr'] = errstr            
        if errnum == errno.ENOENT:
            # The file is already gone, don't put it in delete list.
            return None
        
    if del_ok:
        _dec_current_size(sz)
        return sz
        
    # We did not have a successful delete, the file can either
    # be added to delete list or we can just give up and move on.
    if use_delete_list:
        _add_to_delete_list(fname)
        
    return None

class CacheEntryHeader(object):
    
    """
    Represents the header of a cache entry.
    
    This class provides an object representation of the
    header file and allows easy parsing, modification and
    writing of the contents of the header file.
    
    """
    
    
    def __init__(self, hs):
        """
        Initializes the header file object with the name of the header file.
        
        @param hs: Hash name. This can be just the hash value or the fully qualified
            header file name.
        @type hs:  string
         
        """ 
        self.url_map = {}
        if len(os.path.commonprefix([_header_dir, hs])) == len(_header_dir):
            # The name already has path info
            self.hdr_file = hs
        else:
            self.hdr_file = os.path.join(_header_dir, hs)
        
    def open(self, create_flag = False):
        """
        Open and lock the header file.
        
        If create_flag is set to true, then the code creates the header file, 
        if it does not exist. If the flag is false, then it only opens the header
        file exists, otherwise it returns false.
        
        @param create_flag: If true, the header file is created if it does not exist.
        @type create_flag:  bool
        
        @return: True if the header file was found or created. False, if the header file
            was not found and the method was asked to not create one.
        @rtype:  bool
        
        """
        flags = os.O_RDWR
        if create_flag:
            flags |=  os.O_CREAT
        
        try:
            fd = os.open(self.hdr_file, flags, 0600)
        except OSError, (errnum, errstr):
            if errnum == errno.ENOENT and not create_flag:
                return False
            else:
                raise SnapException("Failed to open header file %s. Error %s, %s" 
                                    % (self.hdr_file, errnum, errstr))
        
        try:
            self._fp = os.fdopen(fd, "r+")
        except Exception, e:
            os.close(fd)
            raise e
        try:
            file_lock.lock(self._fp, file_lock.LOCK_EX)
        except Exception, e:
            self._fp.close()
            raise e
        
        try:
            s = self._fp.read()
            if s.strip():
                self.url_map = simplejson.loads(s)
            else:
                self.url_map = {}
        except:
            # We have an error, unlock the file.
            self._fp.close()
            raise
        
        return True
        
    def close(self):
        """Create or overwrite the header file with the contents of the object and close."""
        # We choose to write even an empty header file to keep things simple. The server,
        # at startup, can cleanup all these empty header files.
        try:
            _truncate_file(self._fp)
            simplejson.dump(self.url_map, self._fp)
        finally:
            self._fp.close()
        
    def remove_data_file_ref(self, dfile_name, abort_on_failure = False):
        """
        Remove reference to the data file in the header file and remove the actual data file.
        
        @param dfile_name: Fully qualified name of the data file to be removed.
        @type dfile_name:  string
        
        @param err_val: If not None, should be a dict. This method will be set to the error
            number and error string returned by OS in the event of delete failure. The keys
            are "errno" and "errstr"
        @param err_val: dict
        
        @return: Size of dat file and index file as tuples, if the remove succeeded. None
            could be used instead of size, if the delete failed.
        @rtype:  (long, long) or None
    
        """
        err_val = {}
        use_delete_list = not abort_on_failure
            
        if not self.open():
            # This data file has no header file referring to it. Just attempt to delete it,
            _remove_data_file(dfile_name, use_delete_list)
        
        try:
            found = False
            # Remove entry for header file
            for (u, entry) in self.url_map.iteritems():
                if entry[HDR_DATA_FILE] == dfile_name:
                    del self.url_map[u]
                    found = True
                    break
            if found:
                ret1 = _remove_data_file(entry[HDR_DATA_FILE], use_delete_list, err_val)
                tot = 0
                if ret1 is not None:
                    tot = ret1
                elif abort_on_failure:
                    raise SnapException("Failed to delete data file %s. Exception %s %s" %
                                        (entry[HDR_DATA_FILE], err_val['errno'], err_val['errstr']))
                    
                ret2 = _remove_data_file(entry[HDR_INDEX_FILE], use_delete_list, err_val)
                if ret2 is not None:
                    tot += ret2
                elif abort_on_failure:
                    raise SnapException("Failed to delete index file %s. URL %s Exception %s %s" %
                                        (entry[HDR_INDEX_FILE], u, err_val['errno'], err_val['errstr']))
                return tot
            else:
                return None
                
        finally:
            self.close()
        

def _create_dir(dirname):
    """
    Create directory if it does not exist. Make sure directory has rwx permission.
    
    @param dirname: Name of directory.
    @type dirname:  string
    """
    if not os.path.exists(dirname):
        os.makedirs(dirname, 0700)
    else:
        if not os.path.isdir(dirname):
            raise SnapException("%s is not a directory" % dirname)
        if not os.access(dirname, os.R_OK | os.W_OK | os.X_OK):
            raise SnapException("Directory %s does not have required permissions" % dirname)

def _create_file(fname):
    """
    Create file if it does not exist. Make sure file has rw permission.
    
    @param fname: Name of file.
    @type fname:  string
    
    """
    if not os.path.exists(fname):
        f = open(fname,"w")
        f.close()
    else:
        if not os.path.isfile(fname):
            raise SnapException("%s is not a file" % fname)
    
    if not os.access(fname, os.R_OK | os.W_OK):
        raise SnapException("File %s does not have required permissions" % fname)
    

def _truncate_file(fp):
    """
    Truncates a file and retries on failure.
    
    Windows truncate fails with error 13 (permission denied) occasionally. This
    appears to be a known issue. Retrying seems to work fine.
    
    @param fp: Open file object that needs to be truncated.
    @type fp:  File Object
    
    @raise SnapException: If rename fails.
    
    """
    c = 0
    while c < 10:
        fp.seek(0)
        try:
            fp.truncate(0)
            return
        except IOError, (errnum, errstr):
            if errnum == errno.EACCES:
                # On windows, sometimes, truncate fails with permission denied. Retry.
                time.sleep(.1)
        c += 1
    raise SnapException("Truncate failed after repeated attempts")

def _get_all_data_files():
    """Returns a list of all result files in cache."""
    names = os.listdir(_result_dir)
    return [os.path.join(_result_dir, n) for n in names]

def _get_all_header_files():
    """Returns a list of all header file names in cache."""
    names = os.listdir(_header_dir)
    return [os.path.join(_header_dir, n) for n in names]

def _get_all_index_files():
    """Returns a list of all index file names in cache."""
    names = os.listdir(_index_dir)
    return [os.path.join(_index_dir, n) for n in names]

def _get_all_temp_files():
    """Return a list of all tmp file names in cache."""
    names = os.listdir(_tmp_dir)
    return [os.path.join(_tmp_dir, n) for n in names]

def _get_hash_and_timeout(fname):
    """
    Get the timeout value in seconds and hash value from the result file name.
    
    @param fname:  Name of result file
    @type fname:   string
    
    @return: Tuple of (hash string, Timeout value in seconds from epoch)
    @rtype:  2-tuple
    
    """
    fname = os.path.basename(fname)
    s = fname.split("_")
    return (s[0], long(s[1]))

################################################################################
# 
# Functions that are used to manage cache size.
#
################################################################################

def _remove_delete_list_entries(abort_on_failure = False):
    """
    Remove delete list entries.
    
    This method should be called when there is a need to freeup space in the cache.
    Delete list usually gets populated on Windows for files which were supposed to be
    deleted, but failed to do so. They fail because when other process or thread has the
    file open. This function runs through the delete list and makes another attempt at
    deleting those files.
 
    @param abort_on_failure: If this flag is set to true, the function will abort
        and raise exception when any file deletion fails for any reason other than missing
        file (errno 2). This flag is usually set at server startup time, since there should
        be no reason for a file delete to fail at that time (not a multi thread situation).
    @type abort_on_failure:  bool
    
    """
    rem_size = 0L
    fail_list = []
    
    fp = open(_delete_list_file, "r+")
    file_lock.lock(fp, file_lock.LOCK_EX)
    try:
        lineslist = fp.readlines()
        for l in lineslist:
            # Remove the trailing \n
            l = l.strip()
            log(snap_log.LEVEL_DEBUG, "Removing delete list item %s" % l)
            if not os.path.exists(l):
                # The file does not exist, consider it removed.
                continue
            
            sz = os.path.getsize(l)
            try:
                os.remove(l)
                # The remove succeeded, note the size of file removed
                rem_size += sz
                _dec_current_size(sz)
            except OSError, (errnum, errstr):
                if errnum == errno.ENOENT:
                    # Someone has just deleted the file.
                    continue
                elif abort_on_failure:
                    raise SnapException("Failed to remove delete list entry %s. Exception %s %s"
                                        % (l, errnum, errstr))
                else:
                    # Add back to delete list.
                    fail_list.append(l + "\n")
        # Empty the file.
        _truncate_file(fp)
        # Write new entries.
        fp.writelines(fail_list)
    finally:
        fp.close()
    
    return rem_size

def _remove_timedout_entries(abort_on_failure = False):
    """
    Remove timed out data files.
    
    This method should be called whenever there is a need to free up space in the
    cache or to cleanup cache at startup time.
    
    @param abort_on_failure: If this flag is set to true, the function will abort
        and raise exception when any file deletion fails. This is typically
        done at server startup time, when there should not be any problems in
        deleting a file.
    @type abort_on_failure:  bool
    
    @return: Total file size in bytes that was deleted.
    @rtype:  long
    
    """
    total_sz = 0L
    names = _get_all_data_files()
    for n in names:
        (hs, to) = _get_hash_and_timeout(n)
        if to <= time.time():
            sz = CacheEntryHeader(hs).remove_data_file_ref(n, abort_on_failure)
            if sz is not None:
                total_sz += sz
                
    return total_sz
                

def _remove_using_lru(lower_by):
    """
    Remove least recently used data files.
    
    This function removes entries that have not yet timed out. For this reason,
    this method should only be called when all other methods of freeing up
    cache space have been exhausted.
    
    @param lower_by:  The number of bytes by which the cache should be reduced.
    @param lower_by:  int or long
    
    @return: Total file size in bytes that was deleted.
    @rtype:  long
    
    """
    
    d = {}
    names = _get_all_data_files()
    for name in names:
        try:
            atime =  os.path.getatime(name)
        except OSError, (errnum, errstr):
            if errnum != errno.ENOENT:
                # The file is still there, for some unexpected reason getatime did not work, log a warning.
                log(snap_log.LEVEL_WARN, "LRU failed to get access time of result file %s exception %s %s" %
                                         (name, errnum, errstr))
            continue
        try:
            sz =  os.path.getsize(name)
        except OSError, (errnum, errstr):
            if errnum != errno.ENOENT:
                # The file is still there, for some unexpected reason getsize did not work, log a warning.
                log(snap_log.LEVEL_WARN, "LRU failed to get size of result file %s exception %s %s" %
                                          (name, errnum, errstr))
            continue
        if atime not in d:
            d[atime] = [(name, sz)]
        else:
            d[atime].append((name, sz))
    
    asc_keys = d.keys()
    asc_keys.sort()
    
    total_sz = 0L
    for k in asc_keys:
        for (name, sz) in d[k]:
            (hs, ignore) = _get_hash_and_timeout(name)
            sz = CacheEntryHeader(hs).remove_data_file_ref(name)
            if sz is None:
                # Failed to remove, go to next entry.
                continue
            total_sz += sz
            if total_sz >= lower_by:
                return total_sz
            
    return total_sz

def _reduce_cache_size(current_size):
    """
    Reduces the size of the cache down to the low water mark size.
    
    @param current_size: The current size of cache in bytes.
    @type current_size:  int or long
    
    """
    
    sz = _remove_delete_list_entries()
    current_size -= sz
    
    if current_size <= low_water_mark:
        return
    
    sz = _remove_timedout_entries()
    current_size -= sz
    if current_size <= low_water_mark:
        return
    
    sz = _remove_using_lru(low_water_mark - current_size)
    
    return

def _get_cache_entry(url, p_guid, p_gen_id):
    """
    Get an entry corresponding to the specified URL.
    
    The function creates a md5 hash of the URL and looks for a
    header file by that name. The header file is then checked
    to see if it contains an entry for that URL. We can have more
    than one URL entry in the header file if there is a md5 collision.
    The URL entry also specifies the name of the data file containing
    the actual data. The name of the file is usually 
    <hash value>_<timeout value>. In some cases, if there is a collision
    it is <hash value>_<timeout value>_<a number>. 
    Such a collision can happen under two circumstances:
    1) There was an md5 collision of two URLs and they have created
       data files that have the same timeout.
    2) There are two data files created for the same URL, under a second.
       The older file is pending deletion due to LRU and the newer file
       is just being added.
    
    Irrespective of whether there is a collision or not, it is possible
    to tell from the name of the file, when it will timeout. If the 
    file has not timed out, then it is opened and a file object is
    returned. If no such entry exists or it has timed out, then None
    is returned.
    
    @param url: URL being looked up. This *must* be an encoded URL, if it has
        special characters like spaces. In short, it must be a valid URL.
    @type url:  string
    
    @type p_guid: The current gen id of the resource that produced the data set. When
        this does not match the gen_id stored in the cache, then the cache entry is 
        invalidated.
    @param p_guid: str
    
    @param p_gen_id: The current guid of the resource that produced the data set. When
        this does not match the guid stored in the cache, then the cache entry is 
        invalidated.
    @param p_gen_id: str
    

    
    @return: dictionary containing
        { HDR_DATA_FILE   : data file pointer,
          HDR_INDEX_FILE  : index file pointer,
          HDR_ROW_COUNT   : row count,
          HDR_GEN_ID      : gen id of the resource,
          HDR_GUID        : guid of the resource,
          HDR_DATA_FORMAT : data format of the entry (content type) } 
        In case of miss, None is returned.
    @rtype:  dict or None
    
    @raise SnapException: If open/stat/utime of data file fails for
        unexpected reasons.
    
    """
    if not is_enabled():
        return None
    
    hs = md5.new(url).hexdigest()
    
    hdr = CacheEntryHeader(hs)
    if not hdr.open():
        # Such a file does not exist, its a miss.
        return None
    try:
        if url not in hdr.url_map:
            # A cache miss
            return None
        
        datafile    = hdr.url_map[url][HDR_DATA_FILE]
        idxfile     = hdr.url_map[url][HDR_INDEX_FILE]
        row_count   = hdr.url_map[url][HDR_ROW_COUNT]
        gen_id      = hdr.url_map[url][HDR_GEN_ID]
        guid        = hdr.url_map[url][HDR_GUID]
        data_format = hdr.url_map[url][HDR_DATA_FORMAT]
        
        (h, timeout) = _get_hash_and_timeout(datafile)
        # Lose the existing entry, if it has 
        # 1) Timed out
        # 2) If the gen_id and guids don't match (,eaning the resource has been changed or deleted,
        #    since the time that the data set was cached)
        if (timeout <= long(time.time())) or \
                (p_gen_id < gen_id) or (p_guid != guid):
            # The entry has timedout, remove it and return a cache miss
            _remove_data_file(datafile, True)
            _remove_data_file(idxfile, True)
            del hdr.url_map[url]
            return None
        
        # Its a cache hit
        try:
            datafp = open(datafile, "rb")
        except IOError, (errnum, errstr):
            # The file was not found. This can happen, if someone
            # deleted the data file and failed to update the header file.
            # Update the header file and return a cache miss.
            # PS: open() returns IOError and not OSError as many of the
            #     other calls in this file do.
            if errnum == errno.ENOENT:
                # The file seems to have been deleted.
                _remove_data_file(idxfile, True)
                del hdr.url_map[url]
                return None
            else:
                raise SnapException("Failed to open data file %s. Exception %s %s"
                                    % (datafile, errnum, errstr))
        
        try:
            idxfp = open(idxfile, "rb")
        except IOError, (errnum, errstr):
            # This should not have happened
            datafp.close()
            raise SnapException("Failed to open index file %s. Exception %s %s"
                                    % (datafile, errnum, errstr))
        try:
            st = os.stat(datafile)
            # We forcibly update the access time of the file,
            # because LRU algorithm needs it and on Windows,
            # a certain registry setting can cause the access time to be
            # not updated automatically, when the file is opened.
            os.utime(datafile, (time.time(), st[stat.ST_MTIME]))
        except OSError, (errnum, errstr):
            raise SnapException("Failed to stat/utime data file %s. Exception %s %s" % (datafile, errnum, errstr))
                
        return { HDR_DATA_FILE   : datafp,
                 HDR_INDEX_FILE  : idxfp,
                 HDR_ROW_COUNT   : row_count,
                 HDR_GEN_ID      : gen_id,
                 HDR_GUID        : guid,
                 HDR_DATA_FORMAT : data_format }
    finally:
        # Unlock and witeout header.
        hdr.close()

def _create_cache_entry(url, timeout, temp_fname, index_fname, guid, gen_id, data_format, row_count):
    """
    Create a cache entry with the URL.
    
    Note: If this function raises exception, it is the responsibility of the
    caller to delete temporary result file by calling the tmp_file.remove()
    method.
    
    An md5 hash of the URL is made. If a header file by that hash name does not
    already exist, it is created. If the header already exists, it is checked
    to see if the URL entry is present in the header. If the entry is found and
    has a timeout which is equal or greater than the new file being added, then
    the new entry is discarded. If not, then the new entry is added and the old
    entry is deleted.
    
    @param url: URL of the pipeline that produced the result file. This *must*
        be an encoded URL, if it has special characters like spaces. In short,
        it must be a valid URL.
    @type url:  string
    
    @param timeout: The time (seconds since epoch) at which the entry should be
        timed out. Please note that this is the absolute time at which the
        entry timesout and not some diff from current time.
    @type timeout:  int or long or float
    
    @param tmp_file: The temporary result file.
    @type tmp_file:  L{TempResultFile}
    
    @param index_fname: The name of the index file name.
    @type index_fname:  str

    @param guid: The guid of the resource that created the data set.
    @type guid:  str

    @param gen_id: The gen id of the resource that created the data set.
    @type gen_id:  int
    
    @param data_format: The format of the data. It is specified as
               "record <content_type>" for record mode data.
               "binary <content_type>" for binary mode data. 
    @type data_format:  str
    
    @param row_count: Number of rows in the data set (if any).
    @type row_count:  int
    
    @return: Name of the new data file, or None if the add failed.
    @rtype:  string
    
    @raise SnapException: If creation of cache entry fails.
    
    """
    if not is_enabled():
        return
    
    timeout = long(timeout)
    hs = md5.new(url).hexdigest()
    new_result_file_name = os.path.join(_result_dir, hs) + "_" +  str(long(timeout))
            
    hdr = CacheEntryHeader(hs)
    hdr.open(True)
    try:
        curr_names = hdr.url_map.values()
        fname = new_result_file_name
        # If some other thread or process has created an result which has the same
        # or greater timeout, then discard this new entry.
        # By discarding the new entry with same timeout, we also avoid a collision
        # caused by results having same timeout.
        if url in hdr.url_map:
            r = _get_hash_and_timeout(hdr.url_map[url][HDR_DATA_FILE])
            if r[1] >= timeout:
                os.remove(temp_fname)
                return None
        
        c = 0    
        # Next, we see if there is collisions with any other URL.
        while (fname in curr_names) or os.path.exists(fname):
            if c > 100:
                # Something has to be wrong. There cannot possibly be
                # so many collisions.
                raise SnapException("Failed to create a name for URL %s with template %s" %
                                    (url, new_result_file_name))
            fname = "%s_%s" % (new_result_file_name, c)
            c = c + 1
        
        entry = {}
        entry[HDR_DATA_FILE]   = fname
        entry[HDR_INDEX_FILE]  = index_fname
        entry[HDR_GEN_ID]      = gen_id
        entry[HDR_GUID]        = guid
        entry[HDR_DATA_FORMAT] = data_format
        entry[HDR_ROW_COUNT]   = row_count
            
        if url in hdr.url_map:
            # Remove the current result set and add the new one.
            _remove_data_file(hdr.url_map[url][HDR_DATA_FILE], True)
            _remove_data_file(hdr.url_map[url][HDR_INDEX_FILE], True)
            
        hdr.url_map[url] = entry
        os.rename(temp_fname, entry[HDR_DATA_FILE])
        # TODO: This (file open) is a test. Will remove
        # it once the code is mature.
        newfp = open(entry[HDR_DATA_FILE])
        newfp.close()
    finally:
       hdr.close()
    
    return entry[HDR_DATA_FILE]

################################################################################
# 
# Functions and Class that act as the primary interface to data_cache for
# creating cache entries and looking up cache entries.
#
################################################################################

def is_enabled():
    """Returns true if caching has been enabled."""
    if cache_dir is None:
        return False
    else:
        return True
    
def get_cache_size():
    """Return the size (in bytes) of all result and temporary result files."""
    total_sz = 0
    files = _get_all_data_files() + _get_all_index_files() + _get_all_temp_files() 
    for f in files:
        try:
            total_sz += os.path.getsize(f)
        except OSError, (errnum, errstr):
            # The file was not found. This can happen, 
            if errnum != errno.ENOENT:
                 # The file could have been deleted, after we did an ls. However, if that is not
                 # the error, then log the unexpected issue and move on.
                log(snap_log.LEVEL_ERR, "Cache: getSize failed for file %s. Exception %s %s" % (f, errnum, errstr))
                
    return total_sz

    
class CacheEntry(object):
    
    """
    This class helps create a temporary data file in cache. 
    
    It also helps monitor how much data is written into cache.
    If the size of cache is exceeded, it will pause the write to
    cache and do cache cleanup before proceeding.
    
    """
    
    def __init__(self):
        """Initializes the object."""
        self._write_mode = False
        self._read_mode = False
        self.url = None
        self._read_data_fp = None
        self._read_index_fp = None
        self._write_tmp_fd = None
        self._write_index_fd = None
        self.row_count = None
    
    def open_entry(self, url, guid, gen_id, mode, data_format = None):
        """
        Creates a cache entry object for the purpose of creating cache entry or accessing existing entry.
        
        This object can be used to start the creation of a new cache entry by specify the mode as "w"
        (write mode). Please note that this initialization only starts the creation of the entry. In
        order to complete it, data has to be written, the stream closed by calling close() and then
        entry comitted, by calling commit().
        
        The object can also be used to gain access to an existing entry in cache by specifying the
        mode as "r" (read). The entry is identified by the param "url".
        
        @param url: URL of the entry.
        @type url:  string
        
        @param guid: The guid of the resource that created the data set. Only needed for write mode open.
        @type guid:  str
    
        @param gen_id: The gen id of the resource that created the data set. Only needed for write mode open.
        @type gen_id:  int
        
        @param mode: Mode of using this object either to create entry ("w") or access existing entry "r".
        @type mode:  string
        
        @param data_format: The format of the data. It is specified as
               "record" for record mode data.
               "binary" for binary mode data. 
               This is only needed when an entry is being written.
        @type data_format:  str
        
        @return: True if the operation succeeded, False if it failed. It the open was in read mode and a
            cache miss occurs, the failure is returned.
        @rtype:  bool
        
        @raise SnapValueError: If mode has invalid value.
        
        """
        if not is_enabled():
            raise SnapException("Cache has not been initialized")
        
        self.url = url
        
        if mode == "w":
            if data_format is None:
                raise SnapException("Cache write open received no data format for URL %s" % self.url)
            # The file is opened in binary mode by default
            (self._write_tmp_fd, self.temp_fname) = tempfile.mkstemp(".tmp","RSet_", _tmp_dir)
            (self._write_index_fd, self.index_fname) = tempfile.mkstemp(".file","Idx_", _index_dir)
            self.gen_id      = gen_id
            self.guid        = guid
            self.data_format = data_format
            self.row_count   = 0
            _set_current_size(get_cache_size())
            self._write_mode = True
        elif mode == "r":
            r = _get_cache_entry(url, guid, gen_id)
            if r is None:
                return False
            self._read_data_fp  = r[HDR_DATA_FILE]
            self._read_index_fp = r[HDR_INDEX_FILE]
            self.guid           = r[HDR_GUID]
            self.gen_id         = r[HDR_GEN_ID]
            self.data_format    = r[HDR_DATA_FORMAT]
            self.row_count      = r[HDR_ROW_COUNT]
            self._read_mode     = True
        else:
            raise SnapValueError("The mode should have the value 'r' or 'w'. Received ", mode)
        
        return True
    
    #
    # Write open related methods
    #    
    def _close_write(self):
        """
        Closes the file descriptor to temp data and index files, if it is still open.
        
        @raise SnapException: If close of file streams fail.
        
        """
        e = None
        if self._write_tmp_fd is not None:
            try:
                os.close(self._write_tmp_fd)
            except Exception, err:
                e = SnapException("Temp data file close failed %s" % err)
            self._write_tmp_fd = None
        if self._write_index_fd is not None:
            try:
                os.close(self._write_index_fd)
            except Exception, err:
                e = SnapException.chain(e, SnapException("Index file close failed %s" % err))
            self._write_index_fd = None
        
        if e is not None:
            raise e 
    
    def write(self, data, index_value = None, row_inc = None):
        """
        Writes data to the cache entry, if it has been opened for writing.
        
        This method also takes optional argument to mark the block being
        written with an index value. In future, this index value can be used
        for seeking rapidly to the beginning of this block that is about
        to be written.
        
        
        The method also takes an optional method to increment the count of number
        of records stored in the cache entry. This count will be available whenever
        the cache entry is looked up in the future.
        
        @param data: Data to be written
        @type data:  string
        
        @param index_value: If an entry needs to be made in the index file
            to record the location of this chunk of data being written,
            then this index_value should be a value other than None and should
            be unique for this index file.
        @type index_value:  string
        
        @param row_inc: Number of rows to add to the current count.
        @type row_inc:  int or None
        
        """
        
        if not self._write_mode:
            raise SnapIOError("This object was not opened to write")
        
        dlen = len(data)
        
        if index_value is not None:
            # Use lseek to find the current position value.
            curr_pos = os.lseek(self._write_tmp_fd, 0, 1)
            idx_str = "%s %s\n" % (index_value, curr_pos)
            # We ignore the increase in size caused on windows by the conversion
            # of \n to \c\r
            dlen += len(idx_str)
            
        sz = _get_current_size()
        if sz + dlen > high_water_mark:
           _reduce_cache_size(sz + dlen)
        
        os.write(self._write_tmp_fd, data)
        
        if index_value is not None:
            os.write(self._write_index_fd, idx_str)
            
        if row_inc is not None:
            self.row_count += row_inc
            
        _inc_current_size(dlen)
        
    def commit(self, timeout):
        """
        Commit the entry to cache.
        
        This finalizes the storage of the entry in cache. The user MUST call this
        method or the discard method, after opening an entry to write. If not,
        the cache ends up having temporary entries that consume space.
        
        @param timeout: The time (seconds since epoch) at which the entry should be
            timed out. Please note that this is the absolute time at which the
            entry timesout and not some diff from current time.
        @type timeout:  int or long or float
        
        @return: Name of the new data file, or None if the add failed.
        @rtype:  string
    
        """
        if not self._write_mode:
            raise SnapIOError("This object was not opened in write mode, nothing to commit")
        
        self.close()
            
        return _create_cache_entry(self.url, timeout, self.temp_fname, self.index_fname,
                                   self.guid, self.gen_id, self.data_format, self.row_count)
    
    def discard(self):
        """
        Discard the entry opened (in write mode) in cache.
        
        This method is typically called by the user when some error has occured
        and the user no longer intends to go through with the creation of this
        entry. In general, the user must call commit() or discard() after an
        entry has been opened for writing. 
        
        """
        if not self._write_mode:
            raise SnapIOError("This object was not opened in write mode, nothing to discard")
            
        self.close()
            
        if self.temp_fname:
            sz = os.path.getsize(self.temp_fname)    
            os.remove(self.temp_fname)
            _dec_current_size(sz)
        self.temp_fname = None
        
        if self.index_fname:
            sz = os.path.getsize(self.index_fname)    
            os.remove(self.index_fname)
            _dec_current_size(sz)
        self.index_fname = None
    
    #
    # Read open related methods
    #
    def _close_read(self):
        """
        Closes the file descriptor to temp data and index files, if it is still open.
        
        @raise SnapException: If close of file streams fail.
        
        """
        e = None
        if self._read_data_fp is not None:
            try:
                self._read_data_fp.close()
            except Exception, err:
                e = SnapException("Data file close failed %s" % err)
            self._read_data_fp = None
        if self._read_index_fp is not None:
            try:
                self._read_index_fp.close()
            except Exception, err:
                e = SnapException.chain(e, SnapException("Index file close failed %s" % err))
            self._read_index_fp = None
        
        if e is not None:
            raise e 

    def read(self, size = None):
        """
        Read specified size. If size is negative or omitted, then all the data upto EOF is read.
        
        @param size: Size to be read.
        @type size:  long
        
        """
        
        if not self._read_mode:
            raise SnapIOError("This object was not opened for reading")        
        
        # File object read() handles the logic of reading negative size.
        if size is not None:
            data = self._read_data_fp.read(size)
        else:
            data = self._read_data_fp.read()
        
        if data == "":
            return None
        
        return data
    
    def _seek_index(self, tgt_idx):
        """
        Find the specified index entry in file. If not found, seek to closest index entry before the specified one.
        
        @param tgt_idx: Index file number being requested.
        @type tgt_idx:  long
        
        @return: True, if the seek worked, False otherwise.
        @rtype:  bool.
        
        @return: 2-tuple containing index number found and the poisition in the data file.
            None if no such entry orclosest preceding entry was found.
        @rtype:  tuple or None
        
        """
        
        self._read_index_fp.seek(0, 2)
        high = self._read_index_fp.tell()
        self._read_index_fp.seek(0)
        low = self._read_index_fp.tell()
        c = 0 
        while low < high:
            c += 1
            #print "low %s high %s" % (low, high)
            mid = long((low + high)/2)
            # Go to midpoint
            if mid > 0:
                # The mid point we calculated could be in the middle of some line or at the
                # beginning of some line (which is what we want). To deal with these two
                # scenarios, seek one byte behind mid point and read a line. If the mid point
                # is the beginning of the line, then we will rread "\n" and return to that
                # midpoint. If the midpoint is in the middle of a line, then we will read
                # past that line and reach begining of next line. Either way, we endup
                # properly aligned,
                self._read_index_fp.seek(mid - 1)
                self._read_index_fp.readline()
            else:
                self._read_index_fp.seek(mid)
           
            mid_line = self._read_index_fp.readline()
            if mid_line == "":
                # Nothing in the high section (EOF), search mid to low section.
                high = mid
                continue
            mid_entry = mid_line.split(" ")
            mid_idx = long(mid_entry[0].strip())
            #print "\tMid index %s num %s" %(mid_idx, mid)
            if tgt_idx  < mid_idx:
                # Search from low to mid section.
                high = mid
                continue
            elif tgt_idx > mid_idx:
                next_line = self._read_index_fp.readline()
                if next_line == "":
                    # We are at end of the file. The mid entry is the closest one to
                    # the requested index
                    return (mid_idx, long(mid_entry[1].strip()))
                next_entry = next_line.split(" ")
                next_idx = long(next_entry[0].strip())
                if next_idx > tgt_idx:
                    # This one is greater than the requested index. The mid point
                    # is the closest entry.
                    return (mid_idx, long(mid_entry[1].strip()))
                elif next_idx == tgt_idx:
                    return (next_idx, long(next_entry[1].strip())) 
                low = mid
            else:
                # We found exact match
                return (mid_idx, long(mid_entry[1].strip()))
        return None
    
    def seek_nearest_record(self, tgt_rec):
        """
        Seek to the specified record or the nearest possible record before it.
        
        @param tgt_rec: The record being searched for.
        @type tgt_rec:  long
        
        @return: The number of the record that was seeked to., or None, if no
            suitable record was found.
        @rtype:  long or None
        
        """
                
        if not self._read_mode:
            raise SnapIOError("This object was not opened for reading")
        
        ret = self._seek_index(tgt_rec)
        if ret is None:
            return None
        
        self._read_data_fp.seek(ret[1])
        return ret[0]
    
    def seek(self, offset, whence=0):
        """
        Has the same semantics as file object seek.
        
        Set the file's current position. The whence argument is optional
        and defaults to 0 (absolute file positioning); other values are 1
        (seek relative to the current position) and 2 (seek relative to
        the file's end). There is no return value.
        
        @param offset: The number of bytes to seek.
        @type offset:  long
        
        @param whence: Can take values 0, 1 and 2
        @type whence:  int
        
        """
        
        if not self._read_mode:
            raise SnapIOError("This object was not opened for reading")
        
        self._read_data_fp.seek(offset, whence)
        
    def close(self):
        if self._read_mode:
            self._close_read()        
        if self._write_mode:
            self._close_write()

def open_entry(url, guid, gen_id, mode, data_format = None):
    """
    Returns a cache entry object for the purpose of creating new cache entry or accessing existing entry.
    
    If mode is specified as "w" (write mode), then the object returned can be used to create a new entry.
    In order to create a new cache entry using the object, data has to be written by calling the write()
    method of the object, and then entry comitted, by calling commit() method of the object. If for any
    reason, the entry needs to be discarded, instead of comitted, then the discard() method should be
    called.
    
    If mode is specified as "r" (read), the function will try to rerieve an existing entry in cache that
    matches the specified URL.
    
    @param url: URL of the entry.
    @type url:  string
    
    @param guid: The guid of the resource that created the data set. Only needed for write mode open.
    @type guid:  str

    @param gen_id: The gen id of the resource that created the data set. Only needed for write mode open.
    @type gen_id:  int
    
    @param mode: Mode of open, either to create an entry ("w") or access an existing entry "r".
    @type mode:  string
    
    @param data_format: The format of the data. It is specified as
           "record <content_type>" for record mode data.
           "binary <content_type>" for binary mode data. 
           This is only needed when an entry is being written.
    @type data_format:  str
    
    @return: Cache entry object or None. None is returned in read mode, when there is a cache miss.
    @rtype:  L{CachEntry}
    
    @raise SnapException: If the cache has not been initialized.
    
    """
    
    if not is_enabled():
        raise SnapException("Cache has not been initialized")
    entry = CacheEntry()
    if not entry.open_entry(url, guid, gen_id, mode, data_format):
        return None
    return entry


################################################################################
# 
# Functions that can only be run safely, when the server is coming up or
# is down. These functions work under the assumption that there is only
# one thread or process accessing the cache.
#
################################################################################

def init_cache(dir, size, low_mark, high_mark, p_log, p_elog):
    """
    Initialize the cache.
    
    This function should be called when the server is starting up. It sets up
    the directories and files inside the cache, if they don't already exist.
    
    @param dir: Location of the cache directory.
    @type dir:  string
    
    @param size: Maximum size of the cache in bytes.
    @type size:  long
    
    @param low_mark: Low water mark (percentage value from 0 to 100)
    @type low_mark:  int
    
    @param high_mark: High water mark for cache (percentage value from 0 to 100)
    @type high_mark:  int
    
    @param p_log: Log method
    @type p_log:  log method
    
    @param p_elog: Exception log method
    @type p_elog:  Exception log method
    
    @raise SnapValueError: If the values specified in the parameters are invalid.
    
    """
    
    global cache_dir
    global max_size
    global low_water_mark
    global high_water_mark
    global _delete_list_file
    global _tmp_dir
    global _result_dir
    global _index_dir
    global _header_dir
    global log
    global elog
    
    if dir is None:
        # The cache is not going to be activated.
        return
    
    if size < 0:
        raise SnapValueError("Size of cache cannot be negative (%s)" % size)
    
    cache_dir = os.path.abspath(dir)
    max_size = size
    if low_mark < 0 or low_mark > 100:
        raise SnapValueError("Low water mark value %s should be a percentage value" % low_mark)
    
    if high_mark < 0 or high_mark > 100:
        raise SnapValueError("High water mark value %s should be a percentage value" % high_mark)
     
    low_water_mark = (int(low_mark*max_size)/100)
    high_water_mark = long((high_mark*max_size)/100)
    log = p_log
    elog = p_elog
    
    # Create the cache directory structure
    _create_dir(cache_dir)
    _tmp_dir = os.path.join(cache_dir, "tmp")
    _create_dir(_tmp_dir)
    _header_dir = os.path.join(cache_dir, "header")
    _create_dir(_header_dir)
    _result_dir = os.path.join(cache_dir, "result")
    _create_dir(_result_dir)
    _index_dir = os.path.join(cache_dir, "index")
    _create_dir(_index_dir)
    _delete_list_file = os.path.join(cache_dir, "__delete_list__")
    _create_file(_delete_list_file)
    
def header_checkup(fname, data_files, index_files):
    """
    Return True if the header file points to atleast one existing data file.
    
    The function also modifies data_files and leaves only those data files in
    the list which have no header files pointing to it.
    
    @param fname: Name of header file being tested.
    @type fname:  string
    
    @param data_files: List of data files that have not yet found a header file
        referring to them.
    @type data_files:  list
    
     @param index_files: List of index files that have not yet found a header
        file referring to them.
    @type index_files:  list
    
    @return: True if the header file has a reference to atleast one existing
        data file.
    @rtype:  bool
    
    """
    hdr = CacheEntryHeader(fname)
    if not hdr.open():
        raise SnapException("Header checkup failed to open headr file %s" % fname)
    try:
        is_valid = False
        if len(hdr.url_map) != 0:
            # The header file is not empty. See if the entry is still valid
            for url in hdr.url_map:
                if hdr.url_map[url][HDR_DATA_FILE] not in data_files :
                    # Data file does not exist.
                    continue
                if hdr.url_map[url][HDR_INDEX_FILE] not in index_files :
                    # Index file does not exist.
                    continue
                is_valid = True
                # There is a valid header referring to that data file
                data_files.remove(hdr.url_map[url][HDR_DATA_FILE])
                index_files.remove(hdr.url_map[url][HDR_INDEX_FILE])
        
        if is_valid:
            return True
        else:
            # No valid entries were in the header file. Delete it.
            return False
    finally:
        hdr.close()
        
def startup_cleanup():
    """
    Do cache cleanup at server startup time.
    
    This method removes empty header files and temporary files. These are
    files that are typically left behind by the abrupt shutdown of the
    server.
    
    Since this is called only at startup time, there is no reason for a file
    deletion to fail, other than some serious issue. For this reason,
    this function will throw exceptions whenever a delete fails. These
    exceptions should stop the server startup.
    
    @raise SnapException: If a file delete fails.
     
    """
    
    if not is_enabled():
        return

    # First, cleanup timed out entries. 
    _remove_timedout_entries(True)
    
    # Second, cleanup delete list
    _remove_delete_list_entries(True)
   
    # Now cleanup header files that are empty or pointing to non existent data files.
    # This has to happen after timed out entries have been removed above (as it creates
    # more empty header files).
    data_files = _get_all_data_files()
    index_files = _get_all_index_files()
    names = _get_all_header_files()
    for n in names:
        if not header_checkup(n, data_files, index_files):
            try:
                os.remove(n)
            except Exception, e:
                raise SnapException("Failed to remove empty header file %s exception %s" % (n, e))
    
    # Delete data and index files that don't have a valid header file referring to them.
    # This can happen if the file ref was removed from header file, but
    # process was killed/crshed before that data or index  file could be removed.
    # Or, in the case of index files, if the process failed to commit the entry.
    for d in data_files:
        try:
            os.remove(d)
        except Exception, e:
            raise SnapException("Failed to remove data file %s, exception %s" % (d, e))
        
    for i in index_files:
        try:
            os.remove(i)
        except Exception, e:
            raise SnapException("Failed to remove index file %s, exception %s" % (i, e))
 
    # Delete temp files
    names = _get_all_temp_files()
    for n in names:
        try:
            os.remove(n)
        except Exception, e:
            raise SnapException("Failed to remove temporary file %s, exception %s" % (n, e))
        
    # Compute current size.
    _set_current_size(get_cache_size())
    
def empty_cache():
    """
    Remove all cache data (including temp, header and data files.
    
    The delete list file is also set to empty. This method should
    only be called when no other process or thread is accessing
    the cache.

    @raise SnapException: If any of the file deletions fail.
    
    """
    if not is_enabled():
        return
    excp = None
    for n in _get_all_data_files():
        try:
            os.remove(n)
        except Exception, e:
            excp = SnapException.chain(excp, e)
            
    for n in _get_all_index_files():
        try:
            os.remove(n)
        except Exception, e:
            excp = SnapException.chain(excp, e)
            
    for n in _get_all_header_files():
        try:
            os.remove(n)
        except Exception, e:
            excp = SnapException.chain(excp, e)
        
    for n in _get_all_temp_files():
        try:
            os.remove(n)
        except Exception, e:
            excp = SnapException.chain(excp, e)
        
    # Empty the delete list file.
    try:
        f = open(_delete_list_file,"w")
        try:
            f.write("")
        finally:
            f.close()
    except Exception, e:
        excp = SnapException.chain(excp, e)
    
    # Recompute the current cache size.
    _set_current_size(get_cache_size())
    
    if excp is not None:
        raise excp

def destroy_cache():
    """Do the opposite of init. Remove the entire cache setup."""
    empty_cache()
    os.rmdir(_tmp_dir)
    os.rmdir(_result_dir)
    os.rmdir(_index_dir)
    os.rmdir(_header_dir)
    os.remove(_delete_list_file)
    os.rmdir(cache_dir)
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.