test_athena.py :  » Web-Frameworks » Nevow » Nevow-0.10.0 » nevow » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Nevow 
Nevow » Nevow 0.10.0 » nevow » test » test_athena.py

import os, sets
from itertools import izip
from xml.dom.minidom import parseString

from twisted.trial import unittest
from twisted.python import util
from twisted.internet.defer import Deferred
from twisted.application.service import IServiceMaker
from twisted.application.internet import TCPServer
from twisted.python.reflect import qual
from twisted.python.usage import UsageError
from twisted.plugin import IPlugin

from nevow import athena,rend,tags,flat,loaders,url
from nevow.loaders import stan
from nevow.athena import LiveElement
from nevow.appserver import NevowSite
from nevow.inevow import IRequest
from nevow.context import WovenContext
from nevow.testutil import FakeRequest,renderPage,renderLivePage,CSSModuleTestMixin
from nevow._widget_plugin import WidgetPluginRoot
from nevow._widget_plugin import ElementRenderingLivePage

from twisted.plugins.nevow_widget import widgetServiceMaker


class MappingResourceTests(unittest.TestCase):
    """
    Tests for L{athena.MappingResource}.
    """
    def test_renderMapping(self):
        """
        L{athena.MappingResource} isn't directly renderable.
        """
        m = athena.MappingResource({})
        self.failUnless(isinstance(m.renderHTTP(None), rend.FourOhFour))


    def test_lookupNonExistentKey(self):
        """
        L{athena.MappingResource} should return L{rend.NotFound} when asked
        for a non-existent key.
        """
        m = athena.MappingResource({'name': 'value'})
        self.assertEquals(m.locateChild(None, ('key',)), rend.NotFound)


    def test_lookupKey(self):
        """
        L{athena.MappingResource} should return whatever the
        C{resourceFactory} method products when supplied a valid key.
        """
        m = athena.MappingResource({'name': 'value'})
        m.resourceFactory = sets.Set
        resource, segments = m.locateChild(None, ('name',))
        self.assertEquals(segments, [])
        self.assertEquals(resource, sets.Set('value'))



class ModuleRegistryTestMixin:
    """
    Mixin for testing module registry objects.
    """
    def test_getModuleForName(self):
        """
        C{getModuleForName} should return the right kind of module.
        """
        moduleName = u'test_getModuleForName'
        mapping = {moduleName: self.mktemp()}
        reg = self.registryClass(mapping)
        mod = reg.getModuleForName(moduleName)
        self.assertTrue(isinstance(mod, self.moduleClass))
        self.assertEqual(mod.name, moduleName)
        self.assertIdentical(mod.mapping, mapping)


    def test_getModuleForNameUnknown(self):
        """
        C{getModuleForName} should get angry if we ask for a module which
        doesn't exist.
        """
        moduleName = u'test_getModuleForName'
        reg = self.registryClass({})
        self.assertRaises(
            RuntimeError,
            reg.getModuleForName,
            moduleName)



class CSSRegistryTests(unittest.TestCase, ModuleRegistryTestMixin):
    """
    Tests for L{athena.CSSRegistry}.
    """
    registryClass = athena.CSSRegistry
    moduleClass = athena.CSSModule


    def test_getModuleForNameLoad(self):
        """
        L{athena.CSSRegistry} should initialize its mapping from
        L{athena.allCSSPackages} as needed.
        """
        moduleName = u'test_getModuleForNameLoad'
        origAllCSSPackages = athena.allCSSPackages
        theCSSPackages = {moduleName: self.mktemp()}
        athena.allCSSPackages = lambda: theCSSPackages
        reg = athena.CSSRegistry()
        try:
            mod = reg.getModuleForName(moduleName)
        finally:
            athena.allCSSPackages = origAllCSSPackages
        self.assertEqual(mod.name, moduleName)
        self.assertEqual(mod.mapping, theCSSPackages)



class JSDependenciesTests(unittest.TestCase, ModuleRegistryTestMixin):
    """
    Tests for L{athena.JSDependencies}.
    """
    registryClass = athena.JSDependencies
    moduleClass = athena.JSModule


    def test_getModuleForNameLoad(self):
        """
        L{athena.JSDependencies} should initialize its mapping from
        L{athena.allCSSPackages} as needed.
        """
        moduleName = u'test_getModuleForNameLoad'
        origAllJavascriptPackages = athena.allJavascriptPackages
        theJavascriptPackages = {moduleName: self.mktemp()}
        athena.allJavascriptPackages = lambda: theJavascriptPackages
        reg = athena.JSDependencies()
        try:
            mod = reg.getModuleForName(moduleName)
        finally:
            athena.allJavascriptPackages = origAllJavascriptPackages
        self.assertEqual(mod.name, moduleName)
        self.assertEqual(mod.mapping, theJavascriptPackages)



