# $Id: test_kjbuckets.py,v 1.4 2002/05/08 00:49:01 anthonybaxter Exp $
import unittest
# a simple test for kjbuckets0 stolen from relalg.py in the kjbuckets C
# module distro
# A simple implementation of the relational algebra using kjbuckets
def relFromDictSet(schemeseq, dictSet):
result = relation(schemeseq, [] )
result.rowset = dictSet
return result
class relation:
def __init__(self, schemeseq, listofrows):
self.schemeseq = schemeseq
self.scheme = kjbuckets_module.kjSet(schemeseq)
rowset = kjbuckets_module.kjSet()
for row in listofrows:
rowset.add(kjbuckets_module.kjUndump(schemeseq, row))
self.rowset = rowset
def result(self):
l = []
for row in self.rowset.items():
l.append(row.dump(self.schemeseq))
return l
def addDicts(self, dictseq): # not used...
for dict in dictseq:
self.rowset.add(dict)
def checkUnionCompatible(self,other):
if self.scheme != other.scheme:
raise ValueError, "operands not union compatible"
# relational union
def __add__(self, other):
self.checkUnionCompatible(other)
return relFromDictSet(self.schemeseq, self.rowset + other.rowset)
# relational difference
def __sub__(self, other):
self.checkUnionCompatible(other)
return relFromDictSet(self.schemeseq, self.rowset - other.rowset)
# natural join (hash based algorithm)
def __mul__(self,other):
commonatts = self.scheme & other.scheme
resultset = kjbuckets_module.kjSet()
if commonatts: # do a hash based join
dumper = tuple(commonatts.items())
selfgraph = kjbuckets_module.kjGraph() # hash index for self
othergraph = kjbuckets_module.kjGraph() # hash index for other
for row in self.rowset.items():
selfgraph[row] = row.dump(dumper)
for row in other.rowset.items():
othergraph[row.dump(dumper)] = row
for (selfrow, otherrow) in (selfgraph * othergraph).items():
resultset.add(selfrow + otherrow)
else: # no common attributes: do a cross product
otherrows = other.rowset.items()
for selfrow in self.rowset.items():
for otherrow in otherrows:
resultset.add(selfrow + otherrow)
return relFromDictSet( tuple((self.scheme + other.scheme).items()),
resultset )
# selection using a att->value pairs (as conjunction)
def vSel(pairs, rel):
selected = kjbuckets_module.kjSet()
selector = kjbuckets_module.kjDict(pairs)
if selector.Clean()!=None:
for row in rel.rowset.items():
if (row + selector).Clean() != None:
selected.add(row)
return relFromDictSet(rel.schemeseq, selected)
# selection using att = att pairs (as conjunction)
def eqSelect(pairs, rel):
selected = kjbuckets_module.kjSet()
selector = kjbuckets_module.kjGraph(pairs)
selector = (selector + ~selector).tclosure() # sym, trans closure
for row in rel.rowset.items():
if row.remap(selector) != None:
selected.add(row)
return relFromDictSet(rel.schemeseq, selected)
# projection on attribute sequence (as conjunction)
def proj(atts, rel):
attset = kjbuckets_module.kjSet(atts)
resultset = kjbuckets_module.kjSet()
for row in rel.rowset.items():
resultset.add(attset * row)
return relFromDictSet(atts, resultset)
# renaming using (new,old) pair sequence
def rename(pairs, rel):
renames = kjbuckets_module.kjDict(pairs)
untouched = rel.scheme - kjbuckets_module.kjSet(renames.values())
mapper = renames + untouched
resultset = kjbuckets_module.kjSet()
for row in rel.rowset.items():
resultset.add(mapper * row)
return relFromDictSet(tuple(mapper.keys()), resultset)
#=========== end of simple.py
#
#Now let me show you the "simple" module in use. First we need some relations.
#I'll steal C.J.Date's canonical/soporific supplier/parts database:
#
## database of suppliers, parts and shipments
## from Date, page 79 (2nd ed) or page 92 (3rd ed) */
class test_kjbuckets0(unittest.TestCase):
def setUp(self):
global kjbuckets_module
import gadfly.kjbuckets0 as kjbuckets_module
def test(self):
#suppliers
S = relation(
('snum', 'sname', 'status', 'city'),
[ (1, 'Smith', 20, 'London'),
(2, 'Jones', 10, 'Paris'),
(3, 'Blake', 30, 'Paris'),
(4, 'Clark', 20, 'London'),
(5, 'Adams', 30, 'Athens')
])
#parts
P = relation(
('pnum', 'pname', 'color', 'weight', 'pcity'),
[ (1, 'Nut', 'Red', 12, 'London'),
(2, 'Bolt', 'Green', 17, 'Paris' ),
(3, 'Screw', 'Blue', 17, 'Rome' ),
(4, 'Screw', 'Red', 14, 'London'),
(5, 'Cam', 'Blue', 12, 'Paris'),
(6, 'Cog', 'Red', 19, 'London')
])
# shipments
SP = relation(
('snum', 'pnum', 'qty',),
[ (1, 1, 300),
(1, 2, 200),
(1, 3, 400),
(1, 4, 200),
(1, 5, 100),
(1, 6, 100),
(2, 1, 300),
(2, 2, 400),
(3, 2, 200),
(4, 2, 200),
(4, 4, 300),
(4, 5, 400)
])
# names and cities of suppliers
l = proj(("sname","city"),S).result()
l.sort()
self.assertEquals(l, [('Adams', 'Athens'), ('Blake', 'Paris'),
('Clark', 'London'), ('Jones', 'Paris'), ('Smith', 'London')])
# part names of parts supplied by Blake
self.assertEquals(proj(("pname",),vSel( ( ("sname","Blake"), ),
S*SP*P)).result(), ['Bolt'])
# supplier names and numbers where the supplier doesn't supply screws
l = (proj(("sname","snum"), S) -
proj(("sname","snum"), vSel((("pname", "Screw"),), P*SP*S))
).result()
l.sort()
self.assertEquals(l, [('Adams', 5), ('Blake', 3), ('Jones', 2)])
def test2(self):
G = kjbuckets_module.kjGraph()
r3 = range(3)
r = map(None, r3, r3)
for i in range(3):
G[i] = i+1
D = kjbuckets_module.kjDict(G)
D[9]=0
G[0]=10
S = kjbuckets_module.kjSet(G)
S[-1] = 5
#print "%s.remap(%s) = %s" % (D, G, D.remap(G))
for X in (S, D, G, r, tuple(r), 1):
for C in (kjbuckets_module.kjGraph, kjbuckets_module.kjSet,
kjbuckets_module.kjDict):
T = C(X)
T2 = C()
ALL = (S, D, G)
for X in ALL:
self.assertEqual(len(X), len(X.items()))
cb = X.Clean()
del X[2]
self.assertNotEqual(cb, X.Clean() or [])
self.assert_(X.subset(X), "trivial subset fails")
self.assert_(X==X, "trivial cmp fails")
self.assert_(not not X, "nonzero fails")
if X is S:
self.assert_(S.member(0), "huh 1?")
self.assert_(not S.member(123), "huh 2?")
S.add(999)
del S[1]
self.assert_(S.has_key(999), "huh 3?")
else:
self.assertNotEqual(X, ~X, "inverted")
self.assert_(X.member(0,1), "member test fails (0,1)")
X.add(999,888)
X.delete_arc(999,888)
self.assert_(not X.member(999,888),
"member test fails (999,888)")
self.assert_(not X.has_key(999), "has_key fails 999")
self.assert_(X.has_key(0), "has_key fails 0")
for Y in ALL:
#if (X!=S and Y!=S):
# print "diff", X, Y
# print "%s-%s=%s" % (X,Y,X-Y)
#elif X==S:
# D = kjbuckets_module.kjSet(Y)
# print "diff", X, D
# print "%s-%s=%s" % (X,D,X-D)
#print "%s+%s=%s" % (X,Y,X+Y)
#print "%s&%s=%s" % (X,Y,X&Y)
#print "%s*%s=%s" % (X,Y,X*Y)
x,y = cmp(X,Y), cmp(Y,X)
self.assertEqual(x, -y, "bad cmp!")
#print "cmp(X,Y), -cmp(Y,X)", x,-y
#print "X.subset(Y)", X.subset(Y)
class test_kjbuckets(unittest.TestCase):
def setUp(self):
global kjbuckets_module
import kjbuckets as kjbuckets_module
def suite():
l = [
unittest.makeSuite(test_kjbuckets0),
]
try:
import kjbuckets
l.append(unittest.makeSuite(test_kjbuckets))
except ImportError:
print 'not running kjbuckets C module test'
pass
return unittest.TestSuite(l)
if __name__ == '__main__':
runner = unittest.TextTestRunner()
runner.run(suite())
#
# $Log: test_kjbuckets.py,v $
# Revision 1.4 2002/05/08 00:49:01 anthonybaxter
# El Grande Grande reindente! Ran reindent.py over the whole thing.
# Gosh, what a lot of checkins. Tests still pass with 2.1 and 2.2.
#
# Revision 1.3 2002/05/07 04:03:14 richard
# . major cleanup of test_gadfly
#
# Revision 1.2 2002/05/06 23:27:10 richard
# . made the installation docco easier to find
# . fixed a "select *" test - column ordering is different for py 2.2
# . some cleanup in gadfly/kjParseBuild.py
# . made the test modules runnable (remembering that run_tests can take a
# name argument to run a single module)
# . fixed the module name in gadfly/kjParser.py
#
#
|