test_compression.py :  » Web-Frameworks » Nevow » Nevow-0.10.0 » nevow » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Nevow 
Nevow » Nevow 0.10.0 » nevow » test » test_compression.py
"""
Tests for on-the-fly content compression encoding.
"""
from StringIO import StringIO
from gzip import GzipFile

from zope.interface import implements

from twisted.trial.unittest import TestCase
from twisted.internet.defer import succeed

from nevow.inevow import IResource,IRequest
from nevow.testutil import FakeRequest
from nevow.context import RequestContext
from nevow.appserver import errorMarker
from nevow.rend import NotFound
from nevow.compression import CompressingResourceWrapper,CompressingRequestWrapper
from nevow.compression import parseAcceptEncoding,_ProxyDescriptor



class HeaderTests(TestCase):
    """
    Tests for header parsing.
    """
    def test_parseAcceptEncoding(self):
        """
        Test the parsing of a variety of Accept-Encoding field values.
        """
        cases = [
            ('compress, gzip',
             {'compress': 1.0, 'gzip': 1.0, 'identity': 0.0001}),
            ('',
             {'identity': 0.0001}),
            ('*',
             {'*': 1}),
            ('compress;q=0.5, gzip;q=1.0',
             {'compress': 0.5, 'gzip': 1.0, 'identity': 0.0001}),
            ('gzip;q=1.0, identity;q=0.5, *;q=0',
             {'gzip': 1.0, 'identity': 0.5, '*': 0})
            ]

        for value, result in cases:
            self.assertEqual(parseAcceptEncoding(value), result, msg='error parsing %r' % value)



class _Dummy(object):
    """
    Dummy object just to get an instance dict.
    """



class _Wrapper(object):
    """
    Test wrapper.
    """
    x = _ProxyDescriptor('x')
    y = _ProxyDescriptor('y')

    def __init__(self, underlying):
        self.underlying = underlying



class ProxyDescriptorTests(TestCase):
    """
    Tests for L{_ProxyDescriptor}.
    """
    def setUp(self):
        """
        Set up a dummy object and a wrapper for it.
        """
        self.dummy = _Dummy()
        self.dummy.x = object()
        self.dummy.y = object()
        self.wrapper = _Wrapper(self.dummy)


    def test_proxyGet(self):
        """
        Getting a proxied attribute should retrieve the underlying attribute.
        """
        self.assertIdentical(self.wrapper.x, self.dummy.x)
        self.assertIdentical(self.wrapper.y, self.dummy.y)
        self.dummy.x = object()
        self.assertIdentical(self.wrapper.x, self.dummy.x)


    def test_proxyClassGet(self):
        """
        Getting a proxied attribute from the class should just retrieve the
        descriptor.
        """
        self.assertIdentical(_Wrapper.x, _Wrapper.__dict__['x'])


    def test_proxySet(self):
        """
        Setting a proxied attribute should set the underlying attribute.
        """
        self.wrapper.x = object()
        self.assertIdentical(self.dummy.x, self.wrapper.x)
        self.wrapper.y = 5
        self.assertEqual(self.dummy.y, 5)


    def test_proxyDelete(self):
        """
        Deleting a proxied attribute should delete the underlying attribute.
        """
        self.assertTrue(hasattr(self.dummy, 'x'))
        del self.wrapper.x
        self.assertFalse(hasattr(self.dummy, 'x'))



