Join.py :  » Development » SnapLogic » snaplogic » components » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » SnapLogic 
SnapLogic » snaplogic » components » Join.py
# $SnapHashLicense:
# 
# SnapLogic - Open source data services
# 
# Copyright (C) 2008 - 2009, SnapLogic, Inc.  All rights reserved.
# 
# See http://www.snaplogic.org for more information about
# the SnapLogic project. 
# 
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
# 
# "SnapLogic" is a trademark of SnapLogic, Inc.
# 
# 
# $
#    $Id: Join.py 10330 2009-12-24 22:13:38Z grisha $
"""
Join Module and Resource Definition

"""

import tempfile
import os
from decimal import Decimal
from datetime import datetime,timedelta
import time
from sqlite3 import dbapi2

import snaplogic.components as components
from snaplogic.common import snap_log,sqlite_iter
from snaplogic.common import version_info
from snaplogic.cc import component_api
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.cc import prop
from snaplogic.snapi_base import keys
from snaplogic.common.snap_exceptions import *
from snaplogic.common.data_types import SnapNumber,SnapString,SnapDateTime
from snaplogic.components.computils import parse_date_with_fractions

JOIN_FIELD_1 = "Join field 1"

JOIN_FIELD_2 = "Join field 2"

JOIN_EXPR = "Join expression"

JOIN_EXPRS = "Join expressions"

JOIN_FQN_FIELD = "Fully qualified field"

JOIN_OUTPUT_FIELD = "Output field"

JOIN_ALIAS = "Alias"

JOIN_ALIASES = "Aliases"

