# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Compute.py 10330 2009-12-24 22:13:38Z grisha $
"""
Compute Module and Resource Definition.
Specify a snipped of Python code for execution on each
record that passes through.
"""
__docformat__ = "epytext en"
import imp
import tempfile
import os
from snaplogic.common.data_types import Record
from snaplogic.common.data_types import SnapNumber,SnapString,SnapDateTime
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.common.SnapReader import SnapReader
from snaplogic.snapi_base import keys
import snaplogic.cc.prop as prop
from snaplogic import components
from snaplogic.common.snap_exceptions import *
from snaplogic.components import FileUtils
# Public names
__all__ = [ "Compute" ]
class Compute(ComponentAPI):
"""
Compute module.
The compute function takes the following arguments:
compute_component -- the reference back to the Compute component that invoked
this function
inrec -- incoming record
outrec -- outgoing record
"""
api_version = '1.0'
component_version = '1.1'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH : True
}
component_description = "Allows the execution of user-defined Python code for processing of records."
component_label = "Compute"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Compute" % \
version_info.doc_uri_version
def validate_config_file(self):
"""If a config file is provided, then validate its contents."""
for k in self.config_dictionary:
if k == "root_directory":
if not os.path.exists(self.config_dictionary[k]):
raise SnapComponentConfigError("The path specified for root (%s) is not valid" %
self.config_dictionary[k])
else:
# No other config file param is supported.
raise SnapComponentConfigError("Unexpected config file entry (%s) encountered" % k)
def create_resource_template(self):
"""
Create Compute resource definition template.
"""
self.set_property_def('sourcecode',
prop.SimpleProp("Source code",
SnapString,
"Inline Python source code to be executed by the Compute component, if not using 'Module name'."))
self.set_property_def('modulename',
prop.SimpleProp("Module name",
SnapString,
"The name of an installed module to be executed by the Compute component, if not using 'Source code'."))
self.set_property_def('funcname',
prop.SimpleProp("Function name",
SnapString,
"The name of the function that is to be executed: f(component, inrec, outrec).",
None,
True))
def validate(self, err_obj):
"""
Validate the proposed resource definition for this component.
"""
source_code = self.get_property_value("sourcecode")
module_name = self.get_property_value("modulename")
func_name = self.get_property_value("funcname")
if not source_code and not module_name:
err_obj.get_property_err("modulename").set_message("Either 'sourcecode' or 'modulename' must be specified.")
err_obj.get_property_err("sourcecode").set_message("Either 'sourcecode' or 'modulename' must be specified.")
if source_code and module_name:
err_obj.get_property_err("modulename").set_message("'sourcecode' and 'modulename' are mutually exclusive. Only one can be specified.")
err_obj.get_property_err("sourcecode").set_message("'sourcecode' and 'modulename' are mutually exclusive. Only one can be specified.")
def _setup(self):
"""
Initialize.
Load the compute code.
"""
# Default root and override if configured.
self._compute_root = '/'
self._source_code = self.get_property_value('sourcecode')
self._module_name = self.get_property_value('modulename')
self._func_name = self.get_property_value('funcname')
# Handle of the file where compute function is stored
f = None
# Import the procedure function
# Check if we are taking source from resdef
try:
if self._source_code:
# Source is in resdef 'sourcecode' attribute
f = tempfile.NamedTemporaryFile(mode='w+a')
f.write(self._source_code)
f.flush()
# Seek to the beginning of the file because we pass this file handle
# into imp.load_module that will read from it
f.seek(0)
# Import as module
imp_mod = imp.load_module('', f.file, f.name, ('', '', imp.PY_SOURCE))
# Get the function or class that will be called
self._compute_func = getattr(imp_mod, self._func_name)
else:
path_name = FileUtils.get_file_location(self._module_name + '.py', self.config_dictionary)
# Validate filename URI scheme
error = FileUtils.validate_file_scheme(path_name, self.config_dictionary.get("schemes"), ['file'])
if error is not None:
raise SnapComponentError(error)
path_name = str(path_name)
f = open(path_name, 'r')
# Import the module
imp_mod = imp.load_source(self._module_name, path_name, f)
# Get the function or class that will be called
self._compute_func = getattr(imp_mod, self._func_name)
finally:
if f is not None:
# Close the file
f.close()
def execute(self, input_views, output_views):
"""
Perform the processing of input records.
"""
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connnected.")
try:
input_view = input_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No input view connected.")
self._setup()
out_rec = output_view.create_record()
record = input_view.read_record()
while record is not None:
self._compute_func(self, record, out_rec)
out_rec.transfer_pass_through_fields(record)
output_view.write_record(out_rec)
record = input_view.read_record()
output_view.completed()
def upgrade_1_0_to_1_1(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|