MySQL.py :  » Web-Frameworks » Karrigell » Karrigell-3.1 » karrigell » package » PyDbLite » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Karrigell 
Karrigell » Karrigell 3.1 » karrigell » package » PyDbLite » MySQL.py
"""PyDbLite.py adapted for MySQL backend

Differences with PyDbLite:
- pass the connection to the MySQL db as argument to Base()
- in create(), field definitions must specify a type
- no index
- the Base() instance has a cursor attribute, so that SQL requests
  can be executed :
    db.cursor.execute(an_sql_request)
    result = db.cursor.fetchall()

Fields must be declared 
Syntax :
    from PyDbLite.MySQL import Database,Table
    db = Database("localhost","root","admin","test")
    # pass the connection as argument to Base creation
    table = Table('dummy',db)
    # create new table with field names
    table.create(('name','INTEGER PRIMARY KEY AUTO_INCREMENT'),
        ('age','INTEGER'),('size','REAL'))
    # open existing base
    table.open()
    # insert new record
    table.insert(name='homer',age=23,size=1.84)
    # selection by list comprehension
    res = [ r for r in table if 30 > r['age'] >= 18 and r['size'] < 2 ]
    # or generator expression
    for r in (r for r in table if r['name'] in ('homer','marge') ):
    # simple selection (equality test)
    res = table(age=30)

    # the following methods only work if the table has an
    # AUTO_INCREMENT
    # delete a record or a list of records
    table.delete(one_record)
    table.delete(list_of_records)
    # delete a record by its id
    del table[rec_id]
    # direct access by id
    record = table[rec_id] # the record such that record['__id__'] == rec_id
    # update
    table.update(record,age=24)
    # add and drop fields
    table.add_field('new_field')
    table.drop_field('name')
    # save changes on disk
    table.commit()
"""

import os
import cPickle
import bisect

import datetime

import MySQLdb

# compatibility with Python 2.3
try:
    set([])
except NameError:
    from sets import Set

class MySQLError(Exception):

    pass

class Connection:

    def __init__(self,host,login,password):
        self.conn = MySQLdb.connect(host,login,password)
        self.cursor = self.conn.cursor()

    def databases(self):
        self.cursor.execute('SHOW DATABASES')
        return [db[0] for db in self.cursor.fetchall()]

    def create(self,db_name,mode=None):
        if mode=="open":
            if not db_name in self.databases():
                self.cursor.execute('CREATE DATABASE %s' %db_name)
            else:
                self.cursor.execute('USE %s' %db_name)
        else:
            self.cursor.execute('CREATE DATABASE %s' %db_name)
        return Database(db_name,self)

class Database:

    def __init__(self,db_name,connection):
        self.conn = connection
        self.cursor = connection.cursor
        self.cursor.execute('USE %s' %db_name)

    def tables(self):
        self.cursor.execute("SHOW TABLES")
        return [ t[0] for t in self.cursor.fetchall() ]

    def drop(self):
        # drop database
        self.cursor.execute('USE %s' %db)
        self.cursor.execute('SHOW TABLES')
        if len(self.cursor.fetchall()):
            raise MySQLError,\
              "Can't drop database %s ; all tables must be dropped first" %db
    
