from time import time
import subprocess
import random
import numpy
# Constants
STEP = 1000*100 # the size of the buffer to fill the table, in rows
SCALE = 0.1 # standard deviation of the noise compared with actual values
NI_NTIMES = 1 # The number of queries for doing a mean (non-idx cols)
#COLDCACHE = 10 # The number of reads where the cache is considered 'cold'
#WARMCACHE = 50 # The number of reads until the cache is considered 'warmed'
#READ_TIMES = WARMCACHE+50 # The number of complete calls to DB.query_db()
COLDCACHE = 50 # The number of reads where the cache is considered 'cold'
WARMCACHE = 50 # The number of reads until the cache is considered 'warmed'
READ_TIMES = WARMCACHE+50 # The number of complete calls to DB.query_db()
MROW = 1000*1000.
# Test values
#COLDCACHE = 5 # The number of reads where the cache is considered 'cold'
#WARMCACHE = 5 # The number of reads until the cache is considered 'warmed'
#READ_TIMES = 10 # The number of complete calls to DB.query_db()
# global variables
rdm_cod = ['lin', 'rnd']
prec = 6 # precision for printing floats purposes
def get_nrows(nrows_str):
if nrows_str.endswith("k"):
return int(float(nrows_str[:-1])*1000)
elif nrows_str.endswith("m"):
return int(float(nrows_str[:-1])*1000*1000)
elif nrows_str.endswith("g"):
return int(float(nrows_str[:-1])*1000*1000*1000)
else:
raise ValueError, "value of nrows must end with either 'k', 'm' or 'g' suffixes."
class DB(object):
def __init__(self, nrows, rng, userandom):
global step, scale
self.step = STEP
self.scale = SCALE
self.rng = rng
self.userandom = userandom
self.filename = '-'.join([rdm_cod[userandom], nrows])
self.nrows = get_nrows(nrows)
def get_db_size(self):
sout = subprocess.Popen("sync;du -s %s" % self.filename, shell=True,
stdout=subprocess.PIPE).stdout
line = [l for l in sout][0]
return int(line.split()[0])
def print_mtime(self, t1, explain):
mtime = time()-t1
print "%s:" % explain, round(mtime, 6)
print "Krows/s:", round((self.nrows/1000.)/mtime, 6)
def print_qtime(self, colname, ltimes):
qtime1 = ltimes[0] # First measured time
qtime2 = ltimes[-1] # Last measured time
print "Query time for %s:" % colname, round(qtime1, 6)
print "Mrows/s:", round((self.nrows/(MROW))/qtime1, 6)
print "Query time for %s (cached):" % colname, round(qtime2, 6)
print "Mrows/s (cached):", round((self.nrows/(MROW))/qtime2, 6)
def norm_times(self, ltimes):
"Get the mean and stddev of ltimes, avoiding the extreme values."
lmean = ltimes.mean(); lstd = ltimes.std()
ntimes = ltimes[ltimes < lmean+lstd]
nmean = ntimes.mean(); nstd = ntimes.std()
return nmean, nstd
def print_qtime_idx(self, colname, ltimes, repeated, verbose):
if repeated:
r = "[REP] "
else:
r = "[NOREP] "
ltimes = numpy.array(ltimes)
ntimes = len(ltimes)
qtime1 = ltimes[0] # First measured time
ctimes = ltimes[1:COLDCACHE]
cmean, cstd = self.norm_times(ctimes)
wtimes = ltimes[WARMCACHE:]
wmean, wstd = self.norm_times(wtimes)
if verbose:
print "Times for cold cache:\n", ctimes
#print "Times for warm cache:\n", wtimes
print "Histogram for warm cache: %s\n%s" % \
numpy.histogram(wtimes, new=True)
print "%s1st query time for %s:" % (r, colname), \
round(qtime1, prec)
print "%sQuery time for %s (cold cache):" % (r, colname), \
round(cmean, prec), "+-", round(cstd, prec)
print "%sQuery time for %s (warm cache):" % (r, colname), \
round(wmean, prec), "+-", round(wstd, prec)
def print_db_sizes(self, init, filled, indexed):
table_size = (filled-init)/1024.
indexes_size = (indexed-filled)/1024.
print "Table size (MB):", round(table_size, 3)
print "Indexes size (MB):", round(indexes_size, 3)
print "Full size (MB):", round(table_size+indexes_size, 3)
def fill_arrays(self, start, stop):
arr_f8 = numpy.arange(start, stop, dtype='float64')
arr_i4 = numpy.arange(start, stop, dtype='int32')
if self.userandom:
arr_f8 += numpy.random.normal(0, stop*self.scale,
size=stop-start)
arr_i4 = numpy.array(arr_f8, dtype='int32')
return arr_i4, arr_f8
def create_db(self, dtype, kind, optlevel, verbose):
self.con = self.open_db(remove=1)
self.create_table(self.con)
init_size = self.get_db_size()
t1=time()
self.fill_table(self.con)
table_size = self.get_db_size()
self.print_mtime(t1, 'Insert time')
self.index_db(dtype, kind, optlevel, verbose)
indexes_size = self.get_db_size()
self.print_db_sizes(init_size, table_size, indexes_size)
self.close_db(self.con)
def index_db(self, dtype, kind, optlevel, verbose):
if dtype == "int":
idx_cols = ['col2']
elif dtype == "float":
idx_cols = ['col4']
else:
idx_cols = ['col2', 'col4']
for colname in idx_cols:
t1=time()
self.index_col(self.con, colname, kind, optlevel, verbose)
self.print_mtime(t1, 'Index time (%s)' % colname)
def query_db(self, niter, dtype, onlyidxquery, onlynonidxquery,
avoidfscache, verbose, inkernel):
self.con = self.open_db()
if dtype == "int":
reg_cols = ['col1']
idx_cols = ['col2']
elif dtype == "float":
reg_cols = ['col3']
idx_cols = ['col4']
else:
reg_cols = ['col1', 'col3']
idx_cols = ['col2', 'col4']
if avoidfscache:
rseed = int(numpy.random.randint(self.nrows))
else:
rseed = 19
# Query for non-indexed columns
numpy.random.seed(rseed)
base = numpy.random.randint(self.nrows)
if not onlyidxquery:
for colname in reg_cols:
ltimes = []
random.seed(rseed)
for i in range(NI_NTIMES):
t1=time()
results = self.do_query(self.con, colname, base, inkernel)
ltimes.append(time()-t1)
if verbose:
print "Results len:", results
self.print_qtime(colname, ltimes)
# Always reopen the file after *every* query loop.
# Necessary to make the benchmark to run correctly.
self.close_db(self.con)
self.con = self.open_db()
# Query for indexed columns
if not onlynonidxquery:
for colname in idx_cols:
ltimes = []
numpy.random.seed(rseed)
rndbase = numpy.random.randint(self.nrows, size=niter)
# First, non-repeated queries
for i in range(niter):
base = rndbase[i]
t1=time()
results = self.do_query(self.con, colname, base, inkernel)
#results, tprof = self.do_query(self.con, colname, base, inkernel)
ltimes.append(time()-t1)
if verbose:
print "Results len:", results
self.print_qtime_idx(colname, ltimes, False, verbose)
# Always reopen the file after *every* query loop.
# Necessary to make the benchmark to run correctly.
self.close_db(self.con)
self.con = self.open_db()
ltimes = []
# # Second, repeated queries
# for i in range(niter):
# t1=time()
# results = self.do_query(self.con, colname, base, inkernel)
# #results, tprof = self.do_query(self.con, colname, base, inkernel)
# ltimes.append(time()-t1)
# if verbose:
# print "Results len:", results
# self.print_qtime_idx(colname, ltimes, True, verbose)
# Print internal PyTables index tprof statistics
#tprof = numpy.array(tprof)
#tmean, tstd = self.norm_times(tprof)
#print "tprof-->", round(tmean, prec), "+-", round(tstd, prec)
#print "tprof hist-->", \
# numpy.histogram(tprof, new=True)
#print "tprof raw-->", tprof
# Always reopen the file after *every* query loop.
# Necessary to make the benchmark to run correctly.
self.close_db(self.con)
self.con = self.open_db()
# Finally, close the file.
self.close_db(self.con)
def close_db(self, con):
con.close()
if __name__=="__main__":
import sys, os
import getopt
try:
import psyco
psyco_imported = 1
except:
psyco_imported = 0
usage = """usage: %s [-T] [-P] [-v] [-f] [-k] [-p] [-m] [-c] [-q] [-i] [-I] [-S] [-x] [-z complevel] [-l complib] [-R range] [-N niter] [-n nrows] [-d datadir] [-O level] [-t kind] [-s] col -Q [suplim]
-T use Pytables
-P use Postgres
-v verbose
-f do a profile of the run (only query functionality & Python 2.5)
-k do a profile for kcachegrind use (out file is 'indexed_search.kcg')
-p use "psyco" if available
-m use random values to fill the table
-q do a query (both indexed and non-indexed versions)
-i do a query (just indexed one)
-I do a query (just in-kernel one)
-S do a query (just standard one)
-x choose a different seed for random numbers (i.e. avoid FS cache)
-c create the database
-z compress with zlib (no compression by default)
-l use complib for compression (zlib used by default)
-R select a range in a field in the form "start,stop" (def "0,10")
-N number of iterations for reading
-n sets the number of rows (in krows) in each table
-d directory to save data (default: data.nobackup)
-O set the optimization level for PyTables Pro indexes
-t select the index type: "medium" (default) or "full", "light", "ultralight"
-s select a type column for operations ('int' or 'float'. def all)
-Q do a repeteated query up to 10**value
\n""" % sys.argv[0]
try:
opts, pargs = getopt.getopt(sys.argv[1:], 'TPvfkpmcqiISxz:l:R:N:n:d:O:t:s:Q:')
except:
sys.stderr.write(usage)
sys.exit(1)
# default options
usepytables = 0
usepostgres = 0
verbose = 0
doprofile = 0
dokprofile = 0
usepsyco = 0
userandom = 0
docreate = 0
optlevel = 0
kind = "medium"
docompress = 0
complib = "zlib"
doquery = False
onlyidxquery = False
onlynonidxquery = False
inkernel = True
avoidfscache = 0
#rng = [-10, 10]
rng = [-1000, -1000]
repeatquery = 0
repeatvalue = 0
krows = '1k'
niter = READ_TIMES
dtype = "all"
datadir = "data.nobackup"
# Get the options
for option in opts:
if option[0] == '-T':
usepytables = 1
elif option[0] == '-P':
usepostgres = 1
elif option[0] == '-v':
verbose = 1
elif option[0] == '-f':
doprofile = 1
elif option[0] == '-k':
dokprofile = 1
elif option[0] == '-p':
usepsyco = 1
elif option[0] == '-m':
userandom = 1
elif option[0] == '-c':
docreate = 1
elif option[0] == '-q':
doquery = True
elif option[0] == '-i':
doquery = True
onlyidxquery = True
elif option[0] == '-I':
doquery = True
onlynonidxquery = True
elif option[0] == '-S':
doquery = True
onlynonidxquery = True
inkernel = False
elif option[0] == '-x':
avoidfscache = 1
elif option[0] == '-z':
docompress = int(option[1])
elif option[0] == '-l':
complib = option[1]
elif option[0] == '-R':
rng = [int(i) for i in option[1].split(",")]
elif option[0] == '-N':
niter = int(option[1])
elif option[0] == '-n':
krows = option[1]
elif option[0] == '-d':
datadir = option[1]
elif option[0] == '-O':
optlevel = int(option[1])
elif option[0] == '-t':
if option[1] in ('full', 'medium', 'light', 'ultralight'):
kind = option[1]
else:
print "kind should be either 'full', 'medium', 'light' or 'ultralight'"
sys.exit(1)
elif option[0] == '-s':
if option[1] in ('int', 'float'):
dtype = option[1]
else:
print "column should be either 'int' or 'float'"
sys.exit(1)
elif option[0] == '-Q':
repeatquery = 1
repeatvalue = int(option[1])
# If not database backend selected, abort
if not usepytables and not usepostgres:
print "Please select a backend:"
print "PyTables: -T"
print "Postgres: -P"
sys.exit(1)
# Create the class for the database
if usepytables:
from pytables_backend import PyTables_DB
db = PyTables_DB(krows, rng, userandom, datadir,
docompress, complib, kind, optlevel)
elif usepostgres:
from postgres_backend import Postgres_DB
db = Postgres_DB(krows, rng, userandom)
if not avoidfscache:
# in order to always generate the same random sequence
numpy.random.seed(20)
if verbose:
if userandom:
print "using random values"
if onlyidxquery:
print "doing indexed queries only"
if psyco_imported and usepsyco:
psyco.bind(db.create_db)
psyco.bind(db.query_db)
if docreate:
if verbose:
print "writing %s rows" % krows
db.create_db(dtype, kind, optlevel, verbose)
if doquery:
print "Calling query_db() %s times" % niter
if doprofile:
import pstats
import cProfile as prof
prof.run('db.query_db(niter, dtype, onlyidxquery, onlynonidxquery, avoidfscache, verbose, inkernel)', 'indexed_search.prof')
stats = pstats.Stats('indexed_search.prof')
stats.strip_dirs()
stats.sort_stats('time', 'calls')
if verbose:
stats.print_stats()
else:
stats.print_stats(20)
elif dokprofile:
from cProfile import Profile
import lsprofcalltree
prof = Profile()
prof.run('db.query_db(niter, dtype, onlyidxquery, onlynonidxquery, avoidfscache, verbose, inkernel)')
kcg = lsprofcalltree.KCacheGrind(prof)
ofile = open('indexed_search.kcg','w')
kcg.output(ofile)
ofile.close()
elif doprofile:
import hotshot, hotshot.stats
prof = hotshot.Profile("indexed_search.prof")
benchtime, stones = prof.run('db.query_db(niter, dtype, onlyidxquery, onlynonidxquery, avoidfscache, verbose, inkernel)')
prof.close()
stats = hotshot.stats.load("indexed_search.prof")
stats.strip_dirs()
stats.sort_stats('time', 'calls')
stats.print_stats(20)
else:
db.query_db(niter, dtype, onlyidxquery, onlynonidxquery,
avoidfscache, verbose, inkernel)
if repeatquery:
# Start by a range which is almost None
db.rng = [1, 1]
if verbose:
print "range:", db.rng
db.query_db(niter, dtype, onlyidxquery, onlynonidxquery,
avoidfscache, verbose, inkernel)
for i in xrange(repeatvalue):
for j in (1, 2, 5):
rng = j*10**i
db.rng = [-rng/2, rng/2]
if verbose:
print "range:", db.rng
# if usepostgres:
# os.system("echo 1 > /proc/sys/vm/drop_caches; /etc/init.d/postgresql restart")
# else:
# os.system("echo 1 > /proc/sys/vm/drop_caches")
db.query_db(niter, dtype, onlyidxquery, onlynonidxquery,
avoidfscache, verbose, inkernel)
|