# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: CsvRead.py 10330 2009-12-24 22:13:38Z grisha $
"""
CsvRead Component
"""
import codecs
import csv
import re
from decimal import Decimal
import snaplogic.cc.prop as prop
import snaplogic.components.FileUtils as FileUtils
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI,has_param
from snaplogic.common import snap_log
from snaplogic.common.SnapReader import SnapReader
from snaplogic.common.data_types import SnapNumber,SnapString
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys
from snaplogic.common.prop_err import FIELD_AND_VIEW_NAME_PATTERN
class UTF8Recoder:
"""
Iterator that reads an encoded stream and reencodes the input to UTF-8.
See http://docs.python.org/lib/csv-examples.html.
"""
def __init__(self, f, encoding):
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def next(self):
return self.reader.next().encode("utf-8")
class UnicodeReader:
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding.
See http://docs.python.org/lib/csv-examples.html.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
f = UTF8Recoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def next(self):
row = self.reader.next()
return [unicode(s, "utf-8") for s in row]
def __iter__(self):
return self
class CsvRead(ComponentAPI):
"""
This class implements the CSV Read component.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 0,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
}
component_description = "This component parses CSV format data sources."
component_label = "CSV Reader"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/CsvRead" % \
version_info.doc_uri_version
# We only support numbers and strings as field datatype.
# Because dates can be represented in many different ways as a string,
# supporting dates would require defining masks for conversion,
# and we don't do this yet.
supported_datatypes = [SnapString, SnapNumber]
def validate_config_file(self):
"""
If config file is provided for this component then see if it provides a value for root directory.
The root directory is a way specifying that all local files must be read from the specified
root directory.
"""
root = None
if 'schemes' in self.config_dictionary:
FileUtils.validate_schemes(self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
def _add_1_2_props(self):
"""
Adds quote and skip initial whitespace properties
"""
self.set_property_def('quote',
prop.SimpleProp("Quote character",
SnapString,
"Character used to quote an entry if a delimiter appears in it", None))
self.set_property_def('skip_initial_whitespace',
prop.SimpleProp("Skip initial whitespace",
'boolean',
"Whether to ignore whitespace immediately after delimiter",
None,
True))
self.set_property_value('skip_initial_whitespace', True)
def create_resource_template(self):
"""
Create CsvRead resource template.
All CsvRead resource definitions need the following properties:
filename : The name of the file or data source, using scheme:// format.
delimiter : The delimiter used for fields (default in ",")
quote : Quote character (default to " - double quote)
is_input_a_list : Set to True if filename specifies a list of data sources.
skip_lines : If the initial few lines of the file are unwanted header information, then
specify the number of lines to skip.
username : Credentials: username needed to read the file.
password : Credentials: password needed to read the file.
"""
self.set_property_def("filename", prop.SimpleProp("File name", SnapString,
"The URI of the file to be read. You can enter either a local file path, or a remote location on a FTP or HTTP server. The following URI schemes are supported: file, http, https, ftp, file.\n"
"\n\nExamples of valid URIs:\n"
"/home/user/input.file\n"
"c:\dir\input.file\n"
"file:///c:/dir/input.file\n"
"ftp://ftp.server.com/dir/input.file\n"
"http://www.server.com/dir/input.file\n"
"https://www.server.com/dir/input.file"
, None, True))
self.set_property_def("delimiter",
prop.SimpleProp("Delimiter", SnapString, "The field delimiter", None, True))
self.set_property_def("is_input_a_list",
prop.SimpleProp("Is input a list", "boolean",
"Does the file specified by the filename property contain a list of files to be read.",
None, True))
self.set_property_value("is_input_a_list", False)
self.set_property_def("skip_lines",
prop.SimpleProp("Skip lines", SnapNumber, "Number of initial lines to skip", None, True))
self.set_property_value("skip_lines", 0)
# Credentials are username and password
self.set_property_def("username",
prop.SimpleProp("Credentials: Username", SnapString,
"Username to use if credentials are needed for the accessing data"))
self.set_property_value("username", "")
self.set_property_def("password",
prop.SimpleProp("Credentials: Password", SnapString,
"Password to use if credentials are needed for the accessing data",
{'obfuscate' : 0}))
self._add_1_2_props()
def _read_sample(self):
""" Read a sample of the flat file into a string buffer for sniffing """
# Read up to MAX_LINES into a buffer that we pass onto csv "sniffer"
MAX_LINES = 100
buffer = ""
# Read up to MAX_LINES from the first data source
for (input, username, password) in self._get_data_sources()[:1]:
if has_param(input):
# Cannot suggest if filename is defined via parameter
continue
rdr = SnapReader.create(input, username, password, None, self.env)
try:
rdr.open()
for i in range(1, MAX_LINES):
s = rdr.readline()
if not s:
break
buffer += s + "\n"
finally:
rdr.close()
return buffer
def _suggest_delimiter(self, sample):
""" Use the csv sniffer to guess the delimiter """
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample)
return dialect.delimiter
def _suggest_quote(self, sample):
""" Use the csv sniffer to guess the quotechar """
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample)
return dialect.quotechar
def _suggest_header(self, sample):
""" Use the csv sniffer to check whether the flat file may have a header """
sniffer = csv.Sniffer()
return sniffer.has_header(sample)
def _suggest_output_view(self):
""" Using delimiter and skip_lines properties return a list of file fields """
# We'll store types here
discovered_types = []
# We'll store field names in this variable in a list, if CSV file has a header
field_names = None
# Read up to MAX_LINES from the first data source to try to guess datatypes
MAX_LINES = 100
for (input, username, password) in self._get_data_sources()[:1]:
if has_param(input):
# Cannot suggest if filename is defined via parameter
continue
rdr = SnapReader.create(input, username, password, None, self.env)
try:
rdr.open()
# Prepare the parameters to pass into the reader
parameters = {}
for parameter_name, property_name in {'delimiter' : 'delimiter',
'quotechar' : 'quote',
'skipinitialspace' : 'skip_initial_whitespace'}.items():
property_value = self.get_property_value(property_name)
if property_value is not None:
if isinstance(property_value, basestring):
property_value = property_value.encode('utf-8')
parameters[parameter_name] = property_value
reader = UnicodeReader(rdr.handle, dialect="excel", **parameters)
line = 0
skip_lines = self.get_property_value("skip_lines")
for rec in reader:
line += 1
if skip_lines > 0:
# Assume first line is the header
if not field_names:
# Save field names
field_names = rec
# Decrement skip_lines
skip_lines -= 1
continue
if line > MAX_LINES:
# We're done
break
# Note that this record may have fewer fields than previous records.
# This is OK, because our csvread "contract" is that if there are fewer
# fields than in the output view we set the missing fields to None.
# One common scenario where this may happen is if the CSV file has
# an empty line at the end.
# For each field, guess its type
for i, field in enumerate(rec):
try:
# If we can convert to decimal, perhaps this is a numeric field
n = Decimal(field)
field_type = SnapNumber
except Exception:
field_type = SnapString
if i >= len(discovered_types):
discovered_types.append(field_type)
elif discovered_types[i] != SnapString:
# Make sure to not override a SnapString with SnapNumber.
# When analyzing multiple lines if a given column is textual
# in any row, it must be set to string type, even though
# other rows may have numeric values for that column.
discovered_types[i] = field_type
except Exception, e:
# Ignore exception during discovery: maybe the filename provided was incorrect
self.elog(e)
# Return None to indicate that discovery failed
return None
finally:
rdr.close()
# Discover the output view
fields = []
for i, discovered_type in enumerate(discovered_types):
# If we have a field name use it, otherwise generate one in the form of Field001
if field_names and i < len(field_names):
field_name = field_names[i]
# Replace illegal characters with underscores
field_name = FIELD_AND_VIEW_NAME_PATTERN.sub('_', field_name)
# Strip leading and trailing underscores
field_name = field_name.strip('_')
# Duplicate field name? add numeric suffix
for field in fields:
if field[0] == field_name:
field_name += str(i + 1).zfill(3)
break
else:
field_name = "Field" + str(i + 1).zfill(3)
field_type = discovered_types[i]
fields.append([field_name, field_type, None])
return fields
def suggest_resource_values(self, err_obj):
""" Attempt to discover delimiter, header, and output view given filename """
# Get the current setting of the delimiter and quote properties
# We'll use it when trying to sniff the file
delimiter = self.get_property_value("delimiter")
quote = self.get_property_value('quote')
had_delimiter = delimiter is not None
had_quote = quote is not None
# Read a sample from the file for sniffing
sample = None
try:
sample = self._read_sample()
except Exception, e:
# Couldn't read from file: log the exception, set default delimiter and return
err_obj.get_property_err("filename").set_message(str(e))
if not sample:
# We weren't able to read a sample from file: exit.
if not had_delimiter:
# If delimiter wasn't set make it comma
self.set_property_value("delimiter", ',')
if not had_quote:
self.set_property_value('quote', '"')
return
# Use the CSV sniffer to attempt to discover the delimiter and header
try:
if not had_delimiter:
delimiter = self._suggest_delimiter(sample)
# If delimiter was discovered save it
if delimiter:
self.set_property_value("delimiter", delimiter)
else:
# Otherwise if we weren't able to discover a delimiter
# and delimiter wasn't set by the user as input set it to comma
self.set_property_value("delimiter", ',')
if not had_quote:
quote = self._suggest_quote(sample)
# If quote was discovered save it
if quote:
self.set_property_value("quote", quote)
else:
# Otherwise if we weren't able to discover quote
# and it wasn't set by the user as input set it to "
self.set_property_value("quote", '"')
has_header = self._suggest_header(sample)
self.set_property_value("skip_lines", 1 if has_header else 0)
# If delimiter is none set it to the default (comma)
if not self.get_property_value("delimiter"):
self.set_property_value("delimiter", ',')
# Try to guess the output view
fields = self._suggest_output_view()
except SnapComponentError, e:
# Suggest didn't work, log the error and return
self.elog(e)
return
# Now on to changing the output view:
# if we were able to discover fields in the file, we can
# check our fields against the output view.
# Was there an output view to begin with?
had_view = len(self.list_output_view_names()) == 1
# If there was an output view defined save its name and description.
if had_view:
view_name = self.list_output_view_names()[0]
view_desc = self.get_output_view_def(view_name)['description']
else:
view_name = "Output1"
view_desc = None
# Did we discover any fields?
if fields:
# If there was an output view already defined "merge" any useful information
# like field names from the old output view into the new output view
if had_view:
for (old_field, new_field) in zip(self.get_output_view_def(view_name)['fields'], fields):
# Was there a customized field name, e.g. anything other than FieldXXX?
custom_field_name = not re.match('Field[0-9]+', old_field[keys.FIELD_NAME])
if custom_field_name:
# If there was a custom field name make sure to keep it
new_field[keys.FIELD_NAME] = old_field[keys.FIELD_NAME]
new_field[keys.FIELD_DESC] = old_field[keys.FIELD_DESC]
self.add_record_output_view_def(view_name, fields, view_desc)
def validate(self, err_obj):
"""
Component-specific validation logic.
Validate that the URI scheme specified for the filename is one of the allowed
schemes as specified in the component config file.
@param err_obj: Object for error reporting
@type err_obj: L{SimplePropErr} or L{ListPropErr} or L{DictPropErr}
"""
filename = self.get_property_value("filename")
delimiter = self.get_property_value("delimiter")
quote = self.get_property_value("quote")
if delimiter == quote:
err_obj.get_property_err('quote').set_message('Delimiter and quote cannot be the same character')
# Validate that the filename complies with the allowed URI schemes,
# unless it's specified via a parameter
FileUtils.validate_filename_property(filename, "filename", err_obj,
self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
views = self.list_output_view_names()
view_name = views[keys.SINGLE_VIEW]
view = self.get_output_view_def(view_name)
field_names = [ d[keys.FIELD_NAME] for d in view[keys.VIEW_FIELDS] ]
field_types = [ d[keys.FIELD_TYPE] for d in view[keys.VIEW_FIELDS] ]
for i in range(0, len(field_types)):
field_type = field_types[i]
field_name = field_names[i]
# For each field, check the datatype.
# If we don't support this datatype add an error.
if field_type not in self.supported_datatypes:
err_obj.get_output_view_err()[view_name][keys.VIEW_FIELDS][i].set_message(
"Output field '%s' datatype '%s' is not supported. Must be one of: %s" %
(field_name, field_type, str(self.supported_datatypes)))
# Validate that the view definition matches the file structure
sniff_fields = None
try:
sniff_fields = self._suggest_output_view()
except SnapComponentError, e:
# There was an error sniffing the file, log it but don't act on it
self.elog(e)
# If sniff_fields was returned we were able to discover fields,
# so let's check the output view fields against the file fields.
# If None was returned it means discovery failed, e.g. because file wasn't found.
# If that's the case skip this check.
if sniff_fields:
num_view_fields = len(field_names)
num_file_fields = len(sniff_fields)
if num_view_fields > num_file_fields:
err_obj.get_output_view_err()[view_name].set_message(
("Output view must have as many fields as the input file. " +
"Output view has %s fields while the input file '%s' has %s fields (parsed using delimiter '%s'). " +
"Please correct this by removing %s field(s) from the output view.") %
(num_view_fields, filename, num_file_fields, delimiter, num_view_fields - num_file_fields))
elif num_view_fields < num_file_fields:
err_obj.get_output_view_err()[view_name].set_message(
("Output view must have as many fields as the input file. " +
"Output view has %s fields while the input file '%s' has %s fields (parsed using delimiter '%s'). " +
"Please correct this by adding %s field(s) to the output view.") %
(num_view_fields, filename, num_file_fields, delimiter, num_file_fields - num_view_fields))
def _get_data_sources(self):
"""
Given component properties return a list of data sources where each data source
is a tuple (input, username, password)
"""
filename = self.get_property_value("filename")
username = self.get_property_value("username")
password = self.get_property_value("password")
is_input_a_list = self.get_property_value("is_input_a_list")
# If no filename was provided, return an empty list
if not filename:
return []
# Make sure the filename is always qualified
filename = FileUtils.qualify_filename(filename)
# Validate filename URI scheme
error = FileUtils.validate_file_scheme(filename, self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
if error is not None:
raise SnapComponentError(error)
filename = FileUtils.get_file_location(filename, self.config_dictionary)
if is_input_a_list:
data_sources = []
for (filename, username, password) in FileUtils.read_input_list(filename, username, password):
# Validate filename URI scheme
error = FileUtils.validate_file_scheme(filename, self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
if error is not None:
raise SnapComponentError(error)
filename = FileUtils.get_file_location(filename, self.config_dictionary)
data_sources.append((filename, username, password))
else:
data_sources = [ ( filename, username, password ) ]
return data_sources
def execute(self, input_views, output_views):
"""Execute the CSV reading functionality."""
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
self._delimiter = self.get_property_value("delimiter")
self._delimiter = self._delimiter.encode("utf-8")
self._quote = self.get_property_value("quote")
if self._quote:
self._quote = self._quote.encode("utf-8")
self._skip_lines = self.get_property_value("skip_lines")
self._skip_initial_whitespace = self.get_property_value("skip_initial_whitespace")
# Array storing a flag for each field
# is_string_field[i] = True if it's a string field
# Pre-calculating this is a performance optimization,
# since looking up strings in a string array is expensive (6%)
is_string_field = []
for field_type in output_view.field_types:
is_string_field.append(field_type == SnapString)
# Save the number of fields
num_fields = len(output_view.field_types)
# Store a flag telling us if there are any non-string output fields at all.
# This is a performance optimization. (1%)
all_string_fields = False not in is_string_field
for (input, username, password) in self._get_data_sources():
self.log(snap_log.LEVEL_DEBUG, "Input: %s" % input)
rdr = SnapReader.create(input, username, password, None, self.env)
rdr.open()
try:
if self._quote:
reader = UnicodeReader(rdr.handle, dialect="excel", delimiter=self._delimiter,
quotechar=self._quote, skipinitialspace=self._skip_initial_whitespace)
else:
reader = UnicodeReader(rdr.handle, dialect="excel", delimiter=self._delimiter,
skipinitialspace=self._skip_initial_whitespace)
# If all output types are strings branch to a faster code path
# since we don't need to check the datatype of each field
# (performance optimization).
if all_string_fields:
for rec in reader:
if self._skip_lines > 0:
self._skip_lines -= 1
continue
out_rec = output_view.create_record()
i = 0
for field in rec:
field_name = out_rec.field_names[i]
out_rec[field_name] = field
i = i + 1
output_view.write_record(out_rec)
else:
# There is a mix of string and non-string output fields,
# so we need to check type for each field.
# This "else" branch is doing essentially the same
# as the "if" branch above it plus type conversion.
for rec in reader:
if self._skip_lines > 0:
self._skip_lines -= 1
continue
out_rec = output_view.create_record()
i = 0
for field in rec:
field_name = out_rec.field_names[i]
# Note: field-handling could have been a function for readibility
# purposes, but it's inlined as a perfomance optimization (1%)
if is_string_field[i]:
out_rec[field_name] = field
else:
# Convert to number
field = field.strip()
if len(field) == 0:
# Empty field is interpreted as None
out_rec[field_name] = None
else:
try:
# Convert to decimal
out_rec[field_name] = Decimal(field)
except Exception, e:
# Conversion failed, throw an appropriate exception
raise SnapComponentError("Failed to cast field %s value '%s' to type 'number' (%s)" %
(field_name, field, e))
i = i + 1
output_view.write_record(out_rec)
finally:
# If there was an error reading this file/stream
# make sure we close it before exception is propagated futher up
rdr.close()
output_view.completed()
def upgrade_1_1_to_1_2(self):
self._add_1_2_props()
def upgrade_1_0_to_1_1(self):
"""
Upgrade resource from version 1.0 to version 1.1.
In version 1.0 credentials were stored as a single user:passwd string separated by colon.
In version 1.1 it's stored as two separate properties, and password is obfuscated.
This change was made in the 2.0.5 release.
Also, change the description of the filename property.
The description was changed in release 2.1.0.
"""
# Old credentials were stored as user:password, split them into two variables
credentials = self.get_property_value("credential")
username = None
password = None
if credentials is not None:
# Colons are allowed in passwords, so split it at the first colon
cred_list = credentials.split(':', 1)
if len(cred_list) >= 1:
# If there is a username, it's the first element of the list
username = cred_list[0]
if len(cred_list) == 2:
# If there is a password, it's the second element of the list
password = cred_list[1]
# Delete the old credentials property
self.del_property_def("credential")
# Create the new credentials properties
self.set_property_def("username",
prop.SimpleProp("Credentials: Username", SnapString,
"Username to use if credentials are needed for the accessing data"))
self.set_property_def("password",
prop.SimpleProp("Credentials: Password", SnapString,
"Password to use if credentials are needed for the accessing data",
{'obfuscate' : 0}))
# Set the new credentials properties
self.set_property_value("username", username)
self.set_property_value("password", password)
# Recreate the filename property
filename = self.get_property_value("filename")
self.del_property_def("filename")
self.set_property_def("filename", prop.SimpleProp("File name", SnapString,
"The URI of the file to be read. You can enter either a local file path, or a remote location on a FTP or HTTP server. The following URI schemes are supported: file, http, https, ftp, file.\n"
"\n\nExamples of valid URIs:\n"
"/home/user/input.file\n"
"c:\dir\input.file\n"
"file:///c:/dir/input.file\n"
"ftp://ftp.server.com/dir/input.file\n"
"http://www.server.com/dir/input.file\n"
"https://www.server.com/dir/input.file"
, None, True))
self.set_property_value("filename", filename)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|