test_basic.py :  » Database » PyDB2 » PyDB2_1.1.1 » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » PyDB2 
PyDB2 » PyDB2_1.1.1 » test » test_basic.py
import unittest
import DB2
import Config

class SimpleDB2Test_Connection(unittest.TestCase):
    def setUp(self):
        self.conn_d = Config.ConnDict.copy()

    def test_0000_get_type_num_2_name_dict(self):
        """DB2.SQL_type_dict for description2"""
        d = DB2.SQL_type_dict

    def test_001_ConnectSuccess(self):
        """DB2.connect() - Successful"""
        self.db = DB2.connect(**Config.ConnDict)
        self.db.close()

    def test_002_ConnectFailureWithDB(self):
        """DB2.connect() - Connection failure with wrong database"""
        self.conn_d['dsn'] = 'NonexistentDatabase'
        self.assertRaises( DB2.Error, DB2.connect, **self.conn_d )

    def test_003_ConnectFailureWithUid(self):
        """DB2.connect() - Connection failure with wrong username"""
        self.conn_d['uid'] = 'manyong'
        self.assertRaises( DB2.Error, DB2.connect, **self.conn_d )

    def test_004_ConnectFailureWithPwd(self):
        """DB2.connect() - Connection failure with wrong password"""
        self.conn_d['pwd'] = ''
        self.assertRaises( DB2.Error, DB2.connect, **self.conn_d )


