# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
# $
#
# $Id: snap_log.py 9868 2009-11-21 17:50:45Z dhiraj $
from __future__ import with_statement
"""
The SnapLogic logging module.
This module provides logging facilities to processes in
the SnapLogic server. Each process (not thread!) should
get itself an instance of a logging object, and then use
this instance to write log messages.
Each logging instance will create individual log files.
Consequently, each process writes its own log file. An
identifier for the log file is specified when a log
instance is created.
Each log line has a timestamp, process-ID, facility, level,
pipeline-ID and log message.
Normally, the facility would have to be provided with each
call to the logging function. However, to reduce the need
for repetitive typing, we provide a helper function, which
creates a facility-specific version of the log() function.
The same for pipeline IDs. The helper functions have that
one specified by the time of their creation as well, so that
it doesn't have to be specified over and over.
Please note that each process can generate a number of those
helper functions for itself. For example, one that doesn't
specify a pipeline ID and thus is useful for non-pipeline
related things, and another with a pipeline ID, created
specifically when the pipeline was started.
A number of facilities are predefined:
LOG_RH
LOG_PM
LOG_REPO
LOG_SNAPI
LOG_RP
LOG_CC
LOG_COMPONENT
LOG_LOG
LOG_AUTH
LOG_SNAPSTREAM
LOG_SCHEDULER
LOG_NOTIFICATION
LOG_SERVER
The defined log levels are:
CRIT
ERR
WARN
INFO
DEBUG
"""
import logging
import logging.handlers
import time
import sys
import functools
import copy
import codecs
import os
from contextlib import nested
from snaplogic.common.snap_exceptions import *
from snaplogic.common import uri_prefix,parse_size_param
from snaplogic.server import RhResponse,product_prefix
enforce_singleton = True
"""For testing, can be set to False."""
TIME_STAMP_FORMAT = "%Y-%m-%dT%H:%M:%S"
"""Time format to use when logging. Use like time.strftime(TIME_STAMP_FORMAT)"""
LOG_FORMAT = "%(asctime)s %(levelname)s %(message)s"
"""Basic log format for message and exception."""
ACCESS_LOG_FORMAT = "%(asctime)s %(message)s"
"""Access log format."""
LEVEL_SEPARATOR = "@"
"""Separates facility from log levels"""
# The pre-defined logging facilities of the system.
LOG_RH = "RH"
LOG_PM = "PM"
LOG_REPO = "REPO"
LOG_SNAPI = "SNAPI"
LOG_RP = "RP"
LOG_CC = "CC"
LOG_COMPONENT = "COMP"
LOG_LOG = "LOG"
LOG_AUTH = "AUTH"
LOG_SNAPSTREAM = "SNAPSTREAM"
LOG_SCHEDULER = "SCHEDULER"
LOG_NOTIFICATION = "NOTIFICATION"
LOG_SERVER = "SERVER"
_facility_list = [ LOG_RH, LOG_PM, LOG_REPO, LOG_SNAPI, LOG_RP, LOG_CC, LOG_COMPONENT, LOG_LOG, LOG_AUTH, LOG_SNAPSTREAM, LOG_SCHEDULER, LOG_NOTIFICATION, LOG_SERVER ]
# The log levels that we support
LEVEL_DEBUG = "DEBUG"
LEVEL_INFO = "INFO"
LEVEL_WARN = "WARNING"
LEVEL_ERR = "ERROR"
LEVEL_CRIT = "CRITICAL"
SNAP_TO_PYLEVELS = {LEVEL_DEBUG : logging.DEBUG,
LEVEL_INFO : logging.INFO,
LEVEL_WARN : logging.WARNING,
LEVEL_ERR : logging.ERROR,
LEVEL_CRIT : logging.CRITICAL}
# This shows the levels from lowest to highest.
_level_list_sorted = [ LEVEL_DEBUG, LEVEL_INFO, LEVEL_WARN, LEVEL_ERR, LEVEL_CRIT ]
ACCESS_LOG = 'access'
MESSAGE_LOG = 'main'
EXCEPTION_LOG = 'exception'
_instance = None
# The logging module can write its various log files.
# We allow clients to access those via distinct URIs.
# These are the suffixes of the URI, which indicate
# the type of log that was requested.
_log_names = [ ACCESS_LOG, MESSAGE_LOG, EXCEPTION_LOG, 'pipeline' ]
log_uri_params = ["facility", "rid", "start_date", "end_date", "last_lines", "max_level", "min_level"]
def get_log_name(http_req):
"""
Return the name of the specified log.
This is simply the extraction of the log-name portion
of a request URI. It's in its own function, because
this functionality is also needed in the get_log()
function in the server.
@param http_req: HTTP request object.
@type http_req: L{HttpRequest}
@return: The extracted log-name as a string,
or None in case of error.
@rtype: string
"""
# We don't need the '/' at the start of the path,
# so we are skipping over that as well.
log_type = http_req.path[len(uri_prefix.LOGS)+1:]
if log_type not in _log_names:
return None
else:
return log_type
def facility_parsing(val):
"""
Parse the value of the facility argument.
This is in a separate function, since it is also used
by the get_logs() method in server.py.
Returns a touple with two lists (or None in their place),
which contains the desired facilities and the ones that
are specifically disallowed.
Syntax on the URI line: facility=foo,bar,-blah
Allows 'foo', 'bar' but not 'blah'.
@param val: The value of the facility argument.
@type val: string
@return: Tuple of two lists, the first one containing
the facilities we allow, the second one the
facilities we deny.
@rtype: tuple of lists
"""
facility_list = None
neg_facility_list = None
# We allow the definition of multiple facilities, and also negations.
# So, for example, you can say:
# facility=foo,bar # Show only facility 'foo' and 'bar'.
# But you can also say:
# facility=-foo,-bar # Show only facilities that are NOT 'foo' and NOT 'bar'.
# And finally, you can say:
# facility=foo,-bar # Show only facility 'foo' and those that are not 'bar', which makes
# # no sense at all, really.
for fac in val.split(","):
fac = fac.strip()
is_neg = False
if fac.startswith("-"):
is_neg = True
fac = fac[1:]
if is_neg:
if neg_facility_list == None:
neg_facility_list = []
neg_facility_list.append(fac)
else:
if facility_list == None:
facility_list = []
facility_list.append(fac)
return (facility_list, neg_facility_list)
def send_headers_for_logs(http_req, log_type):
"""
Send the response headers needed for outputting log information.
This method is also used by some other modules, which is why
it was factored out of get_logs().
@param http_req: HTTP request object.
@type http_req: L{HttpRequest}
@param log_type: Name of the logfile, as it should appear in the
title of the output.
@type log_type: string
"""
http_req.send_response_headers(200, None, True, { 'font' : 'fixed-width', 'enclosing_tags' : ('<pre>', '</pre>'),
'table' : 'no', 'title' : '%s log' % log_type })
def get_logs(http_req, public_uri, logger, rlog, elog):
"""
Return server log info.
This function is used by different processes. Therefore, some of the otherwise
process-global variables, such as logger, rlog and elog are just passed in
as parameters. This avoids the complexities of recursive imports.
Processes also have a get_logs() function, which just serves to call this one
here with the right parameters.
@param http_req: HTTP request object.
@type http_req: L{HttpRequest}
@param public_uri: The public-URI of this server process. This is
needed to construct the absolute URIs for access
to the various log files.
@type public_uri: str
@param logger: The logging object of this process. The logging
module knows how to write out the contents of
log files.
@type logger: L{SnapLog}
@param rlog: The logging function for the access log of this process.
When a user accesses a log file then this access needs
to appear in the access log, just like any other access.
@type rlog: function
@param elog: The logging function for the exception log of this process.
@type elog: function
@return: RhResponse object with data and code
to be written to client.
@rtype: L{RhResponse}
"""
# At the uri_prefix.LOGS root we present a little menu, which shows
# the more specialized URIs for particular logs.
if http_req.path == uri_prefix.LOGS or http_req.path == uri_prefix.LOGS + "/":
serv = public_uri
menu = {}
for k in _log_names:
uri = serv + uri_prefix.LOGS + "/" + k
menu[k] = { 'all' : uri, 'last_40_lines' : uri + '?last_lines=40' }
return RhResponse(200, menu, None, { 'title' : product_prefix.SNAPLOGIC_PRODUCT + ': Log files' })
# Someone requested something more specialized, so we extract
# the name of the log that is wanted.
log_type = get_log_name(http_req)
if not log_type:
return RhResponse(404, "Not found")
try:
params = {}
ret = process_log_params(log_type, http_req, params)
if ret is not None:
return ret
send_headers_for_logs(http_req, log_type)
get_instance().write_log_out(log_type, http_req.output, params)
# max_lines, do_filtering, rid, accept_levels,
# start_date, end_date, (facility_list, neg_facility_list))
http_req.output.end()
# Because we are handling our own stream, we need to write our own
# log entry for the access log.
rlog(http_req, 200)
return None
except Exception, e:
rlog(http_req, 500)
elog(e)
return RhResponse(500, "Internal server error")
def process_log_params(log_type, http_req, params):
"""
Process any params provided with HTTP request for log messages.
@param log_type: This designates the streams as message, exception or access log stream.
@type log_type: str
@param http_req: HTTP request object.
@type http_req: L{HttpRequest}
@param params: Dictionary that must be modified by this function to set the params
extracted from the HTTP request
@return: If an error occurs, then a non-None value will be returned.
@rtype: L{RhResponse}
"""
max_level_idx = None
min_level_idx = None
for param in http_req.params:
if param == 'last_lines':
try:
params["last_lines"] = int(http_req.params['last_lines'])
except:
return RhResponse(400, "Bad request: 'last_lines' parameter must specify a valid number...")
elif param == 'rid':
# We can filter on the RID in a number of logs.
params["rid"] = http_req.params['rid']
elif param == 'facility':
if log_type == "access":
return RhResponse(400, "Bad request: 'facility' parameter is not supported for the '%s' log..." % log_type)
fac_string = http_req.params['facility']
facility_pass, facility_deny = facility_parsing(fac_string)
if facility_pass:
params["facility_pass"] = facility_pass
if facility_deny:
params["facility_deny"] = facility_deny
elif param == 'max_level':
if log_type not in [ 'main', 'exception', 'pipeline']:
return RhResponse(400, "Bad request: 'max_level' parameter is not supported for the '%s' log..." % log_type)
level = http_req.params['max_level']
try:
max_level_idx = _level_list_sorted.index(level)
except ValueError:
return RhResponse(400, "Bad request: 'max_level' parameter '%s' not a known log-level..." % level)
elif param == 'min_level':
if log_type not in [ 'main', 'exception', 'pipeline']:
return RhResponse(400, "Bad request: 'min_level' parameter is not supported for the '%s' log..." % log_type)
level = http_req.params['min_level']
try:
min_level_idx = _level_list_sorted.index(level)
except ValueError, e:
return RhResponse(400, "Bad request: 'min_level' parameter '%s' not a known log-level..." % level)
elif param == 'start_date':
start_date_str = http_req.params['start_date']
try:
start_date = time.strptime(start_date_str, TIME_STAMP_FORMAT)
params["start_date"] = time.mktime(start_date)
except ValueError, e:
return RhResponse(400, "Bad request: 'start_date' parameter '%s' malformed...(%s)" %
(start_date_str, e))
elif param == 'end_date':
end_date_str = http_req.params['end_date']
try:
end_date = time.strptime(end_date_str, TIME_STAMP_FORMAT)
params["end_date"] = time.mktime(end_date)
except ValueError, e:
return RhResponse(400, "Bad request: 'end_date' parameter '%s' malformed...(%s)" %
(end_date_str, e))
else:
return RhResponse(400, "Bad request: Parameter '%s' is not known..." % param)
if min_level_idx is not None or max_level_idx is not None:
accept_levels = []
if min_level_idx is not None and max_level_idx is not None and min_level_idx > max_level_idx:
return RhResponse(400, "Bad request: 'min_level' cannot be greater than max_level")
if min_level_idx is None:
min_level_idx = 0
elif max_level_idx is None:
max_level_idx = len(_level_list_sorted) - 1
for level in _level_list_sorted[min_level_idx : max_level_idx + 1]:
accept_levels.append(level)
params["accept_levels"] = accept_levels
return None
def get_instance():
"""
Return the single instance of the log object in this process.
If no instance exists yet then it returns None.
"""
return _instance
class NullHandler(logging.Handler):
"""
This class is used in disabling logging.
"""
def __init__(self, level=logging.NOTSET):
logging.Handler.__init__(self, level)
self.maxBytes = 0
# send stream to dev null.
self.stream = open(os.devnull, 'w')
def emit(self, record):
pass
class SnapLog(object):
"""
The logging object.
One instance of this needs to be created by each process. The
process ID and the logging directory need to be defined. The
logging object knows how to log normal messages as well as
exceptions, which are logged in a separate file.
To get facility-specific logging functions, use the
make_facility_log() helper function. You can call this function
multiple times, to get loggers with specific log-levels and/or
pipeline-IDs.
"""
def __init__(self, process, logdir=None, to_console=False, disable_logging=None, log_level=LEVEL_INFO,
max_log_size=None, log_backup_count=None):
"""
Initialize log file.
This creates/opens the log file for this process. If
there is a problem, the native IO exception will be
propegated.
@param process: Process ID
@type process: str
@param logdir: The directory for the logfile.
@type logdir: str
@param to_console: Flag indicating whether output should ALSO be
mirrored to console.
@type to_console: bool
@param disable_logging: Can be None or a string or a list of strings with values:
"message", "exception" and "access". Specifying the string turns off
the corresponding log file.
@type disable_logging: str or list.
@param log_level: The minimum level of logging.
@param log_level: str
@param max_log_size: The maximum size the log should grow to. If infinit, then set to None.
@type max_log_size: str
@param log_backup_count: The number of backups to permit for logs that are rotated out. Defaults to 5.
@type log_backuup_count: str
@throws: Exception
"""
global _instance
if _instance and enforce_singleton:
raise SnapObjExistsError("Attempting creation of duplicate logging singleton.")
if logdir:
self._logdir = logdir
else:
self._logdir = "/tmp"
if process:
self._process = process
else:
self._process = "(unknown)"
if max_log_size is None:
self.max_bytes = 0 # Disable log rotation.
else:
self.max_bytes = parse_size_param(max_log_size)
if self.max_bytes is None:
raise SnapValueError("The option max_log_size has invalid value %s" % max_log_size)
self.backup_count = 5 if log_backup_count is None else int(log_backup_count)
self._file_handler_map = {}
# Process any log disable config settings.
valid_logs = (ACCESS_LOG, MESSAGE_LOG, EXCEPTION_LOG)
self._disable_logging = []
if disable_logging:
if isinstance(disable_logging, list):
for s in disable_logging:
if s not in valid_logs:
raise SnapValueError("%s is invalid value for stop logging option. Correct values are - %s" %
(s, ", ".join(valid_logs)))
self._disable_logging = disable_logging
elif disable_logging in valid_logs:
self._disable_logging = [disable_logging]
else:
raise SnapValueError("%s is invalid value for stop logging option. Correct values are - %s" %
(disable_logging, ", ".join(valid_logs)))
# Process log level settings
facility_log_level = {}
if not isinstance(log_level, list):
log_level = [log_level]
level_names = SNAP_TO_PYLEVELS.keys()
for s in log_level:
v = s.split(LEVEL_SEPARATOR)
if len(v) == 1:
level = v[0]
scope = ""
elif len(v) == 2:
scope = v[0]
level = v[1]
else:
raise SnapValueError("%s specified in log_level is not a valid value" % s);
if level not in level_names:
raise SnapValueError("%s specified in log_level is not a valid log level" % level)
facility_log_level[scope] = SNAP_TO_PYLEVELS[level]
access_formatter = logging.Formatter(ACCESS_LOG_FORMAT, TIME_STAMP_FORMAT)
# Setup Access logger
self._accesslogfile_name = self._logdir + "/" + self._process + "_access.log"
# We hardcode access log level at info, since SnapLogic log levels only apply for message and exception logs.
# For access logs, you either have it or disable it.
self._access_logger_id = self._configure_logger(ACCESS_LOG, self._accesslogfile_name, to_console,
access_formatter, {"" : logging.INFO})
formatter = logging.Formatter(LOG_FORMAT, TIME_STAMP_FORMAT)
# Setup message logger.
self._logfile_name = self._logdir + "/" + self._process + ".log"
self._mesg_logger_id = self._configure_logger(MESSAGE_LOG, self._logfile_name, to_console, formatter,
facility_log_level)
# Setup exception logger.
self._excp_logfile_name = self._logdir + "/" + self._process + "_exceptions.log"
self._excp_logger_id = self._configure_logger(EXCEPTION_LOG, self._excp_logfile_name, to_console, formatter,
facility_log_level)
# Redirect stdout and stderr to the main process log file, if 'to_console' was not set
if (not to_console) and self.get_main_log_stream() is not None:
sys.stdout = self.get_main_log_stream()
sys.stderr = self.get_main_log_stream()
self._mesg_id_prefix = str(int(time.time())) + "."
self._mesg_id_count = 0
self._to_console = to_console
_instance = self
def _configure_logger(self, log_type, logfile_name, to_console, formatter, facility_log_level):
"""
Configure handlers for a specified log stream.
@param log_type: This designates the streams as message, exception or access log stream.
@type log_type: str
@param logfile_name: Name of the log file to log into.
@type logfile_name: str
@param to_console: Flag if true, then we also "tee" the output to console.
@type to_console: bool
@param formatter: The format for the log message.
@type formatter: L{logging.Formatter}
@param facility_log_level: All the facility specific log levels.
@type facility_log_level: dict
"""
logger = self.get_facility_logger(log_type)
# Setup message logger.
if log_type in self._disable_logging:
# Disable logging
h = NullHandler()
logger.addHandler(h)
self._file_handler_map[log_type] = h
else:
file_handler = logging.handlers.RotatingFileHandler(logfile_name, maxBytes=self.max_bytes,
backupCount=self.backup_count)
file_handler.setFormatter(formatter)
self._file_handler_map[log_type] = file_handler
logger.addHandler(file_handler)
if to_console:
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)
for (scope, level) in facility_log_level.items():
lg = self.get_facility_logger(log_type, scope)
lg.setLevel(level)
return logger.name
def get_main_logfile_name(self):
"""
Return the name of the main logfile.
@return The name of the logfile.
@rtype string
"""
return self._logfile_name
def get_main_log_stream(self):
"""
Return the file pointer for the main logfile.
@return File descriptor for the logfile.
"""
return self._file_handler_map[MESSAGE_LOG].stream
def generic_log(self, logger, level, message, mesg_id=None, facility_id=None, rid=None, resource_name=None,
invoker=None):
"""
Write a log message.
This writes the next message into the logfile of this log object. A facility identifier
needs to be specified.
@param logger: The message logger.
@type logger: L{logging.Logger}
@param level: The logging level. Valid logging levels are defined at the beginning
of this file.
@type level: str
@param message: The actual log message.
@type message: str
@param mesg_id: ID used to associate a message log entry with an exception log entry.
@type mesg_id: str
@param facility_id: The facility identifier.
@type facility_id: str
@param rid: The unique ID of a pipeline. If this is not
set (None), then a '-' will be logged in its
place.
@type rid: str
@param resourceName: Resource name if this message is from a member of execting pipeline. If
this is not set (None), then a '-' will be logged in its place.
@type resourceName: str
@param invoker: If this message is from an executing resource, then its invoker can be
specified here If not, "-" is written to log file.
@type invoker: str
"""
if not logger.isEnabledFor(level):
return
if mesg_id is None:
mesg_id = "-"
if rid is None:
rid = "-"
if resource_name is None:
resource_name = "-"
else:
resource_name = "\"%s\"" % resource_name
if invoker is None:
invoker = "-"
if facility_id is None:
facility_id = "-"
log_type = "%s.%s" % (self._process, MESSAGE_LOG);
buf = "%s %s %s %s %s %s %s" % (log_type, facility_id, mesg_id, rid, resource_name, invoker, message)
if type(buf) == unicode:
buf = buf.encode('utf-8')
logger.log(SNAP_TO_PYLEVELS[level], buf)
def generic_exception_log(self, excp_logger, mesg_logger, ex, message=None, level=LEVEL_ERR,
facility_id=None, rid=None, resource_name=None, invoker=None):
"""
Log an exception.
This outputs information about the specified exception. The
message of the exception is logged in the main log file, the
stack trace in the exception logfile.
Note that in the case of chained exceptions, the normal logfile
will only contain the message of the top-most exception. The
exception log-file, however, contains messages and stack traces
of all the chained exceptions.
@param excp_logger: The exception logger.
@type excp_logger: L{logging.Logger}
@param mesg_logger: The message logger.
@type mesg_logger: L{logging.Logger}
@param ex: The exception (chain) that needs to be logged
in the exception log file.
@type ex: Exception
@param message: An optional, higher level message that can be
provided by the caller, and which is logged in
the normal log file. If this is None, then
nothing will be logged in the normal log file.
@type message: str
@param level: The logging level. Valid logging levels are defined at the beginning
of this file.
@type level: str
@param message: The actual log message.
@type message: str
@param mesg_id: ID used to associate a message log entry with an exception log entry.
@type mesg_id: str
@param facility_id: The facility identifier.
@type facility_id: str
@param rid: The unique ID of a pipeline. If this is not
set (None), then a '-' will be logged in its
place.
@type rid: str
@param resourceName: Resource name if this message is from a member of execting pipeline. If
this is not set (None), then a '-' will be logged in its place.
@type resourceName: str
@param invoker: If this message is from an executing resource, then its invoker can be
specified here If not, "-" is written to log file.
@type invoker: str
"""
if not excp_logger.isEnabledFor(level):
return
# Each exception is logged with a unique ID. This allows the message in the
# main log to be easily associated with messages in the exception log.
# The unique ID is generated from a unique number chosen when the logger module
# was initialized, and a running counter that is maintained throughout the
# lifetime of the logging module and which is increased with each exception.
mesg_id = "%s%d" % (self._mesg_id_prefix, self._mesg_id_count)
self._mesg_id_count += 1
# In the normal log file, we log a user-specified message. If this message
# is not provided then nothing will be logged in the normal log file.
if message:
self.generic_log(mesg_logger, level, message, mesg_id = mesg_id, facility_id = facility_id, rid = rid,
resource_name = resource_name, invoker = invoker)
if rid is None:
rid = "-"
if resource_name is None:
resource_name = "-"
else:
resource_name = "\"%s\"" % resource_name
if invoker is None:
invoker = "-"
if facility_id is None:
facility_id = "-"
log_type = "%s.%s" % (self._process, EXCEPTION_LOG);
# The messages of all exceptions, plus their stack traces, are written into the
# exception log file. Note that this only supports SnapExceptions. We can easily
# get a string representation of an entire chain of exceptions, with all the
# messages and stack traces.
buf = "%s %s %s %s %s %s %s" % (log_type, facility_id, mesg_id, rid, resource_name, invoker,
SnapException.snapify_exception(ex).get_all(True))
if type(buf) == unicode:
buf = buf.encode('utf-8')
excp_logger.log(SNAP_TO_PYLEVELS[level], buf)
# Finally, we mark the fact that we have logged this exception (chain) already.
# This allows us to log the chain at one point, but rethrow the exception and
# handle the whole thing somewhere else. There then, we can prevent double
# logging by checking the top-most exception in the chain to see if this flag
# is set.
ex.is_logged = True
def access_log(self, logger, http_req, status):
"""
Log an HTTP request.
The log entry is written to the standard server log at the INFO level.
@param http_req: The http_request object.
@type http_req: L{HttpReq}
@param status: The HTTP return status code.
@type status: int
"""
if http_req.username:
invoker = http_req.username
else:
invoker = "-"
buf = "%s %s %s \"%s\" %d %d" % (http_req.client_address, invoker, http_req.method, http_req.raw_uri,
http_req.content_length, status)
if type(buf) == unicode:
buf = buf.encode('utf-8')
logger.log(logging.INFO, buf)
def make_specific_loggers(self, facility_id, rid=None, resource_name=None, invoker=None):
"""
Create facility specific logging functions.
Specify the facility only once, and have two logging
function returned to you (one for normal log messages, one
for exceptions), just for your facility. The returned
functions only requires the message or exception as a
parameter.
Note that the returned logger for exceptions always has
a log-level of ERROR. Thus, the log-level does not need
to be specified when calling this returned function.
@param facility_id: The facility ID.
@type facility_id: str
@param rid: The unique ID of a pipeline. If this is not
set (None), then a '-' will be logged in its
place.
@type rid: str
@param resourceName: Resource name if this logger is for a member of
executing pipeline. If this is not set (None), then
a '-' will be logged in its place.
@type resourceName: str
@param invoker: If this for an executing resource, then its invoker
(if known) can be specified here If not, "-" is
written to log file.
@type invoker: str
@return: A tuple of facility (and/or pipeline)-specific logging
functions, which only require the message as
a parameter (and the log level for the normal
logger, and the exception for the exception logger). The
first element in the tuple is the logger for normal messages,
the second element is the logger for exceptions. The
third element is a logging function for HTTP accesses.
"""
if facility_id is None:
facility_id = ""
elif '"' in facility_id:
raise SnapFormatError("No quotes are allowed in facility ID strings: %s" % facility_id)
mlogger = self.get_facility_logger(MESSAGE_LOG, facility_id)
elogger = self.get_facility_logger(EXCEPTION_LOG, facility_id)
alogger = self.get_facility_logger(ACCESS_LOG, facility_id)
mesg_logger = functools.partial(self.generic_log, mlogger, facility_id = facility_id, rid = rid)
exception_logger = functools.partial(self.generic_exception_log, elogger, mlogger,
facility_id = facility_id, rid = rid, resource_name = resource_name,
invoker = invoker)
access_logger = functools.partial(self.access_log, alogger)
return (mesg_logger, exception_logger, access_logger)
def get_facility_logger(self, log_type, facility_id=""):
"""
Get the python logging object for specified stream.
@param log_type: Specifies whether the stream in MESSAGE, ACCESS or EXCEPTION
@type log_type: str
@param facility_id: The specific facility.
@type facitlity_id: str
@return: The python logging object.
"""
s = "%s.%s" % (self._process, log_type)
if log_type == MESSAGE_LOG:
if facility_id:
s += "." + facility_id
elif log_type == EXCEPTION_LOG:
if facility_id:
s += "." + facility_id
elif log_type == ACCESS_LOG:
if facility_id:
s += "." + facility_id
else:
raise SnapValueError("No logging stream named %s" % log_type)
return logging.getLogger(s)
def write_log_out(self, log_type, out, params):
"""
Process request to read messages from a specified log type with possibly, specified filter values.
@param log_type: Specified type of log file (one of ACCESS_LOG, MESSAGE_LOG, EXCEPTION_LOG)
@type log_type: str
@param out: The output stream to write to.
@type out RP writer
@param params: Dictionary with filtering params.
@type params: dict
"""
if log_type == ACCESS_LOG:
fname = self._accesslogfile_name
accept_levels = None # No levels for access file
rid = None
facility_pass = None
facility_deny = None
elif log_type == MESSAGE_LOG:
fname = self._logfile_name
elif log_type == EXCEPTION_LOG:
fname = self._excp_logfile_name
else:
raise SnapObjNotFoundError("No log file named '%s' is known." % log_type)
# See if we need to do filtering.
s = set(params.keys()) & set(("rid", "accept_levels", "start_date", "end_date",
"facility_pass", "facility_deny"))
do_filtering = len(s) > 0
# Log rotation causes rename of files. On Windows, you cannot rename a file when it is open.
# So, there is a theoretical possibility that if a log rotation occurs while we read from the
# log files here, then it will fail and throw all sorts of errors. To avoid this issue, we
# temporarily disable log rotation by setting maxBytes to 0, while we read from the log file.
with self._file_handler_map[log_type].lock:
self._file_handler_map[log_type].maxBytes = 0
try:
return self._filter_log_messages(fname, log_type, out, do_filtering,
max_lines = params.get("last_lines"),
rid = params.get("rid"),
accept_levels = params.get("accept_levels"),
start_date = params.get("start_date"),
end_date = params.get("end_date"),
facility_pass = params.get("facility_pass"),
facility_deny = params.get("facility_deny"))
finally:
with self._file_handler_map[log_type].lock:
# Always go to self.max_bytes for correct max bytes value, as that might have been
# reconfigured by the user while we were busy trolling the log files.
self._file_handler_map[log_type].maxBytes = self.max_bytes
def _filter_log_messages(self, fname, log_type, out, do_filtering, max_lines, rid, accept_levels, start_date,
end_date, facility_pass, facility_deny):
"""
Writes the a current log file to a specified output stream.
This will read the specified logfile (if it exists) and write the
contents of the log to the specified stream. A log file is specified
via a string. Either 'access', 'main', or 'exception'.
Before a line is written out, the specified filter_func function
is called. If the filter_func returns None, then the line will not
be written. The filter_func may also change the line before output.
@param fname: Log file name to be read.
@type fname: str
@param log_type: Name identifying the log file. This is either
'access', 'main' or 'exception'.
@type log_type: str
@param out: An output stream, or any object providing a write()
method.
@type out: Stream like object, providing write().
@param do_filtering: If True, the method needs to filter each log message.
@type do_filtering: bool
@param max_lines: Specifies the maximum number of lines from the
end that should be written. This is similar to
what the tail command does. If set to 0, all available
output will be written.
Note that this is applied AFTER all other filters,
not to the initial, raw log file.
@type max_lines: integer
@param rid: The resource ID of the pipeline we want to filter on, or
None. It can also be set to "-", in which case it will
match all log file entries with a valid RID in the proper
location in the log line.
@type rid: str
@param accept_levels: A list containing the string representations of the
the log levels we are interested in.
@type accept_levels: list
@param start_date: The oldest messages we want to display, specified as
seconds from epoch.
@type start_date: int
@param end_date: The youngest messages we want to display, specified as
seconds from epoch
@type end_date: int
@param facility_pass: A list containing all the allowed facilities.
@type facility_pass: list
@param facility_deny: A list containing specifically those facilities we don't want to allow.
@type facility_deny: list
"""
if max_lines is None:
max_lines = 9999999999999999
# ---------------------------------------------------------
# A small helper function, which we use to write a line
# into the output stream or the output ring-buffer. We
# need to do this writing in two different places, so
# putting it into this little function here makes sense.
# But since really nobody else has any use for it, there
# is also no point in making it available outside of
# this function.
def out_write(line):
if max_lines == 0:
# Don't need to deal with messy buffers
# when someone just wants the entire output.
out.write(line)
else:
# Someone only wants the last 'max_lines' lines,
# so we implement a simple fixed length queue here.
out_list.append(line)
if len(out_list) > max_lines:
del out_list[0]
# ---------------------------------------------------------
with codecs.open(fname, "r", "utf-8") as lfile:
out_list = []
have_line_already = False
while True:
if not have_line_already:
line = lfile.readline()
else:
have_line_already = False
if not line:
break
# The interesting log lines all start with a purely numeric timestamp.
# This is 14 characters long.
if do_filtering:
# If filtering is defined then we can only allow well-formed lines
# through. Continuation lines from exceptions are handled in a separate
# code block.
# The format of exception and main log entries are along these lines:
# <time> <log level> <logger id> <facility> <message id> <rid> <resource name> <invoker> <message>
# Of these, <resource name> will either be a "-" or a quote enclosed string like
# "Netsuite Reader". Note how this value can have white spaces in it. This is why
# it is quoted. For our processing, we are only interested upto the field <rid>. So,
# the presence of spaces in resource name does not impact us.
fields = line.split()
is_message = False
if len(fields) >= 8:
try :
date_field = time.strptime(fields[0], TIME_STAMP_FORMAT)
date_field = time.mktime(date_field)
is_message = True
except ValueError:
pass # So, its not a message, the flag stays False.
if is_message:
# Perform the filtering of the line. There are a bunch of parameters
# to consider, one by one.
try:
# Start and end date
if (start_date is not None and date_field < start_date) or \
(end_date is not None and date_field > end_date):
continue
# Log levels (if it's None then this filter is pass)
if accept_levels and fields[1] not in accept_levels:
continue
# Facilities
if (facility_pass and fields[3] not in facility_pass) or (facility_deny and
fields[3] in facility_deny):
continue
# RID
if rid:
# The RID is a dotted decimal value, where each resource inside a pipeline
# uses the the pipe rid as its prefix, separated by a dot. For example:
# If pipeline foo has rid 12345 and this pipeline has executed CsvRead and CsvWrite
# inside it, then the rid of CsvRead is along the lines of 12345.NNNN and CsvWrite
# is 12345.MMMM, where NNNN != MMMM. The get_log() request is supposed to return
# log messages for both the pipeline and all resources inside it, so we search for
# rids with prefix "12345.".
rid_fields = fields[5].split(".")
if rid != rid_fields[0]:
continue
except Exception, e:
print e
continue
else:
continue
out_write(line)
if log_type in [ "exception", "main" ]:
# Exception and main log entries can be multi line. So, we have to
# read all the other lines that belong to this log
# entry as well. Fortunately, they are easily recognized,
# since all the 'base' log lines are starting with a date
# and therefore a specific format. The 'other' lines in the
# log never start with a number.
while True:
line = lfile.readline()
is_message = False
if len(line) > 19:
try :
time.strptime(line[0:19], TIME_STAMP_FORMAT)
is_message = True
except ValueError, e:
pass # Still reading more lines from the current log entry
elif not line:
# No more lines to read. Indicate end of message
is_message = True
if is_message:
# We have detected beginning of a new log entry and read its first line already
have_line_already = True
break
else:
out_write(line)
# We haven't written anything yet, if we assembled an output
# list because a max_lines parameter was specified. So we
# write that out now.
if out_list:
for l in out_list:
out.write(l)
|