class Table:

    def __init__(self,table_name,db):
        """db = an instance of Database"""
        self.name = table_name
        self.db = db
        self.conn = self.db.conn.conn
        self.cursor = db.cursor

    def create(self,*fields,**kw):
        """Create a new table
        For each field, a 2-element tuple is provided :
        - the field name
        - a string with additional information : field type +
          other information using the MySQL syntax
          eg : ('name','TEXT NOT NULL')
               ('date','TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
        A keyword argument mode can be specified ; it is used if a file
        with the base name already exists
        - if mode = 'open' : open the existing base, ignore the fields
        - if mode = 'override' : erase the existing base and create a
        new one with the specified fields"""
        self.mode = mode = kw.get("mode",None)
        if self._table_exists():
            if mode == "override":
                self.cursor.execute("DROP TABLE %s" %self.name)
            elif mode == "open":
                return self.open()
            else:
                raise IOError,"Base %s already exists" %self.name
        self.fields = []
        self.field_info = {}
        sql = "CREATE TABLE %s (" %self.name
        for field in fields:
            sql += self._validate_field(field)
            sql += ','
        sql = sql[:-1]+')'
        self.cursor.execute(sql)
        self._get_table_info()
        return self

    def _validate_field(self,field):
        if len(field)!= 2:
            msg = "Error in field definition %s" %field
            msg += ": should be a 2- tuple (field_name,field_info)"
            raise SQLiteError,msg
        return '%s %s' %(field[0],field[1])

    def open(self):
        """Open an existing database"""
        if self._table_exists():
            self.mode = "open"
            self._get_table_info()
            return self
        # table not found
        raise IOError,"Table %s doesn't exist" %self.name

    def _table_exists(self):
        """Database-specific method to see if the table exists"""
        self.cursor.execute("SHOW TABLES")
        for table in self.cursor.fetchall():
            if table[0].lower() == self.name.lower():
                return True
        return False

    def _get_table_info(self):
        """Database-specific method to get field names"""
        self.rowid = None
        self.fields = []
        self.field_info = {}
        self.cursor.execute('DESCRIBE %s' %self.name)
        for row in self.cursor.fetchall():
            field,typ,null,key,default,extra = row
            self.fields.append(field)
            self.field_info[field] = {'type':typ,'NOT NULL':null,'key':key,
                'DEFAULT':default,'extra':extra}
            if extra == 'auto_increment':
                self.rowid = field

    def commit(self):
        self.conn.commit()

    def insert(self,*args,**kw):
        """Insert a record in the database
        Parameters can be positional or keyword arguments. If positional
        they must be in the same order as in the create() method
        If some of the fields are missing the value is set to None
        Returns the record identifier
        """
        fields = [ f for f in self.fields
            if not self.field_info[f]['extra']=="auto_increment"]
        if args:
            kw = dict([(f,arg) for f,arg in zip(fields,args)])

        vals = self._make_sql_params(kw)
        sql = "INSERT INTO %s SET %s" %(self.name,",".join(vals))
        res = self.cursor.execute(sql,kw.values())
        self.cursor.execute("SELECT LAST_INSERT_ID()")
        __id__ = self.cursor.fetchone()[0]
        return __id__

    def delete(self,removed):
        """Remove a single record, or the records in an iterable
        Before starting deletion, test if all records are in the base
        and don't have twice the same __id__
        Return the number of deleted items
        """
        if self.rowid is None:
            raise MySQLError,"Can't use delete() : missing row id"
        if isinstance(removed,dict):
            # remove a single record
            removed = [removed]
        else:
            # convert iterable into a list (to be able to sort it)
            removed = [ r for r in removed ]
        if not removed:
            return 0
        _ids = [ r[self.rowid] for r in removed ]
        _ids.sort()
        sql = "DELETE FROM %s WHERE %s IN (%s)" %(self.name,self.rowid,
            ",".join([str(_id) for _id in _ids]))
        self.cursor.execute(sql)
        return len(removed)

    def update(self,record,**kw):
        """Update the record with new keys and values"""
        # increment version number
        if self.rowid is None:
            raise MySQLError,"Can't use update() : missing row id"
        vals = self._make_sql_params(kw)
        sql = "UPDATE %s SET %s WHERE %s=%s" %(self.name,
            ",".join(vals),self.rowid,record[self.rowid])
        self.cursor.execute(sql,kw.values())

    def _make_sql_params(self,kw):
        """Make a list of strings to pass to an SQL statement
        from the dictionary kw with Python types"""
        return ['%s=%%s' %k for k in kw.keys() ]

    def _conv(self,v):
        if isinstance(v,str):
            v = v.replace('"','""')
            return '"%s"' %v
        elif isinstance(v,datetime.datetime):
            return '"%s"' %v.strftime("%Y-%m-%d %H:%M:%S")
        elif isinstance(v,datetime.date):
            return v.strftime("%Y-%m-%d")
        else:
            return v

    def _make_record(self,row):
        """Make a record dictionary from the result of a fetch_"""
        return dict(zip(self.fields,row))
        
    def add_field(self,field,default=None):
        fname,ftype = field
        if fname in self.fields:
            raise ValueError,'Field "%s" already defined' %fname
        sql = "ALTER TABLE %s ADD %s %s" %(self.name,fname,ftype)
        if default is not None:
            sql += " DEFAULT %s" %self._conv(default)
        self.cursor.execute(sql)
        self.commit()
        self._get_table_info()
    
    def drop_field(self,field):
        if not field in self.fields:
            raise ValueError,"Field %s not found in base" %field
        sql = "ALTER TABLE %s DROP %s" %(self.name,field)
        self.cursor.execute(sql)
        self._get_table_info()

    def __call__(self,**kw):
        """Selection by field values
        db(key=value) returns the list of records where r[key] = value"""
        for key in kw:
            if not key in self.fields:
                raise ValueError,"Field %s not in the database" %key
        vals = self._make_sql_params(kw)
        if vals:
            sql = "SELECT * FROM %s WHERE %s" %(self.name," AND ".join(vals))
        else: # all records
            sql = "SELECT * FROM %s" %self.name
        self.cursor.execute(sql,kw.values())            
        return [self._make_record(row) for row in self.cursor.fetchall() ]
    
    def __getitem__(self,record_id):
        """Direct access by record id"""
        if self.rowid is None:
            raise MySQLError,"Can't use __getitem__() : missing row id"
        sql = "SELECT * FROM %s WHERE %s=%s" %(self.name,self.rowid,record_id)
        self.cursor.execute(sql)
        res = self.cursor.fetchone()
        if res is None:
            raise IndexError,"No record at index %s" %record_id
        else:
            return self._make_record(res)
    
    def __len__(self):
        self.cursor.execute("SELECT COUNT(*) FROM %s" %self.name)
        return int(self.cursor.fetchone()[0])

    def __delitem__(self,record_id):
        """Delete by record id"""
        self.delete(self[record_id])
        
    def __iter__(self):
        """Iteration on the records"""
        self.cursor.execute("SELECT * FROM %s" %self.name)
        results = [ self._make_record(r) for r in self.cursor.fetchall() ]
        return iter(results)

Base = Table # compatibility with older versions

if __name__ == '__main__':
    os.chdir(os.path.join(os.getcwd(),'test'))
    execfile('MySQL_test.py')
    
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.