pgresult.py :  » Database » PyPgSQL » pyPgSQL-2.5.1 » test » regression » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » PyPgSQL 
PyPgSQL » pyPgSQL 2.5.1 » test » regression » pgresult.py
#!/usr/local/bin/python
#ident "@(#) $Id: pgresult.py,v 1.10 2002/12/01 04:59:25 ballie01 Exp $"
# vi:set sw=4 ts=8 showmode ai: 
#-----------------------------------------------------------------------+
# Name:    pgresult.py            |
#                  |
# Description:  pgresult.py contains (white box) regression test cases  |
#    for the PgResult object.        |
#                  |
# Note:    These test cases requires that a test database named  |
#    pypgsql exists and that the person running the test has  |
#    full rights to the database.        |
#=======================================================================|
# Copyright 2001 by Billy G. Allie.          |
# All rights reserved.              |
#                  |
# Permission to use, copy, modify, and distribute this software and its  |
# documentation for any purpose and without fee is hereby granted, pro-  |
# vided that the above copyright notice appear in all copies and that  |
# both that copyright notice and this permission notice appear in sup-  |
# porting documentation, and that the copyright owner's name not be  |
# used in advertising or publicity pertaining to distribution of the  |
# software without specific, written prior permission.      |
#                  |
# THE AUTHOR(S) DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,  |
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS.  IN  |
# NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY SPECIAL, INDIRECT OR  |
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS  |
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE  |
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE  |
# USE OR PERFORMANCE OF THIS SOFTWARE.          |
#=======================================================================|
# Revision History:              |
#                  |
# Date      Ini Description            |
# --------- --- ------------------------------------------------------- |
# 30NOV2002 bga Updated tests for PostgreSQL 7.3.      |
# 24SEP2001 bga Updated test case to use new string quoting method.  |
# 21SEP2001 bga Added check of the member types in certain test cases.  |
# 10SEP2001 bga Added the test cases for methods.  This unit test file  |
#    is complete (for now :-).        |
# 09SEP2001 bga Modified tests to reflect changes to PgVersion object.  |
#    In particulare, a PgVersion object no longer has a  |
#    __dict__ attribute, but now acts as a mapping object.  |
# 06SEP2001 bga Completed the PgResult Member test cases, just have to  |
#     do the PgResult Method test cases.      |
# 04SEP2001 bga Expanded the test cases.  Still more to do.    |
# 02SEP2001 bga Initial release by Billy G. Allie.      |
#-----------------------------------------------------------------------+
import sys
import unittest
import types
import string
from pyPgSQL import libpq

version = sys.version_info
version = ((((version[0] * 100) + version[1]) * 100) + version[2])

# Get a connection and a version string to be used globally.
cnx = libpq.PQconnectdb("dbname=pypgsql")
vstr = "%(major)d.%(minor)d" % cnx.version

