# test/test_encoding.py
"""Test that stuff in myghty.requestbuffer
"""
import unittest, os, sys, codecs, StringIO, cStringIO, cPickle, warnings
import testbase
from myghty import importer,util
from myghty.requestbuffer import \
UnicodeRequestBuffer, StrRequestBuffer, MismatchedPop
def _is_ascii_superset(encoding):
ASCII_CHARS = ''.join(map(chr, range(128)))
try:
return ASCII_CHARS.decode(encoding) == ASCII_CHARS.decode('ascii')
except UnicodeDecodeError:
return False
if not _is_ascii_superset(sys.getdefaultencoding()):
warnings.warn("System default encoding is not a subset of ASCII. "
"Many of these tests will fail.",
UserWarning)
REPLACEMENT_CHARACTER = u'\ufffd'
class TestBuffer:
"""A StringIO which supports only a limited set of file operations.
"""
def __init__(self):
self.buf = StringIO.StringIO()
def write(self, text):
self.buf.write(text)
def __str__(self):
return self.buf.getvalue()
def test_filter(text):
return "f(%s)" % text
class _RequestBufferTests(object):
def setUp(self):
self.outputBuffer = TestBuffer()
output = property(lambda self: str(self.outputBuffer))
def testBuffering(self):
buf = self.getRequestBuffer()
buf.write("a")
self.failUnlessEqual(self.output, "a")
buf.push_buffer()
buf.write("b")
self.failUnlessEqual(self.output, "a")
buf.push_buffer()
buf.write("c")
self.failUnlessEqual(self.output, "a")
buf.pop_buffer()
self.failUnlessEqual(self.output, "a")
buf.pop_buffer()
self.failUnlessEqual(self.output, "abc")
def testBufferNoFlush(self):
buf = self.getRequestBuffer()
buf.write("a")
self.failUnlessEqual(self.output, "a")
buf.push_buffer()
buf.write("b")
buf.pop_buffer(discard=True)
self.failUnlessEqual(self.output, "a")
def testFilter(self):
buf = self.getRequestBuffer()
buf.push_filter(test_filter)
buf.write("a")
buf.write("b")
self.failUnlessEqual(self.output, "f(a)f(b)")
buf.push_buffer()
buf.write("c")
buf.write("d")
buf.pop_buffer()
self.failUnlessEqual(self.output, "f(a)f(b)f(cd)")
buf.pop_filter()
buf.write("e")
self.failUnlessEqual(self.output, "f(a)f(b)f(cd)e")
def testCaptureBuffer(self):
buf = self.getRequestBuffer()
buf.push_buffer()
buf.write('a')
self.failUnlessEqual(self.output, '')
cbuf = util.StringIO()
buf.push_capture_buffer(cbuf)
buf.write('b')
self.failUnlessEqual(cbuf.getvalue(), "b")
buf.flush()
self.failUnlessEqual(self.output, '')
buf.push_capture_buffer()
buf.write("c")
cbuf2 = buf.pop_capture_buffer()
self.failUnlessEqual(cbuf2.getvalue(), "c")
self.failUnlessEqual(cbuf, buf.pop_capture_buffer())
self.failUnlessEqual(cbuf.getvalue(), "b")
self.failUnlessEqual(self.output, '')
buf.pop_buffer()
self.failUnlessEqual(self.output, 'a')
def testFlush(self):
buf = self.getRequestBuffer()
buf.push_buffer()
buf.write("howdy")
self.failUnlessEqual(self.output, "")
buf.flush()
self.failUnlessEqual(self.output, "howdy")
def testMismatchedPop(self):
buf = self.getRequestBuffer()
self.failUnlessRaises(MismatchedPop, buf.pop_buffer)
buf.push_buffer()
self.failUnlessRaises(MismatchedPop, buf.pop_filter)
buf.pop_buffer()
def testSaveAndRestoreState(self):
buf = self.getRequestBuffer()
buf.push_filter(test_filter)
state = buf.get_state()
buf.push_buffer()
buf.write('b')
self.failUnlessEqual(self.output, '')
buf.pop_to_state(state)
self.failUnlessEqual(self.output, 'f(b)')
buf.write('a')
self.failUnlessEqual(self.output, 'f(b)f(a)')
class StrRequestBufferTests(_RequestBufferTests, unittest.TestCase):
def getRequestBuffer(self):
return StrRequestBuffer(self.outputBuffer)
def testWriteUnicodeFails(self):
buf = self.getRequestBuffer()
self.failUnlessRaises(UnicodeEncodeError,
buf.write, REPLACEMENT_CHARACTER)
def testBinarySafe(self):
buf = self.getRequestBuffer()
all_chars = ''.join(map(chr, range(256)))
buf.write(all_chars)
self.failUnlessEqual(self.output, all_chars)
def testWriteUnicodeOkay(self):
buf = self.getRequestBuffer()
buf.write(u'abc')
self.failUnlessEqual(self.output, 'abc')
def testFilterUnicodeOutput(self):
def filt(text):
if text == 'a':
return REPLACEMENT_CHARACTER
else:
return unicode(text) * 2
buf = self.getRequestBuffer()
buf.push_filter(filt)
self.failUnlessRaises(UnicodeEncodeError, buf.write, "a")
buf.write('b')
self.failUnlessEqual(self.output, "bb")
def testChangeOutputEncodingFails(self):
buf = self.getRequestBuffer()
self.failUnless(buf.encoding is None)
self.failUnlessRaises(AttributeError, setattr, buf, 'encoding', 'ascii')
def testChangeEncodingErrorsFails(self):
buf = self.getRequestBuffer()
self.failUnless(buf.errors is None)
self.failUnlessRaises(AttributeError, setattr, buf, 'errors', 'strict')
class UnicodeRequestBufferTests(_RequestBufferTests, unittest.TestCase):
def setUp(self):
self.outputBuffer = TestBuffer()
output = property(lambda self: str(self.outputBuffer))
def getRequestBuffer(self, encoding='ascii', errors='strict'):
return UnicodeRequestBuffer(self.outputBuffer, encoding, errors)
def testBufferMixedEncoding(self):
buf = self.getRequestBuffer('latin1')
buf.push_buffer()
buf.write("a")
buf.write(u'b')
buf.write(unicode("\xc2\xa0", 'utf8'))
self.failUnlessEqual(self.output, '')
buf.pop_buffer()
self.failUnlessEqual(self.output, 'ab\xa0')
def testBufferMixedEncodingNoFlush(self):
buf = self.getRequestBuffer('latin1')
buf.push_buffer()
buf.write("a")
buf.write(u'b')
buf.write(unicode("\xc2\xa0", 'utf_8'))
self.failUnlessEqual(self.output, '')
buf.pop_buffer(discard=True)
self.failUnlessEqual(self.output, '')
def testFilterUnicodeOutput(self):
def filt(text):
return u'\u20ac'
buf = self.getRequestBuffer('iso-8859-15')
buf.push_filter(filt)
buf.write("a")
self.failUnlessEqual(self.output, "\xa4")
def testRecoding(self):
buf = self.getRequestBuffer('utf_8')
buf.write(unicode('\xa0', 'latin1'))
self.failUnlessEqual(self.output, '\xc2\xa0')
def testRecodingError(self):
buf = self.getRequestBuffer('utf_8')
buf.push_capture_buffer(buffer=cStringIO.StringIO())
# Can't recode to ascii
self.failUnlessRaises(UnicodeError,
buf.write, unicode('\xa0', 'latin1'))
buf.write('a')
cbuf = buf.pop_capture_buffer()
self.failUnlessEqual(cbuf.getvalue(), 'a')
self.failUnlessEqual(self.output, '')
def testBufferIsEncodingTransparent(self):
buf = self.getRequestBuffer('utf_8')
buf.push_buffer() # This buffer does not encode to ascii
buf.write(unicode("\xa0", 'latin1'))
self.failUnlessEqual(self.output, '')
buf.flush()
self.failUnlessEqual(self.output, '\xc2\xa0')
def testUnicodeWrites(self):
buf = self.getRequestBuffer('iso-8859-15')
EURO_SIGN = u'\u20ac'
buf.write(EURO_SIGN)
self.failUnlessEqual(self.output, '\xa4')
# There's no euro symbol in latin1
buf = self.getRequestBuffer('iso-8859-1')
self.failUnlessRaises(UnicodeEncodeError,
buf.write, EURO_SIGN)
def testChangeOutputEncoding(self):
nbsp = unicode('\xa0', 'latin1')
buf = self.getRequestBuffer('latin1')
self.failUnlessEqual(buf.encoding, 'latin1')
buf.write(nbsp)
self.failUnlessEqual(self.output, '\xa0')
buf.set_encoding('utf-8')
self.failUnlessEqual(buf.encoding, 'utf-8')
buf.write(nbsp)
self.failUnlessEqual(self.output, '\xa0\xc2\xa0')
def testChangeOutputEncodingBufferFlushing(self):
buf = self.getRequestBuffer('latin1')
buf.push_buffer()
buf.write(unicode('\xa0', 'latin1'))
self.failUnlessEqual(self.output, '')
cbuf = util.StringIO()
buf.push_capture_buffer(cbuf)
buf.push_buffer()
buf.write('foo')
buf.set_encoding('utf-8')
self.failUnlessEqual(buf.encoding, 'utf-8')
# Test that nothing is flushed.
self.failUnlessEqual(cbuf.getvalue(), '')
self.failUnlessEqual(self.output, '')
buf.pop_buffer()
buf.pop_capture_buffer()
buf.write(unicode(' \xa0', 'latin1'))
buf.pop_buffer()
self.failUnlessEqual(self.output, '\xc2\xa0 \xc2\xa0')
def testReplaceErrors(self):
buf = self.getRequestBuffer(errors='replace')
buf.push_filter(test_filter)
state = buf.get_state()
buf.push_buffer()
buf.write(REPLACEMENT_CHARACTER)
self.failUnlessEqual(self.output, '')
buf.pop_to_state(state)
self.failUnlessEqual(self.output, 'f(?)')
buf.write(u'a')
self.failUnlessEqual(self.output, 'f(?)f(a)')
def testFlushWithErrors(self):
buf = self.getRequestBuffer()
buf.push_buffer()
buf.write(unicode('\xa4', 'latin1'))
self.failUnlessRaises(UnicodeEncodeError, buf.pop_buffer)
self.failUnlessEqual(self.output, '')
################################################################
class TestIsAsciiSuperset(unittest.TestCase):
def test(self):
self.failUnless(_is_ascii_superset('latin1'))
self.failUnless(_is_ascii_superset('ascii'))
self.failUnless(_is_ascii_superset('utf8'))
self.failIf(_is_ascii_superset('utf7'))
if __name__ == '__main__':
runner = unittest.TextTestRunner(verbosity=2, descriptions=False)
unittest.main(testRunner=runner)
|