class RequestWrapperTests(TestCase):
    """
    Tests for L{CompressingRequestWrapper}.
    """
    def setUp(self):
        """
        Wrap a request fake to test the wrapper.
        """
        self.request = FakeRequest()
        self.wrapper = CompressingRequestWrapper(self.request)


    def test_attributes(self):
        """
        Attributes on the wrapper should be forwarded to the underlying
        request.
        """
        attributes = ['method', 'uri', 'path', 'args', 'received_headers']
        for attrName in attributes:
            self.assertIdentical(getattr(self.wrapper, attrName),
                                 getattr(self.request, attrName))


    def test_missingAttributes(self):
        """
        Attributes that are not part of the interfaces being proxied should not
        be proxied.
        """
        self.assertRaises(AttributeError, getattr, self.wrapper, 'doesntexist')
        self.request._privateTestAttribute = 42
        self.assertRaises(AttributeError, getattr, self.wrapper, '_privateTestAttribute')


    def test_contentLength(self):
        """
        Content-Length header should be discarded when compression is in use.
        """
        self.assertNotIn('content-length', self.request.headers)
        self.wrapper.setHeader('content-length', 1234)
        self.assertNotIn('content-length', self.request.headers)

        self.request.setHeader('content-length', 1234)
        self.wrapper = CompressingRequestWrapper(self.request)
        self.assertNotIn('content-length', self.request.headers)


    def test_responseHeaders(self):
        """
        Content-Encoding header should be set appropriately.
        """
        self.assertEqual(self.request.headers['content-encoding'], 'gzip')


    def test_lazySetup(self):
        """
        The gzip prelude should only be written once real data is written.

        This is necessary to avoid terminating the header too quickly.
        """
        self.assertEqual(self.request.accumulator, '')
        self.wrapper.write('foo')
        self.assertNotEqual(self.request.accumulator, '')


    def _ungzip(self, data):
        """
        Un-gzip some data.
        """
        s = StringIO(data)
        return GzipFile(fileobj=s, mode='rb').read()


    def test_encoding(self):
        """
        Response content should be written out in compressed format.
        """
        self.wrapper.write('foo')
        self.wrapper.write('bar')
        self.wrapper.finishRequest(True)
        self.assertEqual(self._ungzip(self.request.accumulator), 'foobar')


    def test_finish(self):
        """
        Calling C{finishRequest()} on the wrapper should cause the underlying
        implementation to be called.
        """
        self.wrapper.finishRequest(True)
        self.assertTrue(self.request.finished)



class TestResource(object):
    """
    L{IResource} implementation for testing.

    @ivar lastRequest: The last request we were rendered with.
    @type lastRequest: L{IRequest} or C{None}
    @ivar segments: The segments we were constructed with.
    @type segments: C{list}
    @ivar html: The data to return from C{renderHTTP}.
    """
    implements(IResource)

    lastRequest = None

    def __init__(self, segments=[], html='o hi'):
        self.segments = segments
        self.html = html


    def locateChild(self, ctx, segments):
        """
        Construct a new resource of our type.

        We hand out child resources for any segments, as this is the simplest
        thing to do.
        """
        return type(self)(segments), []


    def renderHTTP(self, ctx):
        """
        Stash the request for later inspection.
        """
        self.lastRequest = IRequest(ctx)
        return self.html



class TestChildlessResource(object):
    """
    L{IResource} implementation with no children.
    """
    implements(IResource)

    def locateChild(self, ctx, segments):
        """
        Always return C{NotFound}.
        """
        return NotFound



class TestDeferredResource(object):
    """
    L{IResource} implementation with children.
    """
    implements(IResource)

    def locateChild(self, ctx, segments):
        """
        Construct a new resource of our type.

        We hand out child resources for any segments, but the resource itself
        is wrapped in a deferred.
        """
        return succeed(type(self)()), []



class TestResourceWrapper(CompressingResourceWrapper):
    """
    Subclass for testing purposes, just to create a new type.
    """


class TestBrokenResource(object):
    """
    L{IResource} implementation that returns garbage from C{locateChild}.
    """
    implements(IResource)

    def locateChild(self, ctx, segments):
        """
        Return some garbage.
        """
        return 42