class PgResultMemberTestCases(unittest.TestCase):
    def setUp(self):
  global cnx
  global vstr

  self.cnx = cnx
  self.vstr = vstr
  self.members = ['binaryTuples', 'cmdStatus', 'cmdTuples', 'nfields',
      'ntuples', 'oidValue', 'resultStatus', 'resultType']
  self.methods = ['clear', 'fname', 'fnumber', 'fsize', 'ftype',
      'getisnull', 'getlength', 'getvalue']

    def CheckForMembers(self, res):
  for i in self.members:
      self.assert_(i in res.__members__,
       'PgResult is missing member "%s".' % i)

    def CheckMemberValues(self, res, expected):
  for i in range(len(self.members)):
      exec 'v = res.%s' % self.members[i]
      self.assertEquals(v, expected[i],
            'PgResult.%s is %s, it should be %s!' % \
            (self.members[i], v, expected[i]))

    def CheckForMethods(self, res):
  for i in self.methods:
      self.assert_(i in res.__methods__,
       'PgResult is missing method "%s".' % i)
      exec 'm = res.%s' % i
      self.assert_(callable(m),
       'PgResult method %s() is not callable.' % i)

    def CheckDropPgResult(self):
  query = "DROP TABLE pgresult_test"
  res = self.cnx.query(query)
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'DROP', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'DROP TABLE'
  self.CheckMemberValues(res, expected)
  # Try to drop the table again.  This should raise an execption.
  self.failUnlessRaises(libpq.OperationalError, self.cnx.query, query)

    def CheckEndPgResult(self):
  res = self.cnx.query("END WORK")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'COMMIT', None, 0, 0, None, 1, 2 ]
  self.CheckMemberValues(res, expected)

    def CheckCloseBinaryPgResult(self):
  res = self.cnx.query("CLOSE pgres_bcursor")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'CLOSE', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'CLOSE CURSOR'
  self.CheckMemberValues(res, expected)

    def CheckNoticePgResult(self):
  # The following query will cause a notice to be generated, but not
  # an error.
  res = self.cnx.query("FETCH FROM pgres_cursor")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'FETCH', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'FETCH 0'
  self.CheckMemberValues(res,  expected)
        self.assertEquals(len(self.cnx.notices), 1,
        "A notice should of been generated, none was!")
  n = self.cnx.notices.pop()
  e = 'NOTICE:  PerformPortalFetch: portal "pgres_cursor" not found\n'
        if self.cnx.version >= '7.3':
            e = 'WARNING:  PerformPortalFetch: portal "pgres_cursor" not found\n'
  self.assertEquals(n, e, "The notice did not match expected results");
        
    def CheckClosePgResult(self):
  res = self.cnx.query("CLOSE pgres_cursor")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'CLOSE', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'CLOSE CURSOR'
  self.CheckMemberValues(res, expected)

    def CheckFetchBinaryPgResult(self):
  res = self.cnx.query("FETCH 3 FROM pgres_bcursor")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [1, 'FETCH', None, 2, 3, None, 2, 1 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'FETCH 3'
  self.CheckMemberValues(res, expected)

    def CheckDeclareBinaryPgResult(self):
  res = self.cnx.query("""DECLARE pgres_bcursor BINARY CURSOR FOR
              SELECT * FROM pgresult_test""")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'SELECT', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'DECLARE CURSOR'
  self.CheckMemberValues(res, expected)

    def CheckMovePgResult(self):
  res = self.cnx.query("MOVE BACKWARD ALL IN pgres_cursor")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'MOVE', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'MOVE 1'
  self.CheckMemberValues(res, expected)
  res = self.cnx.query("FETCH ALL FROM pgres_cursor")
  expected = [0, 'FETCH', None, 2, 4, None, 2, 1 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'FETCH 4'
  self.CheckMemberValues(res, expected)

    def CheckFetchPgResult(self):
  res = self.cnx.query("FETCH 2 FROM pgres_cursor")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'FETCH', None, 2, 2, None, 2, 1 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'FETCH 2'
  self.CheckMemberValues(res, expected)

    def CheckDeclarePgResult(self):
  res = self.cnx.query("""DECLARE pgres_cursor CURSOR FOR
        SELECT * FROM pgresult_test""")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'SELECT', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'DECLARE CURSOR'
  self.CheckMemberValues(res, expected)

    def CheckBeginPgResult(self):
  res = self.cnx.query("BEGIN WORK")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'BEGIN', None, 0, 0, None, 1, 2 ]
  self.CheckMemberValues(res, expected)

    def CheckSelectPgResult(self):
  res = self.cnx.query("SELECT * FROM  pgresult_test")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'SELECT', None, 2, 4, None, 2, 1 ]
  self.CheckMemberValues(res, expected)

    def CheckUpdatePgResult(self):
  res = self.cnx.query("""UPDATE pgresult_test
        SET f2 = 'This is a test (3u)'
        WHERE f1 = 3""")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  cs = 'UPDATE %s' % res.cmdTuples
  expected = [0, cs, 1, 0, 0, None, 1, 3 ]
  self.CheckMemberValues(res, expected)

    def CheckInsertPgResult(self):
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (1, 'This is a test (1)')""")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  # The PgResult object's members can't be tested with CheckMemberValues()
  # directly.  Set up things so we can call CheckMember Values()
  cs = 'INSERT %s %s' % (res.oidValue, res.cmdTuples)
  expected = [0, cs, 1, 0, 0, res.oidValue, 1, 3 ]
  self.assert_(res.oidValue != None,
         "PgResult.oidValue is None, it shouldn't be None.")
  self.CheckMemberValues(res, expected)
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (2, 'This is a test (2)')""")
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (3, 'This is a test (3)')""")
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (4, 'This is a test (4)')""")

    def CheckCreatePgResult(self):
  query = "CREATE TABLE pgresult_test(f1 INT, f2 TEXT)"
  res = self.cnx.query(query)
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'CREATE', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'CREATE TABLE'
  self.CheckMemberValues(res, expected)
  # Try to create the table again.  This should raise an execption.
  self.failUnlessRaises(libpq.OperationalError, self.cnx.query, query)

    def CheckPgResult(self):
  if self.vstr.startswith("7.3"):
      flen = 11
  elif self.vstr.startswith("7.2"):
      flen = 9
  elif self.vstr.startswith("7.1"):
      flen = 7
  else:
      flen = 4

  res = self.cnx.query("SELECT * FROM pg_database LIMIT 3")
  self.CheckForMembers(res)
  self.CheckForMethods(res)
  expected = [0, 'SELECT', None, flen, 3, None, 2, 1 ]
  self.CheckMemberValues(res, expected)