class AthenaModuleTestMixin:
    """
    Mixin for testing L{athena.AthenaModule} and derived classes.
    """
    testModuleImpl = """\
lalal this is javascript honest
// uh oh!  a comment!  gee I wish javascript had an import system
// import ExampleModule
here is some more javascript code
// import Another
// import Module
the end
"""
    moduleClass = athena.AthenaModule

    def setUp(self):
        """
        Write L{testModuleImpl} to a file.
        """
        self.testModuleFilename = self.mktemp()
        testModule = file(self.testModuleFilename, 'w')
        testModule.write(self.testModuleImpl)
        testModule.close()


    def test_getOrCreate(self):
        """
        L{athena.AthenaModule.getOrCreate} shouldn't make two instances of the
        same module.
        """
        modules = {'testmodule': self.testModuleFilename}
        m1 = self.moduleClass.getOrCreate('testmodule', modules)
        m2 = self.moduleClass.getOrCreate('testmodule', modules)

        self.assertTrue(isinstance(m1, self.moduleClass))
        self.assertEquals(m1.name, 'testmodule')

        self.assertIdentical(m1, m2)


    def _doDependencySetup(self):
        """
        Create a complicated network of module dependencies.
        """
        emptyModulePath = self.mktemp()
        file(emptyModulePath, 'w').close()
        modules = {
            'testmodule': self.testModuleFilename,
            'Another': self.mktemp(),
            'ExampleModule': self.mktemp(),
            'Module': emptyModulePath,
            'SecondaryDependency': emptyModulePath,
            'ExampleDependency': emptyModulePath}

        anotherModule = file(modules['Another'], 'w')
        anotherModule.write('// import SecondaryDependency\n')
        anotherModule.close()

        exampleModule = file(modules['ExampleModule'], 'w')
        exampleModule.write('// import ExampleDependency\n')
        exampleModule.close()

        return modules


    def test_dependencies(self):
        """
        L{athena.AthenaModule.dependencies} should return the direct
        dependencies of the module.
        """
        modules = self._doDependencySetup()
        m = self.moduleClass.getOrCreate('testmodule', modules)
        deps = [d.name for d in m.dependencies()]
        deps.sort()
        self.assertEquals(deps, ['Another', 'ExampleModule', 'Module'])


    def test_allDependencies(self):
        """
        L{athena.AthenaModule.allDependencies} should return all dependencies
        of the module.
        """
        depgraph = {
            'Another': ['SecondaryDependency'],
            'ExampleModule': ['ExampleDependency'],
            'Module': [],
            'testmodule': ['Another', 'ExampleModule', 'Module'],
            'SecondaryDependency': [],
            'ExampleDependency': []}

        modules = self._doDependencySetup()
        m = self.moduleClass.getOrCreate('testmodule', modules)

        allDeps = [d.name for d in m.allDependencies()]
        for depMod in allDeps:
            modDeps = depgraph[depMod]
            for d in modDeps:
                # All dependencies should be loaded before the module
                # that depends upon them.
                self.assertIn(d, allDeps)
                self.assertIn(depMod, allDeps)
                self.failUnless(allDeps.index(d) < allDeps.index(depMod))


    def test_crlfNewlines(self):
        """
        L{athena.AthenaModule}  should correctly ignore the CR after a module
        name when CR LF newlines are used in a JavaScript source file.
        """
        fooModuleFilename = self.mktemp()
        fooModule = file(fooModuleFilename, 'wb')
        fooModule.write('// import Bar\r\n')
        fooModule.close()
        barModuleFilename = self.mktemp()
        barModule = file(barModuleFilename, 'wb')
        barModule.close()

        modules = {
            'Foo': fooModuleFilename,
            'Bar': barModuleFilename}
        module = self.moduleClass('Foo', modules)
        fooDependencies = list(module.dependencies())
        self.assertEqual(len(fooDependencies), 1)
        self.assertEqual(fooDependencies[0].name, u'Bar')


    def test_dependencyCaching(self):
        """
        L{athena.AthenaModule} should cache module dependencies.
        """
        testModuleFilename = self.mktemp()
        testModule = file(testModuleFilename, 'w')
        testModule.write('')
        testModule.close()

        modules = {'testmodule': testModuleFilename}
        m = self.moduleClass('testmodule', modules)
        m.extractCounter = 0
        origExtractImports = m._extractImports
        def _extractImports(x):
            m.extractCounter += 1
            return origExtractImports(x)
        m._extractImports = _extractImports

        deps = list(m.dependencies())
        self.assertEquals(m.extractCounter, 1)

        deps2 = list(m.dependencies())
        self.assertEquals(m.extractCounter, 1)

        newTime = m.lastModified
        os.utime(testModuleFilename, (newTime + 1, newTime + 1))
        deps3 = list(m.dependencies())
        self.assertEquals(m.extractCounter, 2)


    def test_packageDependencies(self):
        """
        L{athena.AthenaModule} should include a module's package in its
        dependencies.
        """
        modules = {u'Foo': self.mktemp(), u'Foo.Bar': self.mktemp()}
        file(modules[u'Foo'], 'wb').close()
        file(modules[u'Foo.Bar'], 'wb').close()
        foo = self.moduleClass.getOrCreate(u'Foo', modules)
        bar = self.moduleClass.getOrCreate(u'Foo.Bar', modules)
        self.assertIn(foo, bar.allDependencies())


    def test_repr(self):
        """
        L{athena.AthenaModule} should C{repr} to something helpful.
        """
        moduleName = u'Foo.Bar'
        module = self.moduleClass(
            moduleName, {moduleName: self.mktemp()})
        self.assertEqual(
            repr(module), 
           '%s(%r)' % (self.moduleClass.__name__, moduleName))



class AthenaModuleTests(AthenaModuleTestMixin, unittest.TestCase):
    """
    Tests for L{athena.AthenaModule}.
    """
    moduleClass = athena.AthenaModule



class JSModuleTests(AthenaModuleTestMixin, unittest.TestCase):
    """
    Tests for L{athena.JSModule}.
    """
    moduleClass = athena.JSModule



class CSSModuleTests(AthenaModuleTestMixin, unittest.TestCase):
    """
    Tests for L{athena.CSSModule}.
    """
    moduleClass = athena.CSSModule



class _CountingAthenaModule(athena.AthenaModule):
    """
    Instrumented version of L{athena.AthenaModule} for testing.
    """
    count = 0

    def dependencies(self):
        self.count += 1
        return super(_CountingAthenaModule, self).dependencies()



class MemoizationTests(unittest.TestCase):
    """
    Tests for dependency memoization.
    """
    def _outputToTempFile(self, s):
        """
        Write the contents of string C{s} to a tempfile and return the
        filename that was used

        @param s: file contents
        @type s: C{str}

        @return: filename
        @rtype: C{str}
        """
        fname = self.mktemp()
        fObj = file(fname, 'w')
        fObj.write(s)
        fObj.close()
        return fname


    def setUp(self):
        # AthenaModule keeps a global mapping of module names to module objects
        # in a class attribute; we overwrite this here (on
        # _CountingAthenaModule) to sure that all modules will be loaded for
        # every test run.
        _CountingAthenaModule._modules = {}

        empty = self._outputToTempFile('')
        quux = self._outputToTempFile('// import Foo')
        top = self._outputToTempFile('// import Quux\n'
                                     '// import Quux2')
        self.modules = {'Top': top,
                        'Quux': quux,
                        'Quux2': quux,
                        'Foo': empty}


    def test_noGlobalMemo(self):
        """
        L{AthenaModule.allDependencies} with no memo argument will retrieve its
        own dependencies (via L{AthenaModule.dependencies}) exactly once per
        invocation.
        """
        foo = _CountingAthenaModule.getOrCreate('Foo', self.modules)
        top = _CountingAthenaModule.getOrCreate('Top', self.modules)

        self.assertEqual(top.count, 0)
        deps = list(top.allDependencies())
        self.assertEqual(top.count, 1)
        deps = list(top.allDependencies())
        self.assertEqual(top.count, 2)


    def test_withGlobalMemo(self):
        """
        Direct dependencies for a particular module should only be retrieved
        once across multiple C{allDependencies()} calls if a memo is reused.
        """
        foo = _CountingAthenaModule.getOrCreate('Foo', self.modules)
        top = _CountingAthenaModule.getOrCreate('Top', self.modules)

        memo = {}
        self.assertEqual(top.count, 0)
        deps = list(top.allDependencies(memo))
        self.assertEqual(top.count, 1)
        deps = list(top.allDependencies(memo))
        self.assertEqual(top.count, 1)



class ModuleInteractionTests(unittest.TestCase):
    """
    Tests for JS/CSS module interactions.
    """
    def test_separateModuleNamespace(self):
        """
        L{athena.CSSModule} and L{athena.JSModule} should use separate module
        namespaces.
        """
        cssModule = athena.CSSModule.getOrCreate(
            u'test_separateModuleNamespace',
            {u'test_separateModuleNamespace': self.mktemp()})
        jsModule = athena.JSModule.getOrCreate(
            u'test_separateModuleNamespace',
            {u'test_separateModuleNamespace': self.mktemp()})
        self.assertNotIdentical(cssModule, jsModule)
        self.assertTrue(isinstance(cssModule, athena.CSSModule))
        self.assertTrue(isinstance(jsModule, athena.JSModule))