class SimpleDB2Test_Cursor(unittest.TestCase):
    def setUp(self):
        self.db = DB2.connect(**Config.ConnDict)
        self.tableName = 'PYDB2TEST_0'
        self.cs = self.db.cursor()

    def tearDown(self):
        self.cs.close()
        self.db.close()

    def _createTable(self):
        self.cs.execute( """CREATE TABLE %s
                (
                    C1 INTEGER,
                    C2 VARCHAR(3)
                )
            """ % self.tableName)

    def _insertData(self, valTuple):
        self.cs.execute( """INSERT INTO %s
                    VALUES (?, ?)
            """ % self.tableName,
            valTuple)

    def _insertDataList(self, valList):
        self.cs.executemany( """INSERT INTO %s
                    VALUES (?, ?)
            """ % self.tableName,
            valList)

    ### Acutal test comes here ###

    def test_001_cursor_creation(self):
        """db.cursor() - Successful"""
        pass

    def test_002_execute_create_table(self):
        """cs.execute() - CREATE TABLE"""
        self._createTable()

    def test_0030_execute_insert_values(self):
        """cs.execute() - INSERT (NORMAL)"""
        self._createTable()
        self._insertData( (1, 'a') )

    def test_0031_execute_insert_values(self):
        """cs.execute() - INSERT (NULL)"""
        self._createTable()
        self._insertData( (None, None) )

    def test_0032_execute_insert_values(self):
        """cs.execute() - INSERT (Wrong type)"""
        self._createTable()
        self.assertRaises(
            TypeError,
            self._insertData,
            (1.0, None)
            )

    def test_0033_execute_insert_values(self):
        """cs.execute() - INSERT (Right truncation)"""
        self._createTable()
        self.assertRaises(
            DB2.ProgrammingError,
            self._insertData,
            (1, 'XXXX')
            )

    def test_0034_execute_insert_values(self):
        """cs.execute() - INSERT (Wrong # of params)"""
        self._createTable()
        self.assertRaises(
            DB2.ProgrammingError,
            self._insertData,
            (1, )
            )

    def test_0040_execute_fetchone(self):
        """cs.fetchone() - SELECT (NORMAL)"""
        self._createTable()
        self._insertData( (1, 'a') )

        self.cs.execute("""SELECT * FROM %s""" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( tuple(r), (1, 'a') )

    def test_0041_execute_fetchone(self):
        """cs.fetchone() - SELECT (NULL)"""
        self._createTable()
        self._insertData( (None, None) )

        self.cs.execute("""SELECT * FROM %s""" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( tuple(r), (None, None) )

    def test_0042_execute_fetchone(self):
        """cs.fetchone() - fetch w/o SELECT"""
        self._createTable()
        self._insertData( (None, None) )

        self.assertRaises(DB2.ProgrammingError, self.cs.fetchone)

    def test_0043_execute_fetchone(self):
        """cs.fetchone() - beyond the last result set"""
        self._createTable()
        self._insertData( (None, None) )
        self.cs.execute("""SELECT * FROM %s""" % self.tableName)

        r = self.cs.fetchone()

        for i in range(100):
            r = self.cs.fetchone()
            self.assertEqual(r, None)

    def test_005_executemany(self):
        """cs.executemany()"""
        self._createTable()
        self._insertDataList( [(1, 'a'), (2, 'b'), (None, None)] )

    def test_006_fetchall(self):
        """cs.fetchall()"""
        self._createTable()
        dataList = [(1, 'a'), (2, 'b'), (None, None)]
        self._insertDataList( dataList )
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        rows = self.cs.fetchall()
        for i in range( len(rows) ):
            self.assertEqual( dataList[i], tuple(rows[i]) )

    def test_007_fetchmany(self):
        """cs.fetchmany()"""
        self._createTable()
        dataList = [(1, 'a'), (2, 'b'), (None, None)]
        self._insertDataList( dataList )
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        self.cs.arraysize = 2
        rows = self.cs.fetchmany()
        self.assertEqual( len(rows), self.cs.arraysize )
        for i in range( len(rows) ):
            self.assertEqual( dataList[i], tuple(rows[i]) )

    def test_008_description(self):
        """cs.description & description2"""
        import types

        self._createTable()
        colData = (None, None)
        self._insertData( colData )

        self.cs.execute("""SELECT * FROM %s""" % self.tableName)
        desc = self.cs.description

        # number of columns in result set
        self.assertEqual(len(desc), len(colData))
        for x in desc:
            # each item SHOULD be of Tuple type
            self.assertEqual(type(x), types.TupleType)
            # each item SHOULD be of length 7
            self.assertEqual(len(x), 7)

        desc2 = self.cs.description2
        self.assertEqual(len(desc), len(desc2))
        for x in desc2:
            # each item SHOULD be of Tuple type
            self.assertEqual(type(x), types.TupleType)
            # each item SHOULD be of length 7
            self.assertEqual(len(x), 7)
            # 2nd item SHOULD be of String type
            self.assertEqual(type(x[1]), types.StringType)

    def test_0090_rowcount(self):
        """cs.rowcount - w/ Non-scrollable cursor"""
        self._createTable()
        colData = (None, None)
        self._insertData( colData )
        self.assertEqual(self.cs.rowcount, 1)
        # with non-scrollable cursor (default),
        # it returns -1, not the count of result set
        self.cs.execute("""SELECT * FROM %s""" % self.tableName)
        self.assertEqual(self.cs.rowcount, -1)

    def test_0100_callproc(self):
        """cs.callproc() - IN, OUT, INOUT parameters"""
        self.cs.execute(
            """CREATE PROCEDURE CP_TEST_1
            (IN P1 CHAR(5), OUT P2 VARCHAR(5), INOUT P3 INTEGER)
            LANGUAGE SQL
            BEGIN
                SET P2 = 'YYY';
                SET P3 = 3;
            END""")
        params = ( 'XXXXX', None, 1 )
        r = self.cs.callproc('CP_TEST_1', params)
        self.assertNotEqual( params, r )
        self.assertEqual( ('XXXXX', 'YYY', 3), r )

    def test_0101_callproc(self):
        """cs.callproc() - w/ Result set"""
        self.cs.execute("""
            CREATE TABLE CP_TEST_TB ( P1 INTEGER )
            """)

        SIZE = 100;
        for i in range(SIZE):
            self.cs.execute(
                """INSERT INTO CP_TEST_TB VALUES (?)""", i)

        self.cs.execute(
            """CREATE PROCEDURE CP_TEST_1
            (IN P1 INTEGER)
            LANGUAGE SQL
            BEGIN
                DECLARE CS1 CURSOR WITH RETURN FOR
                    SELECT * FROM CP_TEST_TB;
                OPEN CS1;
            END
            """)

        r = self.cs.callproc("CP_TEST_1", 1)
        self.assertEqual(r, (1, ))
        rows = self.cs.fetchall()
        self.assertEqual(len(rows), SIZE)
        
    def test_0102_callproc(self):
        """cs.callproc() - IN, OUT, INOUT parameters. Out is too short."""
        self.cs.execute(
            """CREATE PROCEDURE CP_TEST_1
            (IN P1 CHAR(5), OUT P2 VARCHAR(5), INOUT P3 INTEGER)
            LANGUAGE SQL
            BEGIN
                SET P2 = 'YYY';
                SET P3 = 3;
            END""")
        params = ( 'XXXXX', 'AA', 1 )
        r = self.cs.callproc('CP_TEST_1', params)
        self.assertNotEqual( params, r )
        self.assertEqual( ('XXXXX', 'YYY', 3), r )

    def test_0103_callproc(self):
        """cs.callproc() - IN, OUT, INOUT parameters. Out is too long."""
        self.cs.execute(
            """CREATE PROCEDURE CP_TEST_1
            (IN P1 CHAR(5), OUT P2 VARCHAR(5), INOUT P3 INTEGER)
            LANGUAGE SQL
            BEGIN
                SET P2 = 'YYY';
                SET P3 = 3;
            END""")
        params = ( 'XXXXX', 'AAAAAAA', 1 )
        r = self.cs.callproc('CP_TEST_1', params)
        self.assertNotEqual( params, r )
        self.assertEqual( ('XXXXX', 'YYY', 3), r )

    def test_0104_callproc(self):
        """cs.callproc() - IN, OUT, INOUT parameters. Out is wrong type."""
        self.cs.execute(
            """CREATE PROCEDURE CP_TEST_1
            (IN P1 CHAR(5), OUT P2 VARCHAR(5), INOUT P3 INTEGER)
            LANGUAGE SQL
            BEGIN
                SET P2 = 'YYY';
                SET P3 = 3;
            END""")
        params = ( 'XXXXX', 3, 1 )
        self.assertRaises(TypeError, self.cs.callproc, 'CP_TEST_1', params )


class SimpleDB2Test_Extended(unittest.TestCase):
    def setUp(self):
        self.db = DB2.connect(**Config.ConnDict)
        self.tableName = 'PYDB2TEST_1'
        self.cs = self.db.cursor()
        self.type_data = [
                ('CHAR(3)', 'CCC'),
                ('VARCHAR(3)', 'V'),
                ('DATE', '2005-03-03'),
                ('TIME', '00:01:02'),
                ('SMALLINT', 1),
                ('INTEGER', 2),
                ('BIGINT', 3L),
                ('REAL', 1.0),
            ]

    def tearDown(self):
        self.cs.close()
        self.db.close()

    def _pick_col(self, count):
        import random
        cols = []
        for i in range(count):
            cols.append( (i, random.choice(self.type_data)) )

        col_def = ', '.join(['P%d %s' % (x[0], x[1][0]) for x in cols])
        table_def = 'CREATE TABLE %s (%s)' % (self.tableName, col_def)
        def_val = tuple([x[1][1] for x in cols])
        return table_def, def_val

    def __test_randomTable(self, count):
        table_def, def_val = self._pick_col(count)

        self.cs.execute(table_def)
        qmark_list = ', '.join(['?'] * len(def_val))

        self.cs.execute(
            "INSERT INTO %s VALUES (%s)" % (self.tableName, qmark_list),
            def_val
            )

        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.db.rollback()

        return tuple(r) == def_val

    def test_001_randomTable(self):
        """Create table in random manner (columns of random types)"""
        COL_SIZE = 120
        for i in range(100, COL_SIZE):
            r = self.__test_randomTable(i)
            self.assert_(r)

    def __creatSampleTable(self):
        self.cs.execute("""CREATE TABLE %s
            (
                P1 INTEGER,
                P2 VARCHAR(512)
            )
            """ % self.tableName)

    def __test_scrollable_cursor(self, scrollable):
        self.__creatSampleTable()
        data = [ (1, 'a'), (2, 'bb'), (3, 'ccc') ]
        self.cs.executemany(
            "INSERT INTO %s VALUES (?, ?)" % self.tableName,
            data)
        self.cs.set_scrollable(scrollable)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        rows = self.cs.fetchmany( len(data) )
        for i in range( len(data) ):
            self.assertEqual(
                tuple(rows[i]),
                data[i]
            )

    def test_0020_fetchonmany_unscrollable(self):
        """cs.fetchmany() - use Non-Scrollable cursor"""
        self.__test_scrollable_cursor(0)

    def test_0021_fetchonmany_scrollable(self):
        """cs.fetchmany() - use Scrollable cursor"""
        self.__test_scrollable_cursor(1)

    def __creatSampleBLOBTable(self):
        self.cs.execute("""CREATE TABLE %s
            (
                P1 BLOB(1024),
                P2 BLOB(1024)
            )
            """ % self.tableName)

    def test_0030_BLOB(self):
        """BLOB (file)"""
        import os
        self.__creatSampleBLOBTable()
        f = os.tmpfile()
        data = '\xae' * 1024
        f.write(data)
        f.seek(0, 0)
        self.cs.execute(
            "INSERT INTO %s (P1) VALUES (?)" % self.tableName, f
        )
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        rows = self.cs.fetchone()
        self.assertEqual(str(rows[0]), data)

    def __test_BLOB(self, auto_LOB_read):
        SIZE = 10
        self.__creatSampleBLOBTable()
        for i in range(SIZE):
            self.cs.execute(
                "INSERT INTO %s VALUES (?, ?)" % self.tableName,
                (
                    DB2.Binary(chr(i) * i),
                    DB2.Binary(chr(i) * (1024-i))
                )
            )
        self.cs.set_scrollable(1)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        self.cs.auto_LOB_read = auto_LOB_read
        rows = self.cs.fetchmany(SIZE)
        return rows

    def test_0031_BLOB(self):
        """BLOB (auto_LOB_read ON)"""
        rows = self.__test_BLOB(1)

    def test_0032_BLOB(self):
        """BLOB (auto_LOB_read OFF)"""
        rows = self.__test_BLOB(0)
        for r in rows:
            b = r[0]
            v = self.cs.readLOB(b)

    def test_0040_DATE(self):
        """DATE"""
        import time
        self.cs.execute("""CREATE TABLE %s (P1 DATE) """ % self.tableName)
        ctime = time.time()
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, DB2.DateFromTicks(ctime))
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], str(DB2.DateFromTicks(ctime)) )

    def test_0041_TIME(self):
        """TIME"""
        import time
        self.cs.execute("""CREATE TABLE %s (P1 TIME) """ % self.tableName)
        ctime = time.time()
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, DB2.TimeFromTicks(ctime))
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], str(DB2.TimeFromTicks(ctime)) )

    def test_0042_TIMESTAMP(self):
        """TIMESTAMP"""
        import time
        self.cs.execute("""CREATE TABLE %s (P1 TIMESTAMP) """ % self.tableName)
        ctime = time.time()
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, DB2.TimestampFromTicks(ctime))
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        # XXX
        self.assertEqual( r[0], str(DB2.TimestampFromTicks(ctime)) )
        self.assertEqual( len(r[0]), 26 )

        self.cs.execute("DELETE FROM %s" % self.tableName)

        value = []
        for p in [ '000000', '000010', '000100', '001000', '010000', '100000']:
            ts = '2005-03-09-08.24.59.%s' % p
            value.append(ts)
            self.cs.execute("""INSERT INTO %s
                    VALUES (?)
                """ % self.tableName, ts)

        self.cs.execute("SELECT * FROM %s" % self.tableName)
        rows = self.cs.fetchall()
        for i in range( len(rows) ):
            self.assertEqual(rows[i][0], value[i])

        for i in range(100):
            self.cs.execute("""INSERT INTO %s
                VALUES (CURRENT TIMESTAMP)""" % self.tableName)
            self.cs.execute("SELECT * FROM %s" % self.tableName)
            r = self.cs.fetchone()
            self.assertEqual( len(r[0]), 26 )
            self.cs.execute("DELETE FROM %s" % self.tableName)

    def test_0050_rowcount(self):
        """cs.rowcount - w/ Scrollable cursor"""
        self.__creatSampleTable()
        data = [ (1, 'a'), (2, 'bb'), (3, 'ccc') ]
        self.cs.executemany(
            "INSERT INTO %s VALUES (?, ?)" % self.tableName,
            data)

        self.cs.set_scrollable(1)
        rowCount = self.cs.execute("SELECT * FROM %s" % self.tableName)
        self.assertEqual(rowCount, len(data))
    def test_0060_REAL(self):
        """REAL"""
        self.cs.execute("""CREATE TABLE %s (P1 REAL) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1.0)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1.0)

    def test_0061_FLOAT(self):
        """FLOAT"""
        self.cs.execute("""CREATE TABLE %s (P1 FLOAT) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1.0)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1.0)

    def test_0062_DOUBLE(self):
        """DOUBLE"""
        self.cs.execute("""CREATE TABLE %s (P1 DOUBLE) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1.0)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1.0)

    def test_0071_DECIMAL_from_float(self):
        """DECIMAL from float"""
        self.cs.execute("""CREATE TABLE %s (P1 DECIMAL) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1.0)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1.0)

    def test_0072_DECIMAL_from_int(self):
        """DECIMAL from int"""
        self.cs.execute("""CREATE TABLE %s (P1 DECIMAL) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1)

    def test_0073_DECIMAL_from_long(self):
        """DECIMAL from long"""
        self.cs.execute("""CREATE TABLE %s (P1 DECIMAL) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1L)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1L)
        
    def test_0071_NUMERIC_from_float(self):
        """NUMERIC from float"""
        self.cs.execute("""CREATE TABLE %s (P1 NUMERIC) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1.0)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1.0)

    def test_0072_NUMERIC_from_int(self):
        """NUMERIC from int"""
        self.cs.execute("""CREATE TABLE %s (P1 NUMERIC) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1)

    def test_0073_NUMERIC_from_long(self):
        """NUMERIC from long"""
        self.cs.execute("""CREATE TABLE %s (P1 NUMERIC) """ % self.tableName)
        self.cs.execute("""INSERT INTO %s
                VALUES (?)
            """ % self.tableName, 1L)
        self.cs.execute("SELECT * FROM %s" % self.tableName)
        r = self.cs.fetchone()
        self.assertEqual( r[0], 1L)
        
