# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Exercise_3.py 9626 2009-11-06 19:42:18Z pamor $
"""
These files are the basis for the the SnapLogic tutorials.
The SnapLogic SnapAPI library allows you to drive the creation
of SnapLogic Resources and Pipelines conveniently and easily via
an easy to use language. This tutorial uses the Python language
and the Python version of the SnapLogic SnapAPI library.
For this tutorial, we successively develop a small sample application. We provide
several examples, each of which adds more functionality to the application.
Our sample application consists of the processing of (sales) leads data:
Reformatting, filtering, joining and sorting.
This represents a typical and simple data integration example one might encounter
in an enterprise, where data from different sources needs to be pulled together.
Exercise 1: Read file, Write file
Exercise 2: Insert a filter function into exercise 1
Exercise 3: Add a second data source and join into exercise 2
Exercise 4: Filter and sort based on specified critera
Exercise 5: Variation of Exercise 2. Introduction to pass-through.
This is Exercise 3: It builds on Exercise 2 by joining the CA based prospects
with census data on median houshold income. For the census data, we will create a
data service pipeline and incorporate that into the final pipeline.
"""
import sys, getopt, time
#
# SnapLogic related imports.
#
# For a more detailed explanation, please see the documentation in Exercise_1.py
#
from snaplogic import snapi
def usage():
"""Print usage information about this program."""
print "Usage:"
print " Exercise_3.py [-s <server_url>] [-c]"
print " -s <server_url> Specify the URL of the SnapLogic server to connect to."
print " -c Only create the resources, do not execute the pipeline."
def make_credentials(username):
"""Parse username/password argument and construct credentials token."""
if username is None:
return None
userpass = username.split('/')
if len(userpass) != 2:
print "Invalid or missing username/password argument '" + username + "'"
sys.exit(1)
return (userpass[0], userpass[1])
def print_resource_validate_info(error):
"""Print out useful information about resource validation errors and exit."""
print "Resource validation failed with the following error"
# This should be a call to some super snapi helper routine which unpacks errors nicely
print error
sys.exit(1)
# Default values for the required parameters
username = None # Default authorization is anonymous
server = 'http://localhost:8088'
create_only = 'no'
# Processing of command line arguments
try:
opts, args = getopt.getopt(sys.argv[1:], "s:u:hc", ["server=", "username=", "help", "create"])
except getopt.GetoptError, e:
print 'Options error:', e
usage()
sys.exit(1)
for o, a in opts:
if o in ("-s", "--server"):
server = a
elif o in ("-u", "--username"):
username = a
elif o in ("-c", "--create"):
create_only = 'yes'
elif o in ("-h", "--help"):
usage()
sys.exit()
# Construct the credentials if any were provided.
creds = make_credentials(username)
# We are reusing the Leads and Filter resource definition, which we have
# created in Exercise 1 and 2. We can simply read them from the SnapLogic server.
leads_uri = server + '/SnapLogic/Tutorial/Exercise_1/Leads'
leads_res_def = snapi.get_resource_object(leads_uri, creds)
filter_leads_uri = server + '/SnapLogic/Tutorial/Exercise_2/FilterLeads'
filter_leads_res_def = snapi.get_resource_object(filter_leads_uri, creds)
# We cannot just use a parameterized version of prospects, since the views
# (input and output record format) are going to be different for this
# exercise. However, we can still derive our prospects resource from the
# one we used in the previous exercises.
prospects_uri = server + '/SnapLogic/Tutorial/Exercise_1/Prospects'
prospects_res_def = snapi.get_resource_object(prospects_uri, creds)
# We will save prospects to this URI
prospects_uri = server + '/SnapLogic/Tutorial/Exercise_3/Prospects'
prospects_res_def.set_property_value('delimiter', '$?{DELIMITER}')
prospects_res_def.set_property_value('filename', '$?{OUTPUTFILE}')
prospects_res_def.set_property_value('header', False)
# Define the resource's parameters.
prospects_res_def.define_param('OUTPUTFILE', 'file://tutorial/data/ex_3_prospects.csv')
prospects_res_def.define_param('DELIMITER', ',')
# We change the input view of the Prospects resource by adding another
# field for the income.
prospects_res_def.add_record_input_view('Input1',
(('Last_Name', 'string', 'Last Name'),
('First_Name', 'string', 'First Name'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip_Code', 'string', 'Zip Code'),
('Work_Phone', 'string', 'Phone, Work'),
('MoreThan50K', 'number', 'Percentage of households greater than 50K')),
'Prospects input view')
# Sanity check, printing and saving to the server.
print 'Validating Prospects'
error = prospects_res_def.validate()
if error:
print_resource_validate_info(error)
prospects_res_def.save(prospects_uri)
print 'Resource Prospects saved'
# We now introduce a new resource: Census data, which has been stored in a file,
# again in CSV format.
census_res_def = snapi.create_resource_object(server,'snaplogic.components.CsvRead', creds)
census_res_def.set_general_info('description', 'Get the census data from the file')
census_res_def.set_general_info('author', 'SnapLogic Tutorial')
# Setting the properties for this Census resource. Delimiter and filename are
# specified as configurable parameters to allow this resource to also work on
# other files, with possibly different delimiters.
census_res_def.set_property_value('delimiter', '$?{DELIMITER}')
census_res_def.set_property_value('filename', '$?{INPUTFILE}')
census_res_def.set_property_value('skip_lines', 1)
census_res_def.set_property_value('quote', '"')
# No default values are provided for the following parameters so it will be
# required to provide them at runtime.
census_res_def.define_param('INPUTFILE')
census_res_def.define_param('DELIMITER')
# In the output view of the Census resource we specify the record fields that
# are going to be produced by the resource.
census_res_def.add_record_output_view('Output1',
(('Zip', 'string', 'Zip Code'),
('LessThan10K', 'number', 'Income less than $10K per year'),
('MoreThan10K', 'number', 'Income between $10K and $25K'),
('MoreThan25K', 'number', 'Income between $25K and $50K'),
('MoreThan50K', 'number', 'Income greater than $50K')),
'Census output view')
# And again we check and save
print 'Validating Census'
error = census_res_def.validate()
if error:
print_resource_validate_info(error)
census_uri = server + '/SnapLogic/Tutorial/Exercise_3/Census'
census_res_def.save(census_uri)
print 'Resource Census saved'
# We introduce yet another resource: a join. It is based on the standard join
# component that comes with the SnapLogic server, which is designed to take
# two record streams and combine them into one as an inner join.
join_res_def = snapi.create_resource_object(server, 'snaplogic.components.Join', creds)
join_res_def.set_general_info('description', 'Join CA leads with Census data')
join_res_def.set_general_info('author', 'SnapLogic Tutorial')
# The join resource naturally requires multiple input views, since it joins
# two streams of records.
join_res_def.add_record_input_view('Input1',
filter_leads_res_def.list_output_view_fields('Output1'),
'Filtered Leads input')
join_res_def.add_record_input_view('Input2',
census_res_def.list_output_view_fields('Output1'),
'Census input')
# A 'join expression' needs to be defined that is used to decide which records
# are to be joined. A field from each of the two input record streams is selected,
# and compared via the specified operator. Note that multiple expressions may be
# defined, which would then be combined via the AND operator into a more complex
# expression.
join_res_def.set_property_value('Join expressions',
[{'Join field 1' : 'Input1.Zip',
'Join field 2' : 'Input2.Zip'}])
# The output view of the join resource contains fields from both record streams
join_res_def.add_record_output_view('Output1',
(('First', 'string', 'First Name'),
('Last', 'string', 'Last Name'),
('Phone_w', 'string', 'Phone, Work'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip', 'string', 'Zip Code'),
('MoreThan50K', 'number', 'Percentage of households greater than 50K')),
'output')
# In our example, both record streams contain a field named 'Zip'. Which one do
# we want to use for the output? Such ambiguities can be resolved via so-called
# 'field aliases'. They allow us to choose a new name for specific fields from
# specific views. In our example, we choose the 'Zip' field from the 'Input1'
# view. The fully qualified name of that field is given in dotted notation as:
# 'Input1.Zip'. We define a new name 'Zip', which is the one that is used in the
# output view we had just defined. The new name does not have to be similar to
# the name of field we are aliasing. It is a coincidence that the alias name
# is also 'Zip'.
join_res_def.set_property_value('Aliases',
[{"Fully qualified field" : 'Input1.Zip', 'Output field' : 'Zip'}])
# Check and save
print 'Validating Join'
error = join_res_def.validate()
if error:
print_resource_validate_info(error)
join_uri = server + '/SnapLogic/Tutorial/Exercise_3/Join'
join_res_def.save(join_uri)
print 'Resource Join saved'
# Now lets create a Data Service Pipeline with just a single Resource, the Census Resource.
ds = snapi.create_resource_object(server, snapi.PIPELINE, creds)
ds.set_general_info('description', 'Exercise 3 Census Data Service Pipeline')
ds.set_general_info('author', 'SnapLogic Tutorial')
# Add census to the pipeline...
ds.add(census_res_def, "Census")
# and assign it's output view to be the pipeline's output view.
ds.assign_output_view("Census", "Output1", "Output001")
# Map pipeline parameters and give them default values.
# Remember, the census resource we created does not have default values for its
# parameters so we must provide defaults or else require the user to provide them
# at runtime. The most user fiendly thing to do is to provide defaults here.
ds.map_param('CENSUS', 'Census', 'INPUTFILE', 'file://tutorial/data/CAIncomeByZip.csv')
ds.map_param('INPUT_DELIMITER', 'Census', 'DELIMITER', ',')
print 'Validating pipeline'
error = ds.validate()
if error:
print_resource_validate_info(error)
ds_uri = server + '/SnapLogic/Tutorial/Exercise_3/CensusFeed'
ds.save(ds_uri)
print 'Pipeline CensusFeed saved'
# The neat thing about a Data Service Pipeline is that you can go fetch from it using your
# browser! Try this:
# http://localhost:8088/feed/SnapLogic/Tutorial/Exercise_3/CensusFeed/Output001?sn.content_type=text/html
#
# You should get a simple representation of the output records.
#
# The other useful thing you can do with a Data Service Pipeline is use it's output in other pipelines.
# We will do exactly that below.
# Now lets create our final pipeline.
# We arrange the various resources in a pipeline
p = snapi.create_resource_object(server, snapi.PIPELINE, creds)
p.set_general_info('description', 'Exercise 3 Pipeline')
p.set_general_info('author', 'SnapLogic Tutorial')
# Instances of the resources are added to the pipeline's resources attribute.
p.add(leads_res_def, "Leads")
p.add(filter_leads_res_def, "FilterLeads")
p.add(ds, "CensusFeed")
p.add(join_res_def, "Join")
p.add(prospects_res_def, "Prospects")
# We define the mappings of output view fields to input views that are needed for
# the linkages within the pipeline. In some of the mappings of this exercise the
# field names in each view are the same. This does not have to be the case, though,
# as is demonstrated by join_to_prospects_link...
links = (('First', 'First'),
('Last', 'Last'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip'),
('Phone_w', 'Phone_w'))
census_to_join_link = (('Zip', 'Zip'),
('LessThan10K', 'LessThan10K'),
('MoreThan10K', 'MoreThan10K'),
('MoreThan25K', 'MoreThan25K'),
('MoreThan50K', 'MoreThan50K'))
join_to_prospects_link = (('First', 'First_Name'),
('Last', 'Last_Name'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip_Code'),
('Phone_w', 'Work_Phone'),
('MoreThan50K', 'MoreThan50K'))
# Specify the mappings in the pipeline
p.link_views('Leads', 'Output1', 'FilterLeads', 'Input1', links)
p.link_views('FilterLeads', 'Output1', 'Join', 'Input1', links)
p.link_views('CensusFeed', 'Output001', 'Join', 'Input2', census_to_join_link)
p.link_views('Join', 'Output1', 'Prospects', 'Input1', join_to_prospects_link)
# Specify how the pipeline parameters are mapping to the parameters of the
# individual resources. Explained in more detail in Exercise_1.py.
p.map_param('LEADS', 'Leads', 'INPUTFILE', 'file://tutorial/data/leads.csv')
p.map_param('CENSUS', 'CensusFeed','CENSUS', 'file://tutorial/data/CAIncomeByZip.csv')
p.map_param('PROSPECTS', 'Prospects', 'OUTPUTFILE', 'file://tutorial/data/ex_3_prospects.csv')
p.map_param('INPUT1_DELIMITER', 'Leads', 'DELIMITER', ',')
p.map_param('INPUT2_DELIMITER', 'CensusFeed','INPUT_DELIMITER', ',')
p.map_param('OUTPUT_DELIMITER', 'Prospects', 'DELIMITER', ',')
# Check and save
print 'Validating pipeline'
error = p.validate()
if error:
print_resource_validate_info(error)
p_uri = server + '/SnapLogic/Tutorial/Exercise_3/Qual_CA_Prospects'
p.save(p_uri)
print 'Pipeline Qual_CA_Prospects saved'
if create_only == 'no':
# Run the pipeline (specify the URL of the pipeline)
#
# Note that we return without waiting for the completion of the pipeline.
# See the comments in Exercise_1.py and Exercise_2.py for more details.
print "Starting pipeline."
handle = p.execute(None, None, None, "Exercise 3")
print "Waiting for completion of pipeline execution..."
while not handle.get_current_status(False).is_finished():
time.sleep(1)
print "Polling for completion of pipeline..."
print "All done!"
|