class _AutoPackageTestMixin:
    """
    Mixin for testing L{athena.AutoJSPackage} and L{athena.AutoCSSPackage}.
    """
    packageFactory = None
    moduleExtension = None

    def test_package(self):
        """
        L{packageFactory} should correctly construct its mapping from a
        filesystem package layout.
        """
        packageDir = self.mktemp()
        os.makedirs(os.path.join(packageDir, 'Foo', 'Baz'))

        def childPath(*a):
            path = os.path.join(packageDir, *a)
            file(path, 'w').close()
            return path

        expected = {
            u'Foo': childPath('Foo', '__init__.' + self.moduleExtension),
            u'Foo.Bar': childPath('Foo', 'Bar.' + self.moduleExtension),
            u'Foo.Baz': util.sibpath(athena.__file__, 'empty-module.' + self.moduleExtension),
            u'Foo.Baz.Quux': childPath('Foo', 'Baz', 'Quux.' + self.moduleExtension)}

        childPath('Foo', '.foo.' + self.moduleExtension)
        os.mkdir(os.path.join(packageDir, 'Foo', '.test'))
        childPath('Foo', '.test', 'Foo.' + self.moduleExtension)
        childPath('Foo', 'Bar.other')
        childPath('Foo', 'Zot.other')

        package = self.packageFactory(packageDir)
        for module, path in expected.iteritems():
            m = package.mapping.pop(module)
            self.assertEquals(m, path)
        self.assertEquals(package.mapping, {})



class AutoJSPackageTests(unittest.TestCase, _AutoPackageTestMixin):
    """
    Tests for L{athena.AutoJSPackage}.
    """
    packageFactory = athena.AutoJSPackage
    moduleExtension = 'js'



class AutoCSSPackageTests(unittest.TestCase, _AutoPackageTestMixin):
    """
    Tests for L{athena.AutoCSSPackage}.
    """
    packageFactory = athena.AutoCSSPackage
    moduleExtension = 'css'



class UtilitiesTests(unittest.TestCase):
    """
    Tests for misc. Athena utilities.
    """
    def test_preprocessorCollection(self):
        """
        Test that preprocessors from all the base classes of an instance are
        found, and that a preprocessor instance attribute overrides all of
        these.
        """
        a, b, c = object(), object(), object()

        class Base(object):
            preprocessors = [a]

        class OtherBase(object):
            preprocessors = [b]

        class Derived(Base, OtherBase):
            preprocessors = [c]

        inst = Derived()
        self.assertEqual(
            rend._getPreprocessors(inst),
            [a, b, c])

        d = object()
        inst.preprocessors = [d]
        self.assertEqual(
            rend._getPreprocessors(inst),
            [d])


    def test_handlerMacro(self):
        """
        Test that the handler macro rewrites athena:handler nodes to the
        appropriate JavaScript.
        """
        expectedOutput = (
            'return Nevow.Athena.Widget.handleEvent('
            'this, &quot;onclick&quot;, &quot;bar&quot;);')
        tag = tags.span[athena.handler(event='onclick', handler='bar')]
        mutated = athena._rewriteEventHandlerToAttribute(tag)
        output = flat.flatten(mutated)
        self.assertEquals(
            output,
            '<span onclick="' + expectedOutput + '"></span>')


    def test_handlerMacroAgainstList(self):
        """
        Macros need to be runnable on lists of things.  Make sure the handler
        macro is.
        """
        tag = ["hello", " ", "world"]
        self.assertEquals(
            athena._rewriteEventHandlerToAttribute(tag),
            tag)


    def test_athenaIdRewriting(self):
        """
        Test that IDs are correctly rewritten in id, for, and headers
        attributes.
        """
        tag = [tags.label(_for='foo'),
               tags.input(id='foo'),
               tags.th(headers=''),
               tags.th(headers='foo'),
               tags.td(headers='foo bar'),
               tags.td(headers='foo bar baz')]
        element = athena.LiveElement(docFactory=loaders.stan(tag))
        page = athena.LivePage(docFactory=loaders.stan(element))
        element.setFragmentParent(page)

        def _verifyRendering(result):
            self.assertIn('<input id="athenaid:%s-foo"' % (element._athenaID,), result)
            self.assertIn('<label for="athenaid:%s-foo"' % (element._athenaID,), result)
            self.assertIn('<th headers=""', result)
            self.assertIn('<th headers="athenaid:%s-foo"' % (
                element._athenaID,), result)
            self.assertIn('<td headers="athenaid:%s-foo athenaid:%s-bar"' % (
                element._athenaID, element._athenaID), result)
            self.assertIn('<td headers="athenaid:%s-foo athenaid:%s-bar athenaid:%s-baz"' % (
                element._athenaID, element._athenaID, element._athenaID), result)

        return renderLivePage(page).addCallback(_verifyRendering)


    def test_elementPreprocessors(self):
        """
        Make sure that LiveElements have their preprocessors applied to their
        document.
        """
        preprocessed = []

        tag = tags.span
        element = athena.LiveElement(docFactory=loaders.stan(tag))
        page = athena.LivePage(docFactory=loaders.stan(element))
        element.preprocessors = [preprocessed.append]
        element.setFragmentParent(page)
        renderDeferred = renderPage(page)
        def rendered(result):
            page.action_close(None)
            self.assertEquals(preprocessed, [[tag]])
        renderDeferred.addCallback(rendered)
        return renderDeferred


    def test_userAgentDetection(self):
        """
        C{LivePage._supportedBrowser} should return True for User-Agent strings
        which are not known to be supported and False for those which are known
        to be unsupported.
        """
        page = athena.LivePage()
        supported = ["Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
                     "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)",
                     "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.1.3)"
                     " Gecko/20061201 Firefox/2.0.0.3 (Ubuntu-feisty)",
                     "Mozilla/5.0 (Windows; U; Windows NT 5.2; sv-SE;"
                     " rv:1.8.0.8) Gecko/20061025 Firefox 1.5.0.8",
                     "Opera/9.20 (Windows NT 6.0; U; en)"]
        unsupported = ["Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en)"
                       " AppleWebKit/418.9.1 (KHTML, like Gecko) Safari/419.3",
                       "Opera/8.5 (Windows NT 6.0; U; en)",
                       "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US;"
                       " rv:1.7.10) Gecko/20050716 Firefox/1.0.6",
                       "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT 5.0)"]
        for ua in supported:
            req = FakeRequest()
            req.received_headers['user-agent'] = ua
            self.assertTrue(page._supportedBrowser(req))
        for ua in unsupported:
            req = FakeRequest()
            req.received_headers['user-agent'] = ua
            self.assertFalse(page._supportedBrowser(req))


    def test_unsupportedBrowserPage(self):
        """
        Test that unsupported browsers get told they're unsupported.
        """
        ctx = WovenContext()
        page = athena.LivePage()
        req = FakeRequest()
        req.received_headers['user-agent'] = "Mozilla/4.0 (compatible; MSIE 2.0; Windows NT 5.1)"
        ctx.remember(req, IRequest)
        d = renderPage(page, reqFactory=lambda: req)
        d.addCallback(
            self.assertEqual,
            flat.flatten(page.unsupportedBrowserLoader))
        return d



