# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: FixedWidthRead.py 10330 2009-12-24 22:13:38Z grisha $
"""
FixedWidthRead Component
"""
import os
import string
import re
from decimal import Decimal
import snaplogic.components.FileUtils as FileUtils
from snaplogic.common import snap_log
from snaplogic.common.SnapReader import SnapReader
from snaplogic.common.SnapReader import SnapFtpReader
from snaplogic.common.SnapReader import SnapHttpReader
from snaplogic.common.SnapReader import SnapFileReader
from snaplogic.common.data_types import Record
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI
from snaplogic.cc import prop
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys
from snaplogic.cc import component_api
from snaplogic.common.prop_err import FIELD_AND_VIEW_NAME_PATTERN
# Public names
__all__ = [ "FixedWidthRead"]
FIELD_NAME = 'Field name'
FIELD_WIDTH = 'Field width'
FIELD_SPEC = 'Field spec'
FIELD_SPECS = 'Field specs'
class FieldNames(list):
"""
List of field names.
Same as list but also implements additional rule-check on field naming and uniqueness.
"""
def append(self, field = None):
if field:
# Make sure field name is valid:
# Replace illegal characters with underscores
field = FIELD_AND_VIEW_NAME_PATTERN.sub('_', field)
# Strip leading and trailing underscores
field = field.strip('_')
field_no = 1
# Loop until we have a unique field name e.g. Field005
while not field or field in self:
# If it's a duplicate, or invalid name generate a name like Field001, Field002 etc
if not field or field in self:
field = "Field" + str(field_no).zfill(3)
field_no += 1
list.append(self, field)
class FixedWidthRead(ComponentAPI):
"""
This class implements the CSV Read component.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 0,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 1,
}
component_description = "This component parses Fixed width files."
component_label = "Fixed Width Reader"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/FixedwidthRead" % \
version_info.doc_uri_version
# We only support numbers and strings as field datatype.
# Because dates can be represented in many different ways as a string,
# supporting dates would require defining masks for conversion,
# and we don't do this yet.
supported_datatypes = [SnapString, SnapNumber]
def validate_config_file(self):
"""
If config file is provided for this component then see if it provides a value for root directory.
The root directory is a way specifying that all local files must be read from the specified
root directory.
"""
root = None
for k in self.config_dictionary:
if k == "root_directory" and self.config_dictionary[k]:
if not os.path.exists(self.config_dictionary[k]):
raise SnapComponentConfigError("The path specified for root (%s) is not valid" %
self.config_dictionary[k])
root = self.config_dictionary[k]
elif k == "schemes":
# Make sure "schemes" contains only the schemes we support
FileUtils.validate_schemes(self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
else:
# No other config file param is supported.
raise SnapComponentConfigError("Unexpected config file entry (%s) encountered" % k)
def create_resource_template(self):
"""
Create FixedWidthRead resource template.
All FixedWidthRead resource definitions need the following properties:
filename : The name of the file or data source, using scheme:// format.
is_input_a_list : Set to True if filename specifies a list of data sources.
skip_lines : If the initial fiew lines of the file are unwantedheader information, then
specify the number of lines to skip.
username : Credentials: username needed to read the file.
password : Credentials: password needed to read the file.
Field specs : Record format definition (field name and width)
"""
self.set_property_def("filename", prop.SimpleProp("File name", SnapString,
"The URI of the file to be read. You can enter either a local file path, or a remote location on a FTP or HTTP server. The following URI schemes are supported: file, http, https, ftp, file.\n"
"\n\nExamples of valid URIs:\n"
"/home/user/input.file\n"
"c:\dir\input.file\n"
"file:///c:/dir/input.file\n"
"ftp://ftp.server.com/dir/input.file\n"
"http://www.server.com/dir/input.file\n"
"https://www.server.com/dir/input.file"
, None, True))
self.set_property_def("is_input_a_list",
prop.SimpleProp("Is input a list", "boolean", "Is input a list", None, True))
self.set_property_value("is_input_a_list", False)
self.set_property_def("skip_lines",
prop.SimpleProp("Skip lines", "number", "Number of initial lines to skip",
{"min_value": 0}, True))
self.set_property_value("skip_lines", 0)
# Credentials are username and password
self.set_property_def("username",
prop.SimpleProp("Credentials: Username", SnapString,
"Username to use if credentials are needed for the accessing data"))
self.set_property_value("username", "")
self.set_property_def("password",
prop.SimpleProp("Credentials: Password", SnapString,
"Password to use if credentials are needed for the accessing data",
{'obfuscate' : 0}))
field_name = prop.SimpleProp(FIELD_NAME, "string", "Field name",
{'lov' : [keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, True)
field_width = prop.SimpleProp(FIELD_WIDTH, "number", "Field width", {"min_value": 1}, True)
field_spec = prop.DictProp(FIELD_SPEC, field_name, "The name and width of the field", 2, 2, True, True)
field_spec[FIELD_WIDTH] = field_width
field_spec[FIELD_NAME] = field_name
field_specs = prop.ListProp("Field specifications", field_spec, 1, required=True)
self.set_property_def(FIELD_SPECS, field_specs)
def _suggest_datatypes(self, boundaries):
"""
Given field boundaries analyze the first few lines of the file to guess the datatypes
and see if the file looks like it may have a header.
@param boundaries List of tuples where each tuple corresponds to a field and
contains two positions: [start, stop]
pointing to the positions in the line where field starts or stops.
@type boundaries: list
@return: a tuple containing two elements:
1) dictionary containing field name to field type mapping
2) header: list of column names if first line of the file looks like a header,
or if skip_lines property is set.
None if it doesn't look like the first line of the file is a header,
nor skip_lines is set.
"""
# We parse the first line's fields into a list, just in case
# they may be column names (header)
# FieldNames is like a list, with additional rule-checking
# specific to Snaplogic fields.
first_line_values = FieldNames()
# first_line_types is used to store field types for the first row
first_line_types = [None] * len(boundaries)
# field_types stores field types for all rows except the first
field_types = [None] * len(boundaries)
# Analyze the first few line of the file
for i, line in enumerate(self._sample()):
offset = 0
# Store first line types separately
# because it may be a header
t = first_line_types if i == 0 else field_types
# Unpack the fields from the line
for k, (start, stop) in enumerate(boundaries):
field = line[start:stop].strip()
if i == 0:
first_line_values.append(field)
# Check to see if field is numeric.
# Don't try to convert empty fields.
if len(field) == 0:
# Next field
continue
if t[k] == SnapString:
continue
try:
# Convert to decimal
Decimal(field)
except Exception, e:
# If conversion failed this is likely a string type
t[k] = SnapString
else:
# If conversion succeeded this is likely a decimal type
t[k] = SnapNumber
skip_lines = self.get_property_value('skip_lines')
# If first line's types are all strings, but the rest of the sample
# has non-string types, assume first line is the header
header_detected = all(t == SnapString for t in first_line_types) and \
not all(t == SnapString for t in field_types)
# Replace None by SnapString in field_types
# (if we couldn't detect type if file read failed let's assume all fields are strings)
field_types = [t if t else SnapString for t in field_types]
# If we have a header:
# either explicitly requested via skip_lines property
# or detected by sniffing the file,
# return column names read from file
header = first_line_values if skip_lines or header_detected else None
# Return a tuple containing type mapping, and header
return (field_types, header)
def _detect_fields(self):
"""
Read a sample from input file and attempt to detect field boundaries
@return: List of tuples where each tuple represents one field.
For each field we store two values: [start, stop]
that represent the indexes in the string at which field begins and ends.
@rtype: list of tuples
"""
# The algorithm is similar to binary AND where space character corresponds
# to one, and any other character corresponds to zero.
# We AND all lines, and what we end up is a mask of spaces
# for the positions where *all* the lines had spaces in that position.
# "Spaces" variable holds the result of the AND operation
spaces = []
for line_number, line in enumerate(self._sample()):
# Strip CR and/or LF characters at the end of the line
line = line.rstrip('\r\n')
# If it's the first line initialize the "spaces" variable
# to hold a string of spaces the length of the line.
if line_number == 0:
spaces.extend([' '] * len(line))
# If lines are of varying lengths use the shorter length
spaces = spaces[:len(line)]
for i in range(0, len(spaces)):
# Perform the AND on two characters at the same position
if line[i] != ' ' or spaces[i] != ' ':
spaces[i] = 'x'
boundaries = []
# Find clusters of x characters
# E.g. "spaces" variable may contain:
# "xxx xxxx xx x"
# We interpret groups of x characters as fixed width fields.
pattern = re.compile(r'(x+)')
for match in pattern.finditer("".join(spaces)):
boundaries.append([match.start(), match.end()])
return (boundaries, len(spaces))
def _sample(self, *args):
""" Generator returning the first few lines of the file for sniffing """
try:
# Read a few lines from the first datasource
for (input, username, password) in self._get_data_sources()[:1]:
reader = SnapReader.create(input, username, password, 'utf-8', self.env)
reader.open()
try:
MAX_LINES = 100
for line_number in range(0, MAX_LINES):
line = reader.readline()
if line:
yield line
else:
break
finally:
reader.close()
except Exception, ignore:
# Ignore exception
pass
def _suggest_field_specs(self, right_justified, boundaries, field_names, length):
"""
Suggest field specs if they weren't provided.
@param right_justified: Fields are right-justified
@param boundaries: Field boundaries
List of tuples where each tuple is [start, stop] indexes of field in file line
@param field_names: Field names
@param length: Line length
This method modifies FIELD_SPECS property
"""
# Given "boundaries" as described above generate a list of "widths": field widths
widths = []
if right_justified:
# All fields are right-justified
offset = 0
for (start, stop) in boundaries:
widths.append(stop - offset)
offset = stop
else:
# All fields are left-justified
offset = length
for (start, stop) in reversed(boundaries):
widths.append(offset - start)
offset = start
widths.reverse()
# Make field specs
field_specs = []
for field_name, width in zip(field_names, widths):
field_specs.append({FIELD_WIDTH : width, FIELD_NAME : field_name})
self.set_property_value(FIELD_SPECS, field_specs)
def _suggest_output_view(self, field_names, field_types):
""" Create output view from field specs """
fields = []
for field_name, field_type in zip(field_names, field_types):
fields.append([field_name, field_type, None])
self.add_record_output_view_def("Output1", fields, None)
def _suggest_resource_values(self, err_obj):
"""
This method is disabled per ticket https://www.snaplogic.org/trac/ticket/1951
Depending on what's provided as properties attempt to suggest the missing properties.
We support the following scenarios:
Inputs Outputs Results
-----------------------------------------------------------------------------------------
1) field_specs output_view If field_specs is set and filename isn't
fwread creates the output view using the field specs.
All datatypes are string.
2) field_specs output_view If both field_specs and filename are specified
filename fwread creates the output view using the field specs,
and sniffs the file to check if there are any numeric fields.
As a result the output view created may have some string
and some numeric fields.
3) filename output_view If only filename is provided fwread sniffs the input file
field_specs and populates the specs using the structure it guessed.
skip_lines The sniffer attempts to discover whether the file
has a header and populates skip_lines property appropriately.
If there is a header the output view is created using
the field names from the file. Otherwise, the output view
is created using field names such as Field001, Field002 etc.
4) filename output_view If skip_lines is provided we use it to generate field_names
skip_lines field_specs in the output view created by the component.
"""
had_field_specs = self.get_property_value(FIELD_SPECS)
field_names = FieldNames()
if not had_field_specs:
# If field specs weren't provided, attempt to discover it by
# reading the file
(boundaries, length) = self._detect_fields()
else:
# Convert field specs to "boundaries".
# "boundaries" is a list of tuples where each field is represented by a tuple:
# [start, stop] indicating where fields begins and ends in the line.
# We need to work with "boundaries" as an intermediary structure,
# instead of field specs because it is easier to account for field-justification
# using this structure.
boundaries = []
offset = 0
for field_spec in self.get_property_value(FIELD_SPECS):
width = field_spec[FIELD_WIDTH]
name = field_spec[FIELD_NAME]
field_names.append(name)
start = offset
stop = start + width
boundaries.append([start, stop])
offset = stop
# Determine line length.
# Because we have field_specs provided, we can assume that line length is the
# ending position of the last field.
if len(boundaries):
length = boundaries[len(boundaries) - 1][1]
else:
length = 0
# Discover the datatypes
(field_types, header) = self._suggest_datatypes(boundaries)
if not had_field_specs:
# If no field specs were provided, use field names we discovered
field_names = header
if not field_names:
# If there are no field names generate them in the form Field001, Field002, etc.
field_names = ["Field" + str(i + 1).zfill(3) for i in range(0, len(boundaries))]
# If it looked like file has a header set skip_lines property
if header:
self.set_property_value('skip_lines', 1)
# Make sure fields are correctly justified.
# Field may be left-justified or right-justified.
# It is most common to left-justify strings and right-justify numbers,
# so for simplification assume that's always the case.
# If a file has a mix of string and numeric fields it is hard to determine
# field boundaries, so if there are any numeric fields assume all fields
# are right-justified
has_numeric_fields = SnapNumber in field_types
right_justify = has_numeric_fields
if not had_field_specs:
# Only generate field specs if they weren't provided as an input
self._suggest_field_specs(right_justify, boundaries, field_names, length)
# Create the output view based on field specs
# Only suggest the output view if one wasn't created already
if len(self.list_output_view_names()) == 0:
self._suggest_output_view(field_names, field_types)
def validate(self, err_obj):
"""Validate anything that the system can't validate for us."""
# Credential can be "" so we can't mark it as required but make sure it is there.
val = self.get_property_value("username")
if val is None:
err_obj.get_property_err("username").set_message("Property must be specified. Must be null string '' or 'username'")
# Validate that the filename complies with the allowed URI schemes,
# unless it's specified via a parameter
FileUtils.validate_filename_property(self.get_property_value("filename"), "filename", err_obj,
self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
# See that the output view corresponds to the field specifiers.
output_views = self.list_output_view_names()
output_view_name = output_views[keys.SINGLE_VIEW]
output_view = self.get_output_view_def(output_view_name)
output_view_fields = [ d[keys.FIELD_NAME] for d in output_view[keys.VIEW_FIELDS] ]
output_view_field_types = [ d[keys.FIELD_TYPE] for d in output_view[keys.VIEW_FIELDS] ]
output_view_count = len(output_view_fields)
fw_field_names = []
# For each field, check the datatype.
# If we don't support this datatype add an error.
for i, (field_name, field_type) in enumerate(zip(output_view_fields, output_view_field_types)):
if field_type not in self.supported_datatypes:
err_obj.get_output_view_err()[output_view_name][keys.VIEW_FIELDS][i].set_message(
"Output field '%s' datatype '%s' is not supported. Must be one of: %s" %
(field_name, field_type, str(self.supported_datatypes)))
field_specs = self.get_property_value(FIELD_SPECS)
for i, spec in enumerate(field_specs):
fw_field_name = spec[FIELD_NAME]
if fw_field_name in fw_field_names:
err_obj.get_property_err(FIELD_SPECS)[i][FIELD_NAME].set_message(
"Duplicate field name '%s'." % fw_field_name)
fw_field_names.append(fw_field_name)
# The number of fields should match
if output_view_count != i + 1:
err_obj.get_property_err(FIELD_SPECS)[i][FIELD_NAME].set_message(
"Number of Field specification fields '%d' does not match number of output view fields '%d'" %
(i+ 1, output_view_count))
def _get_data_sources(self):
""" Return a list of data sources from which fixed width files are to be read """
is_input_a_list = self.get_property_value("is_input_a_list")
filename = self.get_property_value("filename")
username = self.get_property_value("username")
password = self.get_property_value("password")
# Make sure the filename is always qualified
filename = FileUtils.qualify_filename(filename)
filename = FileUtils.get_file_location(filename, self.config_dictionary)
# Validate filename URI scheme
error = FileUtils.validate_file_scheme(filename, self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
if error is not None:
raise SnapComponentError(error)
data_sources = []
if is_input_a_list:
for (filename, username, password) in FileUtils.read_input_list(filename, username, password):
filename = FileUtils.get_file_location(filename, self.config_dictionary)
error = FileUtils.validate_file_scheme(filename, self.config_dictionary.get("schemes"), FileUtils.reader_schemes)
if error is not None:
raise SnapComponentError(error)
data_sources.append([filename, username, password])
else:
data_sources = [ ( filename, username, password ) ]
return data_sources
def execute(self, input_views, output_views):
"""Execute the Fixed Width reading functionality."""
try:
output_view = output_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
self._skip_lines = self.get_property_value("skip_lines")
self._field_specs = self.get_property_value(FIELD_SPECS)
# Dictionary mapping field names to a boolean as follows:
# is_string_field[field_name] = True if it's a string field
# Pre-calculating this is a performance optimization,
# since looking up strings in a string array is expensive (6%)
is_string_field = {}
# Store a flag telling us if there are any non-string output fields at all.
# This is a performance optimization. (1%)
all_string_fields = True
# Go through all fields and determine which ones are strings,
# and check if all of them are strings.
for (field_name, field_type) in zip(output_view.field_names, output_view.field_types):
is_string = field_type == SnapString
all_string_fields &= is_string
is_string_field[field_name] = is_string
for (input, username, password) in self._get_data_sources():
self.log(snap_log.LEVEL_DEBUG, "Input: %s" % input)
self._cur_file = None
self._rec_delimiter = os.linesep
reader = SnapReader.create(input, username, password, 'utf-8', self.env)
reader.open()
try:
# If all output types are strings branch to a faster code path
# since we don't need to check the datatype of each field
# (performance optimization).
if all_string_fields:
while True:
line = reader.readline()
if not line:
break
if self._skip_lines > 0:
self._skip_lines -= 1
continue
out_rec = output_view.create_record()
offset = 0
for field_spec in self._field_specs :
name = field_spec[FIELD_NAME]
width = field_spec[FIELD_WIDTH]
next_offset = offset + width
out_rec[name] = line[offset:next_offset]
offset = next_offset
output_view.write_record(out_rec)
else:
# There is a mix of string and non-string output fields,
# so we need to check type for each field.
# This "else" branch is doing essentially the same
# as the "if" branch above it plus type conversion.
while True:
line = reader.readline()
if not line:
break
if self._skip_lines > 0:
self._skip_lines -= 1
continue
out_rec = output_view.create_record()
offset = 0
for field_spec in self._field_specs :
name = field_spec[FIELD_NAME]
width = field_spec[FIELD_WIDTH]
next_offset = offset + width
field = line[offset:next_offset]
offset = next_offset
if is_string_field[name]:
if type(field) == str:
field = field.decode('utf-8')
out_rec[name] = field
else:
# Convert to number
field = field.strip()
if len(field) == 0:
# Empty field is interpreted as None
out_rec[name] = None
else:
try:
# Convert to decimal
out_rec[name] = Decimal(field)
except Exception, e:
# Conversion failed, throw an appropriate exception
raise SnapComponentError("Failed to cast field %s value '%s' to type 'number' (%s)" %
(name, field, e))
output_view.write_record(out_rec)
finally:
reader.close()
output_view.completed()
def upgrade_1_0_to_1_1(self):
"""
Upgrade resource from version 1.0 to version 1.1.
In version 1.0 credentials were stored as a single user:passwd string separated by colon.
In version 1.1 it's stored as two separate properties, and password is obfuscated.
"""
# Old credentials were stored as user:password, split them into two variables
credentials = self.get_property_value("credential")
username = None
password = None
if credentials is not None:
# Colons are allowed in passwords, so split it at the first colon
cred_list = credentials.split(':', 1)
if len(cred_list) >= 1:
# If there is a username, it's the first element of the list
username = cred_list[0]
if len(cred_list) == 2:
# If there is a password, it's the second element of the list
password = cred_list[1]
# Delete the old credentials property
self.del_property_def("credential")
# Create the new credentials properties
self.set_property_def("username",
prop.SimpleProp("Credentials: Username", SnapString,
"Username to use if credentials are needed for the accessing data"))
self.set_property_def("password",
prop.SimpleProp("Credentials: Password", SnapString,
"Password to use if credentials are needed for the accessing data",
{'obfuscate' : 0}))
# Set the new credentials properties
self.set_property_value("username", username)
self.set_property_value("password", password)
def upgrade_1_1_to_1_2(self):
"""
Add LOV constraint to field property
Also, change the description of the filename property.
"""
# Save property value. We need to recreate the property, which resets its value.
field_specs_value = self.get_property_value(FIELD_SPECS)
# Delete the property
self.del_property_def(FIELD_SPECS)
# Recreate the property
field_name = prop.SimpleProp(FIELD_NAME, "string", "Field name",
{'lov' : [keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, True)
field_width = prop.SimpleProp(FIELD_WIDTH, "number", "Field width", {"min_value": 1}, True)
field_spec = prop.DictProp(FIELD_SPEC, field_name, "The name and width of the field", 2, 2, True, True)
field_spec[FIELD_WIDTH] = field_width
field_spec[FIELD_NAME] = field_name
field_specs = prop.ListProp("Field specifications", field_spec, 1, required=True)
self.set_property_def(FIELD_SPECS, field_specs)
# Restore property value
self.set_property_value(FIELD_SPECS, field_specs_value)
# Recreate the filename property
filename = self.get_property_value("filename")
self.del_property_def("filename")
self.set_property_def("filename", prop.SimpleProp("File name", SnapString,
"The URI of the file to be read. You can enter either a local file path, or a remote location on a FTP or HTTP server. The following URI schemes are supported: file, http, https, ftp, file.\n"
"\n\nExamples of valid URIs:\n"
"/home/user/input.file\n"
"c:\dir\input.file\n"
"file:///c:/dir/input.file\n"
"ftp://ftp.server.com/dir/input.file\n"
"http://www.server.com/dir/input.file\n"
"https://www.server.com/dir/input.file"
, None, True))
self.set_property_value("filename", filename)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|