# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008 - 2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
# $Id: FixedWidthWrite.py 10330 2009-12-24 22:13:38Z grisha $
"""
FixedWidthWrite Component
"""
import os
import codecs
from snaplogic.components import FileUtils
from snaplogic.common import snap_log
from snaplogic.common.data_types import SnapString,SnapNumber,SnapDateTime
from snaplogic.common.data_types import Record
from snaplogic.common import version_info
from snaplogic.cc.component_api import ComponentAPI
import snaplogic.cc.prop as prop
from snaplogic.common.snap_exceptions import *
from snaplogic.snapi_base import keys
from snaplogic.cc import component_api
FIELD_NAME = 'Field name'
FIELD_WIDTH = 'Field width'
FIELD_SPEC = 'Field spec'
FIELD_SPECS = 'Field specs'
class FixedWidthWrite(ComponentAPI):
"""
This class implements the CSV Write component.
"""
api_version = '1.0'
component_version = '1.3'
capabilities = {
ComponentAPI.CAPABILITY_INPUT_VIEW_LOWER_LIMIT : 1,
ComponentAPI.CAPABILITY_INPUT_VIEW_UPPER_LIMIT : 1,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_LOWER_LIMIT : 0,
ComponentAPI.CAPABILITY_OUTPUT_VIEW_UPPER_LIMIT : 0,
}
component_description = "This component parses Fixed width files."
component_label = "Fixed Width Writer"
component_doc_uri = "https://www.snaplogic.org/trac/wiki/Documentation/%s/ComponentRef/FixedWidthWrite" % \
version_info.doc_uri_version
def validate_config_file(self):
"""
If config file is provided for this component then see if it provides a value for root directory.
The root directory is a way specifying that all local files must be read from the specified
root directory.
"""
root = None
for k in self.config_dictionary:
if k == "root_directory" and self.config_dictionary[k]:
if not os.path.exists(self.config_dictionary[k]):
raise SnapComponentConfigError("The path specified for root (%s) is not valid" %
self.config_dictionary[k])
root = self.config_dictionary[k]
elif k == "schemes":
# Make sure "schemes" contains only the schemes we support
FileUtils.validate_schemes(self.config_dictionary.get("schemes"), FileUtils.writer_schemes)
else:
# No other config file param is supported.
raise SnapComponentConfigError("Unexpected config file entry (%s) encountered" % k)
def create_resource_template(self):
"""
Create FixedWidthWrite resource template.
"""
self.set_property_def("filename",
prop.SimpleProp("File name",
SnapString,
"The name of the file to write",
None,
True))
self.set_property_def("header",
prop.SimpleProp("Header",
"boolean",
"Output the field names as the first row",
None,
True))
self.set_property_value('header', False)
field_name = prop.SimpleProp(FIELD_NAME, SnapString, "Field name",
{'lov' : [keys.CONSTRAINT_LOV_INPUT_FIELD] }, True)
field_width = prop.SimpleProp(FIELD_WIDTH, SnapNumber, "Field width", {"min_value": 1}, True)
field_spec = prop.DictProp(FIELD_SPEC, field_name, "The name and width of the field", 2, 2, True, True)
field_spec[FIELD_WIDTH] = field_width
field_spec[FIELD_NAME] = field_name
field_specs = prop.ListProp("Field specifications", field_spec, 1, required=True)
self.set_property_def(FIELD_SPECS, field_specs)
def validate(self, err_obj):
"""Validate anything that the system can't validate for us."""
# See that the input view corresponds to the field specifiers.
input_views = self.list_input_view_names()
input_view_name = input_views[keys.SINGLE_VIEW]
input_view = self.get_input_view_def(input_view_name)
input_view_fields = [ d[keys.FIELD_NAME] for d in input_view[keys.VIEW_FIELDS] ]
input_view_count = len(input_view_fields)
fw_field_names = []
field_specs = self.get_property_value(FIELD_SPECS)
for i, spec in enumerate(field_specs):
fw_field_name = spec[FIELD_NAME]
if fw_field_name in fw_field_names:
err_obj.get_property_err(FIELD_SPECS)[i][FIELD_NAME].set_message(
"Duplicate field name '%s'." % fw_field_name)
fw_field_names.append(fw_field_name)
# The number of fields should match
if input_view_count != i + 1:
err_obj.get_property_err(FIELD_SPECS)[i][FIELD_NAME].set_message(
"Number of Field specification fields '%d' does not match number of input view fields '%d'" %
(i+ 1, input_view_count))
# Validate that the filename complies with the allowed URI schemes,
# unless it's specified via a parameter
FileUtils.validate_filename_property(self.get_property_value("filename"), "filename", err_obj,
self.config_dictionary.get("schemes"), FileUtils.writer_schemes)
def execute(self, input_views, output_views):
"""Execute the fixed width writing functionality."""
try:
input_view = input_views.values()[keys.SINGLE_VIEW]
except IndexError:
raise SnapComponentError("No output view connected.")
self._filename = self.get_property_value("filename")
self._header = self.get_property_value("header")
self._field_specs = self.get_property_value(FIELD_SPECS)
field_formats = [("%%%s.%ss" % (field_spec[FIELD_WIDTH], field_spec[FIELD_WIDTH])) for field_spec in self._field_specs]
self._field_format_str = ''.join(field_formats)
self._field_format_str += os.linesep
self._filename = FileUtils.get_file_location(self._filename, self.config_dictionary)
# Validate filename URI scheme
error = FileUtils.validate_file_scheme(self._filename, self.config_dictionary.get("schemes"), FileUtils.writer_schemes)
if error is not None:
raise SnapComponentError(error)
if self._filename.startswith('file://'):
if os.name == 'nt':
self._filename = self._filename[len('file:///'):]
else:
self._filename = self._filename[len('file://'):]
self._file = codecs.open(self._filename, "w", 'utf-8')
try:
if self._header:
header = [field_spec[FIELD_NAME] for field_spec in self._field_specs]
self._file.write(self._field_format_str % tuple(header))
while True:
record = input_view.read_record()
if record is None:
break
line = [record[field_name] if record[field_name] is not None else "" for field_name in record.field_names]
self._file.write(self._field_format_str % tuple(line))
finally:
self._file.close()
def upgrade_1_0_to_1_1(self):
"""
Add LOV constraint to field property
"""
# Save property value. We need to recreate the property, which resets its value.
field_specs_value = self.get_property_value(FIELD_SPECS)
# Delete the property
self.del_property_def(FIELD_SPECS)
# Recreate the property
field_name = prop.SimpleProp(FIELD_NAME, SnapString, "Field name",
{'lov' : [keys.CONSTRAINT_LOV_OUTPUT_FIELD] }, True)
field_width = prop.SimpleProp(FIELD_WIDTH, SnapNumber, "Field width", {"min_value": 1}, True)
field_spec = prop.DictProp(FIELD_SPEC, field_name, "The name and width of the field", 2, 2, True, True)
field_spec[FIELD_WIDTH] = field_width
field_spec[FIELD_NAME] = field_name
field_specs = prop.ListProp("Field specifications", field_spec, 1, required=True)
self.set_property_def(FIELD_SPECS, field_specs)
# Restore property value
self.set_property_value(FIELD_SPECS, field_specs_value)
def upgrade_1_1_to_1_2(self):
"""
Add correct LOV constraint to field property.
version 1.1 added incorrect constraint.
"""
# Save property value. We need to recreate the property, which resets its value.
field_specs_value = self.get_property_value(FIELD_SPECS)
# Delete the property
self.del_property_def(FIELD_SPECS)
# Recreate the property
field_name = prop.SimpleProp(FIELD_NAME, SnapString, "Field name",
{'lov' : [keys.CONSTRAINT_LOV_INPUT_FIELD] }, True)
field_width = prop.SimpleProp(FIELD_WIDTH, SnapNumber, "Field width", {"min_value": 1}, True)
field_spec = prop.DictProp(FIELD_SPEC, field_name, "The name and width of the field", 2, 2, True, True)
field_spec[FIELD_WIDTH] = field_width
field_spec[FIELD_NAME] = field_name
field_specs = prop.ListProp("Field specifications", field_spec, 1, required=True)
self.set_property_def(FIELD_SPECS, field_specs)
# Restore property value
self.set_property_value(FIELD_SPECS, field_specs_value)
def upgrade_1_2_to_1_3(self):
"""
No-op upgrade only to change component doc URI during the upgrade
which will be by cc_info before calling this method.
"""
pass
|