class Join(ComponentAPI):
    """
    Join.
    
    """
    
    api_version = '1.0'
    component_version = '1.3'
    
    capabilities = {
        ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT    : 2,
        ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT    : ComponentAPI.UNLIMITED_VIEWS,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT   : 1,
        ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT   : 1,
    }
    component_description = "Equijoin"
    
    component_label = "Join"
    
    component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/Join" % \
                                                        version_info.doc_uri_version
    
    def _adapt_Decimal(self, dec):
        return float(dec)

    def create_resource_template(self):
        """
        Create Join resource template.
        
        """
        join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join", 
                                       {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, 
                                       required=True)
        join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with", 
                                       {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
                                       required=True)
        join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
        join_expr[JOIN_FIELD_1] = join_field_1
        join_expr[JOIN_FIELD_2] = join_field_2
        join_exprs = prop.ListProp("Join expressions", join_expr, min_size = 1, required=True)
        self.set_property_def(JOIN_EXPRS, join_exprs)
        
        join_fqn_field = prop.SimpleProp(JOIN_FQN_FIELD, SnapString, "Fully-qualified input field",
                                         {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, required=True)
        join_output_field = prop.SimpleProp(JOIN_OUTPUT_FIELD, SnapString, "Corresponding output field",
                                            {'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, required=True)
        join_alias = prop.DictProp(JOIN_ALIAS, join_field_1, "Alias", 2, required=False)
        join_alias[JOIN_FQN_FIELD] = join_fqn_field
        join_alias[JOIN_OUTPUT_FIELD] = join_output_field
        join_aliases = prop.ListProp("Aliases", join_alias)
        self.set_property_def(JOIN_ALIASES, join_aliases)


    def _has_field(self, view, field_name):
        """
        Given a view definition, return True if field_name is a valid field.
        """
        for field_def in view[keys.VIEW_FIELDS]:
            if field_def[keys.FIELD_NAME] == field_name:
                return True
        return False
        
    def validate(self, err_obj):
        """
        Validate the Join resource for correctness.
        """
        res_ok = True
        input_view_names = self.list_input_view_names()
        output_views = self.list_output_view_names()
        output_view_name = output_views[keys.SINGLE_VIEW]

        # Check that each alias is valid with no duplicates
        alias_output_field_names = []

        # Since the alias specs are not required we must not presume any are present.
        alias_specs = self.get_property_value(JOIN_ALIASES)
        if not alias_specs:
            alias_specs = []
        
        for i, alias in enumerate(alias_specs):
            alias_fqn = alias[JOIN_FQN_FIELD]
            alias_output_field_name = alias[JOIN_OUTPUT_FIELD]
            # Again, parameters make life difficult here... 
            if not component_api.has_param(alias_output_field_name):
                err_idx = err_obj.get_property_err(JOIN_ALIASES)[i][JOIN_OUTPUT_FIELD]
                if alias_output_field_name in alias_output_field_names:
                    res_ok = False
                    err_idx.set_message("Duplicate alias output field name '%s'." % alias_output_field_name)
                else:
                    alias_output_field_names.append(alias_output_field_name)
    
          
        # Check that the output view definition is valid.
        # We could throw false positives here if the user did wild things
        # with parameters...  
        output_view = self.get_output_view_def(output_view_name)
        output_view_field_names = [ d[keys.FIELD_NAME] for d in output_view[keys.VIEW_FIELDS] ]
        # Subtract the set of alias fields 
        remainder = [field for field in output_view_field_names if field not in alias_output_field_names]
        # Now check that each remaining field exists only once in any of the input views
        # If we don't find it we can't output it.
        # If we find it more than once then we don't know which one was intended and we require an alias.
        for field in remainder:
            view_list = []
            for input_view in input_view_names:
                if self._has_field(self.get_input_view_def(input_view), field):
                    view_list.append(input_view)
            ref_count = len(view_list)
            if ref_count == 0:
                res_ok = False
                err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][output_view_field_names.index(field)].set_message(
                    "Output view field '%s' is neither an input view field name nor alias output field name." % field)
            elif ref_count > 1:
                res_ok = False
                view_list.sort()
                found_in_views = ', '.join(view_list)
                err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][output_view_field_names.index(field)].set_message(
                    "Output view field '%s' is ambiguous (found in views %s) and requires an alias." % (field, found_in_views))

        # We can't perform the remaining checks until the user has fixed the issues.
        if not res_ok:
            return

        # Now chase the output field types to make sure they match.
        output_view_fields = output_view[keys.VIEW_FIELDS]
        for i, output_field in enumerate(output_view_fields):
            output_field_name = output_field[keys.FIELD_NAME]
            output_field_type = output_field[keys.FIELD_TYPE]
            # Follow the output field through any alias.
            alias_found = False
            for alias in alias_specs:
                if output_field_name == alias[JOIN_OUTPUT_FIELD]:
                    input_alias = alias[JOIN_FQN_FIELD].split('.')
                    input_view_name = input_alias[0]
                    input_field_name = input_alias[1]
                    alias_found = True
                    break                    

            # If we didn't find it in the alias list, we have to hunt for it in the input views
            if not alias_found:
                for input_view_name in input_view_names:
                    if self._has_field(self.get_input_view_def(input_view_name), output_field_name):
                        input_field_name = output_field_name
                        break

            # We should have an input_view_name now
            input_view = self.get_input_view_def(input_view_name)
            for input_field in input_view[keys.VIEW_FIELDS]:
                if input_field_name == input_field[keys.FIELD_NAME]:
                    if output_field_type != input_field[keys.FIELD_TYPE]:
                        err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
                            "Output view field '%s' type '%s' does not match corresponding input view '%s' field '%s' type '%s'." \
                            % (output_field_name, output_field_type, input_view_name, input_field[keys.FIELD_NAME], 
                               input_field[keys.FIELD_TYPE]))

                                
 
    def _process_record(self, data, input_view, list_of_active_input_views):
        """
        Processes a record from incoming views. The order doesn't matter, it just stores records. 
        When all input views are exhausted, L{_send_output} is called to send the result of the join.
        
        See L{ComponentAPI.process_input_views}. 
        
        """
        
        view_name = input_view.name
        if data:
            insert_stmt = self._insert_stmts[view_name]
            vals = [data[field] for field in input_view.field_names] 
            self._cursor.execute(insert_stmt, vals)
        else:
            self._finished_views[view_name] = True
            for is_done in self._finished_views.values():
                if not is_done:
                    return
            self._send_output()
    
    def _send_output(self):
        """
        Sends the output.
        
        """
        output_view = self._output_views.values()[keys.SINGLE_VIEW]
        out_fields = ([field[keys.FIELD_NAME] for field in output_view.fields])
        
        aliases = self.get_property_value(JOIN_ALIASES)
        alias_dict = {}
        for alias in aliases:
            fqn_field = alias[JOIN_FQN_FIELD]
            output_field = alias[JOIN_OUTPUT_FIELD]
            # Need to quote all strings to avoid clashes with sqlite keywords
            fqn_field_quotedl = ["\"" + elem + "\"" for elem in fqn_field.split('.')]
            fqn_field_quoted = ".".join(fqn_field_quotedl) 
            alias_dict[output_field] = fqn_field_quoted
        
        select_fields = []
        for out_field in out_fields:
            if alias_dict.has_key(out_field):
                select_fields.append(alias_dict[out_field])
            else:
                select_fields.append("\"" + out_field + "\"")
        
        select_stmt = "SELECT " 
        select_stmt += ','.join(select_fields)
        from_list = ["\"" + expr + "\"" for expr in self._input_views.keys()]
        select_stmt += " FROM " + ','.join(from_list)
        select_stmt += " WHERE "
        
        join_exprs = self.get_property_value(JOIN_EXPRS)
        where_list = []
        for join_expr in join_exprs:
            # Need to quote all strings to avoid clashes with sqlite keywords
            j1list = (join_expr[JOIN_FIELD_1]).split('.')
            j1quoted = ["\"" + elem + "\"" for elem in j1list]
            j1 = ".".join(j1quoted)
            j2list = (join_expr[JOIN_FIELD_2]).split('.')
            j2quoted = ["\"" + elem + "\"" for elem in j2list]
            j2 = ".".join(j2quoted)
            where_list.append(j2 + "=" + j1)
        select_stmt += " AND ".join(where_list)

        self._cursor.execute(select_stmt)

        output_view_fields = output_view.fields
        types_list = [output_field[keys.FIELD_TYPE] for output_field in output_view_fields]

        for row in sqlite_iter(self._cursor):
            i  = 0
            out_rec = output_view.create_record()
            for field in out_rec.field_names:
                if row[i] is None:
                    out_rec[field] = row[i]
                elif types_list[i] == SnapNumber:
                    out_rec[field] = Decimal(str(row[i]))
                elif types_list[i] == SnapDateTime:
                    out_rec[field] = parse_date_with_fractions(row[i])
                else:
                    out_rec[field] = row[i]
                i += 1        
            output_view.write_record(out_rec)
        output_view.completed()
    
    def execute(self, input_views, output_views):

        # So before we go too far, we need 1 output view connected
        # and all the defined input views connected.
        try:
            output_view = output_views.values()[keys.SINGLE_VIEW] 
        except IndexError:
            raise SnapComponentError("No output view connected.")
        # We don't need a try/except here because we will probe delicately.
        connected_input_views = input_views.keys()
        defined_input_views = self.list_input_view_names()
        for view in defined_input_views:
            if view not in connected_input_views:
                raise SnapComponentError("Not all input views connected.")

        try:
            self._db_file = None
            self._db_file_name = None
            self._cursor = None
            self._con = None
            self._finished_views = {}
            self._input_views = input_views
            self._output_views = output_views
            
            # Create a temp file to hold the SQLite database.
            # Note that mkstemp opens the file as well, which we don't need,
            # so close the temp file after it's been created.  
            (self._db_file, self._db_file_name) = tempfile.mkstemp(".db","snapjoin")
            os.close(self._db_file)
            
            self._con = sqlite.connect(self._db_file_name)
            self._cursor = self._con.cursor()
            sqlite.register_adapter(Decimal, float)
            self._fqn_fields = []
            
            view_process_dict = {}
            self._insert_stmts = {}
            for input_view_name in self._input_views.keys():
                input_view = self._input_views[input_view_name]
                view_process_dict[input_view] = self._process_record
                input_view_fields = input_view.fields
                self._finished_views[input_view_name] = False
                field_clause = ",".join(["\""+field[keys.FIELD_NAME]+"\"" for field in input_view.fields])
                value_clause = ",".join(['?' for i in input_view.fields])            
                self._insert_stmts[input_view_name] = "INSERT INTO %s (%s) VALUES (%s)" % ("\""+input_view_name+"\"", field_clause, value_clause)
                                                                            
                stmt = "CREATE TABLE \"%s\" (" % input_view_name
                first = True
                for input_field in input_view_fields:
                    if not first:
                        stmt += ", "
                    else:
                        first = False
                    field = input_field[keys.FIELD_NAME]
                    field_type = input_field[keys.FIELD_TYPE]
                    stmt += "\"" + field + "\"" + " "
                    self._fqn_fields.append(input_view_name + "." + field)
                    if field_type == SnapString:
                        stmt += "TEXT"
                    elif field_type == SnapNumber:
                        stmt += "DECIMAL"
                    elif field_type == SnapDateTime:
                        stmt += 'DATETIME'
                    else:
                        raise SnapObjTypeError('Unknown type %s', field_type)
                stmt += ")"
                self._cursor.execute(stmt)
                    
            self.process_input_views(view_process_dict)
        finally:
            self._cleanup()
    
    def _cleanup(self):
        """
        Clean up resources...
        
        """
        
        if self._cursor:
            try:
                self._cursor.close()
            except:
                pass
        if self._con:
            try:
                self._con.close()
            except:
                pass
        if self._db_file_name:
            try:
                os.remove(self._db_file_name)
            except:
                pass

    def upgrade_1_0_to_1_1(self):
        """
         Add LOV constraints 
        """
        # Save property values.
        # We need to recreate the properties, which resets their values.
        join_specs_value = self.get_property_value(JOIN_EXPRS)
        alias_specs_value = self.get_property_value(JOIN_ALIASES)

        # Delete the properties before recreating them
        self.del_property_def(JOIN_EXPRS)
        self.del_property_def(JOIN_ALIASES)
        
        # Redefine properties to include the constraint.
        # This is copy and paste code from create_resource_template for version 1.0.
        # The reason we cannot call create_resource_template directly here is 
        # that in a future version of this component the body of create_resource_template
        # may change, so the invocation will no longer be correct.
        # So we simply copy the code from version 1.0 create_resource_template here.
        join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join", 
                                       {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, 
                                       required=True)
        join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with", 
                                       {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
                                       required=True)
        join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
        join_expr[JOIN_FIELD_1] = join_field_1
        join_expr[JOIN_FIELD_2] = join_field_2
        join_exprs = prop.ListProp("Join expressions", join_expr, required=True)
        self.set_property_def(JOIN_EXPRS, join_exprs)
        
        join_fqn_field = prop.SimpleProp(JOIN_FQN_FIELD, SnapString, "Fully-qualified input field",
                                         {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] })
        join_output_field = prop.SimpleProp(JOIN_OUTPUT_FIELD, SnapString, "Corresponding output field",
                                            {'lov': [ keys.CONSTRAINT_LOV_OUTPUT_FIELD] })
        join_alias = prop.DictProp(JOIN_ALIAS, join_field_1, "Alias", 2, required=True)
        join_alias[JOIN_FQN_FIELD] = join_fqn_field
        join_alias[JOIN_OUTPUT_FIELD] = join_output_field
        join_aliases = prop.ListProp("Aliases", join_alias)
        self.set_property_def(JOIN_ALIASES, join_aliases)
        
        # End copy and paste (create_resource_template for version 1.0)
        
        # Restore the value
        self.set_property_value(JOIN_EXPRS, join_specs_value)
        self.set_property_value(JOIN_ALIASES, alias_specs_value)
        
    def upgrade_1_1_to_1_2(self):
        """ Add size constraint on join expressions """
        saved_join_exprs = self.get_property_value(JOIN_EXPRS)
        
        # Recreate join expressions property
        join_field_1 = prop.SimpleProp(JOIN_FIELD_1, SnapString, "Field to join", 
                                       {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] }, 
                                       required=True)
        join_field_2 = prop.SimpleProp(JOIN_FIELD_2, SnapString, "Field to join with", 
                                       {'lov': [ keys.CONSTRAINT_LOV_INPUT_FIELD] },
                                       required=True)
        join_expr = prop.DictProp(JOIN_EXPR, join_field_1, "Join expression", 2, required=True)
        join_expr[JOIN_FIELD_1] = join_field_1
        join_expr[JOIN_FIELD_2] = join_field_2
        join_exprs = prop.ListProp("Join expressions", join_expr, min_size = 1, required=True)
        self.set_property_def(JOIN_EXPRS, join_exprs)

        # Restore join expressions value
        self.set_property_value(JOIN_EXPRS, saved_join_exprs)
        
    def upgrade_1_2_to_1_3(self):
        """
        No-op upgrade only to change component doc URI during the upgrade
        which will be by cc_info before calling this method.
        
        """
        pass 
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.