# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Exercise_4.py 9626 2009-11-06 19:42:18Z pamor $
"""
These files are the basis for the the SnapLogic tutorials.
The SnapLogic SnapAPI library allows you to drive the creation
of SnapLogic Resources and Pipelines conveniently and easily via
an easy to use language. This tutorial uses the Python language
and the Python version of the SnapLogic SnapAPI library.
For this tutorial, we successively develop a small sample application. We provide
several examples, each of which adds more functionality to the application.
Our sample application consists of the processing of (sales) leads data:
Reformatting, filtering, joining and sorting.
This represents a typical and simple data integration example one might encounter
in an enterprise, where data from different sources needs to be pulled together.
Exercise 1: Read file, Write file
Exercise 2: Insert a filter function into exercise 1
Exercise 3: Add a second data source and join into exercise 2
Exercise 4: Filter and sort based on specified critera
Exercise 5: Variation of Exercise 2. Introduction to pass-through.
This is Exercise 4: It builds on Exercise 3 and adds filtering and sorting of the
results to create a list of qualified prospects.
"""
import sys, getopt, time
#
# SnapLogic related imports.
#
# For a more detailed explanation, please see the documentation in Exercise_1.py
#
from snaplogic import snapi
def usage():
"""Print usage information about this program."""
print "Usage:"
print " Exercise_4.py [-s <server_url>] [-c]"
print " -s <server_url> Specify the URL of the SnapLogic server to connect to."
print " -c Only create the resources, do not execute the pipeline."
def make_credentials(username):
"""Parse username/password argument and construct credentials token."""
if username is None:
return None
userpass = username.split('/')
if len(userpass) != 2:
print "Invalid or missing username/password argument '" + username + "'"
sys.exit(1)
return (userpass[0], userpass[1])
def print_resource_validate_info(error):
"""Print out useful information about resource validation errors and exit."""
print "Resource validation failed with the following error"
# This should be a call to some super snapi helper routine which unpacks errors nicely
print error
sys.exit(1)
# Default values for the required parameters
username = None # Default authorization is anonymous
server = 'http://localhost:8088'
create_only = 'no'
# Processing of command line arguments
try:
opts, args = getopt.getopt(sys.argv[1:], "s:u:hc", ["server=", "username=", "help", "create"])
except getopt.GetoptError, e:
print 'Options error:', e
usage()
sys.exit(1)
for o, a in opts:
if o in ("-s", "--server"):
server = a
elif o in ("-u", "--username"):
username = a
elif o in ("-c", "--create"):
create_only = 'yes'
elif o in ("-h", "--help"):
usage()
sys.exit()
# Construct the credentials if any were provided.
creds = make_credentials(username)
# We are reusing the Leads, Filter, Census, Join and Prospects resource definitions,
# which we had created in Exercise 1, 2 and 3. We can simply read from the
# SnapLogic server.
leads_uri = server + '/SnapLogic/Tutorial/Exercise_1/Leads'
filter_leads_uri = server + '/SnapLogic/Tutorial/Exercise_2/FilterLeads'
census_uri = server + '/SnapLogic/Tutorial/Exercise_3/Census'
join_uri = server + '/SnapLogic/Tutorial/Exercise_3/Join'
prospects_uri = server + '/SnapLogic/Tutorial/Exercise_3/Prospects'
leads_res_def = snapi.get_resource_object(leads_uri, creds)
filter_leads_res_def = snapi.get_resource_object(filter_leads_uri, creds)
census_res_def = snapi.get_resource_object(census_uri, creds)
join_res_def = snapi.get_resource_object(join_uri, creds)
prospects_res_def = snapi.get_resource_object(prospects_uri, creds)
# We are going to save a modified version of the Prospects resource definition.
# Therefore, we specify a new location for it.
prospects_uri = server + '/SnapLogic/Tutorial/Exercise_4/Prospects'
# Define the resource parameters...
prospects_res_def.define_param('OUTPUTFILE', 'file://tutorial/data/ex_4_prospects.csv')
prospects_res_def.define_param('DELIMITER', ',')
# Check and save the new Prospects resource definition.
print 'Validating Prospects'
error = prospects_res_def.validate()
if error:
print_resource_validate_info(error)
prospects_res_def.save(prospects_uri)
print 'Resource Prospects saved'
# We need another Filter and a Sort Resources to finish up the Exercise...
# Create a Filter resource definition.
filter_prospects_res_def = snapi.create_resource_object(server, 'snaplogic.components.Filter', creds)
filter_prospects_res_def.set_general_info('description', 'Prospect filter')
filter_prospects_res_def.set_general_info('author', 'SnapLogic Tutorial')
# Column, value and operator for the filter condition need to be specified.
filter_prospects_res_def.set_property_value('Field', 'MoreThan50K')
filter_prospects_res_def.set_property_value('Value', '0.25')
filter_prospects_res_def.set_property_value('Comparison', '>=')
# The input and output views of the filter resource are identical.
filter_in_out_view = (('First', 'string', 'First Name'),
('Last', 'string', 'Last Name'),
('Phone_w', 'string', 'Phone, Work'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip', 'string', 'Zip Code'),
('MoreThan50K', 'number', 'Percentage of households greater than 50K'))
filter_prospects_res_def.add_record_input_view("Input1", filter_in_out_view, "input")
filter_prospects_res_def.add_record_output_view("Output1", filter_in_out_view, "output")
# Check and save of the new filter resource definition.
print 'Validating FilterProspects'
error = filter_prospects_res_def.validate()
if error:
print_resource_validate_info(error)
filter_prospects_uri = server + '/SnapLogic/Tutorial/Exercise_4/FilterProspects'
filter_prospects_res_def.save(filter_prospects_uri)
print 'Resource FilterProspects saved'
# We create the resource definition for a Sort component, which is one of the
# standard components that come with the SnapLogic server.
sort_prospects_res_def = snapi.create_resource_object(server, 'snaplogic.components.Sort', creds)
sort_prospects_res_def.set_general_info('description', 'Prospect sort')
sort_prospects_res_def.set_general_info('author', 'SnapLogic Tutorial')
# The input and output views of the sort resource are identical.
sort_in_out_view = (('First_Name', 'string', 'First Name'),
('Last_Name', 'string', 'Last Name'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip_Code', 'string', 'Zip Code'),
('Work_Phone', 'string', 'Phone, Work'),
('MoreThan50K', 'number', 'Percentage of households greater than 50K'))
sort_prospects_res_def.add_record_input_view( 'Input1', sort_in_out_view, 'input')
sort_prospects_res_def.add_record_output_view('Output1', sort_in_out_view, 'output')
# We determine the fields based on which the Sort component should sort the
# output. The highest priority field is listed first, along with the sort
# order, followed by the next highest priority field, and so on.
sort_prospects_res_def.set_property_value('Sort specs',
[{'Sort field' : 'MoreThan50K', 'Sort order' : 'desc'},
{'Sort field' : 'Zip_Code', 'Sort order' : 'asc' }])
# Validate and save...
print 'Validating SortProspects'
error = sort_prospects_res_def.validate()
if error:
print_resource_validate_info(error)
sort_prospects_uri = server + '/SnapLogic/Tutorial/Exercise_4/SortProspects'
sort_prospects_res_def.save(sort_prospects_uri)
print 'Resource SortProspects saved'
# Now we can construct the pipeline.
p = snapi.create_resource_object(server, snapi.PIPELINE, creds)
p.set_general_info('description', 'Exercise 4 Pipeline')
p.set_general_info('author', 'SnapLogic Tutorial')
# The resources are added to the pipeline.
p.add(leads_res_def, "Leads")
p.add(filter_leads_res_def, "FilterLeads")
p.add(census_res_def, "Census")
p.add(join_res_def, "Join")
p.add(filter_prospects_res_def, "FilterProspects")
p.add(sort_prospects_res_def, "SortProspects")
p.add(prospects_res_def, "Prospects")
# Each link between the resources requires us to specify the mapping of
# the output view fields to the input view fields.
filter_leads_link = (('First', 'First'),
('Last', 'Last'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip'),
('Phone_w', 'Phone_w'))
census_to_join_link = (('Zip', 'Zip'),
('LessThan10K', 'LessThan10K'),
('MoreThan10K', 'MoreThan10K'),
('MoreThan25K', 'MoreThan25K'),
('MoreThan50K', 'MoreThan50K'))
join_to_filter_link = (('First', 'First'),
('Last', 'Last'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip'),
('Phone_w', 'Phone_w'),
('MoreThan50K', 'MoreThan50K'))
filter_to_sort_link = (('First', 'First_Name'),
('Last', 'Last_Name'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip_Code'),
('Phone_w', 'Work_Phone'),
('MoreThan50K', 'MoreThan50K'))
sort_to_prospects_link = (('First_Name', 'First_Name'),
('Last_Name', 'Last_Name'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip_Code', 'Zip_Code'),
('Work_Phone', 'Work_Phone'),
('MoreThan50K', 'MoreThan50K'))
# With the mappings defined we can now specify the linkage between resources
# in the pipeline.
p.link_views('Leads', 'Output1', 'FilterLeads', 'Input1', filter_leads_link)
p.link_views('FilterLeads', 'Output1', 'Join', 'Input1', filter_leads_link)
p.link_views('Census', 'Output1', 'Join', 'Input2', census_to_join_link)
p.link_views('Join', 'Output1', 'FilterProspects', 'Input1', join_to_filter_link)
p.link_views('FilterProspects', 'Output1', 'SortProspects', 'Input1', filter_to_sort_link)
p.link_views('SortProspects', 'Output1', 'Prospects', 'Input1', sort_to_prospects_link)
# A pipeline can accept parameters...
# ... which are then mapped to the parameters of the individual resources.
p.map_param('LEADS', 'Leads', 'INPUTFILE', 'file://tutorial/data/leads.csv')
p.map_param('CENSUS', 'Census', 'INPUTFILE', 'file://tutorial/data/CAIncomeByZip.csv')
p.map_param('PROSPECTS', 'Prospects', 'OUTPUTFILE', 'file://tutorial/data/ex_4_prospects.csv')
p.map_param('INPUT1_DELIMITER', 'Leads', 'DELIMITER', ',')
p.map_param('INPUT2_DELIMITER', 'Census', 'DELIMITER', ',')
p.map_param('OUTPUT_DELIMITER', 'Prospects', 'DELIMITER', ',')
# We check and save the pipeline resource, so that we can execute it.
print 'Validating pipeline'
error = p.validate()
if error:
print_resource_validate_info(error)
p_uri = server + '/SnapLogic/Tutorial/Exercise_4/Filtered_Qual_CA_Prospects'
p.save(p_uri)
print 'Pipeline Filter_Qual_CA_Prospects saved'
if create_only == 'no':
# We start the pipeline execution and wait for its completion
print "Starting pipeline."
handle = p.execute(None, None, None, "Exercise 4")
print "Waiting for completion of pipeline execution: "
while not handle.get_current_status(False).is_finished():
time.sleep(1)
print "Polling for completion of pipeline..."
print "All done!"
|