class StandardLibraryTestCase(unittest.TestCase):
    """
    Test all the Nevow JavaScript "standard library" modules.
    """
    def setUp(self):
        self.deps = athena.JSDependencies()


    def _importTest(self, moduleName):
        mod = self.deps.getModuleForName(moduleName)
        inspect = [dep for dep in mod.allDependencies() if dep.name == moduleName]
        self.failUnless(inspect)


    def test_divmodImport(self):
        """
        Test that Divmod can be imported.
        """
        return self._importTest('Divmod')


    def test_baseImport(self):
        """
        Test that Divmod.Base can be imported.
        """
        return self._importTest('Divmod.Base')


    def test_deferImport(self):
        """
        Test that Divmod.Defer can be imported.
        """
        return self._importTest('Divmod.Defer')


    def test_inspectImport(self):
        """
        Test that Divmod.Inspect can be imported.
        """
        return self._importTest('Divmod.Inspect')


    def test_runtimeImport(self):
        """
        Test that Divmod.Runtime can be imported.
        """
        return self._importTest('Divmod.Runtime')


    def test_xmlImport(self):
        """
        Test that Divmod.XML can be imported.
        """
        return self._importTest('Divmod.XML')


    def test_nevowImport(self):
        """
        Test that Nevow can be imported.
        """
        return self._importTest('Nevow')


    def test_athenaImport(self):
        """
        Test that Nevow.Athena can be imported.
        """
        return self._importTest('Nevow.Athena')


    def test_testImport(self):
        """
        Test that Nevow.Athena can be imported.
        """
        return self._importTest('Nevow.Athena.Test')


    def test_tagLibraryImport(self):
        """
        Test that Nevow.TagLibrary can be imported.
        """
        return self._importTest('Nevow.TagLibrary')


    def test_tabbedPaneImport(self):
        """
        Test that Nevow.TagLibrary.TabbedPane can be imported.
        """
        return self._importTest('Nevow.TagLibrary.TabbedPane')



class TestFragment(athena.LiveFragment):
    pass

class Nesting(unittest.TestCase):

    def testFragmentNesting(self):
        lp = athena.LivePage()
        tf1 = TestFragment()
        tf2 = TestFragment()

        tf1.setFragmentParent(lp)
        tf2.setFragmentParent(tf1)

        self.assertEquals(lp.liveFragmentChildren, [tf1])
        self.assertEquals(tf1.liveFragmentChildren, [tf2])
        self.assertEquals(tf2.liveFragmentChildren, [])
        self.assertEquals(tf2.fragmentParent, tf1)
        self.assertEquals(tf1.fragmentParent, lp)

        self.assertEquals(tf2.page, lp)
        self.assertEquals(tf1.page, lp)


    def testInsideOutFragmentNesting(self):
        """
        Test that even if LiveFragments have their parents assigned from the
        inside out, parent/child relationships still end up correct.
        """
        innerFragment = TestFragment()
        outerFragment = TestFragment()
        page = athena.LivePage()

        innerFragment.setFragmentParent(outerFragment)
        outerFragment.setFragmentParent(page)

        self.assertEquals(page.liveFragmentChildren, [outerFragment])
        self.assertEquals(outerFragment.fragmentParent, page)
        self.assertEquals(outerFragment.page, page)

        self.assertEquals(outerFragment.liveFragmentChildren, [innerFragment])
        self.assertEquals(innerFragment.fragmentParent, outerFragment)
        self.assertEquals(innerFragment.page, page)



class Tracebacks(unittest.TestCase):
    frames = (('Error()', '', 0),
              ('someFunction()', 'http://somesite.com:8080/someFile', 42),
              ('anotherFunction([object Object])', 'http://user:pass@somesite.com:8080/someOtherFile', 69))

    stack = '\n'.join(['%s@%s:%d' % frame for frame in frames])

    exc = {u'name': 'SomeError',
           u'message': 'An error occurred.',
           u'stack': stack}

    def testStackParsing(self):
        p = athena.parseStack(self.stack)
        for iframe, oframe in izip(self.frames[::-1], p):
            self.assertEquals(oframe, iframe)

    def testStackLengthAndOrder(self):
        f = athena.getJSFailure(self.exc, {})
        self.assertEqual(len(f.frames), len(self.frames))
        self.assertEqual(f.frames[0][0], self.frames[-1][0])



class _DelayedCall(object):
    def __init__(self, container, element):
        self.container = container
        self.element = element

    def cancel(self):
        self.container.remove(self.element)


def mappend(transport):
    def send((ack, messages)):
        transport.append(messages[:])
    return send

