test.py :  » Web-Frameworks » Karrigell » Karrigell-3.1 » karrigell » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Karrigell 
Karrigell » Karrigell 3.1 » karrigell » test » test.py
import os
import sys
import thread
import urllib
import urlparse
import httplib
import cPickle
import Cookie
import cStringIO

from Tkinter import *
from ScrolledText import ScrolledText

# clear session directory
session_dir = os.path.join(os.getcwd(),"sessions")
if not os.path.exists(session_dir):
    os.mkdir(session_dir)
for _file in os.listdir(session_dir):
    os.remove(os.path.join(session_dir,_file))

class Console(ScrolledText):

    def write(self,data,*args):
        self.insert(END,data,*args)

root = Tk()
stdout = Console(root)
stderr = Console(root)
stdout.pack(side=LEFT)
stderr.pack(side=LEFT)
stdout.tag_config("error",foreground="red")

thread.start_new_thread(root.mainloop,())

# ugly hacks to make "import Karrigell" work
this_dir = os.path.dirname(__file__)
server_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(server_dir)
os.chdir(server_dir)
sys.argv[0] = os.path.join(server_dir,"Karrigell.py")
sys.argv.append(server_dir)

# aaah !
import Karrigell

class BadResponse(Exception):
    pass

class Not:

    """Used to test that a string is not equal to another""" 
    
    def __init__(self,value):
        self.value = value

    def __cmp__(self,other_string):
        return not cmp(self.value,other_string)

class Contains:

    """Used to test that a string contains another one""" 
    
    def __init__(self,value):
        self.value = value

    def __cmp__(self,other_string):
        # return 0 if other_string contains self.value
        return int(other_string.find(self.value)==-1)

class CookieStore:

    def __init__(self):
        if os.path.exists("cookie_store.dat"):
            os.remove("cookie_store.dat")

    def store(self,cookie_string):
        cookie = Cookie.SimpleCookie(cookie_string)
        stored_cookies = self.load_cookies()
        for key,morsel in cookie.iteritems():
            stored_cookies[key] = morsel
        out = open("cookie_store.dat","wb")
        cPickle.dump(stored_cookies,out)
        out.close()

    def load_cookies(self):
        try:
            return cPickle.load(open("cookie_store.dat"))
        except:
            return {}

    def search(self,url):
        # return the list of valid cookies for the url
        stored_cookies = self.load_cookies()
        valid_cookies = []
        scheme,location, path, parameters, query, frag_id = \
            urlparse.urlparse(url)
        for key,morsel in stored_cookies.iteritems():
            if not morsel["path"] or morsel['path']=="/":
                valid_cookies.append((key,morsel))
                continue
            cookie_path_elts = morsel["path"].split("/")
            if url.split("/")[:len(cookie_path_elts)]==cookie_path_elts:
                valid_cookies.append((key,morsel))
        return valid_cookies
    
class URL:

    def __init__(self,url,status,data={},result=None):
        self.url = url
        self.expected_status = status
        self.data = data
        self.result = result

    def test(self):        
        cx = httplib.HTTPConnection("localhost:%s" %port)
        
        headers = {}
        valid_cookies = []
        for key,morsel in cookies.search(self.url):
            valid_cookies.append("%s=%s" %(key,morsel.value))

        if valid_cookies:
            headers["Cookie"] = "; ".join(valid_cookies)

        if self.data:
            cx.request("POST",self.url,urllib.urlencode(self.data),headers=headers)
        else:
            cx.request("GET",self.url,headers=headers)
        
        res = cx.getresponse()
        
        if "set-cookie" in res.msg:
            cookies.store(res.msg["set-cookie"])

        if not res.status == self.expected_status:
            raise BadResponse, \
             "error for %s : \nexpected status %s, \nreturned status %s\n%s" \
                %(self.url,self.expected_status,res.status,res.read())
        if self.result:
            result = res.read()
            if not self.result == result :
                raise BadResponse, \
                    "error for %s : \nexpected result [%s], \nreturned result [%s]" \
                    %(self.url,self.result,result)
        print ": ok"
    
class Config:

    def __init__(self,port,conf={},host_conf={}):
        self.port = port
        self.host = "localhost"
        self.conf = conf
        # default config
        #Karrigell.k_config.config[self.host] = Karrigell.k_config.config[None]
        # update with specified values
        Karrigell.k_config.config[self.host].update(conf)
        Karrigell.k_config.config[self.host].alias.update({"test":this_dir})
        Karrigell.k_config.config[self.host].data_dir = this_dir
        config = Karrigell.k_config.config[self.host]
        Karrigell.k_config.k_sessions.init_config_session(config)
        #print Karrigell.k_config.config[self.host].global_modules
            
    def test(self,urls):
        print "\nTesting %s %s\n" %(self.host,self.conf)
        server = Karrigell.ThreadingServer('',self.port,Karrigell.handler,False)

        if not Karrigell.k_config.config[self.host].silent:
            sys.stderr.write("Karrigell %s running on port %s\n" \
                %(Karrigell.k_version.__version__,self.port))
            sys.stderr.write("Press Ctrl+C to stop\n")

        thread.start_new_thread(server.run,())
        
        mtimes = dict([(_file,os.stat(os.path.join(session_dir,_file)).st_mtime)
            for _file in os.listdir(session_dir)])
        for url in urls:
            addr = "http://localhost:%s/%s" %(port,url[0])
            print "testing",addr,
            try:
                URL(*url).test()
            except BadResponse,msg:
                print
                sys.stdout.write(msg,"error")
                raw_input()
                sys.exit()
        # check if session files have changed
        new_mtimes = dict([(_file,os.stat(os.path.join(session_dir,_file)).st_mtime)
            for _file in os.listdir(session_dir)])
        if not Karrigell.k_config.config[self.host].persistent_sessions:
            try:
                assert mtimes == new_mtimes
            except:
                sys.stdout.write("persistent_sessions is False but "\
                    "Session files have been modified","error")
                raw_input()
                sys.exit()

port = 8082
sys.stdout = stdout
sys.stderr = stderr

# set default values for host configuration
karrigell_dir = Karrigell.k_config.config[None].karrigell_dir
default_file = os.path.join(karrigell_dir,"core","default_conf.py")

cookies = CookieStore()

urls = [("",200),("/index.py",200),("/undefined.xxx",404),
    ("/test/echo.py",200,{"name":"azertu"},"azertu\n"),
    ("/test/echo.py?name=sdfgh",200,{},"sdfgh\n"),
    ("/test/session1.py?name=pierre",200),
    ("/test/session2.py",200,{},"pierre\n"),
    ("/test/importError",200,{},Contains("ImportError")),
    ("/test/includeError",200,{},Contains("IOError")),
    ("/test/pages.pdl",403),
    ("/test/global.py",200,{},Not("azertu\n"))
    ]
c=Config(port)
c.test(urls)

Config(port,conf={"silent":True}).test(urls)


# test with global modules
conf = Config(port,conf={"global_modules":[os.path.join(this_dir,"aaa.py")],
    "silent":False})
# the last test should return a value
urls.pop()
urls.append(("/test/global.py",200,{},"azertu\n"))

conf.test(urls)

# test that session folder is unchanged when sessions are not persistent
conf = Config(port,conf={"persistent_sessions":False,
    "global_modules":[os.path.join(this_dir,"aaa.py")]})
conf.test(urls)
raw_input()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.