class SimpleDB2Test_Regression(unittest.TestCase):
    def setUp(self):
        self.db = DB2.connect(**Config.ConnDict)
        cursor = self.db.cursor()
        cursor.execute("CREATE TABLE test1(A BIGINT, B VARCHAR(50), C BIGINT)")
        cursor.close()

    def tearDown(self):
        cursor = self.db.cursor()
        cursor.execute("drop table test1")
        self.db.close()

    def test_select_from_insert(self):
        """Patch[ 958351 ] Fix Select DML Syntax segmentation fault"""
        cursor = self.db.cursor()
        insert_row = (1,"bla", 3)
        rowCount = cursor.execute("""
            SELECT *
            FROM FINAL TABLE
            (
                INSERT into test1(a,b,c) VALUES (?,?,?)
            )
            """, insert_row)
        self.assertEqual(rowCount, 1)
        ##this used to segfault AFAIK
        row = cursor.fetchone()
        self.assertEqual(row, insert_row)

if __name__ == '__main__':
    suite = unittest.TestSuite()

    for t in [
            SimpleDB2Test_Connection,
            SimpleDB2Test_Cursor,
            SimpleDB2Test_Extended,
            SimpleDB2Test_Regression,
        ]:
        suite.addTest(unittest.makeSuite(t))

    unittest.TextTestRunner(verbosity=2).run(suite)

# FIN
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.