class Transport(unittest.TestCase):
    """
    Test the various states and events which can occur that are related to the
    server's ability to convey a message to the client.

    This includes things such as the receipt of a new request or the depletion
    of an existing request.
    """

    theMessage = "Immediately Send This Message"

    connectTimeout = 1
    transportlessTimeout = 2
    idleTimeout = 3

    clientID = 'FAKE ATHENA PAGE'

    def liveTransportMessageReceived(self, ctx, outgoingMessage):
        self.outgoingMessages.append((ctx, outgoingMessage))

    def setUp(self):
        self.transport = []
        self.scheduled = []
        self.events = []
        self.outgoingMessages = []
        self.rdm = athena.ReliableMessageDelivery(
            self,
            connectTimeout=self.connectTimeout,
            transportlessTimeout=self.transportlessTimeout,
            idleTimeout=self.idleTimeout,
            connectionLost=lambda reason: self.events.append(reason),
            scheduler=self._schedule)


    def _schedule(self, n, f, *a, **kw):
        """
        Deterministic, rigidly controlled stand-in for reactor.callLater().
        """
        t = (n, f, a, kw)
        self.scheduled.append(t)
        return _DelayedCall(self.scheduled, t)


    def testSendMessageImmediately(self):
        """
        Test that if there is an output channel for messages, trying to send a
        message immediately does so, consuming the output channel.
        """
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.addMessage(self.theMessage)
        self.assertEquals(self.transport, [[(0, self.theMessage)]])
        self.rdm.addMessage(self.theMessage)
        self.assertEquals(self.transport, [[(0, self.theMessage)]])


    def testSendMessageQueued(self):
        """
        Test that if there is no output channel when a message is sent, it will
        be sent once an output channel becomes available.
        """
        self.rdm.addMessage(self.theMessage)
        self.rdm.addOutput(mappend(self.transport))
        self.assertEquals(self.transport, [[(0, self.theMessage)]])


    def testMultipleQueuedMessages(self):
        """
        Test that if there are several messages queued they are all sent at
        once when an output channel becomes available.
        """
        self.rdm.addMessage(self.theMessage)
        self.rdm.addMessage(self.theMessage.encode('hex'))
        self.rdm.addOutput(mappend(self.transport))
        self.assertEquals(self.transport, [[(0, self.theMessage), (1, self.theMessage.encode('hex'))]])


    def testMultipleQueuedOutputs(self):
        """
        Test that if there are several output channels available, each message
        only consumes the first of them.
        """
        secondTransport = []
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.addOutput(mappend(secondTransport))
        self.rdm.addMessage(self.theMessage)
        self.assertEquals(self.transport, [[(0, self.theMessage)]])
        self.assertEquals(secondTransport, [])


    def testMessageRedelivery(self):
        """
        Test that outputs added while there are unacknowledged messages result
        in re-transmits of those messages.
        """
        secondMessage = self.theMessage + '-2'
        secondTransport = []
        thirdTransport = []
        fourthTransport = []
        self.rdm.addMessage(self.theMessage)
        self.rdm.addMessage(secondMessage)
        self.rdm.addOutput(mappend(self.transport))
        self.assertEquals(self.transport, [[(0, self.theMessage), (1, secondMessage)]])
        self.rdm.addOutput(mappend(secondTransport))
        self.assertEquals(secondTransport, [[(0, self.theMessage), (1, secondMessage)]])
        self.rdm.basketCaseReceived(None, [0, []])
        self.rdm.addOutput(mappend(thirdTransport))
        self.assertEquals(thirdTransport, [[(1, secondMessage)]])
        self.rdm.basketCaseReceived(None, [1, []])
        self.rdm.addOutput(mappend(fourthTransport))
        self.assertEquals(fourthTransport, [])


    def testConnectTimeout(self):
        """
        Test that a connection timeout is set up which, if allowed to expire,
        will cause notification of the fact that the connection was never
        established.
        """
        n, f, a, kw = self.scheduled.pop()
        self.failIf(self.scheduled, "Too many tasks scheduled.")

        self.assertEquals(n, self.connectTimeout)
        f(*a, **kw)

        self.assertEquals(len(self.events), 1)
        self.events[0].trap(athena.ConnectFailed)

        self.failIf(self.scheduled, "Unexpected task scheduled after connect failed.")


    def testConnectSucceeds(self):
        """
        Test that the connection timeout is cancelled when an output channel is
        added.
        """
        self.failUnless(self.scheduled, "No connect timeout scheduled.") # Sanity check
        self.rdm.addOutput(mappend(self.transport))
        n, f, a, kw = self.scheduled.pop()
        self.assertEquals(n, self.idleTimeout)
        self.failIf(self.scheduled, "Output channel added but there is still a task pending.")
        self.assertEquals(self.transport, [], "Received unexpected output.")


    def testOutputConsumedMessageTimeout(self):
        """
        Test that a timeout is set up when the last output is used and that if
        it expires, notification of the connection being lost is delivered.  In
        particular, test that if there is a message waiting and a new output is
        added, the timeout behavior is correct.
        """
        self.rdm.addMessage(self.theMessage)
        self.rdm.addOutput(mappend(self.transport))

        n, f, a, kw = self.scheduled.pop()
        self.failIf(self.scheduled, "Too many tasks scheduled.")

        self.assertEquals(n, self.transportlessTimeout)
        f(*a, **kw)

        self.assertEquals(len(self.events), 1)
        self.events[0].trap(athena.ConnectionLost)

        self.failIf(self.scheduled, "Unexpected task scheduled after connection lost.")


    def testMessageConsumedOutputTimeout(self):
        """
        Very similar to testOutputConsumedMessageTimeout, but test the case
        where there is an existing output and a message is added, causing it
        to be used.
        """
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.addMessage(self.theMessage)

        n, f, a, kw = self.scheduled.pop()
        self.failIf(self.scheduled, "Too many tasks scheduled.")

        self.assertEquals(n, self.transportlessTimeout)
        f(*a, **kw)

        self.assertEquals(len(self.events), 1)
        self.events[0].trap(athena.ConnectionLost)

        self.failIf(self.scheduled, "Unexpected task scheduled after connection lost.")


    def testOutputConnectionAdded(self):
        """
        Test that the timeout created when the last output is used is cancelled
        when a new output is added.
        """
        self.rdm.addMessage(self.theMessage)
        self.rdm.addOutput(mappend(self.transport))

        self.assertEquals(len(self.scheduled), 1, "Transportless timeout not created.")
        n, f, a, kw = self.scheduled[0]
        self.assertEquals(n, self.transportlessTimeout, "Unexpected task still scheduled after output added.")

        self.rdm.basketCaseReceived(None, [0, []])

        n, f, a, kw = self.scheduled.pop()
        self.assertEquals(n, self.idleTimeout)

        self.failIf(self.scheduled, "Unexpected task still scheduled after output added.")
        self.failIf(self.events, "Unexpectedly received some kind of event.")


    def testIdleOutputTimeout(self):
        """
        Test that outputs are discarded with an empty message list if they are
        not used within the specified interval.
        """
        self.rdm.addOutput(mappend(self.transport))

        n, f, a, kw = self.scheduled.pop()
        self.assertEquals(n, self.idleTimeout)
        self.failIf(self.scheduled, "Unexpected tasks still scheduled in addition to idle timeout task.")

        f(*a, **kw)

        self.assertEquals(self.transport, [[]])


    def testIdleTimeoutStartsOutputlessTimeout(self):
        """
        Test that if the last output is removed due to idleness that another
        timeout for the lack of any outputs is started.
        """
        self.rdm.addOutput(mappend(self.transport))

        n, f, a, kw = self.scheduled.pop()
        self.assertEquals(n, self.idleTimeout)
        f(*a, **kw)

        self.failIf(self.events, "Unexpectedly received some events.")

        n, f, a, kw = self.scheduled.pop()
        self.assertEquals(n, self.transportlessTimeout)
        f(*a, **kw)

        self.assertEquals(len(self.events), 1)
        self.events[0].trap(athena.ConnectionLost)


    def testPreConnectPause(self):
        """
        Test that no outputs are used while the reliable message
        deliverer is paused before the first connection is made.
        """
        self.rdm.pause()
        self.rdm.addOutput(mappend(self.transport))

        # The connection timeout should have been cancelled and
        # replaced with an idle timeout.
        self.assertEquals(len(self.scheduled), 1)
        n, f, a, kw = self.scheduled[0]
        self.assertEquals(n, self.idleTimeout)

        self.rdm.addMessage(self.theMessage)
        self.assertEquals(self.transport, [])

        self.rdm.unpause()
        self.assertEquals(self.transport, [[(0, self.theMessage)]])


    def testTransportlessPause(self):
        """
        Test that if the message deliverer is paused while it has no
        transports, it remains so and does not use an output which is
        added to it.
        """
        self.rdm.addOutput(mappend(self.transport))

        self.rdm.pause()
        self.rdm.addMessage(self.theMessage)
        self.assertEquals(self.transport, [])

        self.rdm.unpause()
        self.assertEquals(self.transport, [[(0, self.theMessage)]])


    def testMessagelessPause(self):
        """
        Test that if the message deliverer is paused while it has no
        messages, it remains so and does not use an output when a
        message is added.
        """
        self.rdm.addOutput(mappend(self.transport))

        self.rdm.pause()
        self.rdm.addMessage(self.theMessage)
        self.assertEquals(self.transport, [])

        self.rdm.unpause()
        self.assertEquals(self.transport, [[(0, self.theMessage)]])


    def testStaleMessages(self):
        """
        Test that if an older basket case with fewer messages in it arrives
        after a more recent, complete basket case is processed, that it is
        properly disregarded.
        """
        self.rdm.basketCaseReceived(
            None,
            [-1, [[0, self.theMessage],
                  [1, self.theMessage + "-1"],
                  [2, self.theMessage + "-2"]]])
        self.assertEquals(
            self.outgoingMessages,
            [(None, self.theMessage),
             (None, self.theMessage + "-1"),
             (None, self.theMessage + "-2")])
        self.outgoingMessages = []

        self.rdm.basketCaseReceived(
            None,
            [-1, [[1, self.theMessage + "-1"]]])
        self.assertEquals(
            self.outgoingMessages,
            [])

        self.rdm.basketCaseReceived(
            None,
            [-1, [[2, self.theMessage + "-2"]]])
        self.assertEquals(
            self.outgoingMessages,
            [])


    def testClosing(self):
        """
        Test that closing a reliable message deliverer causes all of outs
        remaining outputs to be used up with a close message and that any
        future outputs added to it are immediately used in a similar
        manner.
        """
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.close()
        self.assertEquals(self.transport, [[(0, (athena.CLOSE, []))], [(0, (athena.CLOSE, []))]])

        self.transport = []
        self.rdm.addOutput(mappend(self.transport))
        self.assertEquals(self.transport, [[(0, (athena.CLOSE, []))]])


    def testCloseBeforeConnect(self):
        """
        Test that closing the reliable message deliverer before a connection is
        ever established properly cleans up any timeouts.
        """
        self.rdm.close()
        self.failIf(self.scheduled, "Expected no scheduled calls.")


    def test_closeExcessOnReceived(self):
        """
        Test that any excess idle transports are closed when a message is received.
        """
        secondTransport = []
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.addOutput(mappend(secondTransport))
        d = self.rdm.basketCaseReceived(None, [0, []])
        self.assertEquals(self.transport, [[]])
        self.assertEquals(secondTransport, [[]])
        self.failIf(d.called)


    def test_closeExcessOnUnpaused(self):
        """
        Test that any excess idle transports are closed when the message
        deliverer is unpaused.
        """
        secondTransport = []
        self.rdm.pause()
        self.rdm.pause()
        self.rdm.addOutput(mappend(self.transport))
        self.rdm.addOutput(mappend(secondTransport))
        self.rdm.unpause()
        self.assertEqual(self.transport, [])
        self.assertEqual(secondTransport, [])
        self.rdm.unpause()
        self.assertEqual(self.transport, [[]])
        self.assertEqual(secondTransport, [])


    def test_specialUnloadSequence(self):
        """
        When the page is unloading in the brosser, it needs to pre-empt its own
        message queue and send a special identifier to express that this is a
        "last gasp" message and must be dealt with immediately.
        """
        self.rdm.basketCaseReceived(None, [0, [[athena.UNLOAD, [athena.CLOSE, []]]]])
        self.assertEqual(self.outgoingMessages, [(None, [athena.CLOSE, []])])



