DBLookup.py :  » Development » SnapLogic » snaplogic » components » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » DBLookup.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008 - 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $

#    $Id: DBLookup.py 10330 2009-12-24 22:13:38Z grisha $
"""
DBLookup component and resource definition.
"""

import re
from decimal import Decimal
from urlparse import urlparse

from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime,\
    Record
from snaplogic.common import version_info
import snaplogic.components as components
from snaplogic.components import DBUtils
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.components.DBComponent import DBComponent
from snaplogic.snapi_base import keys
import snaplogic.cc.prop as prop
from snaplogic import snapi

    
class DBLookup(DBComponent):
    """
    Component to perform a database lookup.

    This component can run an SQL query against a database using the input fields from a particular record as
    substitution values for the query.

    An example SQL statement with subsitution keywords embedded within it follows:
    SELECT * FROM table WHERE column = ${field}

    In this example query, '${field}' will be replaced with the value of the 'field' input field
    for each record that passes through this component.
    
    """
    
    api_version = '1.0'
    component_version = '1.2'
    
    capabilities = {
        ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT    : 1,
        ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT    : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_ALLOW_PASS_THROUGH        : True
        
    }
    
    component_description = "This component performs a database lookup."
    component_label = "DB Lookup"
    component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/DateDimension" % \
                                                        version_info.doc_uri_version
    def create_resource_template(self):
        self._create_common_db_props()
        self.set_property_def("SQLStmt", 
                              prop.SimpleProp("SQL statement", 
                                              SnapString,
                                              "The SQL statement to be executed while processing records",
                                              None,
                                              True))
        self.set_property_def("IgnoreMultipleRows", 
                              prop.SimpleProp("Ignore multiple rows", 
                                              "boolean",
                                              "A boolean flag indicating if multiple rows should be ignored."
                                              " If not ignored, they will generate an exception.",
                                              None,
                                              True))
        self.set_property_value("IgnoreMultipleRows", False)


    def validate(self, err_obj):
        """
        All the properties are marked as required so validate doesn't have to worry about checking those.
        We do need to check that the SQL statement columns map correctly to our input and output fields.
        """
        # DBConnect: Just check that this looks like a URI that begins with http:// or /
        self._common_db_validate(err_obj)
        # SQLStmt: Just check that this is a select.
        val = self.get_property_value("SQLStmt") 
        if not component_api.has_param(val):
            if not val.strip().lower().startswith('select'):
                err_obj.get_property_err("SQLStmt").set_message("SQL statement must be a SELECT statement.")
            else:
                # SQLStmt: check that that any input.fieldname variables in the SQLStmt match field names in the
                # input view.
                bind_cols = self.get_referenced_fields(val, True)
                input_views = self.list_input_view_names()
                input_view = self.get_input_view_def(input_views[keys.SINGLE_VIEW])
                input_view_fields = [ d[keys.FIELD_NAME] for d in input_view[keys.VIEW_FIELDS] ]
                for bind_col in bind_cols:
                    if bind_col not in input_view_fields:
                        err_obj.get_property_err("SQLStmt").set_message("Input field(s) '%s' not present in input view." % bind_col)
        
    
    def upgrade_1_0_to_1_1(self):
        self._upgrade_field_refs("SQLStmt")
        self._upgrade_to_use_refs()
    
    def upgrade_1_1_to_1_2(self):
        """
        No-op upgrade only to change component doc URI during the upgrade
        which will be by cc_info before calling this method.
        
        """
        pass

    def execute(self, input_views, output_views):
        # Connect to database and get a cursor
        
        try:
            output_view = output_views.values()[keys.SINGLE_VIEW] 
        except IndexError:
            raise SnapComponentError("No output view connected.")
        try:
            input_view = input_views.values()[keys.SINGLE_VIEW] 
        except IndexError:
            raise SnapComponentError("No input view connected.")
        connect_resdef = self.get_referenced_resdef("DBConnect")
        if not connect_resdef:
          connect_uri = self.get_property_value("DBConnect")
          connect_resdef = self.get_local_resource_object(connect_uri)
        
        self._db = None
        try:
            self._db = DBUtils.get_connection_from_resdef(connect_resdef)
            self._cursor = self._db.cursor()
            self._cursor.arraysize = 1
            self._query = self.get_property_value('SQLStmt')
            self._ignore_mult_rows = self.get_property_value('IgnoreMultipleRows')
            
            self._fields_to_replace = self.get_referenced_fields(self._query)
            self._bind_vars = self._db.bindVariableList(self._fields_to_replace)
            replacements = {}
            for i in range(len(self._fields_to_replace)):
                replacements[self._fields_to_replace[i]] = self._bind_vars[i]
            self._query = self.replace_referenced_fields(self._query, replacements)
            
            record = input_view.read_record()
            if record is None:
                self._cursor.close()
                self._db.close()
                output_view.completed()
                return
            
            while record is not None:
                output_view.write_record(self._runQuery(record, output_view))
                record = input_view.read_record()
            
            output_view.completed()
        finally:
            # The reason to not close cursor if got_stopped is that
            # at least in MySQL case, if we have 
            # a large data set remaining (see #2090), we'd still 
            # sit here and try to exhaust it. Now, this may
            # just be a MySQL special case, but still, since the 
            # connection object is not shared, and closing connection 
            # will close the cursor, we can just close the 
            # connection on got_stopped and not worry.
            try:
                self._db.close()
            except Exception, e:
                pass
  
    def _runQuery(self, record, output_view):
        """
        Run the query (after substitution) and return a data record for output.

        This may be overridden to handle the result set differently. For example, if no results are returned
        one might want to return a default set of values.

        @param record: An input record.
        @type record: snaplogic.common.DataTypes.Record

        @return: A record for output.
        @rtype: snaplogic.common.DataTypes.Record

        @raise SnapValueError: No result rows were returned or more than one row was returned and the
                               IgnoreMultipleRows property is False.
                                   
        """

        fake_rec = Record(None, None, None)
        fake_rec.view_name = record.view_name
        fake_rec.field_names = []
        for field_to_replace in self._fields_to_replace:
            if field_to_replace in fake_rec.field_names:
                continue
            fake_rec.field_names.append(field_to_replace)
        fake_rec.python_types_map = {}
        for field_name in self._fields_to_replace:
            fake_rec.python_types_map[field_name] = record.python_types_map[field_name]
            fake_rec[field_name] = record[field_name]
        bind_values = self._db.bindValueContainer(fake_rec)
        self._cursor.execute(self._query, bind_values)
        row = self._cursor.fetchone()
        if row is None:
            raise SnapValueError('Zero rows returned from SQL query')
        elif not self._ignore_mult_rows and (self._cursor.fetchone() is not None):
            raise SnapValueError('SQL query returned too many rows')
        out_rec = output_view.create_record()
        out_rec.transfer_pass_through_fields(record)
        field_num = 0
        # Process fields
        for field in row:
            field_name = out_rec.field_names[field_num]
            field_num += 1
            out_rec[field_name] = field

        return out_rec
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.