# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: Exercise_1.py 9626 2009-11-06 19:42:18Z pamor $
"""
These files are the basis for the the SnapLogic tutorials.
The SnapLogic SnapAPI library allows you to drive the creation
of SnapLogic Resources and Pipelines conveniently and easily via
an easy to use language. This tutorial uses the Python language
and the Python version of the SnapLogic SnapAPI library.
For this tutorial, we successively develop a small sample application. We provide
several examples, each of which adds more functionality to the application.
Our sample application consists of the processing of (sales) leads data:
Reformatting, filtering, joining and sorting.
This represents a typical and simple data integration example one might encounter
in an enterprise, where data from different sources needs to be pulled together.
Exercise 1: Read file, Write file
Exercise 2: Insert a filter function into exercise 1
Exercise 3: Add a second data source and join into exercise 2
Exercise 4: Filter and sort based on specified critera
Exercise 5: Variation of Exercise 2. Introduction to pass-through.
This is Example 1: It simply reads a CSV file called 'leads.csv' and reformats it
to a new file called 'ex_1_prospects.csv'.
"""
import sys, getopt, time
#
# SnapLogic related imports.
#
# Snapi: This module contains the functions and classes that allow the
# user to interact with the SnapLogic data server. It provides
# the interface for retreiving the available components and resources
# and for saving resources to the server.
#
from snaplogic import snapi
def usage():
"""Print usage information about this program."""
print "Usage:"
print " Exercise_1.py [-s <server_url>] [-c] [-u username/password]"
print " -s <server_url> Specify the URL of the SnapLogic server to connect to."
print " -c Only create the resources, do not execute the pipeline."
print " -u <user/pass> Username and password to use."
def make_credentials(username):
"""Parse username/password argument and construct credentials token."""
if username is None:
return None
userpass = username.split('/')
if len(userpass) != 2:
print "Invalid or missing username/password argument '" + username + "'"
sys.exit(1)
return (userpass[0], userpass[1])
def print_resource_validate_info(error):
"""Print out useful information about resource validation errors and exit."""
print "Resource validation failed with the following error"
# This should be a call to some super snapi helper routine which unpacks errors nicely
print error
sys.exit(1)
# Default values for the required parameters
username = None # Default authorization is anonymous
server = 'http://localhost:8088' # URL of the SnapLogic data server to which we connect
create_only = 'no' # Indicates whether we only want to create the resources,
# or if we also want to run the pipeline
# Processing of command line arguments
try:
opts, args = getopt.getopt(sys.argv[1:], "s:u:hc", ["server=", "username=", "help", "create"])
except getopt.GetoptError, e:
print 'Options error:', e
usage()
sys.exit(1)
for o, a in opts:
if o in ("-s", "--server"):
server = a
elif o in ("-u", "--username"):
username = a
elif o in ("-c", "--create"):
create_only = 'yes'
elif o in ("-h", "--help"):
usage()
sys.exit()
# Construct the credentials if any were provided.
creds = make_credentials(username)
# Using SnapAPI, you can query the SnapLogic data server to see what
# components are installed.
# Access is via HTTP. The server may be located anywhere on the Internet.
component_list = snapi.list_components(server, creds)
# As explained in the documentation at the start of this program, we need to
# read a CSV file of leads (customer names and contacts information). We
# accomplish this as follows:
#
# First we create a 'resource definition' (res_def) in memory, which represents
# the CSV-file-reading capability, parameterized for our specific application.
# We can see in the following lines how we create this res_def in memory, based
# on a particular standard component. This standard component comes with the
# SnapLogic data server. A full list of the standard components is available in
# the documentation. It is also easy to add new components to the SnapLogic
# data server.
#
# We can see that those standard resources are identified in the dotted form.
leads_res_def = snapi.create_resource_object(server,'snaplogic.components.CsvRead', creds)
# Now that we have an object representing the resource in memory, we can start
# to set the properties (attributes) of this resource. These properties are what
# makes the resource useful for specific situations. A database reader resource
# needs database credentials, a file reader resource needs a file name, and so on.
#
# There are two properties that are common across all resources:
#
# Description: The graphical SnapLogic client can be used to manipulate resources
# and create pipelines. The description is displayed in that client.
# Author: The name of the user who created or is responsible for the resource.
leads_res_def.set_general_info('description', 'Read the leads')
leads_res_def.set_general_info('author', 'SnapLogic Tutorial')
# Some of the properties are specific to the particular component that we are
# working with. The CsvRead component requires us to specify a delimiter and a
# filename from which to read.
#
# In this example, we showcase an additional capability of our pipelines and resources:
# the run-time substitution of property values. Values that are to be substituted
# are defined with a particular syntax: '$?{SOME_NAME}'. Here is what it
# looks like:
leads_res_def.set_property_value('delimiter', '$?{DELIMITER}')
leads_res_def.set_property_value('filename', '$?{INPUTFILE}')
leads_res_def.set_property_value('skip_lines', 1)
leads_res_def.set_property_value('quote', '"')
# We can see how those substitution patterns are used a bit further below. Just to
# provide some background: When the specific pipeline is started in which this
# resource is used, the values for the delimiter and filename will be provided.
# This makes a pipeline reusable even for input files with different names or
# that use a different delimiter.
#
# For now we should just keep in mind that the specific delimiter and filename are
# not defined yet.
# If we use any substitution patterns in our resource definition we call them
# 'parameters' of the resource. We need to provide a concise list of those parameter
# names, and what the default values may be. Any parameter that does not have a
# default value is considered a required parameter. The use must provide a value
# for this parameter at runtime. If the parameter has a default value specified,
# then the parameter is considered optional. If, at runtime, no value for the parameter
# has been specified then the default value will be used.
#
# For this exercise, we specify default values for both of the parameters.
leads_res_def.define_param('INPUTFILE', 'file://tutorial/data/leads.csv')
leads_res_def.define_param('DELIMITER', ',')
# Data between resources in a pipeline is passed along in records. The format of
# the record is defined similar to a database table: There are field names and
# types for example, but also a description.
#
# The view consists of a tuple, which in turn contains further tuples, one for
# each field in the view. In each of those field-tuples we define the field name,
# the type (as a string, there is also 'number' and 'datetime'), and finally a more
# descriptive string.
#
# The CsvRead component in our example will attempt to read lines from a CSV file
# and will assign the fields in each line to the fields in our output view as we
# have defined them here. The fields from each line are therefore grouped into
# a record. All the records that can be read are presented by the CsvRead component
# as a stream of records.
#
# Notice how the output view is defined, using the add_record_output_view method.
# The name 'Output1' is user defined, as is the decription 'Leads output view'.
# A component may have multiple output views.
leads_res_def.add_record_output_view('Output1',
(('First', 'string', 'First Name'),
('Last', 'string', 'Last Name'),
('Phone_h', 'string', 'Phone, Home'),
('Phone_m', 'string', 'Phone, Mobile'),
('Phone_w', 'string', 'Phone, Work'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip', 'string', 'Zip Code')),
'Leads output view')
# After we have defined the resource, we can perform some sanity checking.
# The validate() method validates all the properties.
print 'Validating Leads'
error = leads_res_def.validate()
if error:
print_resource_validate_info(error)
# When you save a resource, you will save it to the server by specifying
# a URL for the resource. Others can then use this particular resource, with all
# the attributes set exactly as we specify it here. The URL will be prefixed by the
# server's base URL. That is the URL we specified when we connected
# created the resource using snapi.create_resource_object().
# For example:
# The server's base/connect URL is 'http://myserver:8088'. The relative
# URI of the resource is '/some/example/resource'. The complete URI then
# is 'http://myserver:8088/some/example/resource'.
#
# We are going to assign our Leads resource the following URI
leads_uri = server + "/SnapLogic/Tutorial/Exercise_1/Leads"
leads_res_def.save(leads_uri)
print 'Resource Leads saved'
# Next, we define a resource which will take care of the writing of our output data.
# The standard component we use for this is a CsvWrite component.
prospects_res_def = snapi.create_resource_object(server,'snaplogic.components.CsvWrite', creds)
# Again there are a number of properties we need to set. In our example, we will process
# leads into prospects. All the interesting transformation and filters for our sample
# application will be added in later examples. For now, we just write the data out into
# a new CSV file with a slightly different format.
prospects_res_def.set_general_info('description', 'Prospects writer')
prospects_res_def.set_general_info('author', 'SnapLogic Tutorial')
# Here too we have some substitution patterns (parameters). Again for the filename (the
# output file this time) and the delimiter. The CsvRead component also has an additional
# property: A flag that indicates whether it should write a header or not.
prospects_res_def.set_property_value('delimiter', '$?{DELIMITER}')
prospects_res_def.set_property_value('filename', '$?{OUTPUTFILE}')
prospects_res_def.set_property_value('header', False)
# And again we provide a concise list of the resource parameters, along with their
# default values, thus making them optional parameters.
prospects_res_def.define_param('OUTPUTFILE', 'file://tutorial/data/ex_1_prospects.csv')
prospects_res_def.define_param('DELIMITER', ',')
# Here now we define the input view of the CsvRead resource. Notice that we have less
# fields in the input view than in the reader resource's output view. While we had home
# and mobile phone numbers in the leads file, we will only retain the mobile phone number.
# Note also that the field names in the input view are slightly different than they were
# in the ouput view of the reader resource. That is ok. We will see in a moment how this
# is handled.
prospects_res_def.add_record_input_view('Input1',
(('Last_Name', 'string', 'Last Name'),
('First_Name', 'string', 'First Name'),
('Address', 'string', 'Street Address'),
('City', 'string', 'City'),
('State', 'string', 'State'),
('Zip_Code', 'string', 'Zip Code'),
('Work_Phone', 'string', 'Phone, Work')),
'Prospects input view')
# A sanity check of the new resource,
# then save the resource to the server.
print 'Validating Prospects'
error = prospects_res_def.validate()
if error:
print_resource_validate_info(error)
# We are going to assign our leads resource the following URI
prospects_uri = server + "/SnapLogic/Tutorial/Exercise_1/Prospects"
prospects_res_def.save(prospects_uri)
print 'Resource Prospects saved'
# Now that we have defined those two resources, we can assemble them into a pipeline.
# A pipeline is what can be 'executed' by the SnapLogic data server.
p = snapi.create_resource_object(server, snapi.PIPELINE, creds)
p.set_general_info('description', 'Exercise 1 Pipeline')
p.set_general_info('author', 'SnapLogic Tutorial')
# We are adding resources to the pipeline simply by using the add_resource method of the
# pipeline.
p.add(leads_res_def, "Leads")
p.add(prospects_res_def, "Prospects")
# The two resources in this pipeline have an output view and input view respectively. It is
# now time to link those two view up with each other: This will define which field of the
# input view will be assigned to which field of the output view.
field_links = (('First', 'First_Name'),
('Last', 'Last_Name'),
('Address', 'Address'),
('City', 'City'),
('State', 'State'),
('Zip', 'Zip_Code'),
('Phone_w', 'Work_Phone'))
# Here we specify which output view of which resource is being linked to which
# which input view of which resource, using the link_views method of the
# pipeline.
#
# p.link_views( producer, producer's output view,
# consumer, consumer's input view,
# field name linkage)
#
# In this example we are linking {Leads, Output1} to {Prospects, Input1}
# and the field name linkage is specified by the field_links list we created
# just above.
p.link_views('Leads','Output1', 'Prospects','Input1', field_links)
# As we had indicated earlier, we can specify parameters at run time (when the
# pipeline is started). This allows the various resources within the pipeline to
# remain generic, and to be parameterized for the context of the particular pipeline
# they are being used in.
# The pipeline itself can take parameters, the 'pipeline parameters'. They are
# defined exactly like the resource parameters in a single, concise list, containing
# names and an indication as to whether these are required parameters (need to be
# specified by the user) or not. Here they are optional, which means that we have
# defined useful default values for them.
#
# Since resources may and often do use the same parameter names, (both Leads and
# Prospects have a parameter named DELIMITER), the pipeline we have created
# must disambiguate them, by mapping the pipeline's parameters to the
# resources' parameters. Also, the pipeline parameters specify all the parameters
# that may be exposed to the user of the pipeline. That user will see the pipeline
# as a single resource and does not need to know about the individual
# resources that make up the pipeline.
#
# Using the map_param method of the pipeline, we can instantiate a pipeline parameter
# and map it to a resource's parameter, and optionally specify the pipeline
# parameter's default value.
#
# For example: the first line below creates a pipeline parameter 'LEADS' which will
# be mapped to the 'INPUTFILE' parameter of the 'Leads' resource. Additionally we
# have specified the default value to be 'file://tutorial/data/leads.csv'.
p.map_param('LEADS', 'Leads', 'INPUTFILE', 'file://tutorial/data/leads.csv')
p.map_param('INPUT_DELIMITER', 'Leads', 'DELIMITER', ',')
p.map_param('PROSPECTS', 'Prospects', 'OUTPUTFILE', 'file://tutorial/data/ex_1_prospects.csv')
p.map_param('OUTPUT_DELIMITER', 'Prospects', 'DELIMITER', ',')
# That's all it takes! Now we just perform a sanity check on the pipeline resource,
# and save it to the server.
print 'Validating pipeline'
error = p.validate()
if error:
print_resource_validate_info(error)
# A pipeline's URI is very important, not only because it allows the pipeline to be accessed,
# but also to be used within other pipelines.
p_uri = server + '/SnapLogic/Tutorial/Exercise_1/Leads_to_Prospects'
p.save(p_uri)
print 'Pipeline Leads_to_Prospects saved'
if create_only == 'no':
# Here we execute the pipeline: Its definition is communicated to the
# server, resources are instantiated and informed of their parameters, their
# input and output views. As a result in this example, the 'leads.csv' file
# is read and the 'ex_1_prospects.csv' file is written.
#
# Note that the execute() method returns after all the resources of the
# pipeline have started to work. The pipeline execution itself has not
# necessarily completed when the method returns.
#
# In order to wait for the completion of the entire pipeline execution, we
# use the handle that is returned by execute() and call the
# get_status() method on it.
print "Starting pipeline."
handle = p.execute(None, None, None, "Exercise 1")
print "Waiting for completion of pipeline execution..."
while not handle.get_current_status(False).is_finished():
time.sleep(1)
print "Polling for completion of pipeline..."
print "All done!"
|