# -*- Mode: Python; tab-width: 4 -*-
VERSION_STRING = '$Id: async_mysql.py,v 1.3 2001/05/01 11:45:26 andreas Exp $'
import exceptions
import math
import socket
import string
import sys
import asyncore
import asynchat
from continuation import continuation
from fifo import fifo
class mysql_error (exceptions.Exception):
pass
# ===========================================================================
# Authentication
# ===========================================================================
# Note: I've ignored the stuff to support an older version of the protocol.
#
# The code is based on the file mysql-3.21.33/client/password.c
#
# The auth scheme is challenge/response. Upon connection the server
# sends an 8-byte challenge message. This is hashed with the password
# to produce an 8-byte response. The server side performs an identical
# hash to verify the password is correct.
class random_state:
def __init__ (self, seed, seed2):
self.max_value = 0x3FFFFFFF
self.seed = seed % self.max_value
self.seed2 = seed2 % self.max_value
def rnd (self):
self.seed = (self.seed * 3 + self.seed2) % self.max_value
self.seed2 = (self.seed + self.seed2 + 33) % self.max_value
return float(self.seed)/ float(self.max_value)
def hash_password (password):
nr=1345345333L
add=7
nr2=0x12345671L
for ch in password:
if (ch == ' ') or (ch == '\t'):
continue
tmp = ord(ch)
nr = nr ^ (((nr & 63) + add) * tmp) + (nr << 8)
nr2 = nr2 + ((nr2 << 8) ^ nr)
add = add + tmp
return (
nr & ((1L<<31)-1L),
nr2 & ((1L<<31)-1L)
)
def scramble (message, password):
hash_pass = hash_password (password)
hash_mess = hash_password (message)
r = random_state (
hash_pass[0] ^ hash_mess[0],
hash_pass[1] ^ hash_mess[1]
)
to = []
for ch in message:
to.append (int (math.floor ((r.rnd() * 31) + 64)))
extra = int (math.floor (r.rnd()*31))
for i in range(len(to)):
to[i] = to[i] ^ extra
return to
# ===========================================================================
# Packet Protocol
# ===========================================================================
def unpacket (p):
# 3-byte length, one-byte packet number, followed by packet data
a,b,c,s = map (ord, p[:4])
l = a | (b << 8) | (c << 16)
# s is a sequence number
return l, s
def packet (data, s=0):
l = len(data)
a, b, c = l & 0xff, (l>>8) & 0xff, (l>>16) & 0xff
h = map (chr, [a,b,c,s])
return string.join (h,'') + data
def n_byte_num (data, n):
result = 0
for i in range(n):
result = result | (ord(data[i])<<(8*i))
return result
def net_field_length (data, pos=0):
n = ord(data[pos])
if n < 251:
return n, 1
elif n == 251:
return None, 1
elif n == 252:
return n_byte_num (data, 2), 3
elif n == 253:
return n_byte_num (data, 3), 4
else:
# libmysql adds 6, why?
return n_byte_num (data, 4), 5
# used to generate the dumps below
def dump_hex (s):
r1 = []
r2 = []
for ch in s:
r1.append (' %02x' % ord(ch))
if (ch in string.letters) or (ch in string.digits):
r2.append (' %c' % ch)
else:
r2.append (' ')
return string.join (r1, ''), string.join (r2, '')
# ===========================================================================
# MySQL Client
# ===========================================================================
class mysql_client (asynchat.async_chat):
# protocol state
PS_HEADER = 1
PS_DATA = 2
# auth state
AS_LOGIN = 1
AS_CHALLENGE = 2
AS_RESPONSE = 3
def __init__ (self, username, password, login_callback, address=('127.0.0.1', 3306)):
asynchat.async_chat.__init__ (self)
self.username = username
self.password = password
self.server_address = address
self.login (login_callback)
self.current_database = None
def login (self, callback):
self.login_callback = callback
self.buffer = ''
self.set_terminator (None)
address = self.server_address
if type(address) == type(''):
self.create_socket (socket.AF_UNIX, socket.SOCK_STREAM)
else:
self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self._connected = 0
self.connect (address)
self.auth = self.AS_LOGIN
self.state = self.PS_HEADER
self.query_fifo = fifo()
def handle_connect (self):
self._connected = 1
def close (self):
asynchat.async_chat.close (self)
self._connected = 0
self.discard_buffers()
# XXX: check query fifo for pending queries
def collect_incoming_data (self, data):
# packets come in with a four-byte head on them.
# we need to stay sync'd with that. we use a
# two-state machine.
self.buffer = self.buffer + data
while self.buffer:
if self.state is self.PS_HEADER:
# do we have a complete header?
if len(self.buffer) >= 4:
l,s = unpacket (self.buffer)
self.pinfo = l, s
self.state = self.PS_DATA
self.buffer = self.buffer[4:]
else:
break
elif self.state is self.PS_DATA:
l, s = self.pinfo
if len(self.buffer) >= l:
data, self.buffer = self.buffer[:l], self.buffer[l:]
self.handle_packet (s, data)
self.state = self.PS_HEADER
else:
break
def handle_packet (self, seq, data):
if self.auth is self.AS_LOGIN:
# unpack the greeting
protocol_version = ord(data[0])
eos = string.find (data, '\000')
mysql_version = data[1:eos]
thread_id = n_byte_num (data[eos+1:eos+5], 4)
challenge = data[eos+5:eos+13]
self.auth = (
protocol_version,
mysql_version,
thread_id,
challenge
)
# print auth
lp = self.build_login_packet (challenge)
# print 'login packet:',repr(lp)
self.auth = self.AS_CHALLENGE
# seems to require a sequence number of one
self.push (packet (lp, 1))
elif self.auth is self.AS_CHALLENGE:
if seq != 2:
self.login_callback (self, 0)
else:
self.auth = self.AS_RESPONSE
self.login_callback (self, 1)
else:
if seq == 1:
callback = self.query_fifo.pop()
if callback:
self.current_callback = callback
callback (seq, data)
else:
if self.current_callback:
self.current_callback (seq, data)
def build_login_packet (self, challenge):
auth = string.join (map (chr, scramble (challenge, self.password)), '')
# 2 bytes of client_capability
# 3 bytes of max_allowed_packet
# no idea what they are
return '\005\000\000\000\020' + self.username + '\000' + auth
def push_query (self, query, callback=None, sequence=0):
if self._connected:
self.push (packet (query, sequence))
self.query_fifo.push (callback)
else:
self.login (
continuation (self._relogin, (query, callback, sequence))
)
# ======================================================================
# auto-reconnect support
# ======================================================================
def _relogin (self, (query, callback, sequence), ignore, result):
if result:
if self.current_database:
self.cmd_use (
self.current_database,
continuation (self._relogin_use, (query, callback, sequence))
)
else:
self.push_query (query, callback, sequence)
else:
# XXX all callbacks need an 'error' parameter.
raise SystemError, "Couldn't reconnect to mysql server"
def _relogin_use (self, (query, callback, sequence), nfields, data):
# this should really be done by cmd_use... (i.e., use a continuation object)
if (nfields > 0) and (data == '\000\000\000'):
self.push_query (query, callback, sequence)
else:
# XXX all callbacks need an 'error' parameter.
raise SystemError, "Couldn't reconnect to current database"
# ======================================================================
# Commands
# ======================================================================
# from mysql-3.21.33/include/mysql_com.h.in
#
cmds = [
'sleep', 'quit', 'init_db', 'query', 'field_list', 'create_db',
'drop_db', 'refresh', 'shutdown', 'statistics', 'process_info',
'connect', 'process_kill', 'debug'
]
d = {}
for i in range (len (cmds)):
d[cmds[i]] = i
cmds = d
del d
def command (self, command_type, command, callback):
self.push_query (
chr(self.cmds[command_type]) + command,
callback
)
def cmd_use (self, database, callback=None):
self.current_database = database
self.command ('init_db', database, callback)
def cmd_quit (self, callback=None):
self.command ('quit', '', callback)
def cmd_query (self, query, callback=None):
self.command ('query', query, result_set (callback))
# ===========================================================================
# Result Set
# ===========================================================================
class result_set:
'unpack a result set'
' If <data_callback> is specified, it will be called'
' for each element of the result set. Otherwise, the'
' results will be collected into a list made available'
' to <callback>'
def __init__ (self, callback, data_callback=None):
self.callback = callback
self.packets = []
self.nfields = None
self.fields = []
self.data = None
self.data_callback = None
def unpack_data (self, d):
r = []
i = 0
while i < len(d):
fl = ord(d[i])
i = i + 1
r.append (d[i:i+fl])
i = i + fl
return r
def __call__ (self, seq, data):
if self.nfields is None:
# first packet is the number of fields (or an error)
n = ord(data[0])
if n == 0:
self.callback ([], [])
else:
self.nfields = n
elif self.data is None:
if ord(data[0]) != 0xfe:
# collect field info
self.fields.append (self.unpack_data (data))
else:
self.data = []
else:
# collect data
if ord(data[0]) != 0xfe:
if self.data_callback:
self.data_callback (self.unpack_data (data))
else:
self.data.append (self.unpack_data (data))
else:
self.callback (self.fields, self.data)
if __name__ == '__main__':
import random
class test_mysql_client:
def __init__ (self, client):
self.client = client
self.client.cmd_query ('create database test_async', self.callback_create)
def callback_create (self, *info):
print 'create database=>', info
self.client.cmd_use ('test_async', self.callback_use)
def callback_use (self, *info):
print 'use=>', info
self.client.cmd_query (
'create table users (name char(30), cool int)',
self.callback_create_table
)
people = ['john', 'paul', 'george', 'ringo']
def callback_create_table (self, *info):
print 'create_table=>', info
self.people_index = 0
self.callback_insert ()
def callback_insert (self, *info):
print 'insert=>', info
if self.people_index == len(self.people):
self.client.cmd_query (
'select * from users',
self.callback_query
)
else:
self.client.cmd_query (
'insert into users values ("%s", %s)' % (
self.people[self.people_index],
random.randint (0,1)
),
self.callback_insert
)
self.people_index = self.people_index + 1
def callback_query (self, fields, data):
print 'query=>'
print ' fields:'
for field in fields:
print '\t%s' % repr(field)
print ' data:'
for d in data:
print '\t%s' % repr(d)
self.client.command (
'drop_db',
'test_async',
self.callback_drop
)
def callback_drop (self, *info):
print 'drop=>', info
self.client.cmd_quit (self.callback_quit)
def callback_quit (self, *info):
print 'quit=>', info
def go (client, yesno):
if yesno:
test_mysql_client (client)
else:
print 'Failed to log in'
import sys
if len(sys.argv) < 4:
print 'Usage: %s <username> <password> <host>' % sys.argv[0]
else:
[username, password, host] = sys.argv[1:4]
c = mysql_client (username, password, go, (host, 3306))
asyncore.loop()
# greeting:
# * first byte is the protocol version (currently 10)
# * null-terminated version string
# * 4-byte thread id.
# * 8-byte challenge
# * 2-byte server capabilities?
# message = [0x00, 0x39, 0x4d, 0x59, 0x59, 0x31, 0x29, 0x79, 0x47]
# password = [0x66, 0x6e, 0x6f, 0x72, 0x64]
# Handshake:
#----------------------------------------
#<== 000 0a 33 2e 32 32 2e 31 30 2d 62 65 74 61 00 1b 00 00 00 39 4d 59 59 31 29 79 47 00 0c 00
# 3 2 2 1 0 b e t a 9 M Y Y 1 y G
#----------------------------------------
#==> 1
# 05 00 00 00 10 72 75 73 68 69 6e 67 00 48 51 42 50 5d 4a 54 57
# r u s h i n g H Q B P J T W
#----------------------------------------
#<== 002 00 00 00
# Insertion/Query (no result set)
#----------------------------------------
#==> 0
# 03 69 6e 73 65 72 74 20 69 6e 74 6f 20 75 73 65 72 73 20 76 61 6c 75 65 73 20 28 22 61 73 64 66 40 61 73 64 66 2e 61 73 64 66 22 2c 20 22 6e 22 29
# i n s e r t i n t o u s e r s v a l u e s a s d f a s d f a s d f n
#----------------------------------------
#<== 001 00 01 00
# Query (with result set)
#----------------------------------------
#==> 0
# 03 73 65 6c 65 63 74 20 2a 20 66 72 6f 6d 20 75 73 65 72 73
# s e l e c t f r o m u s e r s
#----------------------------------------
#<== 001 02
#
#<== 002 05 75 73 65 72 73 04 6e 61 6d 65 03 80 00 00 01 fe 03 00 00 00
# u s e r s n a m e
#<== 003 05 75 73 65 72 73 0a 69 73 62 6f 75 6e 63 69 6e 67 03 01 00 00 01 fe 03 00 00 00
# u s e r s i s b o u n c i n g
#<== 004 fe
#
#<== 005 15 72 75 73 68 69 6e 67 40 6e 69 67 68 74 6d 61 72 65 2e 63 6f 6d 01 6e
# r u s h i n g n i g h t m a r e c o m n
#<== 006 0e 61 73 64 66 40 61 73 64 66 2e 61 73 64 66 01 6e
# a s d f a s d f a s d f n
#<== 007 fe
# "use bouncer_test"
#==> 0
# 02 62 6f 75 6e 63 65 72 5f 74 65 73 74
# b o u n c e r t e s t
#----------------------------------------
#<== 001 00 00 00
|