# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Exercise_2.py 9626 2009-11-06 19:42:18Z pamor $
"""
These files are the basis for the the SnapLogic tutorials.
The SnapLogic SnapAPI library allows you to drive the creation
of SnapLogic Resources and Pipelines conveniently and easily via
an easy to use language. This tutorial uses the Python language
and the Python version of the SnapLogic SnapAPI library.
For this tutorial, we successively develop a small sample application. We provide
several examples, each of which adds more functionality to the application.
Our sample application consists of the processing of (sales) leads data:
Reformatting, filtering, joining and sorting.
This represents a typical and simple data integration example one might encounter
in an enterprise, where data from different sources needs to be pulled together.
Exercise 1: Read file, Write file
Exercise 2: Insert a filter function into exercise 1
Exercise 3: Add a second data source and join into exercise 2
Exercise 4: Filter and sort based on specified critera
Exercise 5: Variation of Exercise 2. Introduction to pass-through.
This is Example 2: It utilizes the resources that were created by Exercise_1.py
and adds a filter in the middle of the pipeline.
"""
import sys, getopt, time
#
# SnapLogic related imports.
#
# For a more detailed explanation, please see the documentation in Exercise_1.py
#
from snaplogic import snapi
def usage():
"""Print usage information about this program."""
print "Usage:"
print " Exercise_2.py [-s <server_url>] [-c]"
print " -s <server_url> Specify the URL of the SnapLogic server to connect to."
print " -c Only create the resources, do not execute the pipeline."
def make_credentials(username):
"""Parse username/password argument and construct credentials token."""
if username is None:
return None
userpass = username.split('/')
if len(userpass) != 2:
print "Invalid or missing username/password argument '" + username + "'"
sys.exit(1)
return (userpass[0], userpass[1])
def print_resource_validate_info(error):
"""Print out useful information about resource validation errors and exit."""
print "Resource validation failed with the following error"
# This should be a call to some super snapi helper routine which unpacks errors nicely
print error
sys.exit(1)
# Default values for the required parameters
username = None # Default authorization is anonymous
server = 'http://localhost:8088'
create_only = 'no'
# Processing of command line arguments
try:
opts, args = getopt.getopt(sys.argv[1:], "s:u:hc", ["server=", "username=", "help", "create"])
except getopt.GetoptError, e:
print 'Options error:', e
usage()
sys.exit(1)
for o, a in opts:
if o in ("-s", "--server"):
server = a
elif o in ("-u", "--username"):
username = a
elif o in ("-c", "--create"):
create_only = 'yes'
elif o in ("-h", "--help"):
usage()
sys.exit()
# Construct the credentials if any were provided.
creds = make_credentials(username)
# We are reusing the two resources (leads and prospects) that were created
# by Exercise_1.py. In that program, the resource definitions which we had
# constructed in memory were saved to the server by calling the 'write_resource()'
# method on those resources.
#
# Here, we retreive those saved resource definitions.
# Each resource definition was saved using a specific URI. We can now
# access the resource again with read_resource() using the same URI.
leads_uri = server + '/SnapLogic/Tutorial/Exercise_1/Leads'
leads_res_def = snapi.get_resource_object(leads_uri, creds)
prospects_uri = server + '/SnapLogic/Tutorial/Exercise_1/Prospects'
prospects_res_def = snapi.get_resource_object(prospects_uri, creds)
# We are going to copy the prospects resource from exercise 1 and modify it
# so we will save a new version to this URI.
prospects_uri = server + '/SnapLogic/Tutorial/Exercise_2/Prospects'
# And again we provide a list of resource parameters, and specify their default
# values thus making them optional parameters.
prospects_res_def.define_param('OUTPUTFILE', 'file://tutorial/data/ex_2_prospects.csv')
prospects_res_def.define_param('DELIMITER', ',')
# Finally, we validate and save the resource
print 'Validating Prospects'
error = prospects_res_def.validate()
if error:
print_resource_validate_info(error)
prospects_res_def.save(prospects_uri)
print 'Resource Prospects resource saved'
# As mentioned in the opening comments, we now want to insert a filter component
# that influences the conversion of leads to prospects. We create the filter
# resource from scratch, using the standard filter component that is available
# in the server.
filter_leads_res_def = snapi.create_resource_object(server,'snaplogic.components.Filter', creds)
# We set the general properties that each resource has...
filter_leads_res_def.set_general_info('description', 'Lead Filter')
filter_leads_res_def.set_general_info('author', 'SnapLogic Tutorial')
# But each resource also may have additional properties that are specific to that
# particular resource type. The filter requires us to specify the column/field that
# we want to examine, the value that we are looking for and the condition that is
# used to compare the value in the column to the value we specify here. In this
# particular example we look for records that have the a value equal to 'CA' in
# their 'State' column.
filter_leads_res_def.set_property_value('Field', 'State')
filter_leads_res_def.set_property_value('Value', 'CA')
filter_leads_res_def.set_property_value('Comparison', '==')
# We need to define the input and output 'views' of the filter component. The 'views'
# are the 'record format' definitions for incoming and outgoing data. In this example,
# the record format is the same. See Exercise_1.py for more information.
in_out_view = (('First', 'string', 'First Name'),
('Last', 'string', 'Last Name'),
('Phone_w', 'string', 'Phone, Work'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip', 'string', 'Zip Code'))
filter_leads_res_def.add_record_input_view("Input1", in_out_view, "input")
filter_leads_res_def.add_record_output_view("Output1", in_out_view, "output")
# Perform sanity checking on the defined resource.
# Then save the resource to the server...
print 'Validating FilterLeads'
error = filter_leads_res_def.validate()
if error:
print_resource_validate_info(error)
filter_leads_uri = server + '/SnapLogic/Tutorial/Exercise_2/FilterLeads'
filter_leads_res_def.save(filter_leads_uri)
print 'Resource FilterLeads saved'
# Now that we have all three resources that we need (leads, filter_leads and
# prospects) we can begin to construct the pipeline itself through which the
# data will flow.
p = snapi.create_resource_object(server, snapi.PIPELINE, creds)
p.set_general_info('description', 'Exercise 2 Pipeline')
p.set_general_info('author', 'SnapLogic Tutorial')
# Add the resources to the pipeline.
p.add(leads_res_def, "Leads")
p.add(filter_leads_res_def, "FilterLeads")
p.add(prospects_res_def, "Prospects")
# To link resources in pipelines we need to define which output fields of
# a resource are linked to what input fields of the next down-stream resource.
# This is accomplished via a simple map.
#
# The output view field names of the leads resource happen to be the same as
# the input view field names of the filter.
link_leads_to_filter = (('First', 'First'),
('Last', 'Last'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip'),
('Phone_w', 'Phone_w'))
# The input field names of the prospects resource are different, though...
link_filter_to_prospects = (('First', 'First_Name'),
('Last', 'Last_Name'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip_Code'),
('Phone_w', 'Work_Phone'))
# Now that the mapping has been defined, we can link up the views of the various
# components
p.link_views('Leads', 'Output1', 'FilterLeads', 'Input1', link_leads_to_filter)
p.link_views('FilterLeads', 'Output1', 'Prospects', 'Input1', link_filter_to_prospects)
# Pipelines can take parameters. They are explained in more detail in the
# documentation for Exercise_1.py.
# Here, the pipeline parameters are mapped to the parameters of the
# individual resources. This is also explained in more detail in Exercise_1.py
p.map_param('LEADS', 'Leads', 'INPUTFILE', 'file://tutorial/data/leads.csv')
p.map_param('INPUT_DELIMITER', 'Leads', 'DELIMITER', ',')
p.map_param('PROSPECTS', 'Prospects', 'OUTPUTFILE', 'file://tutorial/data/ex_2_prospects.csv')
p.map_param('OUTPUT_DELIMITER', 'Prospects', 'DELIMITER', ',')
# Sanity checking...
# ... and save to the server so that it may be run or re-used.
print 'Validating pipeline'
error = p.validate()
if error:
print_resource_validate_info(error)
p_uri = server + '/SnapLogic/Tutorial/Exercise_2/CA_Prospects'
p.save(p_uri)
print 'Pipeline CA_Prospects saved'
if create_only == 'no':
# Here we execute the pipeline: It's definition is communicated to the
# server, resources are instantiated and informed of their parameters, their
# input and output views. As a result in this example, the 'leads.csv' file
# is read and the 'ex_1_prospects.csv' file is written.
#
# Note that the execute() method returns after all the resources of the
# pipeline have started to work. The pipeline execution itself has not
# necessarily completed when the method returns.
#
# In order to wait for the completion of the entire pipeline execution, we
# use the handle that is returned by execute() and call the
# get_status() method on it.
print "Starting pipeline."
handle = p.execute(None, None, None, "Exercise 2")
print "Waiting for completion of pipeline execution: "
while not handle.get_current_status(False).is_finished():
time.sleep(1)
print "Polling for completion of pipeline..."
print "All done!"
|