class PgResultTestCases(unittest.TestCase):
    def setUp(self):
  global cnx
  global vstr

  self.cnx = cnx
  self.vstr = vstr
  self.members = ['binaryTuples', 'cmdStatus', 'cmdTuples', 'nfields',
      'ntuples', 'oidValue', 'resultStatus', 'resultType']
  self.memtypes= [types.IntType, types.StringType, types.IntType,
      types.IntType, types.IntType, types.IntType,
      types.IntType, types.IntType]

    def CheckMemberValues(self, res, expected):
  for i in range(len(self.members)):
      exec 'v = res.%s' % self.members[i]
      self.assertEquals(v, expected[i],
            'PgResult.%s is %s, it should be %s!' % \
            (self.members[i], v, expected[i]))

    def CheckMemberTypes(self, res, expected):
  for i in range(len(self.members)):
      exec 'v = res.%s' % self.members[i]
      self.assertEquals(type(v), expected[i],
            'PgResult.%s is %s, it should be %s!' % \
            (self.members[i], type(v), expected[i]))

    def CheckDropPgResult(self):
  res = self.cnx.query("DROP TABLE pgresult_test")
  expected = [0, 'DROP', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'DROP TABLE'
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckEndPgResult(self):
  res = self.cnx.query("END WORK")
  expected = [0, 'COMMIT', None, 0, 0, None, 1, 2 ]
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckClosePgResult(self):
  res = self.cnx.query("CLOSE pgres_cursor")
  expected = [0, 'CLOSE', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'CLOSE CURSOR'
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckMovePgResult(self):
  res = self.cnx.query("MOVE BACKWARD ALL IN pgres_cursor")
  expected = [0, 'MOVE', None, 0, 0, None, 1, 2 ]
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckFetchPgResult(self):
  res = self.cnx.query("FETCH 2 FROM pgres_cursor")
  expected = [0, 'FETCH', None, 2, 2, None, 2, 1 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'FETCH 2'
  self.CheckMemberValues(res, expected)
  self.assertEquals(res.fmod(0), -1,
        "fmod(0) was %d, it should be -1." % res.fmod(0))
  self.assertEquals(res.fname(0), 'f1',
        "fname(0) was '%s', it should be 'f1'." %
        res.fname(0))
  self.assertEquals(res.fnumber('f2'), 1,
        "fnumber('f2') was %d, it should be 1." %
        res.fnumber('f2'))
  self.assertEquals(res.fsize(0), 4,
        "fsize(0) is %d, it should be 4." % res.fsize(0))
  self.assertEquals(res.ftype(1), 25,
        "ftype(1) is %d, it should be 25." % res.ftype(1))
  self.assertEquals(res.getisnull(0, 1), 0,
        "getisnull(0, 1) returned %d, it should be 0." %
        res.getisnull(0, 1))
  self.assertEquals(res.getlength(1, 1), 18,
        "getlength(1, 1) is %d, it should be 19." %
        res.getlength(1, 1))
  value = res.getvalue(1, 1)
  self.assertEquals(value, 'This is a test (2)',
        "getvalue(1, 1) returned incorrect results.")
  self.assertEquals(res.getlength(1, 1), len(value),
        "getlength(1, 1) does not match len(getvalue(1, 1)).")

    def CheckDeclarePgResult(self):
  res = self.cnx.query("""DECLARE pgres_cursor CURSOR FOR
        SELECT * FROM pgresult_test
        ORDER BY f1""")
  expected = [0, 'SELECT', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'DECLARE CURSOR'
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckBeginPgResult(self):
  res = self.cnx.query("BEGIN WORK")
  expected = [0, 'BEGIN', None, 0, 0, None, 1, 2 ]
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckSelectPgResult(self):
  res = self.cnx.query("SELECT * FROM pgresult_test ORDER BY f1")
  expected = [0, 'SELECT', None, 2, 4, None, 2, 1 ]
  self.CheckMemberValues(res, expected)
  self.assertEquals(res.fmod(0), -1,
        "fmod(0) was %d, it should be -1." % res.fmod(0))
  self.assertEquals(res.fname(0), 'f1',
        "fname(0) was '%s', it should be 'f1'." %
        res.fname(0))
  self.assertEquals(res.fnumber('f2'), 1,
        "fnumber('f2') was %d, it should be 1." %
        res.fnumber('f2'))
  self.assertEquals(res.fsize(0), 4,
        "fsize(0) is %d, it should be 4." % res.fsize(0))
  self.assertEquals(res.ftype(1), 25,
        "ftype(1) is %d, it should be 25." % res.ftype(1))
  self.assertEquals(res.getisnull(0, 1), 0,
        "getisnull(0, 1) returned %d, it should be 0." %
        res.getisnull(0, 1))
  self.assertEquals(res.getisnull(3, 1), 1,
        "getisnull(3, 1) returned %d, it should be 1." %
        res.getisnull(0, 1))
  self.assertEquals(res.getlength(2, 1), 19,
        "getlength(2, 1) is %d, it should be 19." %
        res.getlength(2, 1))
  value = res.getvalue(2, 1)
  self.assertEquals(value, 'This is a test (3u)',
        "getvalue(2, 1) returned incorrect results.")
  self.assertEquals(res.getlength(2, 1), len(value),
        "getlength(2, 1) does not match len(getvalue(2, 1)).")

    def CheckUpdatePgResult(self):
  res = self.cnx.query("""UPDATE pgresult_test
        SET f2 = 'This is a test (3u)'
        WHERE f1 = 3""")
  cs = 'UPDATE %s' % res.cmdTuples
  expected = [0, cs, 1, 0, 0, None, 1, 3 ]
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckInsertPgResult(self):
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (1, 'This is a test (1)')""")
  expected = [types.IntType, types.StringType, types.IntType,
        types.IntType, types.IntType, types.IntType, types.IntType,
        types.IntType ]
  self.CheckMemberTypes(res, expected)
  # The PgResult object's members can't be tested with CheckMemberValues()
  # directly.  Set up things so we can call CheckMember Values()
  cs = 'INSERT %s %s' % (res.oidValue, res.cmdTuples)
  expected = [0, cs, 1, 0, 0, res.oidValue, 1, 3 ]
  self.assert_(res.oidValue != None,
         "PgResult.oidValue is None, it shouldn't be None.")
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (2, 'This is a test (2)')""")
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (3, 'This is a test (3)')""")
  res = self.cnx.query("""INSERT INTO pgresult_test
        VALUES (4, NULL)""")

    def CheckCreatePgResult(self):
  query = "CREATE TABLE pgresult_test(f1 INT, f2 TEXT)"
  res = self.cnx.query(query)
  expected = [0, 'CREATE', None, 0, 0, None, 1, 2 ]
        if self.cnx.version >= '7.3':
            expected[1] = 'CREATE TABLE'
  self.CheckMemberValues(res, expected)
  self.failUnlessRaises(libpq.InterfaceError, res.fmod, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fname, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(libpq.InterfaceError, res.fsize, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.ftype, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getisnull, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getlength, 0, 0)
  self.failUnlessRaises(libpq.InterfaceError, res.getvalue, 0, 0)

    def CheckPgResult(self):
  if self.vstr.startswith("7.3"):
      flen = 11
  elif self.vstr.startswith("7.2"):
      flen = 9
  elif self.vstr.startswith("7.1"):
      flen = 7
  else:
      flen = 4
        
  res = self.cnx.query("SELECT * FROM pg_database LIMIT 3")
  expected = [types.IntType, types.StringType, types.NoneType,
        types.IntType, types.IntType, types.NoneType,
        types.IntType, types.IntType ]
  self.CheckMemberTypes(res, expected)
  expected = [0, 'SELECT', None, flen, 3, None, 2, 1 ]
  self.CheckMemberValues(res, expected)
  # Check that fname returns the expected results
  cols = ['datname', 'datdba', 'encoding', 'datistemplate',
    'datallowconn', 'datlastsysoid']
  for i in range(len(cols)):
      self.assertEquals(res.fname(i), cols[i],
            "Name for field %d is %s, it should be %s." %
            (i, res.fname(i), cols[i]))
      self.assertEquals(res.fnumber(cols[i]), i,
        "Number for field named %s is %d, it should be %d." %
            (cols[i], res.fnumber(cols[i]), i))
  self.failUnlessRaises(ValueError, res.fmod, -1)
  self.failUnlessRaises(ValueError, res.fmod, res.nfields)
  self.failUnlessRaises(ValueError, res.fname, -1)
  self.failUnlessRaises(ValueError, res.fname, res.nfields)
  self.failUnlessRaises(ValueError, res.fnumber, "NoSuchName")
  self.failUnlessRaises(ValueError, res.fsize, -1)
  self.failUnlessRaises(ValueError, res.fsize, res.nfields)
  self.failUnlessRaises(ValueError, res.ftype, -1)
  self.failUnlessRaises(ValueError, res.ftype, res.nfields)
  self.failUnlessRaises(ValueError, res.getisnull, -1, 0)
  self.failUnlessRaises(ValueError, res.getisnull, res.ntuples, 0)
  self.failUnlessRaises(ValueError, res.getisnull, 0, -1)
  self.failUnlessRaises(ValueError, res.getisnull, 0, res.nfields)
  self.failUnlessRaises(ValueError, res.getlength, -1, 0)
  self.failUnlessRaises(ValueError, res.getlength, res.ntuples, 0)
  self.failUnlessRaises(ValueError, res.getlength, 0, -1)
  self.failUnlessRaises(ValueError, res.getlength, 0, res.nfields)
  self.failUnlessRaises(ValueError, res.getvalue, -1, 0)
  self.failUnlessRaises(ValueError, res.getvalue, res.ntuples, 0)
  self.failUnlessRaises(ValueError, res.getvalue, 0, -1)
  self.failUnlessRaises(ValueError, res.getvalue, 0, res.nfields)
  try:
      tattr = res.errorMessage
      self.fail('PgResult attribute "errorMessage" exists!')
  except AttributeError, msg:
      pass

    def CheckSelectOfNonPrintableString(self):
  try:
      a = '\x01\x02\x03\004'
      res = self.cnx.query("select %s as a" % libpq.PgQuoteString(a))
      b = res.getvalue(0, res.fnumber('a'))
            del res
      self.assertEqual(len(b), len(a),
           "Length of result is %d, it should be %d."  %
           (len(b), len(a)))
      self.failUnless(b == a,
           "Result is '%s', it should be '%s'" % (b, a))
        except StandardError, msg:
            self.fail(msg)

if __name__ == "__main__":
    TestSuite = unittest.TestSuite()
    TestSuite.addTest(PgResultMemberTestCases("CheckPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckCreatePgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckInsertPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckUpdatePgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckSelectPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckBeginPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckDeclarePgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckFetchPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckMovePgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckDeclareBinaryPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckFetchBinaryPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckClosePgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckNoticePgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckCloseBinaryPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckEndPgResult"))
    TestSuite.addTest(PgResultMemberTestCases("CheckDropPgResult"))

    TestSuite.addTest(PgResultTestCases("CheckSelectOfNonPrintableString"))
    TestSuite.addTest(PgResultTestCases("CheckPgResult"))
    TestSuite.addTest(PgResultTestCases("CheckCreatePgResult"))
    TestSuite.addTest(PgResultTestCases("CheckInsertPgResult"))
    TestSuite.addTest(PgResultTestCases("CheckUpdatePgResult"))
    TestSuite.addTest(PgResultTestCases("CheckSelectPgResult"))
    TestSuite.addTest(PgResultTestCases("CheckBeginPgResult"))
    TestSuite.addTest(PgResultTestCases("CheckDeclarePgResult"))
    TestSuite.addTest(PgResultTestCases("CheckFetchPgResult"))
    TestSuite.addTest(PgResultTestCases("CheckClosePgResult"))
    TestSuite.addTest(PgResultTestCases("CheckEndPgResult"))
    TestSuite.addTest(PgResultTestCases("CheckDropPgResult"))

    runner = unittest.TextTestRunner()
    runner.run(TestSuite)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.