class LiveMixinTestsMixin(CSSModuleTestMixin):
    """
    Test-method defining mixin class for L{LiveElement} and L{LiveFragment}
    testing.

    @ivar elementFactory: No-argument callable which returns an object against
    which tests will be run.

    @ivar liveGlueRenderer: The name of the live glue renderer on objects
    returned from L{elementFactory}.
    """
    liveGlueRenderer = None

    def elementFactory(self):
        raise NotImplementedError("%s did not implement elementFactory" % (self,))


    def test_localDetach(self):
        """
        Verify that L{_athenaDetachServer} removes the element from its parent
        and disassociates it from the page locally.
        """
        page = athena.LivePage()
        element = self.elementFactory()
        element.setFragmentParent(page)
        element._athenaDetachServer()
        self.assertNotIn(element, page.liveFragmentChildren)
        self.assertIdentical(element.fragmentParent, None)
        self.assertIdentical(element.page, None)


    def test_localDetachWithChildren(self):
        """
        Similar to L{test_localDetach}, but cover the case where the removed
        element has a child of its own and verify that that child is also
        detached.
        """
        page = athena.LivePage()
        element = self.elementFactory()
        element.setFragmentParent(page)
        child = self.elementFactory()
        child.setFragmentParent(element)
        element._athenaDetachServer()
        self.assertNotIn(element, page.liveFragmentChildren)
        self.assertIdentical(element.fragmentParent, None)
        self.assertIdentical(element.page, None)
        self.assertNotIn(child, element.liveFragmentChildren)
        self.assertIdentical(child.fragmentParent, None)
        self.assertIdentical(child.page, None)


    def test_localDetachOrphaned(self):
        """
        L{_athenaDetachServer} should raise L{athena.OrphanedFragment} if the
        element is not attached.
        """
        element = self.elementFactory()
        self.assertRaises(athena.OrphanedFragment, element._athenaDetachServer)
        page = athena.LivePage()
        element.setFragmentParent(page)
        element._athenaDetachServer()
        self.assertRaises(athena.OrphanedFragment, element._athenaDetachServer)


    def test_detach(self):
        """
        Verify that L{detach} informs the client of the event and returns a
        Deferred which fires when the client acknowledges this.
        """
        page = athena.LivePage()
        element = self.elementFactory()
        element.setFragmentParent(page)

        calls = []
        def callRemote(methodName):
            d = Deferred()
            calls.append((methodName, d))
            return d
        element.callRemote = callRemote

        d = element.detach()
        self.assertEqual(len(calls), 1)
        self.assertEqual(calls[0][0], '_athenaDetachClient')
        calls[0][1].callback(None)
        self.assertNotIn(element, page.liveFragmentChildren)
        self.assertIdentical(element.fragmentParent, None)
        self.assertIdentical(element.page, None)


    def test_detachWithChildren(self):
        """
        Similar to L{test_detach}, but cover the case where the removed element
        has a child of its own and verify that that child is also detached.
        """
        page = athena.LivePage()
        element = self.elementFactory()
        element.setFragmentParent(page)
        child = self.elementFactory()
        child.setFragmentParent(element)

        calls = []
        def callRemote(methodName):
            d = Deferred()
            calls.append((methodName, d))
            return d
        element.callRemote = callRemote
        child.callRemote = callRemote

        d = element.detach()
        self.assertEqual(len(calls), 1)
        self.assertEqual(calls[0][0], '_athenaDetachClient')
        calls[0][1].callback(None)
        self.assertNotIn(element, page.liveFragmentChildren)
        self.assertIdentical(element.fragmentParent, None)
        self.assertIdentical(element.page, None)
        self.assertNotIn(child, element.liveFragmentChildren)
        self.assertIdentical(child.fragmentParent, None)
        self.assertIdentical(child.page, None)


    def test_localDetachCallback(self):
        """
        Verify that C{detached} is called when C{_athenaDetachServer} is
        called.
        """
        page = athena.LivePage()
        element = self.elementFactory()
        element.setFragmentParent(page)

        detachCall = []
        def detached():
            detachCall.append((element.fragmentParent, element.page))
        element.detached = detached

        element._athenaDetachServer()
        self.assertEqual(detachCall, [(None, None)])


    def test_detachCallback(self):
        """
        Verify that C{detached} is called C{detach} is called locally.
        """
        page = athena.LivePage()
        element = self.elementFactory()
        element.setFragmentParent(page)

        detachCall = []
        def detached():
            detachCall.append((element.fragmentParent, element.page))
        element.detached = detached

        calls = []
        def callRemote(methodName):
            d = Deferred()
            calls.append(d)
            return d
        element.callRemote = callRemote

        d = element.detach()

        self.assertEqual(detachCall, [])
        calls[0].callback(None)
        self.assertEqual(detachCall, [(None, None)])


    def test_glueIncludesStylesheets(self):
        """
        Our element's glue should include inline stylesheet references.
        """
        element = self.elementFactory()
        element.cssModule = u'TestCSSModuleDependencies.Dependor'
        element.docFactory = loaders.stan(
            tags.div(render=tags.directive(self.liveGlueRenderer)))

        page = ElementRenderingLivePage(element)
        page.cssModules = self._makeCSSRegistry()

        D = renderLivePage(page)
        def cbRendered(result):
            expected = flat.flatten(
                page.getStylesheetStan(
                    [page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependee'),
                     page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependor')]))
            self.assertIn(expected, result)
        D.addCallback(cbRendered)
        return D


    def test_glueIncludesStylesheetsOnce(self):
        """
        Our element's glue shouldn't include redundant stylesheet references.
        """
        element = self.elementFactory()
        element.cssModule = u'TestCSSModuleDependencies.Dependor'
        element.docFactory = loaders.stan(
            tags.div(render=tags.directive(self.liveGlueRenderer)))

        page = ElementRenderingLivePage(element)
        page.docFactory = loaders.stan(tags.invisible[
            tags.invisible(render=tags.directive('liveglue')),
            tags.invisible(render=tags.directive('element')),
            tags.invisible(render=tags.directive('element'))])
        page.cssModules = self._makeCSSRegistry()

        D = renderLivePage(page)
        def cbRendered(result):
            expected = flat.flatten(
                page.getStylesheetStan(
                    [page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependee'),
                     page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependor')]))
            self.assertIn(expected, result)
        D.addCallback(cbRendered)
        return D



class LiveElementTests(LiveMixinTestsMixin, unittest.TestCase):
    """
    Tests for L{nevow.athena.LiveElement}.
    """
    elementFactory = athena.LiveElement
    liveGlueRenderer = 'liveElement'



class LiveFragmentTests(LiveMixinTestsMixin, unittest.TestCase):
    """
    Tests for L{nevow.athena.LiveFragment}.
    """
    elementFactory = athena.LiveFragment
    liveGlueRenderer = 'liveFragment'



class DummyLiveElement(LiveElement):
    """
    A "counting" Athena element used for tests involving the plugin system
    (e.g., supplied as the argument to the "--element" option).
    """
    classCounter = 0

    def __init__(self):
        """
        Create a L{DummyLiveElement} with a 'counter' attribute set to a
        unique, incremented ID, used for comparing instances.
        """
        LiveElement.__init__(self)
        DummyLiveElement.classCounter += 1
        self.counter = DummyLiveElement.classCounter



class LivePageTests(unittest.TestCase, CSSModuleTestMixin):
    """
    Tests for L{nevow.athena.LivePage}
    """

    def setUp(self):
        """
        Create and remember a L{LivePage} instance.
        """
        self.page = athena.LivePage()


    def tearDown(self):
        """
        Shut this test's L{LivePage} timers down, if the test started them up.
        """
        if hasattr(self.page, '_messageDeliverer'):
            self.page._messageDeliverer.close()


    def test_bootstrapCall(self):
        """
        L{LivePage.bootstrapCall} should generate a JSON-serialized string for
        calling a single JavaScript function.
        """
        bc = self.page._bootstrapCall(
            "SomeModule.someMethod", [u"one", 2, {u"three": 4.1}])
        self.assertEqual(
            bc, 'SomeModule.someMethod("one", 2, {"three":4.1});')


    def test_pageJsClassDependencies(self):
        """
        L{LivePage.render_liveglue} should include modules that the
        L{LivePage}'s jsClass depends on.
        """
        self.page.jsClass = u'PythonTestSupport.Dependor.PageTest'
        freq = FakeRequest()
        self.page._becomeLive(url.URL.fromRequest(freq))
        ctx = WovenContext(tag=tags.div())
        ctx.remember(freq, IRequest)
        self.assertEqual(self.page.render_liveglue(ctx, None), ctx.tag)
        expectDependor = flat.flatten(self.page.getImportStan(u'PythonTestSupport.Dependor'))
        expectDependee = flat.flatten(self.page.getImportStan(u'PythonTestSupport.Dependee'))
        result = flat.flatten(ctx.tag, ctx)
        self.assertIn(expectDependor, result)
        self.assertIn(expectDependee, result)


    def test_pageCSSModuleDependencies(self):
        """
        L{athena.LivePage.render_liveglue} should include CSS modules that
        the top-level C{cssModule} depends on.
        """
        self.page.cssModule = u'TestCSSModuleDependencies.Dependor'
        self.page.cssModules = self._makeCSSRegistry()

        self.page._becomeLive(url.URL())
        ctx = WovenContext(tag=tags.div())
        ctx.remember(FakeRequest(), IRequest)
        self.assertEqual(self.page.render_liveglue(ctx, None), ctx.tag)
        expected = flat.flatten(
            self.page.getStylesheetStan(
                [self.page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependee'),
                 self.page.getCSSModuleURL(u'TestCSSModuleDependencies.Dependor')]))
        self.assertIn(expected, flat.flatten(ctx.tag, ctx))


    def test_bootstraps(self):
        """
        L{LivePage._bootstraps} should return a list of 2-tuples of
        (initialization method, arguments) of methods to call in JavaScript.

        Specifically, it should invoke Divmod.bootstrap with the page's own
        URL, and Nevow.Athena.bootstrap with the name of the client-side Page
        class to instantiate and the URL to instantiate it with.
        """
        SEG = "'" + '"'
        URI = "http://localhost/" + SEG
        req = FakeRequest(uri='/' + SEG, currentSegments=[SEG])
        ctx = WovenContext()
        ctx.remember(req, IRequest)
        self.page.clientID = 'asdf'
        self.assertEqual(
            self.page._bootstraps(ctx),
            [("Divmod.bootstrap",
              # Nevow's URL quoting rules are weird, but this is the URL
              # flattener's fault, not mine.  Adjust to taste if that changes
              # (it won't) -glyph
              [u"http://localhost/'%22"]),
             ("Nevow.Athena.bootstrap",
              [u'Nevow.Athena.PageWidget', u'asdf'])])


    def test_renderReconnect(self):
        """
        L{LivePage.renderHTTP} should render a JSON-encoded version of its
        clientID rather than a rendered version of its template when provided
        with a special __athena_reconnect__ parameter.
        """
        req = FakeRequest(args={athena.ATHENA_RECONNECT: ["1"]})
        ctx = WovenContext()
        ctx.remember(req, IRequest)
        string = self.page.renderHTTP(ctx)
        jsonifiedID = '"%s"' % (self.page.clientID,)
        self.assertEqual(string, jsonifiedID)


    def test_cssModules(self):
        """
        L{athena.LivePage.cssModules} should default to
        L{athena._theCSSRegistry}.
        """
        self.assertIdentical(
            athena.LivePage().cssModules, athena._theCSSRegistry)


    def test_cssmoduleChild(self):
        """
        L{athena.LivePage}'s C{cssmodule} child should return a correctly
        initialized L{athena.MappingResource}.
        """
        theCSSMapping = {}
        class MyCSSModules:
            mapping = theCSSMapping
        page = athena.LivePage()
        page.cssModules = MyCSSModules()
        (res, segments) = page.locateChild(None, ('cssmodule',))
        self.assertTrue(isinstance(res, athena.MappingResource))
        self.assertIdentical(res.mapping, theCSSMapping)


    def test_cssModuleRoot(self):
        """
        L{athena.LivePage}'s C{cssModuleRoot} argument should be observed by
        L{athena.LivePage.getCSSModuleURL}.
        """
        theCSSModuleRoot = url.URL.fromString('/test_cssModuleRoot')
        page = athena.LivePage(
            cssModuleRoot=theCSSModuleRoot)
        self.assertEqual(
            page.getCSSModuleURL(u'X.Y'),
            theCSSModuleRoot.child('X.Y'))



class WidgetSubcommandTests(unittest.TestCase):
    """
    Test the twistd subcommand which runs a server to render a single Athena
    widget.
    """
    def test_portOption(self):
        """
        Verify that the --port option adds an integer to the Options' port key.
        """
        options = widgetServiceMaker.options()
        options['element'] = DummyLiveElement()
        options.parseOptions(['--port', '3874'])
        self.assertEqual(options['port'], 3874)
        options.parseOptions(['--port', '65535'])
        self.assertEqual(options['port'], 65535)


    def test_invalidPortOption(self):
        """
        Verify that non-integer and out-of-range port numbers are rejected.
        """
        options = widgetServiceMaker.options()
        options['element'] = DummyLiveElement()
        self.assertRaises(UsageError, options.parseOptions, ['--port', 'hello world'])
        self.assertRaises(UsageError, options.parseOptions, ['--port', '-7'])
        self.assertRaises(UsageError, options.parseOptions, ['--port', '70000'])
        self.assertRaises(UsageError, options.parseOptions, ['--port', '65536'])


    def test_widgetOption(self):
        """
        Verify that the --element option adds a class to the Options' element
        key.
        """
        options = widgetServiceMaker.options()
        options.parseOptions(['--element', qual(DummyLiveElement)])
        self.assertEquals(options['element'], DummyLiveElement)


    def test_invalidWidgetOption(self):
        """
        Verify that specifying a non-existent class is rejected.
        """
        options = widgetServiceMaker.options()
        self.assertRaises(
            UsageError,
            options.parseOptions, ['--element', qual(DummyLiveElement) + 'xxx'])
        self.assertRaises(
            UsageError,
            options.parseOptions, ['--element', '-'])



    def test_invalidMissingWidget(self):
        """
        Verify that a missing widget class is rejected.
        """
        options = widgetServiceMaker.options()
        self.assertRaises(UsageError, options.parseOptions, [])


    def test_defaultPort(self):
        """
        Verify that the default port number is 8080.
        """
        options = widgetServiceMaker.options()
        options['element'] = DummyLiveElement
        options.parseOptions([])
        self.assertEqual(options['port'], 8080)


    def test_providesInterfaces(self):
        """
        Verify that the necessary interfaces for the object to be found as a
        twistd subcommand plugin are provided.
        """
        self.failUnless(IPlugin.providedBy(widgetServiceMaker))
        self.failUnless(IServiceMaker.providedBy(widgetServiceMaker))


    def test_makeService(self):
        """
        Verify that the L{IService} creation function returns a service which
        will run a Nevow site.
        """
        service = widgetServiceMaker.makeService({
                'element': DummyLiveElement,
                'port': 8080,
                })
        self.failUnless(isinstance(service, TCPServer))
        self.assertEqual(service.args[0], 8080)
        self.failUnless(isinstance(service.args[1], NevowSite))
        self.failUnless(isinstance(service.args[1].resource, WidgetPluginRoot))
        self.failUnless(isinstance(service.args[1].resource.elementFactory(),
                                   DummyLiveElement))


    def test_livePageRendering(self):
        """
        Verify that an L{ElementRenderingLivePage} instantiated with a
        particular LiveElement properly renders that element.
        """
        element = DummyLiveElement()
        element.jsClass = u'Dummy.ClassName'
        element.docFactory = stan('the element')
        page = ElementRenderingLivePage(element)
        renderDeferred = renderLivePage(page)
        def cbRendered(result):
            document = parseString(result)
            titles = document.getElementsByTagName('title')
            self.assertEqual(len(titles), 1)
            self.assertEqual(titles[0].firstChild.nodeValue, DummyLiveElement.__name__)
            divs = document.getElementsByTagName('div')
            self.assertEqual(len(divs), 1)
            self.assertEqual(divs[0].firstChild.nodeValue, 'the element')
        renderDeferred.addCallback(cbRendered)
        return renderDeferred


    def test_multipleRendersMultipleWidgets(self):
        """
        Each hit to the top-level page created by makeService should result in a
        new element being created by the specified element factory, so that it
        can be rendered multiple times.
        """
        w = WidgetPluginRoot(DummyLiveElement)
        page1, seg = w.locateChild(None, [''])
        page2, seg = w.locateChild(None, [''])

        # Make sure the pages aren't the same.
        self.failUnless(isinstance(page1, ElementRenderingLivePage))
        self.failUnless(isinstance(page2, ElementRenderingLivePage))
        self.assertNotIdentical(page1, page2)

        # Make sure the elements aren't the same.
        self.assertNotEqual(page1.element.counter, page2.element.counter)


    def test_transportHookup(self):
        """
        When a LivePage is rendered, it needs to hook up to its transport,
        which is a special resource (associated with the particular LivePage
        object in memory).  This hookup is done by some special logic in
        LivePage.locateChild, among other places.  Let's make sure that we can
        look up the live page by its client ID with the default transport root.

        Athena's default transport root is whatever URL the page is rendered
        at.  In the case of this plugin, that will usually be
        http://localhost:8080/
        """
        w = WidgetPluginRoot(DummyLiveElement)
        page1, seg = w.locateChild(None, [''])
        page1.element.docFactory = stan('the element')
        page1.element.jsClass = u'Dummy.ClassName'
        def cbCheckPageByClientID(result):
            req = FakeRequest()
            ctx = WovenContext()
            ctx.remember(req, IRequest)
            page1prime, seg = w.locateChild(ctx, [page1.clientID])
            self.assertIdentical(page1prime, page1)
        renderDeferred = renderLivePage(page1)
        renderDeferred.addCallback(cbCheckPageByClientID)
        return renderDeferred
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.