# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: data_tracing.py 9377 2009-10-21 17:19:09Z dhiraj $
"""
Data Tracing and Debugging
This module provides tools for tracing data flow through a pipeline and capturing it to trace files.
"""
from __future__ import with_statement
import csv
import errno
import os
import threading
import codecs
from snaplogic.common.snap_exceptions import *
from snaplogic.common.config import snap_config
class DataTracer(object):
"""
Traces data into a file.
Traces record or binary data into a trace file for debugging or monitoring. Used for pipeline
tracing feature.
"""
INPUT_VIEW = 'in'
"""
Constant specifies the view being traced is an input view.
"""
OUTPUT_VIEW = 'out'
"""
Constant specifies the view being traced is an output view.
"""
def __init__(self):
"""
Initialization.
"""
super(DataTracer, self).__init__()
self._file = None
self._record_writer = None
self._closed = False
self._lock = threading.Lock()
"""
The python interpreter actually crashes when one thread closes the csv file while
another thread is writing to it. For this reason, all access to the file is protected
with this lock.
"""
def _open_trace_file(self, pipeline_id, resource_id, view_type, view_name):
"""
Opens a trace file and returns it.
The file name is based on the parameters passed in. See begin_record_tracing() or
begin_binary_tracing() for more info.
@param pipeline_id: The pipeline runtime ID unique for this run.
@type pipeline_id: str
@param resource_id: The fully qualified resource ID.
@type resource_id: str
@param view_name: Name of the resource view being traced.
@type view_name: str
@param view_type: Type of view being traced.
@type view_type: str
@raise SnapNativeError: There was an error opening the tracefile.
@raise SnapValueError: The view type given is invalid.
"""
# Get the log directory from the config file.
config = snap_config.get_instance()
log_dir = config.get_section('cc')['log_dir']
traces_dir = log_dir + '/traces'
if not os.path.isdir(traces_dir):
try:
os.mkdir(traces_dir)
except Exception, e:
raise SnapException.chain(e,
SnapNativeError("Error creating global traces directory '%s': %s" %
(traces_dir, str(e))))
dir_name = '%s/%s' % (traces_dir, pipeline_id)
if not os.path.isdir(dir_name):
try:
os.mkdir(dir_name)
except Exception, e:
SnapException.chain(e,
SnapNativeError("Error creating pipeline trace directory '%s': %s" %
(dir_name, str(e))))
if view_type is self.INPUT_VIEW:
ext = 'in'
elif view_type is self.OUTPUT_VIEW:
ext = 'out'
else:
raise SnapValueError("Invalid tracing view type: %s.", repr(view_type))
try:
# If we have a resource_id include it in the filename,
# otherwise generate the filename based on view name only.
if resource_id:
filename = '%s/%s.%s.%s' % (dir_name, resource_id, view_name, ext)
else:
filename = '%s/%s.%s' % (dir_name, view_name, ext)
return open(filename, 'wb')
except Exception, e:
raise SnapException.chain(e,
SnapNativeError("Error creating tracefile '%s': %s" %
(filename, str(e))))
def begin_record_tracing(self, pipeline_id, resource_id, view_type, view_name):
"""
Beginning tracing records on a resource view.
Opens a trace file where the data from trace_record() or trace_bytes() method calls will
be written. A directory named after the pipeline_id parameter is created in the logs directory.
The filename created within that directory will take the form of "${resource_id}.${view_name}.log".
The resource_id parameter should be a fully-qualified name that conveys the location of the current
resource within the complete running pipeline's hierarchy. For instance, if the resource 'reader1' is
located in a pipeline called 'inner_pipe' that is also used in the running pipeline called 'outer_pipe',
the complete name of the resource should be 'outer_pipe.inner_pipe.reader1'.
The view_type paramter must be either DataTracer.INPUT_VIEW or DataTracer.OUTPUT_VIEW and specifies the type
of view being traced.
@param pipeline_id: The pipeline runtime ID unique for this run.
@type pipeline_id: str
@param resource_id: The fully qualified resource ID.
@type resource_id: str
@param view_type: Type of view being traced.
@type view_type: str
@param view_name: Name of the resource view being traced.
@type view_name: str
@raise SnapNativeError: There was an error opening the tracefile.
"""
with self._lock:
self._file = self._open_trace_file(pipeline_id, resource_id, view_type, view_name)
self._record_writer = csv.writer(self._file)
def begin_binary_tracing(self, pipeline_id, resource_id, view_type, view_name):
"""
Beginning tracing binary data on a resource view.
Opens a trace file where the data from trace_record() or trace_bytes() method calls will
be written. A directory named after the pipeline_id parameter is created in the logs directory.
The filename created within that directory will take the form of "${resource_id}.${view_name}.log".
The resource_id parameter should be a fully-qualified name that conveys the location of the current
resource within the complete running pipeline's hierarchy. For instance, if the resource 'reader1' is
located in a pipeline called 'inner_pipe' that is also used in the running pipeline called 'outer_pipe',
the complete name of the resource should be 'outer_pipe.inner_pipe.reader1'.
The view_type paramter must be either DataTracer.INPUT_VIEW or DataTracer.OUTPUT_VIEW and specifies the type
of view being traced.
@param pipeline_id: The pipeline runtime ID unique for this run.
@type pipeline_id: str
@param resource_id: The fully qualified resource ID.
@type resource_id: str
@param view_name: Name of the resource view being traced.
@type view_name: str
@raise SnapNativeError: There was an error opening the tracefile.
"""
with self._lock:
self._file = self._open_trace_file(pipeline_id, resource_id, view_type, view_name)
def trace_record(self, record):
"""
Log the record into the tracefile if enabled.
@param record: Record of data read from stream.
@type record: list
@param view_type: Type of view being traced.
@type view_type: str
@raise SnapValueError: Tracing is not active.
"""
with self._lock:
if self._file is None:
if not self._closed:
raise SnapValueError("Tracing is not active.")
else:
# The csv module does some unicode incompatible operations. So, just opening a codec based file
# pointer and passing it to the csv module does not work. The unicode strings passed to csv
# needs to be encoded to utf-8.
l = []
for x in record:
if isinstance(x, unicode):
l.append(x.encode('utf-8'))
else:
l.append(x)
self._record_writer.writerow(l)
def trace_bytes(self, data):
"""
Log the binary data into the tracefile if enabled.
@param data: A string of data read from the stream.
@type data: str
@raise SnapValueError: Tracing is not active.
"""
with self._lock:
if self._file is None:
if not self._closed:
raise SnapValueError("Tracing is not active.")
else:
self._file.write(data)
def end_tracing(self):
"""
End tracing and close the trace file.
"""
with self._lock:
if self._file is not None:
self._file.close()
self._file = None
self._record_writer = None
self._closed = True
|