# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: data_cache.py 4423 2008-09-23 18:47:31Z jbrendel $
"""
This module helps cache data results of a pipeline run.
The cache is a file system based cache. Each data result
is stored in a separate file. The URL used to invoke the
pipeline is hashed (using md5) and used as the key to
retrieve the cache data.
When an entry is created, the URL is hashed and a header
file is created with that hash name. This file contains
the URL and the name of the data file which actual data.
We have this header file to deal with any hashing collisions.
*** Order of lock acquisition to avoid deadlocks ***
Acquire header lock first, then delete list lock.
"""
import os
import stat
import glob
import threading
import md5
import tempfile
import time
import re
import simplejson
import errno
from snaplogic.common.snap_exceptions import *
from snaplogic.common import file_lock
from snaplogic.common import snap_log
cache_timeout = 300
"""Cache timeout in seconds."""
cache_dir = None
"""Location of the cache directory."""
max_size = None
"""Cache size limit in bytes."""
high_water_mark = None
"""
High water mark (in bytes)
When the cache size reaches this level, it should be
cleaned up until it reduces to the low_water_mark
"""
low_water_mark = None
"""Low water mark (in bytes)."""
log = None
"""Log file method."""
elog = None
"""Exception log method."""
_tmp_dir = None
"""The directory under cache, which has the temp files."""
_header_dir = None
"""The directory under cache, which has the header files."""
_result_dir = None
"""The directory under cache, which has the result files."""
_index_dir = None
"""The directory under cache, which has all the index file names."""
_delete_list_file = None
"""This file contains all cache control related information."""
_current_cache_size = None
"""Current cache size."""
_lock = threading.RLock()
"""Lock for cache size increment/decrement."""
HDR_INDEX_FILE = "index_f"
HDR_DATA_FILE = "data_f"
HDR_ROW_COUNT = "row_c"
HDR_GEN_ID = "gen_id"
HDR_GUID = "guid"
HDR_DATA_FORMAT = "data_format"
def _inc_current_size(sz):
"""
Increment current cache size by sz bytes.
@param sz: Size by which to increment.
@type sz: int or long
"""
global _current_cache_size
_lock.acquire()
if _current_cache_size is not None:
_current_cache_size += sz
else:
_current_cache_size = sz
_lock.release()
def _dec_current_size(sz):
"""
Decrement current cache size by sz bytes.
@param sz: Size by which to decrement.
@type sz: int or long
"""
global _current_cache_size
_lock.acquire()
if _current_cache_size is not None:
_current_cache_size -= sz
if _current_cache_size < 0:
_current_cache_size = 0
_lock.release()
def _set_current_size(sz):
"""
Set current cache size.
@param sz: Updated size for cache.
@type sz: int or long
"""
global _current_cache_size
_lock.acquire()
_current_cache_size = sz
_lock.release()
def _get_current_size():
"""Return the current size (in bytes)."""
_lock.acquire()
sz = _current_cache_size
_lock.release()
return sz
def _add_to_delete_list(fname):
"""
Add specified data file to delete list.
@param fname: name of the data file.
@type fname: string
"""
# Add \n, since this file will occupy a single line in the delete list file.
entry = fname + "\n"
# Open and lock the delete list file
fp = open(_delete_list_file, "r+")
file_lock.lock(fp, file_lock.LOCK_EX)
try:
lineslist = fp.readlines()
if entry not in lineslist:
# We must be at the end of the file, append new entry.
fp.write(entry)
finally:
# Whatever happens, unlock the delete list file.
fp.close()
def _remove_data_file(fname, use_delete_list, err_val = None):
"""
Remove data file from the cache.
If the remove fails, then the file is added to the delete list. This
method increments the current size of the cache, as it deletes the
entry.
@param fname: Name of the data file that should be deleted.
@type fname: string
@param use_delete_list: True, if data file should be put in delete list when delete fails.
@type use_delete_list: bool
@param err_val: If not None, should be a dict. This method will be set to the error number and error string
returned by OS in the event of delete failure. The keys are "errno" and "errstr"
@param err_val: dict
@return: Size of dat file, if the remove succeeded. None, if the file had to be added to delete list
or did not exist.
@rtype: long or None
"""
global global_remove_list
del_ok = False
try:
sz = os.path.getsize(fname)
os.remove(fname)
del_ok = True
except OSError, (errnum, errstr):
if err_val:
err_val['errno'] = errnum
err_val['errstr'] = errstr
if errnum == errno.ENOENT:
# The file is already gone, don't put it in delete list.
return None
if del_ok:
_dec_current_size(sz)
return sz
# We did not have a successful delete, the file can either
# be added to delete list or we can just give up and move on.
if use_delete_list:
_add_to_delete_list(fname)
return None
class CacheEntryHeader(object):
"""
Represents the header of a cache entry.
This class provides an object representation of the
header file and allows easy parsing, modification and
writing of the contents of the header file.
"""
def __init__(self, hs):
"""
Initializes the header file object with the name of the header file.
@param hs: Hash name. This can be just the hash value or the fully qualified
header file name.
@type hs: string
"""
self.url_map = {}
if len(os.path.commonprefix([_header_dir, hs])) == len(_header_dir):
# The name already has path info
self.hdr_file = hs
else:
self.hdr_file = os.path.join(_header_dir, hs)
def open(self, create_flag = False):
"""
Open and lock the header file.
If create_flag is set to true, then the code creates the header file,
if it does not exist. If the flag is false, then it only opens the header
file exists, otherwise it returns false.
@param create_flag: If true, the header file is created if it does not exist.
@type create_flag: bool
@return: True if the header file was found or created. False, if the header file
was not found and the method was asked to not create one.
@rtype: bool
"""
flags = os.O_RDWR
if create_flag:
flags |= os.O_CREAT
try:
fd = os.open(self.hdr_file, flags, 0600)
except OSError, (errnum, errstr):
if errnum == errno.ENOENT and not create_flag:
return False
else:
raise SnapException("Failed to open header file %s. Error %s, %s"
% (self.hdr_file, errnum, errstr))
try:
self._fp = os.fdopen(fd, "r+")
except Exception, e:
os.close(fd)
raise e
try:
file_lock.lock(self._fp, file_lock.LOCK_EX)
except Exception, e:
self._fp.close()
raise e
try:
s = self._fp.read()
if s.strip():
self.url_map = simplejson.loads(s)
else:
self.url_map = {}
except:
# We have an error, unlock the file.
self._fp.close()
raise
return True
def close(self):
"""Create or overwrite the header file with the contents of the object and close."""
# We choose to write even an empty header file to keep things simple. The server,
# at startup, can cleanup all these empty header files.
try:
_truncate_file(self._fp)
simplejson.dump(self.url_map, self._fp)
finally:
self._fp.close()
def remove_data_file_ref(self, dfile_name, abort_on_failure = False):
"""
Remove reference to the data file in the header file and remove the actual data file.
@param dfile_name: Fully qualified name of the data file to be removed.
@type dfile_name: string
@param err_val: If not None, should be a dict. This method will be set to the error
number and error string returned by OS in the event of delete failure. The keys
are "errno" and "errstr"
@param err_val: dict
@return: Size of dat file and index file as tuples, if the remove succeeded. None
could be used instead of size, if the delete failed.
@rtype: (long, long) or None
"""
err_val = {}
use_delete_list = not abort_on_failure
if not self.open():
# This data file has no header file referring to it. Just attempt to delete it,
_remove_data_file(dfile_name, use_delete_list)
try:
found = False
# Remove entry for header file
for (u, entry) in self.url_map.iteritems():
if entry[HDR_DATA_FILE] == dfile_name:
del self.url_map[u]
found = True
break
if found:
ret1 = _remove_data_file(entry[HDR_DATA_FILE], use_delete_list, err_val)
tot = 0
if ret1 is not None:
tot = ret1
elif abort_on_failure:
raise SnapException("Failed to delete data file %s. Exception %s %s" %
(entry[HDR_DATA_FILE], err_val['errno'], err_val['errstr']))
ret2 = _remove_data_file(entry[HDR_INDEX_FILE], use_delete_list, err_val)
if ret2 is not None:
tot += ret2
elif abort_on_failure:
raise SnapException("Failed to delete index file %s. URL %s Exception %s %s" %
(entry[HDR_INDEX_FILE], u, err_val['errno'], err_val['errstr']))
return tot
else:
return None
finally:
self.close()
def _create_dir(dirname):
"""
Create directory if it does not exist. Make sure directory has rwx permission.
@param dirname: Name of directory.
@type dirname: string
"""
if not os.path.exists(dirname):
os.makedirs(dirname, 0700)
else:
if not os.path.isdir(dirname):
raise SnapException("%s is not a directory" % dirname)
if not os.access(dirname, os.R_OK | os.W_OK | os.X_OK):
raise SnapException("Directory %s does not have required permissions" % dirname)
def _create_file(fname):
"""
Create file if it does not exist. Make sure file has rw permission.
@param fname: Name of file.
@type fname: string
"""
if not os.path.exists(fname):
f = open(fname,"w")
f.close()
else:
if not os.path.isfile(fname):
raise SnapException("%s is not a file" % fname)
if not os.access(fname, os.R_OK | os.W_OK):
raise SnapException("File %s does not have required permissions" % fname)
def _truncate_file(fp):
"""
Truncates a file and retries on failure.
Windows truncate fails with error 13 (permission denied) occasionally. This
appears to be a known issue. Retrying seems to work fine.
@param fp: Open file object that needs to be truncated.
@type fp: File Object
@raise SnapException: If rename fails.
"""
c = 0
while c < 10:
fp.seek(0)
try:
fp.truncate(0)
return
except IOError, (errnum, errstr):
if errnum == errno.EACCES:
# On windows, sometimes, truncate fails with permission denied. Retry.
time.sleep(.1)
c += 1
raise SnapException("Truncate failed after repeated attempts")
def _get_all_data_files():
"""Returns a list of all result files in cache."""
names = os.listdir(_result_dir)
return [os.path.join(_result_dir, n) for n in names]
def _get_all_header_files():
"""Returns a list of all header file names in cache."""
names = os.listdir(_header_dir)
return [os.path.join(_header_dir, n) for n in names]
def _get_all_index_files():
"""Returns a list of all index file names in cache."""
names = os.listdir(_index_dir)
return [os.path.join(_index_dir, n) for n in names]
def _get_all_temp_files():
"""Return a list of all tmp file names in cache."""
names = os.listdir(_tmp_dir)
return [os.path.join(_tmp_dir, n) for n in names]
def _get_hash_and_timeout(fname):
"""
Get the timeout value in seconds and hash value from the result file name.
@param fname: Name of result file
@type fname: string
@return: Tuple of (hash string, Timeout value in seconds from epoch)
@rtype: 2-tuple
"""
fname = os.path.basename(fname)
s = fname.split("_")
return (s[0], long(s[1]))
################################################################################
#
# Functions that are used to manage cache size.
#
################################################################################
def _remove_delete_list_entries(abort_on_failure = False):
"""
Remove delete list entries.
This method should be called when there is a need to freeup space in the cache.
Delete list usually gets populated on Windows for files which were supposed to be
deleted, but failed to do so. They fail because when other process or thread has the
file open. This function runs through the delete list and makes another attempt at
deleting those files.
@param abort_on_failure: If this flag is set to true, the function will abort
and raise exception when any file deletion fails for any reason other than missing
file (errno 2). This flag is usually set at server startup time, since there should
be no reason for a file delete to fail at that time (not a multi thread situation).
@type abort_on_failure: bool
"""
rem_size = 0L
fail_list = []
fp = open(_delete_list_file, "r+")
file_lock.lock(fp, file_lock.LOCK_EX)
try:
lineslist = fp.readlines()
for l in lineslist:
# Remove the trailing \n
l = l.strip()
log(snap_log.LEVEL_DEBUG, "Removing delete list item %s" % l)
if not os.path.exists(l):
# The file does not exist, consider it removed.
continue
sz = os.path.getsize(l)
try:
os.remove(l)
# The remove succeeded, note the size of file removed
rem_size += sz
_dec_current_size(sz)
except OSError, (errnum, errstr):
if errnum == errno.ENOENT:
# Someone has just deleted the file.
continue
elif abort_on_failure:
raise SnapException("Failed to remove delete list entry %s. Exception %s %s"
% (l, errnum, errstr))
else:
# Add back to delete list.
fail_list.append(l + "\n")
# Empty the file.
_truncate_file(fp)
# Write new entries.
fp.writelines(fail_list)
finally:
fp.close()
return rem_size
def _remove_timedout_entries(abort_on_failure = False):
"""
Remove timed out data files.
This method should be called whenever there is a need to free up space in the
cache or to cleanup cache at startup time.
@param abort_on_failure: If this flag is set to true, the function will abort
and raise exception when any file deletion fails. This is typically
done at server startup time, when there should not be any problems in
deleting a file.
@type abort_on_failure: bool
@return: Total file size in bytes that was deleted.
@rtype: long
"""
total_sz = 0L
names = _get_all_data_files()
for n in names:
(hs, to) = _get_hash_and_timeout(n)
if to <= time.time():
sz = CacheEntryHeader(hs).remove_data_file_ref(n, abort_on_failure)
if sz is not None:
total_sz += sz
return total_sz
def _remove_using_lru(lower_by):
"""
Remove least recently used data files.
This function removes entries that have not yet timed out. For this reason,
this method should only be called when all other methods of freeing up
cache space have been exhausted.
@param lower_by: The number of bytes by which the cache should be reduced.
@param lower_by: int or long
@return: Total file size in bytes that was deleted.
@rtype: long
"""
d = {}
names = _get_all_data_files()
for name in names:
try:
atime = os.path.getatime(name)
except OSError, (errnum, errstr):
if errnum != errno.ENOENT:
# The file is still there, for some unexpected reason getatime did not work, log a warning.
log(snap_log.LEVEL_WARN, "LRU failed to get access time of result file %s exception %s %s" %
(name, errnum, errstr))
continue
try:
sz = os.path.getsize(name)
except OSError, (errnum, errstr):
if errnum != errno.ENOENT:
# The file is still there, for some unexpected reason getsize did not work, log a warning.
log(snap_log.LEVEL_WARN, "LRU failed to get size of result file %s exception %s %s" %
(name, errnum, errstr))
continue
if atime not in d:
d[atime] = [(name, sz)]
else:
d[atime].append((name, sz))
asc_keys = d.keys()
asc_keys.sort()
total_sz = 0L
for k in asc_keys:
for (name, sz) in d[k]:
(hs, ignore) = _get_hash_and_timeout(name)
sz = CacheEntryHeader(hs).remove_data_file_ref(name)
if sz is None:
# Failed to remove, go to next entry.
continue
total_sz += sz
if total_sz >= lower_by:
return total_sz
return total_sz
def _reduce_cache_size(current_size):
"""
Reduces the size of the cache down to the low water mark size.
@param current_size: The current size of cache in bytes.
@type current_size: int or long
"""
sz = _remove_delete_list_entries()
current_size -= sz
if current_size <= low_water_mark:
return
sz = _remove_timedout_entries()
current_size -= sz
if current_size <= low_water_mark:
return
sz = _remove_using_lru(low_water_mark - current_size)
return
def _get_cache_entry(url, p_guid, p_gen_id):
"""
Get an entry corresponding to the specified URL.
The function creates a md5 hash of the URL and looks for a
header file by that name. The header file is then checked
to see if it contains an entry for that URL. We can have more
than one URL entry in the header file if there is a md5 collision.
The URL entry also specifies the name of the data file containing
the actual data. The name of the file is usually
<hash value>_<timeout value>. In some cases, if there is a collision
it is <hash value>_<timeout value>_<a number>.
Such a collision can happen under two circumstances:
1) There was an md5 collision of two URLs and they have created
data files that have the same timeout.
2) There are two data files created for the same URL, under a second.
The older file is pending deletion due to LRU and the newer file
is just being added.
Irrespective of whether there is a collision or not, it is possible
to tell from the name of the file, when it will timeout. If the
file has not timed out, then it is opened and a file object is
returned. If no such entry exists or it has timed out, then None
is returned.
@param url: URL being looked up. This *must* be an encoded URL, if it has
special characters like spaces. In short, it must be a valid URL.
@type url: string
@type p_guid: The current gen id of the resource that produced the data set. When
this does not match the gen_id stored in the cache, then the cache entry is
invalidated.
@param p_guid: str
@param p_gen_id: The current guid of the resource that produced the data set. When
this does not match the guid stored in the cache, then the cache entry is
invalidated.
@param p_gen_id: str
@return: dictionary containing
{ HDR_DATA_FILE : data file pointer,
HDR_INDEX_FILE : index file pointer,
HDR_ROW_COUNT : row count,
HDR_GEN_ID : gen id of the resource,
HDR_GUID : guid of the resource,
HDR_DATA_FORMAT : data format of the entry (content type) }
In case of miss, None is returned.
@rtype: dict or None
@raise SnapException: If open/stat/utime of data file fails for
unexpected reasons.
"""
if not is_enabled():
return None
hs = md5.new(url).hexdigest()
hdr = CacheEntryHeader(hs)
if not hdr.open():
# Such a file does not exist, its a miss.
return None
try:
if url not in hdr.url_map:
# A cache miss
return None
datafile = hdr.url_map[url][HDR_DATA_FILE]
idxfile = hdr.url_map[url][HDR_INDEX_FILE]
row_count = hdr.url_map[url][HDR_ROW_COUNT]
gen_id = hdr.url_map[url][HDR_GEN_ID]
guid = hdr.url_map[url][HDR_GUID]
data_format = hdr.url_map[url][HDR_DATA_FORMAT]
(h, timeout) = _get_hash_and_timeout(datafile)
# Lose the existing entry, if it has
# 1) Timed out
# 2) If the gen_id and guids don't match (,eaning the resource has been changed or deleted,
# since the time that the data set was cached)
if (timeout <= long(time.time())) or \
(p_gen_id < gen_id) or (p_guid != guid):
# The entry has timedout, remove it and return a cache miss
_remove_data_file(datafile, True)
_remove_data_file(idxfile, True)
del hdr.url_map[url]
return None
# Its a cache hit
try:
datafp = open(datafile, "rb")
except IOError, (errnum, errstr):
# The file was not found. This can happen, if someone
# deleted the data file and failed to update the header file.
# Update the header file and return a cache miss.
# PS: open() returns IOError and not OSError as many of the
# other calls in this file do.
if errnum == errno.ENOENT:
# The file seems to have been deleted.
_remove_data_file(idxfile, True)
del hdr.url_map[url]
return None
else:
raise SnapException("Failed to open data file %s. Exception %s %s"
% (datafile, errnum, errstr))
try:
idxfp = open(idxfile, "rb")
except IOError, (errnum, errstr):
# This should not have happened
datafp.close()
raise SnapException("Failed to open index file %s. Exception %s %s"
% (datafile, errnum, errstr))
try:
st = os.stat(datafile)
# We forcibly update the access time of the file,
# because LRU algorithm needs it and on Windows,
# a certain registry setting can cause the access time to be
# not updated automatically, when the file is opened.
os.utime(datafile, (time.time(), st[stat.ST_MTIME]))
except OSError, (errnum, errstr):
raise SnapException("Failed to stat/utime data file %s. Exception %s %s" % (datafile, errnum, errstr))
return { HDR_DATA_FILE : datafp,
HDR_INDEX_FILE : idxfp,
HDR_ROW_COUNT : row_count,
HDR_GEN_ID : gen_id,
HDR_GUID : guid,
HDR_DATA_FORMAT : data_format }
finally:
# Unlock and witeout header.
hdr.close()
def _create_cache_entry(url, timeout, temp_fname, index_fname, guid, gen_id, data_format, row_count):
"""
Create a cache entry with the URL.
Note: If this function raises exception, it is the responsibility of the
caller to delete temporary result file by calling the tmp_file.remove()
method.
An md5 hash of the URL is made. If a header file by that hash name does not
already exist, it is created. If the header already exists, it is checked
to see if the URL entry is present in the header. If the entry is found and
has a timeout which is equal or greater than the new file being added, then
the new entry is discarded. If not, then the new entry is added and the old
entry is deleted.
@param url: URL of the pipeline that produced the result file. This *must*
be an encoded URL, if it has special characters like spaces. In short,
it must be a valid URL.
@type url: string
@param timeout: The time (seconds since epoch) at which the entry should be
timed out. Please note that this is the absolute time at which the
entry timesout and not some diff from current time.
@type timeout: int or long or float
@param tmp_file: The temporary result file.
@type tmp_file: L{TempResultFile}
@param index_fname: The name of the index file name.
@type index_fname: str
@param guid: The guid of the resource that created the data set.
@type guid: str
@param gen_id: The gen id of the resource that created the data set.
@type gen_id: int
@param data_format: The format of the data. It is specified as
"record <content_type>" for record mode data.
"binary <content_type>" for binary mode data.
@type data_format: str
@param row_count: Number of rows in the data set (if any).
@type row_count: int
@return: Name of the new data file, or None if the add failed.
@rtype: string
@raise SnapException: If creation of cache entry fails.
"""
if not is_enabled():
return
timeout = long(timeout)
hs = md5.new(url).hexdigest()
new_result_file_name = os.path.join(_result_dir, hs) + "_" + str(long(timeout))
hdr = CacheEntryHeader(hs)
hdr.open(True)
try:
curr_names = hdr.url_map.values()
fname = new_result_file_name
# If some other thread or process has created an result which has the same
# or greater timeout, then discard this new entry.
# By discarding the new entry with same timeout, we also avoid a collision
# caused by results having same timeout.
if url in hdr.url_map:
r = _get_hash_and_timeout(hdr.url_map[url][HDR_DATA_FILE])
if r[1] >= timeout:
os.remove(temp_fname)
return None
c = 0
# Next, we see if there is collisions with any other URL.
while (fname in curr_names) or os.path.exists(fname):
if c > 100:
# Something has to be wrong. There cannot possibly be
# so many collisions.
raise SnapException("Failed to create a name for URL %s with template %s" %
(url, new_result_file_name))
fname = "%s_%s" % (new_result_file_name, c)
c = c + 1
entry = {}
entry[HDR_DATA_FILE] = fname
entry[HDR_INDEX_FILE] = index_fname
entry[HDR_GEN_ID] = gen_id
entry[HDR_GUID] = guid
entry[HDR_DATA_FORMAT] = data_format
entry[HDR_ROW_COUNT] = row_count
if url in hdr.url_map:
# Remove the current result set and add the new one.
_remove_data_file(hdr.url_map[url][HDR_DATA_FILE], True)
_remove_data_file(hdr.url_map[url][HDR_INDEX_FILE], True)
hdr.url_map[url] = entry
os.rename(temp_fname, entry[HDR_DATA_FILE])
# TODO: This (file open) is a test. Will remove
# it once the code is mature.
newfp = open(entry[HDR_DATA_FILE])
newfp.close()
finally:
hdr.close()
return entry[HDR_DATA_FILE]
################################################################################
#
# Functions and Class that act as the primary interface to data_cache for
# creating cache entries and looking up cache entries.
#
################################################################################
def is_enabled():
"""Returns true if caching has been enabled."""
if cache_dir is None:
return False
else:
return True
def get_cache_size():
"""Return the size (in bytes) of all result and temporary result files."""
total_sz = 0
files = _get_all_data_files() + _get_all_index_files() + _get_all_temp_files()
for f in files:
try:
total_sz += os.path.getsize(f)
except OSError, (errnum, errstr):
# The file was not found. This can happen,
if errnum != errno.ENOENT:
# The file could have been deleted, after we did an ls. However, if that is not
# the error, then log the unexpected issue and move on.
log(snap_log.LEVEL_ERR, "Cache: getSize failed for file %s. Exception %s %s" % (f, errnum, errstr))
return total_sz
class CacheEntry(object):
"""
This class helps create a temporary data file in cache.
It also helps monitor how much data is written into cache.
If the size of cache is exceeded, it will pause the write to
cache and do cache cleanup before proceeding.
"""
def __init__(self):
"""Initializes the object."""
self._write_mode = False
self._read_mode = False
self.url = None
self._read_data_fp = None
self._read_index_fp = None
self._write_tmp_fd = None
self._write_index_fd = None
self.row_count = None
def open_entry(self, url, guid, gen_id, mode, data_format = None):
"""
Creates a cache entry object for the purpose of creating cache entry or accessing existing entry.
This object can be used to start the creation of a new cache entry by specify the mode as "w"
(write mode). Please note that this initialization only starts the creation of the entry. In
order to complete it, data has to be written, the stream closed by calling close() and then
entry comitted, by calling commit().
The object can also be used to gain access to an existing entry in cache by specifying the
mode as "r" (read). The entry is identified by the param "url".
@param url: URL of the entry.
@type url: string
@param guid: The guid of the resource that created the data set. Only needed for write mode open.
@type guid: str
@param gen_id: The gen id of the resource that created the data set. Only needed for write mode open.
@type gen_id: int
@param mode: Mode of using this object either to create entry ("w") or access existing entry "r".
@type mode: string
@param data_format: The format of the data. It is specified as
"record" for record mode data.
"binary" for binary mode data.
This is only needed when an entry is being written.
@type data_format: str
@return: True if the operation succeeded, False if it failed. It the open was in read mode and a
cache miss occurs, the failure is returned.
@rtype: bool
@raise SnapValueError: If mode has invalid value.
"""
if not is_enabled():
raise SnapException("Cache has not been initialized")
self.url = url
if mode == "w":
if data_format is None:
raise SnapException("Cache write open received no data format for URL %s" % self.url)
# The file is opened in binary mode by default
(self._write_tmp_fd, self.temp_fname) = tempfile.mkstemp(".tmp","RSet_", _tmp_dir)
(self._write_index_fd, self.index_fname) = tempfile.mkstemp(".file","Idx_", _index_dir)
self.gen_id = gen_id
self.guid = guid
self.data_format = data_format
self.row_count = 0
_set_current_size(get_cache_size())
self._write_mode = True
elif mode == "r":
r = _get_cache_entry(url, guid, gen_id)
if r is None:
return False
self._read_data_fp = r[HDR_DATA_FILE]
self._read_index_fp = r[HDR_INDEX_FILE]
self.guid = r[HDR_GUID]
self.gen_id = r[HDR_GEN_ID]
self.data_format = r[HDR_DATA_FORMAT]
self.row_count = r[HDR_ROW_COUNT]
self._read_mode = True
else:
raise SnapValueError("The mode should have the value 'r' or 'w'. Received ", mode)
return True
#
# Write open related methods
#
def _close_write(self):
"""
Closes the file descriptor to temp data and index files, if it is still open.
@raise SnapException: If close of file streams fail.
"""
e = None
if self._write_tmp_fd is not None:
try:
os.close(self._write_tmp_fd)
except Exception, err:
e = SnapException("Temp data file close failed %s" % err)
self._write_tmp_fd = None
if self._write_index_fd is not None:
try:
os.close(self._write_index_fd)
except Exception, err:
e = SnapException.chain(e, SnapException("Index file close failed %s" % err))
self._write_index_fd = None
if e is not None:
raise e
def write(self, data, index_value = None, row_inc = None):
"""
Writes data to the cache entry, if it has been opened for writing.
This method also takes optional argument to mark the block being
written with an index value. In future, this index value can be used
for seeking rapidly to the beginning of this block that is about
to be written.
The method also takes an optional method to increment the count of number
of records stored in the cache entry. This count will be available whenever
the cache entry is looked up in the future.
@param data: Data to be written
@type data: string
@param index_value: If an entry needs to be made in the index file
to record the location of this chunk of data being written,
then this index_value should be a value other than None and should
be unique for this index file.
@type index_value: string
@param row_inc: Number of rows to add to the current count.
@type row_inc: int or None
"""
if not self._write_mode:
raise SnapIOError("This object was not opened to write")
dlen = len(data)
if index_value is not None:
# Use lseek to find the current position value.
curr_pos = os.lseek(self._write_tmp_fd, 0, 1)
idx_str = "%s %s\n" % (index_value, curr_pos)
# We ignore the increase in size caused on windows by the conversion
# of \n to \c\r
dlen += len(idx_str)
sz = _get_current_size()
if sz + dlen > high_water_mark:
_reduce_cache_size(sz + dlen)
os.write(self._write_tmp_fd, data)
if index_value is not None:
os.write(self._write_index_fd, idx_str)
if row_inc is not None:
self.row_count += row_inc
_inc_current_size(dlen)
def commit(self, timeout):
"""
Commit the entry to cache.
This finalizes the storage of the entry in cache. The user MUST call this
method or the discard method, after opening an entry to write. If not,
the cache ends up having temporary entries that consume space.
@param timeout: The time (seconds since epoch) at which the entry should be
timed out. Please note that this is the absolute time at which the
entry timesout and not some diff from current time.
@type timeout: int or long or float
@return: Name of the new data file, or None if the add failed.
@rtype: string
"""
if not self._write_mode:
raise SnapIOError("This object was not opened in write mode, nothing to commit")
self.close()
return _create_cache_entry(self.url, timeout, self.temp_fname, self.index_fname,
self.guid, self.gen_id, self.data_format, self.row_count)
def discard(self):
"""
Discard the entry opened (in write mode) in cache.
This method is typically called by the user when some error has occured
and the user no longer intends to go through with the creation of this
entry. In general, the user must call commit() or discard() after an
entry has been opened for writing.
"""
if not self._write_mode:
raise SnapIOError("This object was not opened in write mode, nothing to discard")
self.close()
if self.temp_fname:
sz = os.path.getsize(self.temp_fname)
os.remove(self.temp_fname)
_dec_current_size(sz)
self.temp_fname = None
if self.index_fname:
sz = os.path.getsize(self.index_fname)
os.remove(self.index_fname)
_dec_current_size(sz)
self.index_fname = None
#
# Read open related methods
#
def _close_read(self):
"""
Closes the file descriptor to temp data and index files, if it is still open.
@raise SnapException: If close of file streams fail.
"""
e = None
if self._read_data_fp is not None:
try:
self._read_data_fp.close()
except Exception, err:
e = SnapException("Data file close failed %s" % err)
self._read_data_fp = None
if self._read_index_fp is not None:
try:
self._read_index_fp.close()
except Exception, err:
e = SnapException.chain(e, SnapException("Index file close failed %s" % err))
self._read_index_fp = None
if e is not None:
raise e
def read(self, size = None):
"""
Read specified size. If size is negative or omitted, then all the data upto EOF is read.
@param size: Size to be read.
@type size: long
"""
if not self._read_mode:
raise SnapIOError("This object was not opened for reading")
# File object read() handles the logic of reading negative size.
if size is not None:
data = self._read_data_fp.read(size)
else:
data = self._read_data_fp.read()
if data == "":
return None
return data
def _seek_index(self, tgt_idx):
"""
Find the specified index entry in file. If not found, seek to closest index entry before the specified one.
@param tgt_idx: Index file number being requested.
@type tgt_idx: long
@return: True, if the seek worked, False otherwise.
@rtype: bool.
@return: 2-tuple containing index number found and the poisition in the data file.
None if no such entry orclosest preceding entry was found.
@rtype: tuple or None
"""
self._read_index_fp.seek(0, 2)
high = self._read_index_fp.tell()
self._read_index_fp.seek(0)
low = self._read_index_fp.tell()
c = 0
while low < high:
c += 1
#print "low %s high %s" % (low, high)
mid = long((low + high)/2)
# Go to midpoint
if mid > 0:
# The mid point we calculated could be in the middle of some line or at the
# beginning of some line (which is what we want). To deal with these two
# scenarios, seek one byte behind mid point and read a line. If the mid point
# is the beginning of the line, then we will rread "\n" and return to that
# midpoint. If the midpoint is in the middle of a line, then we will read
# past that line and reach begining of next line. Either way, we endup
# properly aligned,
self._read_index_fp.seek(mid - 1)
self._read_index_fp.readline()
else:
self._read_index_fp.seek(mid)
mid_line = self._read_index_fp.readline()
if mid_line == "":
# Nothing in the high section (EOF), search mid to low section.
high = mid
continue
mid_entry = mid_line.split(" ")
mid_idx = long(mid_entry[0].strip())
#print "\tMid index %s num %s" %(mid_idx, mid)
if tgt_idx < mid_idx:
# Search from low to mid section.
high = mid
continue
elif tgt_idx > mid_idx:
next_line = self._read_index_fp.readline()
if next_line == "":
# We are at end of the file. The mid entry is the closest one to
# the requested index
return (mid_idx, long(mid_entry[1].strip()))
next_entry = next_line.split(" ")
next_idx = long(next_entry[0].strip())
if next_idx > tgt_idx:
# This one is greater than the requested index. The mid point
# is the closest entry.
return (mid_idx, long(mid_entry[1].strip()))
elif next_idx == tgt_idx:
return (next_idx, long(next_entry[1].strip()))
low = mid
else:
# We found exact match
return (mid_idx, long(mid_entry[1].strip()))
return None
def seek_nearest_record(self, tgt_rec):
"""
Seek to the specified record or the nearest possible record before it.
@param tgt_rec: The record being searched for.
@type tgt_rec: long
@return: The number of the record that was seeked to., or None, if no
suitable record was found.
@rtype: long or None
"""
if not self._read_mode:
raise SnapIOError("This object was not opened for reading")
ret = self._seek_index(tgt_rec)
if ret is None:
return None
self._read_data_fp.seek(ret[1])
return ret[0]
def seek(self, offset, whence=0):
"""
Has the same semantics as file object seek.
Set the file's current position. The whence argument is optional
and defaults to 0 (absolute file positioning); other values are 1
(seek relative to the current position) and 2 (seek relative to
the file's end). There is no return value.
@param offset: The number of bytes to seek.
@type offset: long
@param whence: Can take values 0, 1 and 2
@type whence: int
"""
if not self._read_mode:
raise SnapIOError("This object was not opened for reading")
self._read_data_fp.seek(offset, whence)
def close(self):
if self._read_mode:
self._close_read()
if self._write_mode:
self._close_write()
def open_entry(url, guid, gen_id, mode, data_format = None):
"""
Returns a cache entry object for the purpose of creating new cache entry or accessing existing entry.
If mode is specified as "w" (write mode), then the object returned can be used to create a new entry.
In order to create a new cache entry using the object, data has to be written by calling the write()
method of the object, and then entry comitted, by calling commit() method of the object. If for any
reason, the entry needs to be discarded, instead of comitted, then the discard() method should be
called.
If mode is specified as "r" (read), the function will try to rerieve an existing entry in cache that
matches the specified URL.
@param url: URL of the entry.
@type url: string
@param guid: The guid of the resource that created the data set. Only needed for write mode open.
@type guid: str
@param gen_id: The gen id of the resource that created the data set. Only needed for write mode open.
@type gen_id: int
@param mode: Mode of open, either to create an entry ("w") or access an existing entry "r".
@type mode: string
@param data_format: The format of the data. It is specified as
"record <content_type>" for record mode data.
"binary <content_type>" for binary mode data.
This is only needed when an entry is being written.
@type data_format: str
@return: Cache entry object or None. None is returned in read mode, when there is a cache miss.
@rtype: L{CachEntry}
@raise SnapException: If the cache has not been initialized.
"""
if not is_enabled():
raise SnapException("Cache has not been initialized")
entry = CacheEntry()
if not entry.open_entry(url, guid, gen_id, mode, data_format):
return None
return entry
################################################################################
#
# Functions that can only be run safely, when the server is coming up or
# is down. These functions work under the assumption that there is only
# one thread or process accessing the cache.
#
################################################################################
def init_cache(dir, size, low_mark, high_mark, p_log, p_elog):
"""
Initialize the cache.
This function should be called when the server is starting up. It sets up
the directories and files inside the cache, if they don't already exist.
@param dir: Location of the cache directory.
@type dir: string
@param size: Maximum size of the cache in bytes.
@type size: long
@param low_mark: Low water mark (percentage value from 0 to 100)
@type low_mark: int
@param high_mark: High water mark for cache (percentage value from 0 to 100)
@type high_mark: int
@param p_log: Log method
@type p_log: log method
@param p_elog: Exception log method
@type p_elog: Exception log method
@raise SnapValueError: If the values specified in the parameters are invalid.
"""
global cache_dir
global max_size
global low_water_mark
global high_water_mark
global _delete_list_file
global _tmp_dir
global _result_dir
global _index_dir
global _header_dir
global log
global elog
if dir is None:
# The cache is not going to be activated.
return
if size < 0:
raise SnapValueError("Size of cache cannot be negative (%s)" % size)
cache_dir = os.path.abspath(dir)
max_size = size
if low_mark < 0 or low_mark > 100:
raise SnapValueError("Low water mark value %s should be a percentage value" % low_mark)
if high_mark < 0 or high_mark > 100:
raise SnapValueError("High water mark value %s should be a percentage value" % high_mark)
low_water_mark = (int(low_mark*max_size)/100)
high_water_mark = long((high_mark*max_size)/100)
log = p_log
elog = p_elog
# Create the cache directory structure
_create_dir(cache_dir)
_tmp_dir = os.path.join(cache_dir, "tmp")
_create_dir(_tmp_dir)
_header_dir = os.path.join(cache_dir, "header")
_create_dir(_header_dir)
_result_dir = os.path.join(cache_dir, "result")
_create_dir(_result_dir)
_index_dir = os.path.join(cache_dir, "index")
_create_dir(_index_dir)
_delete_list_file = os.path.join(cache_dir, "__delete_list__")
_create_file(_delete_list_file)
def header_checkup(fname, data_files, index_files):
"""
Return True if the header file points to atleast one existing data file.
The function also modifies data_files and leaves only those data files in
the list which have no header files pointing to it.
@param fname: Name of header file being tested.
@type fname: string
@param data_files: List of data files that have not yet found a header file
referring to them.
@type data_files: list
@param index_files: List of index files that have not yet found a header
file referring to them.
@type index_files: list
@return: True if the header file has a reference to atleast one existing
data file.
@rtype: bool
"""
hdr = CacheEntryHeader(fname)
if not hdr.open():
raise SnapException("Header checkup failed to open headr file %s" % fname)
try:
is_valid = False
if len(hdr.url_map) != 0:
# The header file is not empty. See if the entry is still valid
for url in hdr.url_map:
if hdr.url_map[url][HDR_DATA_FILE] not in data_files :
# Data file does not exist.
continue
if hdr.url_map[url][HDR_INDEX_FILE] not in index_files :
# Index file does not exist.
continue
is_valid = True
# There is a valid header referring to that data file
data_files.remove(hdr.url_map[url][HDR_DATA_FILE])
index_files.remove(hdr.url_map[url][HDR_INDEX_FILE])
if is_valid:
return True
else:
# No valid entries were in the header file. Delete it.
return False
finally:
hdr.close()
def startup_cleanup():
"""
Do cache cleanup at server startup time.
This method removes empty header files and temporary files. These are
files that are typically left behind by the abrupt shutdown of the
server.
Since this is called only at startup time, there is no reason for a file
deletion to fail, other than some serious issue. For this reason,
this function will throw exceptions whenever a delete fails. These
exceptions should stop the server startup.
@raise SnapException: If a file delete fails.
"""
if not is_enabled():
return
# First, cleanup timed out entries.
_remove_timedout_entries(True)
# Second, cleanup delete list
_remove_delete_list_entries(True)
# Now cleanup header files that are empty or pointing to non existent data files.
# This has to happen after timed out entries have been removed above (as it creates
# more empty header files).
data_files = _get_all_data_files()
index_files = _get_all_index_files()
names = _get_all_header_files()
for n in names:
if not header_checkup(n, data_files, index_files):
try:
os.remove(n)
except Exception, e:
raise SnapException("Failed to remove empty header file %s exception %s" % (n, e))
# Delete data and index files that don't have a valid header file referring to them.
# This can happen if the file ref was removed from header file, but
# process was killed/crshed before that data or index file could be removed.
# Or, in the case of index files, if the process failed to commit the entry.
for d in data_files:
try:
os.remove(d)
except Exception, e:
raise SnapException("Failed to remove data file %s, exception %s" % (d, e))
for i in index_files:
try:
os.remove(i)
except Exception, e:
raise SnapException("Failed to remove index file %s, exception %s" % (i, e))
# Delete temp files
names = _get_all_temp_files()
for n in names:
try:
os.remove(n)
except Exception, e:
raise SnapException("Failed to remove temporary file %s, exception %s" % (n, e))
# Compute current size.
_set_current_size(get_cache_size())
def empty_cache():
"""
Remove all cache data (including temp, header and data files.
The delete list file is also set to empty. This method should
only be called when no other process or thread is accessing
the cache.
@raise SnapException: If any of the file deletions fail.
"""
if not is_enabled():
return
excp = None
for n in _get_all_data_files():
try:
os.remove(n)
except Exception, e:
excp = SnapException.chain(excp, e)
for n in _get_all_index_files():
try:
os.remove(n)
except Exception, e:
excp = SnapException.chain(excp, e)
for n in _get_all_header_files():
try:
os.remove(n)
except Exception, e:
excp = SnapException.chain(excp, e)
for n in _get_all_temp_files():
try:
os.remove(n)
except Exception, e:
excp = SnapException.chain(excp, e)
# Empty the delete list file.
try:
f = open(_delete_list_file,"w")
try:
f.write("")
finally:
f.close()
except Exception, e:
excp = SnapException.chain(excp, e)
# Recompute the current cache size.
_set_current_size(get_cache_size())
if excp is not None:
raise excp
def destroy_cache():
"""Do the opposite of init. Remove the entire cache setup."""
empty_cache()
os.rmdir(_tmp_dir)
os.rmdir(_result_dir)
os.rmdir(_index_dir)
os.rmdir(_header_dir)
os.remove(_delete_list_file)
os.rmdir(cache_dir)
|