class ResourceWrapper(TestCase):
    """
    Tests for L{CompressingResourceWrapper}.

    @ivar resource: The underlying resource for testing.
    @type resource: L{TestResource}
    @ivar wrapped: The wrapped resource.
    @type wrapped: L{CompressingResourceWrapper}
    @ivar request: A fake request.
    @type request: L{FakeRequest}
    @ivar ctx: A dummy context.
    @type ctx: L{RequestContext}
    """
    def setUp(self):
        self.resource = TestResource()
        self.wrapped = CompressingResourceWrapper(self.resource)
        self.request = FakeRequest()
        self.ctx = RequestContext(tag=self.request)


    def test_rendering(self):
        """
        Rendering a wrapped resource renders the underlying resource with a
        wrapped request if compression is available.
        """
        self.wrapped.canCompress = lambda req: True
        self.wrapped.renderHTTP(self.ctx)
        self.assertEqual(type(self.resource.lastRequest), CompressingRequestWrapper)
        self.assertIdentical(self.resource.lastRequest.underlying, self.request)


    def test_renderingUnwrapped(self):
        """
        Rendering a wrapped resource renders the underlying resource with an
        unwrapped request if compression is not available.
        """
        self.wrapped.canCompress = lambda req: False
        self.wrapped.renderHTTP(self.ctx)
        self.assertIdentical(self.resource.lastRequest, self.request)


    def test_awfulHack(self):
        """
        Rendering a wrapped resource should finish off the request, and return
        a special sentinel value to prevent the Site machinery from trying to
        finish it again.
        """
        def _cbCheckReturn(result):
            self.rendered = True
            self.assertIdentical(result, errorMarker)
            self.assertTrue(self.request.finished)

        self.rendered = False
        self.wrapped.canCompress = lambda req: True
        self.wrapped.renderHTTP(self.ctx).addCallback(_cbCheckReturn)
        # The callback should run synchronously
        self.assertTrue(self.rendered)


    def test_rendering(self):
        """
        Returning something other than C{str} causes the value to be passed
        through.
        """
        def _cbResult(result):
            self.result = result

        marker = object()
        self.resource.html = marker
        self.wrapped.canCompress = lambda req: True
        self.wrapped.renderHTTP(self.ctx).addCallback(_cbResult)
        self.assertIdentical(marker, self.result)


    def _cbCheckChild(self, result):
        """
        Check that the child resource is wrapped correctly.
        """
        self.checked = True


    def _locateChild(self, resource, segments):
        """
        Helper function for retrieving a child synchronously.
        """
        def _cbGotChild(result):
            self.gotChild = True
            self.result = result

        def _ebChild(f):
            self.gotChild = 'error'
            self.f = f

        self.gotChild = False
        resource.locateChild(None, segments).addCallbacks(_cbGotChild, _ebChild)
        self.assertTrue(self.gotChild)
        if self.gotChild == 'error':
            self.f.raiseException()
        return self.result


    def test_wrapChildren(self):
        """
        Any children of the wrapped resource should also be wrapped.
        """
        self.checked = False
        child, segments = self._locateChild(self.wrapped, ['some', 'child', 'segments'])
        self.assertIdentical(type(child), type(self.wrapped))
        self.assertEqual(child.underlying.segments, ['some', 'child', 'segments'])


    def test_wrapChildrenSubclass(self):
        """
        The request wrapper should wrap children with the same type.
        """
        self.wrapped = TestResourceWrapper(self.resource)
        self.test_wrapChildren()


    def test_childNotFound(self):
        """
        The request wrapper should pass C{NotFound} through.
        """
        wrapped = CompressingResourceWrapper(TestChildlessResource())
        result = self._locateChild(wrapped, ['foo'])
        self.assertEqual(result, NotFound)


    def test_deferredChild(self):
        """
        The wrapper should deal with a resource wrapped in a deferred returned
        from locateChild.
        """
        wrapped = CompressingResourceWrapper(TestDeferredResource())
        child, segments = self._locateChild(wrapped, ['foo'])
        self.assertEqual(type(child.underlying), TestDeferredResource)
        self.assertEqual(segments, [])


    def test_brokenChild(self):
        """
        C{ValueError} should be raised if the underlying C{locateChild} returns
        something bogus.
        """
        wrapped = CompressingResourceWrapper(TestBrokenResource())
        self.assertRaises(ValueError, self._locateChild, wrapped, ['foo'])


    def test_negotiation(self):
        """
        Request wrapping should only occur when the client has indicated they
        can accept compression.
        """
        self.assertFalse(self.wrapped.canCompress(self.request))

        self.request.received_headers['accept-encoding'] = 'foo;q=1.0, bar;q=0.5, baz'
        self.assertFalse(self.wrapped.canCompress(self.request))

        self.request.received_headers['accept-encoding'] = 'gzip'
        self.assertTrue(self.wrapped.canCompress(self.request))

        self.request.received_headers['accept-encoding'] = 'gzip;q=0.5'
        self.assertTrue(self.wrapped.canCompress(self.request))

        self.request.received_headers['accept-encoding'] = 'gzip;q=0'
        self.assertFalse(self.wrapped.canCompress(self.request))
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.