# $SnapHashLicense:
#
# SnapLogic - Open source data services
#
# Copyright (C) 2008, SnapLogic, Inc. All rights reserved.
#
# See http://www.snaplogic.org for more information about
# the SnapLogic project.
#
# This program is free software, distributed under the terms of
# the GNU General Public License Version 2. See the LEGAL file
# at the top of the source tree.
#
# "SnapLogic" is a trademark of SnapLogic, Inc.
#
#
# $
#$Id: streaming_json_rp.py 3071 2008-05-22 21:51:31Z jbrendel $
from decimal import Decimal
from datetime import datetime
from UserList import UserList
from StringIO import StringIO
from shlex import shlex
from tokenize import generate_tokens,TokenError
from simplejson.encoder import JSONEncoder
from simplejson.decoder import JSONDecoder
from snaplogic.common import snap_log
import simplejson
from simplejson.decoder import JSONScanner
from snaplogic.rp import _RPReader,_RPWriter
from snaplogic.common.snap_exceptions import *
EOS_MARKER = Decimal('0')
CONTENT_TYPE = 'application/json'
class _Encoder(JSONEncoder):
"""
Encoder for handling Snap types: L{datetime.datetime}, L{decimal.Decimal}, and L{UserList} --
see L{DataTypes._PassthroughList}.
"""
def __init__(self, *args, **kwargs):
kwargs['sort_keys'] = True
super(_Encoder, self).__init__(*args, **kwargs)
# DRY -- similar code is in Asn1RP.
DATETIME_ATTRIBUTES = ['year','month','day','hour','minute','second','microsecond']
"""Attributes of datetime.datetime type that we will serialize."""
def default(self, obj):
"""
Overriding the template method. See default() method of L{JSONEncoder}.
"""
if type(obj) == datetime:
datetime_dict = {}
datetime_dict['__snaptype__'] = 'datetime'
datetime_dict['__values__'] = [obj.__getattribute__(attr) for attr in _Encoder.DATETIME_ATTRIBUTES]
retval = datetime_dict
elif isinstance(obj, UserList):
retval = obj.data
elif type(obj) == Decimal:
obj = obj.__str__()
retval = obj
return retval
class Writer(_RPWriter):
"""
Writes out Python objects as JSON. All records are sent in the single JSON document representing
an array.
"""
def __init__(self, stream, version=None, human_request=False, options=False):
super(Writer, self).__init__(stream)
def initialize(self, header = ""):
"""Prepare the underlying stream for sending the data."""
self.stream.write('[')
return '['
def write(self, raw_record, options=None):
"""
Write a Snap Record to the underlying stream.
"""
s = ''
if not hasattr(self, 'not_first'):
self.not_first = True
else:
s += ', '
s += simplejson.dumps(obj=raw_record, cls=_Encoder)
self.stream.write(s)
return s
def end(self, footer=""):
self.stream.write(']')
return ']'
class Reader(_RPReader):
"""
Reads a JSON stream, returning Python objects.
"""
def __init__(self, stream, version=None):
super(Reader, self).__init__(stream)
self._iter = None
self._idx = 0
self._nb_buf = None
self._closed = False
self._tokenizer = None
self._stack = []
self._first_bracket_read = False
def object_hook(self, dct):
"""
Object hook for simplejson's decoder, to decode complex types
(Decimal, datetime). See L{simplejson.loads}.
"""
if '__snaptype__' in dct:
t = dct['__snaptype__']
if t == 'datetime':
cls = datetime
elif t == 'Decimal':
cls = Decimal
if cls:
args = dct['__values__']
dct = cls(*args)
else:
pass
return dct
@classmethod
def supports_non_blocking_read(cls):
"""
Returns True if this Reader class supports non-blocking read (see L{read_nb})
@return: True if this Reader supports non-blocking read, False otherwise
@rtype: bool
"""
return True
def next(self):
return self.__iter__().next()
def read_nb(self, empty_is_none=False):
"""
See L{_RPReader.read_nb}. It is assumed that the underlying L{stream} returns None when end of
stream has been reached, and '' if nothing is available on the stream.
"""
# Quick and dirty way for now.
if self._nb_buf is None:
self._nb_buf = StringIO()
#else:
# self._nb_buf.truncate(0)
self._nb_buf.seek(0, 2)
s = self.stream.read()
if s is None:
self._closed = True
if empty_is_none and not s:
self._closed = True
if s:
self._nb_buf.write(s)
self._nb_buf.seek(0)
#
self._tokenizer = generate_tokens(self._nb_buf.read)
try:
for tok_tuple in self._tokenizer:
tok = tok_tuple[1]
if tok == ']':
popped = []
while True:
stack_tok = self._stack.pop()
if stack_tok == '[':
popped = [str(entry) for entry in popped]
popped = '[' + ''.join(popped) + ']'
popped = simplejson.loads(popped, object_hook = self.object_hook)
self._stack.append(popped)
break
else:
popped.insert(0, stack_tok)
elif tok == '':
break
elif tok == ',':
if len(self._stack) == 1:
continue
else:
self._stack.append(tok)
else:
self._stack.append(tok)
except TokenError, e:
pass
# if not chars_left_in_stream:
# self._nb_buf.truncate(0)
if len(self._stack) == 1 and self._closed:
return self._stack
else:
retval = []
popped_comma = False
idx = 0
for obj in self._stack[1:]:
idx += 1
if isinstance(obj, list):
retval.append(obj)
self._stack.remove(obj)
popped_comma = False
elif obj == ',':
self._stack.remove(obj)
popped_comma = True
else:
if popped_comma:
# Put it back
self._stack.insert(idx, ',')
break
return retval
def __iter__(self):
while True:
retval = self.read_nb(True)
for record in retval:
yield record
if self._closed:
raise StopIteration
|