# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: dblookup_example.py 10183 2009-12-16 23:58:55Z grisha $
"""
DBLookup example 1.
Create a DBLookup component resource which subsitutes some data in from a previous DBRead query and writes it to
a file.
"""
import sys
from snaplogic import snapi
SERVER_URL = 'http://localhost:8088'
RESOURCE_ROOT = SERVER_URL + '/examples/dblookup'
def print_resource_validate_info(error):
"""Print out useful information about resource validation errors and exit."""
print "Resource validation failed with the following error"
# This should be a call to some super snapi helper routine which unpacks errors nicely
print error
sys.exit(1)
def delete_resource(uri):
try:
snapi.delete_resource(uri)
except Exception:
pass
########################################
# Delete existing resources if exist
########################################
for uri in ['/dbconn', '/lead_reader', '/dblookup', '/writer', '/pipe']:
delete_resource(RESOURCE_ROOT + uri)
########################################
# Create the database connection
########################################
dbconn = snapi.create_resource_object(SERVER_URL, 'snaplogic.components.Connection')
dbconn.set_property_value('ConnectString', 'MySQLdb:host=mysql,db=qa,user=qa,passwd=qa')
error = dbconn.validate()
if error is not None:
print_resource_validate_info(error)
dbconn.save(RESOURCE_ROOT + '/dbconn')
########################################
# Create the lead reader
########################################
lead_reader = snapi.create_resource_object(SERVER_URL, 'snaplogic.components.DBRead')
lead_reader.set_resource_ref('DBConnect', RESOURCE_ROOT + '/dbconn')
# The subquery is used to prevent the DBLookup from failing for zipcodes that don't exist in the census table.
lead_reader.set_property_value('SQLStmt',
'SELECT last, first, zip FROM leads ' \
'WHERE state = "CA" AND zip IN (SELECT zip FROM census)')
lead_reader.add_record_output_view('output1',
[('last', 'string', 'Last Name'),
('first', 'string', 'First Name'),
('zip', 'number', 'Zip Code')],
'')
error = lead_reader.validate()
if error is not None:
print_resource_validate_info(error)
lead_reader.save(RESOURCE_ROOT + '/lead_reader')
########################################
# Create DBLookup that finds the MoreThan50K value for a given zip code
########################################
lookup = snapi.create_resource_object(SERVER_URL, 'snaplogic.components.DBLookup')
lookup.set_resource_ref('DBConnect', RESOURCE_ROOT + '/dbconn')
lookup.set_property_value('SQLStmt',
'SELECT "${first} ${last}" AS fullname, morethan50k, zip ' \
'FROM census WHERE zip = ${zip}')
lookup.add_record_input_view('input1',
lead_reader.get_output_view('output1')['fields'],
'')
lookup.add_record_output_view('output1',
[('fullname', 'string', 'Full name'),
('morethan50k', 'string', 'MoreThan50K'),
('zip', 'number', 'Zip Code')],
'')
error = lookup.validate()
if error is not None:
print_resource_validate_info(error)
lookup.save(RESOURCE_ROOT + '/dblookup')
########################################
# File writer
########################################
writer = snapi.create_resource_object(SERVER_URL, 'snaplogic.components.CsvWrite')
writer.set_property_value('filename', 'file://tutorial/data/dblookup_example_out.csv')
writer.add_record_input_view('input1',
lookup.get_output_view('output1')['fields'],
'')
error = writer.validate()
if error is not None:
print_resource_validate_info(error)
writer.save(RESOURCE_ROOT + '/writer')
########################################
# Pipeline
########################################
p = snapi.create_resource_object(SERVER_URL, snapi.PIPELINE)
p.add(lead_reader, "LeadReader")
p.add(lookup, "Lookup")
p.add(writer, "Writer")
p.link_views('LeadReader', 'output1', 'Lookup', 'input1', [('zip', 'zip'),
('first', 'first'),
('last', 'last')])
p.link_views('Lookup', 'output1', 'Writer', 'input1', [('fullname', 'fullname'),
('morethan50k', 'morethan50k'),
('zip', 'zip')])
error = p.validate()
if error is not None:
print_resource_validate_info(p)
p.save(RESOURCE_ROOT + '/pipe')
h = snapi.exec_resource(RESOURCE_ROOT + '/pipe')
h.wait()
print "Done"
|