# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008-2009, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
from snaplogic.common.snap_exceptions import SnapEOFError
#$Id: snap_asn1_rp.py 6314 2009-02-11 01:07:59Z grisha $
"""
References:
1. ASN.1: Communication between Heterogeneous Systems, by Olivier
Dubuisson (http://www.oss.com/asn1/dubuisson.html).
2. ASN.1 Complete, by John Larmouth (http://www.oss.com/asn1/larmouth.html)
3. A Layman's Guide to a Subset of ASN.1, BER, and DER, by
Burton S. Kaliski Jr. (http://luca.ntop.org/Teaching/Appunti/asn1.html)
4. ASN.1 tools for Python, by Ilya Etingof (http://pyasn1.sourceforge.net/).
5. pisces.asn1, a module of Pisces, a Python implementation of the SPKI Certificate standard, by
Jeremy Hylton (http://www.cnri.reston.va.us/software/pisces/index.html)
"""
# -*- coding: utf-8 -*-
from decimal import Decimal
from datetime import datetime
import time
from StringIO import StringIO
from snaplogic.common.snap_exceptions import SnapException,SnapObjTypeError,SnapValueError,SnapEOFError
from snaplogic.rp import _RPReader,_RPWriter,SNAP_ASN1_CONTENT_TYPE
CONTENT_TYPE = SNAP_ASN1_CONTENT_TYPE
UNIVERSAL_INTEGER = 0x02
CHR_UNIVERSAL_INTEGER = chr(0x02)
"""Tag for NULL."""
UNIVERSAL_NULL = 0x05
CHR_UNIVERSAL_NULL = chr(UNIVERSAL_NULL)
"""Tag for UTF8String."""
UNIVERSAL_UTF8String = 0x0c
CHR_UNIVERSAL_UTF8String = chr(UNIVERSAL_UTF8String)
"""Tag for real."""
REAL=0x09
CHR_REAL = chr(REAL)
"""
NR2 (Numerical Representation 2. See
Dubuisson, p.401.
"""
NR2_OCTET = 0x02
CHR_NR2_OCTET = chr(NR2_OCTET)
"""
Tag for IA5String. We use it to encode Decimal as REAL (with NR2),
see Dubuisson, p.400.
"""
UNIVERSAL_IA5String = 0x16
UNIVERSAL_OCTET_STRING = 0x04
CHR_UNIVERSAL_OCTET_STRING = chr(UNIVERSAL_OCTET_STRING)
UNIVERSAL_SEQUENCE = 0x10
CLASS_APPLICATION = 0x40
TYPE_CONSTRUCTED = 0x20
"""
Tag for an application-specific (SnapLogic) datetime type.
We encode it in a primitive way -- as a string -- for now.
"""
SNAP_DATETIME = CLASS_APPLICATION | TYPE_CONSTRUCTED | 0x01
CHR_SNAP_DATETIME = chr(SNAP_DATETIME)
"""
Length of string representing datetime -- it is a constant
(we know how many characters there are in a year, month, day,
etc.)
"""
# year + month + day + hour + minutes + seconds + microseconds
# 4 + 2 + 2 + 2 + 2 + 2 + 6
DATETIME_LENGTH = 20
"""
Encoded prefix of our datetime type -- tag and length.
"""
DATETIME_PREFIX = CHR_SNAP_DATETIME + chr(DATETIME_LENGTH)
"""BER-encoded NULL"""
BER_NULL = chr(UNIVERSAL_NULL) + chr(0x00)
"""
We will use a NULL as an end-of-stream marker. It
will not appear otherwise outside a record.
"""
EOS_MARKER = BER_NULL
"""
Tag for a Snap 'raw record'.
"""
RAW_RECORD_TAG = chr(UNIVERSAL_SEQUENCE | TYPE_CONSTRUCTED)
def _encode_length(length):
"""
Encode a length in Asn.1 format.
@param length: a length of an encoded segment
@type length: int
@return: encoded length
@rtype: str
"""
enc_len = length
if length < 0x80:
enc_len = chr(length)
elif length < 0xFF:
enc_len = '\x81%c' % length
elif length < 0xFFFF:
enc_len = '\x82%c%c' % ((length >> 8) & 0xFF, length & 0xFF)
elif length < 0xFFFFFF:
enc_len = '\x83%c%c%c' % ((length >> 16) & 0xFF,(length >> 8) & 0xFF,length & 0xFF)
else:
# just give up for now
raise SnapException('Record too long: %s' % length)
return enc_len
class Writer(_RPWriter):
"""
Writes out Python objects as Asn.1.
"""
def initialize(self, header = ""):
"""
Initializes the stream.
@param header: Not used. Exception is raised if it is passed in and is
not None or empty.
"""
if header:
raise SnapValueError('Header not allowed for Asn.1 stream')
return ''
def write(self, raw_record, options=None):
"""
Write a Snap Record to the underlying stream.
"""
encoded = ''
try:
for field in raw_record:
if field is None:
encoded += BER_NULL
continue
t = type(field)
if t == unicode:
# See http://www.pyzine.com/Issue008/Section_Articles/article_Encodings.html#decoding-and-encoding
field = field.encode('utf-8')
length = len(field)
enc_len = _encode_length(length)
encoded += CHR_UNIVERSAL_UTF8String + enc_len + field
elif t == Decimal or t == int:
# Use IA5String and NR2.
# See Dubuisson, pp.400-401 and 142-143
field = CHR_NR2_OCTET + str(field)
length = len(field)
enc_len = _encode_length(length)
encoded += CHR_REAL + enc_len + field
elif t == datetime:
# This is because we ignore the tzinfo...
datetime_str = '%4s%2s%2s%2s%2s%2s%6s' % (
field.year,
field.month,
field.day,
field.hour,
field.minute,
field.second,
field.microsecond)
encoded += DATETIME_PREFIX + datetime_str
elif t == str:
length = len(field)
enc_len = _encode_length(length)
encoded += CHR_UNIVERSAL_UTF8String + enc_len + field
else:
raise SnapObjTypeError('Cannot encode type %s' % t)
except TypeError, exc:
raise SnapObjTypeError('Cannot write an object of type %s' % type(raw_record), exc)
encoded = RAW_RECORD_TAG + _encode_length(len(encoded)) + encoded
self.stream.write(encoded)
return encoded
def end(self, footer=""):
"""
Writes a marker to the stream indicating that no more records will be
written.
@param footer: Not applicable. Exception is raised if it is
not empty or None
"""
if footer:
raise SnapValueError('Footer not allowed for Asn.1 stream')
self.stream.write(EOS_MARKER)
return EOS_MARKER
class ConsumingStringIO(StringIO):
def read(self, n=-1):
retval = StringIO.read(self, n)
remaining = StringIO.read(self)
self.seek(0)
self.write(remaining)
self.truncate()
self.seek(0)
return retval
def peek(self, n=-1):
retval = StringIO.read(self, n)
self.seek(0)
return retval
class Reader(_RPReader):
"""
Reads an Asn.1 stream, returning Python objects.
"""
@classmethod
def supports_non_blocking_read(cls):
"""
Returns True if this Reader class supports non-blocking read (see L{read_nb})
@return: True if this Reader supports non-blocking read, False otherwise
@rtype: bool
"""
return True
def __init__(self, stream):
"""
Initiaze the reader.
@param stream: stream to read data from
@type stream: stream
"""
super(Reader, self).__init__(stream)
self.blocking = True
self.nb_stream = None
self.eos = False
self._iter = None
def read_nb(self):
"""
See L{_RPReader.read_nb}
"""
self.blocking = False
if self.nb_stream is None:
self.nb_stream = self.stream
self.stream = ConsumingStringIO()
buf = self.nb_stream.read()
if buf is None:
if self.eos:
return None
else:
raise SnapEOFError('Premature end of stream')
if buf == '':
if self.eos:
return None
else:
return []
# print "Read %s bytes [%s] from original stream" % (len(buf), buf)
if self.eos:
raise SnapEOFError('No more values expected')
self.stream.seek(0, 2)
self.stream.write(buf)
self.stream.seek(0)
retval = []
if self._iter is None:
self._iter = self.__iter__()
for obj in self._iter:
if obj is None:
break
## print "<<<%s>>>" % obj
# print obj
# s_val = self.stream.getvalue()
# s_val = ' '.join([hex(ord(c)) for c in s_val])
# print "Stream is %s bytes [%s]" % (len(self.stream.getvalue()), s_val)
# if len(self.stream.getvalue()) == 1:
# print obj
retval.append(obj)
return retval
def next(self):
if self._iter is None:
self._iter = self.__iter__()
return self._iter.next()
def __iter__(self):
"""
Reads the next 'raw record' -- a list of values.
@return: raw record
@rtype: list
@raise StopIteration: When there is nothing more to read
@raise SnapEOFError: When a stream unexpectedly ends (truncated stream)
@raise SnapValueError: When an unexpected value is read in the stream
"""
substrate = ""
while True:
if self.blocking:
tag = self.stream.read(1)
else:
tag = self.stream.peek(1)
if tag == '':
yield None
continue
tag = self.stream.read(1)
# print "Tag: %s" % hex(ord(tag))
if tag == CHR_UNIVERSAL_NULL:
if self.blocking:
suspected_eos = tag + self.stream.read(1)
else:
while True:
nextag = self.stream.peek(1)
if nextag == '':
yield None
continue
nextag = self.stream.read(1)
suspected_eos = tag + nextag
break
if suspected_eos == EOS_MARKER:
self.eos = True
raise StopIteration
else:
if self.blocking:
raise SnapValueError('Unexpected value %s, %s expected' % (suspected_eos, EOS_MARKER))
else:
yield None
if tag != RAW_RECORD_TAG:
if len(tag) == 0:
if self.blocking:
raise SnapEOFError('Premature end of stream')
else:
yield None
raise SnapValueError('Unexpected tag %s' % tag)
if self.blocking:
c = self.stream.read(1)
if not c:
raise SnapEOFError('Unexpected end of stream when trying to read length')
length = ord(c)
if length >= 0x80:
length_octet_cnt = length - 0x80
enc_len = self.stream.read(length_octet_cnt)
if len(enc_len) < length_octet_cnt:
raise SnapEOFError('Unexpected end of stream when trying to read length')
length = 0
for i in range(0, length_octet_cnt):
shift = length_octet_cnt - i - 1
length |= ord(enc_len[i]) << (shift * 8)
else:
while True:
c = self.stream.peek(1)
if c == '':
yield None
continue
c = self.stream.read(1)
length = ord(c)
if length < 0x80:
break
length_octet_cnt = length - 0x80
while True:
enc_len = self.stream.peek(length_octet_cnt)
if len(enc_len) < length_octet_cnt:
yield None
continue
enc_len = self.stream.read(length_octet_cnt)
length = 0
for i in range(0, length_octet_cnt):
shift = length_octet_cnt - i - 1
length |= ord(enc_len[i]) << (shift * 8)
break
break
if self.blocking:
substrate += self.stream.read(length)
if len(substrate) < length:
raise SnapEOFError('Premature end of stream: expected %s bytes, read %s' % (length, len(substrate)))
else:
while True:
peek = self.stream.peek(length)
if len(peek) < length:
yield None
continue
substrate += self.stream.read(length)
break
raw_rec = []
while substrate:
try:
tag = substrate[0]
except IndexError:
if self.blocking:
raise SnapEOFError('Unexpected end of stream when reading tag')
else:
yield None
substrate = substrate[1:]
# Is this a null?
if tag == CHR_UNIVERSAL_NULL:
if not substrate or tag + substrate[0] != BER_NULL:
if not substrate:
raise SnapValueError('Unexpected value %s, %s expected' % (tag, BER_NULL))
else:
raise SnapValueError('Unexpected value %s, %s expected' % (tag + substrate[0], BER_NULL))
raw_rec.append(None)
substrate = substrate[1:]
continue
# This has length...
length = ord(substrate[0])
substrate = substrate[1:]
if length >= 0x80:
length_octet_cnt = length - 0x80
enc_len = substrate[0:length_octet_cnt]
substrate = substrate[length_octet_cnt:]
length = 0
for i in range(0, length_octet_cnt):
shift = length_octet_cnt - i - 1
len_digit = ord(enc_len[i])
length |= len_digit << (shift * 8)
try:
if tag == CHR_UNIVERSAL_UTF8String:
s = substrate[:length]
substrate = substrate[length:]
s = s.decode('utf-8')
raw_rec.append(s)
elif tag == CHR_REAL:
nr = substrate[0]
if nr != CHR_NR2_OCTET:
raise SnapValueError('Unexpected NR %s, expected NR2 (%s)' % (nr, CHR_NR2_OCTET))
rep = substrate[1:length]
raw_rec.append(Decimal(rep))
substrate = substrate[length:]
elif tag == CHR_SNAP_DATETIME:
if length < DATETIME_LENGTH:
raise SnapValueError('Expected length %s' % DATETIME_LENGTH)
dt = substrate[:length]
# int() seems to be quite cheap...
dt = datetime(int(dt[:4]), int(dt[4:6]), int(dt[6:8]), int(dt[8:10]), int(dt[10:12]), int(dt[12:14]), int(dt[14:20]))
raw_rec.append(dt)
substrate = substrate[length:]
else:
raise SnapValueError('Unexpected tag: %s' % tag)
except IndexError:
if self.blocking:
raise SnapEOFError('Unexpected end of stream, expected %s bytes, got %s' % (length, len(substrate)))
else:
yield None
except UnicodeDecodeError, e:
print "@@@@@@@ s = '%s' %d" % (s, len(s))
for (i, c) in enumerate(s):
print "@@@@@@@ %d. %d" % (i, ord(c))
print "@@@@@@@ substrate = '%s' %d" % (substrate, len(substrate))
print "@@@@@@@ tag = '%s' %d" % (tag, len(tag))
print "@@@@@@@ length = ", length
raise e
yield raw_rec
if __name__ == "__main__":
x = ConsumingStringIO()
x.write('abcdef')
x.seek(0)
print x.read(2)
x.seek(0)
print x.read(2)
x.seek(0)
print x.read()
|