""" This test unit checks object creation funtions, like openFile, createTable,
createArray or createGroup.
It also checks:
- name identifiers in tree objects
- title character limit for objects (255)
- limit in number in table fields (255)
"""
import sys
import unittest
import os
import re
import tempfile
import warnings
from tables import *
# important objects to test
from tables import File,Group,Leaf,Table,Array
from tables.tests import common
from tables.parameters import MAX_COLUMNS
import tables
# To delete the internal attributes automagically
unittest.TestCase.tearDown = common.cleanup
class Record(IsDescription):
var1 = StringCol(itemsize=4) # 4-character String
var2 = IntCol() # integer
var3 = Int16Col() # short integer
var4 = FloatCol() # double (double-precision)
var5 = Float32Col() # float (single-precision)
class createTestCase(unittest.TestCase):
file = "test.h5"
title = "This is the table title"
expectedrows = 100
maxshort = 2 ** 15
maxint = 2147483648 # (2 ** 31)
compress = 0
def setUp(self):
# Create an instance of HDF5 Table
self.fileh = openFile(self.file, mode = "w")
self.root = self.fileh.root
# Create a table object
self.table = self.fileh.createTable(self.root, 'atable',
Record, "Table title")
# Create an array object
self.array = self.fileh.createArray(self.root, 'anarray',
[1], "Array title")
# Create a group object
self.group = self.fileh.createGroup(self.root, 'agroup',
"Group title")
def tearDown(self):
self.fileh.close()
os.remove(self.file)
common.cleanup(self)
#----------------------------------------
def test00_isClass(self):
"""Testing table creation"""
assert isinstance(self.table, Table)
assert isinstance(self.array, Array)
assert isinstance(self.array, Leaf)
assert isinstance(self.group, Group)
def test01_overwriteNode(self):
"""Checking protection against node overwriting"""
try:
self.array = self.fileh.createArray(self.root, 'anarray',
[1], "Array title")
except NodeError:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next NameError was catched!"
print value
else:
self.fail("expected a NodeError")
def test02_syntaxname(self):
"""Checking syntax in object tree names"""
# Now, try to attach an array to the object tree with
# a not allowed Python variable name
warnings.filterwarnings("error", category=NaturalNameWarning)
try:
self.array = self.fileh.createArray(self.root, ' array',
[1], "Array title")
except NaturalNameWarning:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next NaturalNameWarning was catched!"
print value
else:
self.fail("expected a NaturalNameWarning")
# another name error
try:
self.array = self.fileh.createArray(self.root, '$array',
[1], "Array title")
except NaturalNameWarning:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next NaturalNameWarning was catched!"
print value
else:
self.fail("expected a NaturalNameWarning")
# Finally, test a reserved word
try:
self.array = self.fileh.createArray(self.root, 'for',
[1], "Array title")
except NaturalNameWarning:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next NaturalNameWarning was catched!"
print value
else:
self.fail("expected a NaturalNameWarning")
# Reset the warning
warnings.filterwarnings("default", category=NaturalNameWarning)
def test03a_titleAttr(self):
"""Checking the self.title attr in nodes"""
# Close the opened file to destroy the object tree
self.fileh.close()
# Open the file again to re-create the objects
self.fileh = openFile(self.file,"r")
# Now, test that self.title exists and is correct in all the nodes
assert self.fileh.root.agroup._v_title == "Group title"
assert self.fileh.root.atable.title == "Table title"
assert self.fileh.root.anarray.title == "Array title"
def test03b_titleLength(self):
"""Checking large title character length limit (1023)"""
titlelength = 1023
# Try to put a very long title on a group object
group = self.fileh.createGroup(self.root, 'group',
"t" * titlelength)
assert group._v_title == "t" * titlelength
assert group._f_getAttr('TITLE') == "t" * titlelength
# Now, try with a table object
table = self.fileh.createTable(self.root, 'table',
Record, "t" * titlelength)
assert table.title == "t" * titlelength
assert table.getAttr("TITLE") == "t" * titlelength
# Finally, try with an Array object
arr = self.fileh.createArray(self.root, 'arr',
[1], "t" * titlelength)
assert arr.title == "t" * titlelength
assert arr.getAttr("TITLE") == "t" * titlelength
def test04_maxFields(self):
"Checking a large number of fields in tables"
# The number of fields for a table
varnumber = MAX_COLUMNS
varnames = []
for i in range(varnumber):
varnames.append('int%d' % i)
# Build a dictionary with the types as values and varnames as keys
recordDict = {}
i = 0
for varname in varnames:
recordDict[varname] = Col.from_type("int32", dflt=1, pos=i)
i += 1
# Append this entry to indicate the alignment!
recordDict['_v_align'] = "="
table = self.fileh.createTable(self.root, 'table',
recordDict, "MetaRecord instance")
row = table.row
listrows = []
# Write 10 records
for j in range(10):
rowlist = []
for i in range(len(table.colnames)):
row[varnames[i]] = i*j
rowlist.append(i*j)
row.append()
listrows.append(tuple(rowlist))
# write data on disk
table.flush()
# Read all the data as a list
listout = table.read().tolist()
# Compare the input rowlist and output row list. They should
# be equal.
if common.verbose:
print "Original row list:", listrows[-1]
print "Retrieved row list:", listout[-1]
assert listrows == listout
# The next limitation has been released. A warning is still there, though
def test05_maxFieldsExceeded(self):
"Checking an excess of the maximum number of fields in tables"
# The number of fields for a table
varnumber = MAX_COLUMNS + 1
varnames = []
for i in range(varnumber):
varnames.append('int%d' % i)
# Build a dictionary with the types as values and varnames as keys
recordDict = {}
i = 0
for varname in varnames:
recordDict[varname] = Col.from_type("int32", dflt=1)
i += 1
# Now, create a table with this record object
# This way of creating node objects has been deprecated
#table = Table(recordDict, "MetaRecord instance")
# Attach the table to object tree
warnings.filterwarnings("error", category=PerformanceWarning)
# Here, a PerformanceWarning should be raised!
try:
table = self.fileh.createTable(self.root, 'table',
recordDict, "MetaRecord instance")
except PerformanceWarning:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next PerformanceWarning was catched!"
print value
else:
self.fail("expected an PerformanceWarning")
# Reset the warning
warnings.filterwarnings("default", category=PerformanceWarning)
# The next limitation has been released
def _test06_maxColumnNameLengthExceeded(self):
"Checking an excess (256) of the maximum length in column names"
# Build a dictionary with the types as values and varnames as keys
recordDict = {}
recordDict["a"*255] = IntCol(dflt=1)
recordDict["b"*256] = IntCol(dflt=1) # Should trigger a ValueError
# Now, create a table with this record object
# This way of creating node objects has been deprecated
table = Table(recordDict, "MetaRecord instance")
# Attach the table to object tree
# Here, ValueError should be raised!
try:
table = self.fileh.createTable(self.root, 'table',
recordDict, "MetaRecord instance")
except ValueError:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next ValueError was catched!"
print value
else:
self.fail("expected an ValueError")
def test06_noMaxColumnNameLength(self):
"Checking unlimited length in column names"
# Build a dictionary with the types as values and varnames as keys
recordDict = {}
recordDict["a"*255] = IntCol(dflt=1, pos=0)
recordDict["b"*1024] = IntCol(dflt=1, pos=1) # Should work well
# Attach the table to object tree
# Here, IndexError should be raised!
table = self.fileh.createTable(self.root, 'table',
recordDict, "MetaRecord instance")
assert table.colnames[0] == "a"*255
assert table.colnames[1] == "b"*1024
class Record2(IsDescription):
var1 = StringCol(itemsize=4) # 4-character String
var2 = IntCol() # integer
var3 = Int16Col() # short integer
class FiltersTreeTestCase(unittest.TestCase):
title = "A title"
nrows = 10
def setUp(self):
# Create a temporary file
self.file = tempfile.mktemp(".h5")
# Create an instance of HDF5 Table
self.h5file = openFile(self.file, "w", filters=self.filters)
self.populateFile()
def populateFile(self):
group = self.h5file.root
# Create a tree with three levels of depth
for j in range(5):
# Create a table
table = self.h5file.createTable(group, 'table1', Record2,
title = self.title,
filters = None)
# Get the record object associated with the new table
d = table.row
# Fill the table
for i in xrange(self.nrows):
d['var1'] = '%04d' % (self.nrows - i)
d['var2'] = i
d['var3'] = i * 2
d.append() # This injects the Record values
# Flush the buffer for this table
table.flush()
# Create a couple of arrays in each group
var1List = [ x['var1'] for x in table.iterrows() ]
var3List = [ x['var3'] for x in table.iterrows() ]
self.h5file.createArray(group, 'array1', var1List, "col 1")
self.h5file.createArray(group, 'array2', var3List, "col 3")
# Create a couple of EArrays as well
ea1 = self.h5file.createEArray(group, 'earray1',
StringAtom(itemsize=4), (0,),
"col 1")
ea2 = self.h5file.createEArray(group, 'earray2',
Int16Atom(), (0,), "col 3")
# And fill them with some values
ea1.append(var1List)
ea2.append(var3List)
# Create a new group (descendant of group)
if j == 1: # The second level
group2 = self.h5file.createGroup(group, 'group'+str(j),
filters=self.gfilters)
elif j == 2: # third level
group2 = self.h5file.createGroup(group, 'group'+str(j))
else: # The rest of levels
group2 = self.h5file.createGroup(group, 'group'+str(j),
filters=self.filters)
# Iterate over this new group (group2)
group = group2
def tearDown(self):
# Close the file
if self.h5file.isopen:
self.h5file.close()
os.remove(self.file)
common.cleanup(self)
#----------------------------------------
def test00_checkFilters(self):
"Checking inheritance of filters on trees (open file version)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test00_checkFilters..." % self.__class__.__name__
# First level check
if common.verbose:
print "Test filter:", repr(self.filters)
print "Filters in file:", repr(self.h5file.filters)
if self.filters == None:
filters = Filters()
else:
filters = self.filters
assert repr(filters) == repr(self.h5file.filters)
# The next nodes have to have the same filter properties as
# self.filters
nodelist = ['/table1', '/group0/earray1', '/group0']
for node in nodelist:
object = self.h5file.getNode(node)
if isinstance(object, Group):
assert repr(filters) == repr(object._v_filters)
else:
assert repr(filters) == repr(object.filters)
# Second and third level check
group1 = self.h5file.root.group0.group1
if self.gfilters == None:
if self.filters == None:
gfilters = Filters()
else:
gfilters = self.filters
else:
gfilters = self.gfilters
if common.verbose:
print "Test gfilter:", repr(gfilters)
print "Filters in file:", repr(group1._v_filters)
assert repr(gfilters) == repr(group1._v_filters)
# The next nodes have to have the same filter properties as
# gfilters
nodelist = ['/group0/group1', '/group0/group1/earray1',
'/group0/group1/table1', '/group0/group1/group2/table1']
for node in nodelist:
object = self.h5file.getNode(node)
if isinstance(object, Group):
assert repr(gfilters) == repr(object._v_filters)
else:
assert repr(gfilters) == repr(object.filters)
# Fourth and fifth level check
if self.filters == None:
# If None, the filters are inherited!
if self.gfilters == None:
filters = Filters()
else:
filters = self.gfilters
else:
filters = self.filters
group3 = self.h5file.root.group0.group1.group2.group3
if common.verbose:
print "Test filter:", repr(filters)
print "Filters in file:", repr(group3._v_filters)
assert repr(filters) == repr(group3._v_filters)
# The next nodes have to have the same filter properties as
# self.filter
nodelist = ['/group0/group1/group2/group3',
'/group0/group1/group2/group3/earray1',
'/group0/group1/group2/group3/table1',
'/group0/group1/group2/group3/group4']
for node in nodelist:
object = self.h5file.getNode(node)
if isinstance(object, Group):
assert repr(filters) == repr(object._v_filters)
else:
assert repr(filters) == repr(object.filters)
# Checking the special case for Arrays in which the compression
# should always be the empty Filter()
# The next nodes have to have the same filter properties as
# Filter()
nodelist = ['/array1',
'/group0/array1',
'/group0/group1/array1',
'/group0/group1/group2/array1',
'/group0/group1/group2/group3/array1']
for node in nodelist:
object = self.h5file.getNode(node)
assert repr(Filters()) == repr(object.filters)
def test01_checkFilters(self):
"Checking inheritance of filters on trees (close file version)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test01_checkFilters..." % self.__class__.__name__
# Close the file
self.h5file.close()
# And open it again
self.h5file = openFile(self.file, "r")
# First level check
if self.filters == None:
filters = Filters()
else:
filters = self.filters
if common.verbose:
print "Test filter:", repr(filters)
print "Filters in file:", repr(self.h5file.filters)
assert repr(filters) == repr(self.h5file.filters)
# The next nodes have to have the same filter properties as
# self.filters
nodelist = ['/table1', '/group0/earray1', '/group0']
for node in nodelist:
object_ = self.h5file.getNode(node)
if isinstance(object_, Group):
assert repr(filters) == repr(object_._v_filters)
else:
assert repr(filters) == repr(object_.filters)
# Second and third level check
group1 = self.h5file.root.group0.group1
if self.gfilters == None:
if self.filters == None:
gfilters = Filters()
else:
gfilters = self.filters
else:
gfilters = self.gfilters
if common.verbose:
print "Test filter:", repr(gfilters)
print "Filters in file:", repr(group1._v_filters)
repr(gfilters) == repr(group1._v_filters)
# The next nodes have to have the same filter properties as
# gfilters
nodelist = ['/group0/group1', '/group0/group1/earray1',
'/group0/group1/table1', '/group0/group1/group2/table1']
for node in nodelist:
object_ = self.h5file.getNode(node)
if isinstance(object_, Group):
assert repr(gfilters) == repr(object_._v_filters)
else:
assert repr(gfilters) == repr(object_.filters)
# Fourth and fifth level check
if self.filters == None:
if self.gfilters == None:
filters = Filters()
else:
filters = self.gfilters
else:
filters = self.filters
group3 = self.h5file.root.group0.group1.group2.group3
if common.verbose:
print "Test filter:", repr(filters)
print "Filters in file:", repr(group3._v_filters)
repr(filters) == repr(group3._v_filters)
# The next nodes have to have the same filter properties as
# self.filters
nodelist = ['/group0/group1/group2/group3',
'/group0/group1/group2/group3/earray1',
'/group0/group1/group2/group3/table1',
'/group0/group1/group2/group3/group4']
for node in nodelist:
object = self.h5file.getNode(node)
if isinstance(object, Group):
assert repr(filters) == repr(object._v_filters)
else:
assert repr(filters) == repr(object.filters)
# Checking the special case for Arrays in which the compression
# should always be the empty Filter()
# The next nodes have to have the same filter properties as
# Filter()
nodelist = ['/array1',
'/group0/array1',
'/group0/group1/array1',
'/group0/group1/group2/array1',
'/group0/group1/group2/group3/array1']
for node in nodelist:
object = self.h5file.getNode(node)
assert repr(Filters()) == repr(object.filters)
class FiltersCase1(FiltersTreeTestCase):
filters = Filters()
gfilters = Filters(complevel=1)
class FiltersCase2(FiltersTreeTestCase):
filters = Filters(complevel=1, complib="bzip2")
gfilters = Filters(complevel=1)
class FiltersCase3(FiltersTreeTestCase):
filters = Filters(shuffle=True, complib="zlib")
gfilters = Filters(complevel=1, shuffle=False, complib="lzo")
class FiltersCase4(FiltersTreeTestCase):
filters = Filters(shuffle=True)
gfilters = Filters(complevel=1, shuffle=False)
class FiltersCase5(FiltersTreeTestCase):
filters = Filters(fletcher32=True)
gfilters = Filters(complevel=1, shuffle=False)
class FiltersCase6(FiltersTreeTestCase):
filters = None
gfilters = Filters(complevel=1, shuffle=False)
class FiltersCase7(FiltersTreeTestCase):
filters = Filters(complevel=1)
gfilters = None
class FiltersCase8(FiltersTreeTestCase):
filters = None
gfilters = None
class FiltersCase9(FiltersTreeTestCase):
filters = Filters(shuffle=True, complib="zlib")
gfilters = Filters(complevel=5, shuffle=True, complib="bzip2")
class CopyGroupTestCase(unittest.TestCase):
title = "A title"
nrows = 10
def setUp(self):
# Create a temporary file
self.file = tempfile.mktemp(".h5")
self.file2 = tempfile.mktemp(".h5")
# Create the source file
self.h5file = openFile(self.file, "w")
# Create the destination
self.h5file2 = openFile(self.file2, "w")
self.populateFile()
def populateFile(self):
group = self.h5file.root
# Add some user attrs:
group._v_attrs.attr1 = "an string for root group"
group._v_attrs.attr2 = 124
# Create a tree
for j in range(5):
for i in range(2):
# Create a new group (brother of group)
group2 = self.h5file.createGroup(group, 'bgroup'+str(i),
filters=None)
# Create a table
table = self.h5file.createTable(group2, 'table1', Record2,
title = self.title,
filters = None)
# Get the record object associated with the new table
d = table.row
# Fill the table
for i in xrange(self.nrows):
d['var1'] = '%04d' % (self.nrows - i)
d['var2'] = i
d['var3'] = i * 2
d.append() # This injects the Record values
# Flush the buffer for this table
table.flush()
# Add some user attrs:
table.attrs.attr1 = "an string"
table.attrs.attr2 = 234
# Create a couple of arrays in each group
var1List = [ x['var1'] for x in table.iterrows() ]
var3List = [ x['var3'] for x in table.iterrows() ]
self.h5file.createArray(group2, 'array1', var1List, "col 1")
self.h5file.createArray(group2, 'array2', var3List, "col 3")
# Create a couple of EArrays as well
ea1 = self.h5file.createEArray(group2, 'earray1',
StringAtom(itemsize=4), (0,),
"col 1")
ea2 = self.h5file.createEArray(group2, 'earray2',
Int16Atom(), (0,), "col 3")
# Add some user attrs:
ea1.attrs.attr1 = "an string for earray"
ea2.attrs.attr2 = 123
# And fill them with some values
ea1.append(var1List)
ea2.append(var3List)
# Create a new group (descendant of group)
group3 = self.h5file.createGroup(group, 'group'+str(j),
filters=None)
# Iterate over this new group (group3)
group = group3
# Add some user attrs:
group._v_attrs.attr1 = "an string for group"
group._v_attrs.attr2 = 124
def tearDown(self):
# Close the file
if self.h5file.isopen:
self.h5file.close()
if self.h5file2.isopen:
self.h5file2.close()
os.remove(self.file)
os.remove(self.file2)
common.cleanup(self)
#----------------------------------------
def test00_nonRecursive(self):
"Checking non-recursive copy of a Group"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test00_nonRecursive..." % self.__class__.__name__
# Copy a group non-recursively
srcgroup = self.h5file.root.group0.group1
# srcgroup._f_copyChildren(self.h5file2.root,
# recursive=False,
# filters=self.filters)
self.h5file.copyChildren(srcgroup, self.h5file2.root,
recursive=False, filters=self.filters)
if self.close:
# Close the destination file
self.h5file2.close()
# And open it again
self.h5file2 = openFile(self.file2, "r")
# Check that the copy has been done correctly
dstgroup = self.h5file2.root
nodelist1 = srcgroup._v_children.keys()
nodelist2 = dstgroup._v_children.keys()
# Sort the lists
nodelist1.sort(); nodelist2.sort()
if common.verbose:
print "The origin node list -->", nodelist1
print "The copied node list -->", nodelist2
assert srcgroup._v_nchildren == dstgroup._v_nchildren
assert nodelist1 == nodelist2
def test01_nonRecursiveAttrs(self):
"Checking non-recursive copy of a Group (attributes copied)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test01_nonRecursiveAttrs..." % self.__class__.__name__
# Copy a group non-recursively with attrs
srcgroup = self.h5file.root.group0.group1
srcgroup._f_copyChildren(self.h5file2.root,
recursive=False,
filters=self.filters,
copyuserattrs = 1)
if self.close:
# Close the destination file
self.h5file2.close()
# And open it again
self.h5file2 = openFile(self.file2, "r")
# Check that the copy has been done correctly
dstgroup = self.h5file2.root
for srcnode in srcgroup:
dstnode = getattr(dstgroup, srcnode._v_name)
if isinstance(srcnode, Group):
srcattrs = srcnode._v_attrs
srcattrskeys = srcattrs._f_list("all")
dstattrs = dstnode._v_attrs
dstattrskeys = dstattrs._f_list("all")
else:
srcattrs = srcnode.attrs
srcattrskeys = srcattrs._f_list("all")
dstattrs = dstnode.attrs
dstattrskeys = dstattrs._f_list("all")
# Filters may differ, do not take into account
if self.filters is not None:
dstattrskeys.remove('FILTERS')
# These lists should already be ordered
if common.verbose:
print "srcattrskeys for node %s: %s" %(srcnode._v_name,
srcattrskeys)
print "dstattrskeys for node %s: %s" %(dstnode._v_name,
dstattrskeys)
assert srcattrskeys == dstattrskeys
if common.verbose:
print "The attrs names has been copied correctly"
# Now, for the contents of attributes
for srcattrname in srcattrskeys:
srcattrvalue = str(getattr(srcattrs, srcattrname))
dstattrvalue = str(getattr(dstattrs, srcattrname))
assert srcattrvalue == dstattrvalue
if self.filters is not None:
assert dstattrs.FILTERS == self.filters
if common.verbose:
print "The attrs contents has been copied correctly"
def test02_Recursive(self):
"Checking recursive copy of a Group"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test02_Recursive..." % self.__class__.__name__
# Create the destination node
group = self.h5file2.root
for groupname in self.dstnode.split("/"):
if groupname:
group = self.h5file2.createGroup(group, groupname)
dstgroup = self.h5file2.getNode(self.dstnode)
# Copy a group non-recursively
srcgroup = self.h5file.getNode(self.srcnode)
self.h5file.copyChildren(srcgroup, dstgroup,
recursive=True,
filters=self.filters)
lenSrcGroup = len(srcgroup._v_pathname)
if lenSrcGroup == 1:
lenSrcGroup = 0 # Case where srcgroup == "/"
if self.close:
# Close the destination file
self.h5file2.close()
# And open it again
self.h5file2 = openFile(self.file2, "r")
dstgroup = self.h5file2.getNode(self.dstnode)
# Check that the copy has been done correctly
lenDstGroup = len(dstgroup._v_pathname)
if lenDstGroup == 1:
lenDstGroup = 0 # Case where dstgroup == "/"
first = 1
nodelist1 = []
for node in srcgroup._f_walkNodes():
if first:
# skip the first group
first = 0
continue
nodelist1.append(node._v_pathname[lenSrcGroup:])
first = 1
nodelist2 = []
for node in dstgroup._f_walkNodes():
if first:
# skip the first group
first = 0
continue
nodelist2.append(node._v_pathname[lenDstGroup:])
if common.verbose:
print "The origin node list -->", nodelist1
print "The copied node list -->", nodelist2
assert nodelist1 == nodelist2
def test03_RecursiveFilters(self):
"Checking recursive copy of a Group (cheking Filters)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test03_RecursiveFilters..." % self.__class__.__name__
# Create the destination node
group = self.h5file2.root
for groupname in self.dstnode.split("/"):
if groupname:
group = self.h5file2.createGroup(group, groupname)
dstgroup = self.h5file2.getNode(self.dstnode)
# Copy a group non-recursively
srcgroup = self.h5file.getNode(self.srcnode)
srcgroup._f_copyChildren(dstgroup,
recursive=True,
filters=self.filters)
lenSrcGroup = len(srcgroup._v_pathname)
if lenSrcGroup == 1:
lenSrcGroup = 0 # Case where srcgroup == "/"
if self.close:
# Close the destination file
self.h5file2.close()
# And open it again
self.h5file2 = openFile(self.file2, "r")
dstgroup = self.h5file2.getNode(self.dstnode)
# Check that the copy has been done correctly
lenDstGroup = len(dstgroup._v_pathname)
if lenDstGroup == 1:
lenDstGroup = 0 # Case where dstgroup == "/"
first = 1
nodelist1 = {}
for node in srcgroup._f_walkNodes():
if first:
# skip the first group
first = 0
continue
nodelist1[node._v_name] = node._v_pathname[lenSrcGroup:]
first = 1
for node in dstgroup._f_walkNodes():
if first:
# skip the first group
first = 0
continue
if isinstance(node, Group):
repr(node._v_filters) == repr(nodelist1[node._v_name])
else:
repr(node.filters) == repr(nodelist1[node._v_name])
class CopyGroupCase1(CopyGroupTestCase):
close = 0
filters = None
srcnode = '/group0/group1'
dstnode = '/'
class CopyGroupCase2(CopyGroupTestCase):
close = 1
filters = None
srcnode = '/group0/group1'
dstnode = '/'
class CopyGroupCase3(CopyGroupTestCase):
close = 0
filters = None
srcnode = '/group0'
dstnode = '/group2/group3'
class CopyGroupCase4(CopyGroupTestCase):
close = 1
filters = Filters(complevel=1)
srcnode = '/group0'
dstnode = '/group2/group3'
class CopyGroupCase5(CopyGroupTestCase):
close = 0
filters = Filters()
srcnode = '/'
dstnode = '/group2/group3'
class CopyGroupCase6(CopyGroupTestCase):
close = 1
filters = Filters(fletcher32=True)
srcnode = '/group0'
dstnode = '/group2/group3'
class CopyGroupCase7(CopyGroupTestCase):
close = 0
filters = Filters(complevel=1, shuffle=False)
srcnode = '/'
dstnode = '/'
class CopyGroupCase8(CopyGroupTestCase):
close = 1
filters = Filters(complevel=1, complib="lzo")
srcnode = '/'
dstnode = '/'
class CopyFileTestCase(unittest.TestCase):
title = "A title"
nrows = 10
def setUp(self):
# Create a temporary file
self.file = tempfile.mktemp(".h5")
self.file2 = tempfile.mktemp(".h5")
# Create the source file
self.h5file = openFile(self.file, "w")
self.populateFile()
def populateFile(self):
group = self.h5file.root
# Add some user attrs:
group._v_attrs.attr1 = "an string for root group"
group._v_attrs.attr2 = 124
# Create a tree
for j in range(5):
for i in range(2):
# Create a new group (brother of group)
group2 = self.h5file.createGroup(group, 'bgroup'+str(i),
filters=None)
# Create a table
table = self.h5file.createTable(group2, 'table1', Record2,
title = self.title,
filters = None)
# Get the record object associated with the new table
d = table.row
# Fill the table
for i in xrange(self.nrows):
d['var1'] = '%04d' % (self.nrows - i)
d['var2'] = i
d['var3'] = i * 2
d.append() # This injects the Record values
# Flush the buffer for this table
table.flush()
# Add some user attrs:
table.attrs.attr1 = "an string"
table.attrs.attr2 = 234
# Create a couple of arrays in each group
var1List = [ x['var1'] for x in table.iterrows() ]
var3List = [ x['var3'] for x in table.iterrows() ]
self.h5file.createArray(group2, 'array1', var1List, "col 1")
self.h5file.createArray(group2, 'array2', var3List, "col 3")
# Create a couple of EArrays as well
ea1 = self.h5file.createEArray(group2, 'earray1',
StringAtom(itemsize=4), (0,),
"col 1")
ea2 = self.h5file.createEArray(group2, 'earray2',
Int16Atom(), (0,),
"col 3")
# Add some user attrs:
ea1.attrs.attr1 = "an string for earray"
ea2.attrs.attr2 = 123
# And fill them with some values
ea1.append(var1List)
ea2.append(var3List)
# Create a new group (descendant of group)
group3 = self.h5file.createGroup(group, 'group'+str(j),
filters=None)
# Iterate over this new group (group3)
group = group3
# Add some user attrs:
group._v_attrs.attr1 = "an string for group"
group._v_attrs.attr2 = 124
def tearDown(self):
# Close the file
if self.h5file.isopen:
self.h5file.close()
if hasattr(self, 'h5file2') and self.h5file2.isopen:
self.h5file2.close()
os.remove(self.file)
if hasattr(self, 'file2') and os.path.exists(self.file2):
os.remove(self.file2)
common.cleanup(self)
#----------------------------------------
def test00_overwrite(self):
"Checking copy of a File (overwriting file)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test00_overwrite..." % self.__class__.__name__
# Create a temporary file
file2h = open(self.file2, "w")
file2h.close()
# Copy the file to the destination
self.h5file.copyFile(self.file2, title=self.title,
overwrite = 1,
copyuserattrs = 0,
filters = None)
# Close the original file, if needed
if self.close:
self.h5file.close()
# re-open it
self.h5file = openFile(self.file, "r")
# ...and open the destination file
self.h5file2 = openFile(self.file2, "r")
# Check that the copy has been done correctly
srcgroup = self.h5file.root
dstgroup = self.h5file2.root
nodelist1 = srcgroup._v_children.keys()
nodelist2 = dstgroup._v_children.keys()
# Sort the lists
nodelist1.sort(); nodelist2.sort()
if common.verbose:
print "The origin node list -->", nodelist1
print "The copied node list -->", nodelist2
assert srcgroup._v_nchildren == dstgroup._v_nchildren
assert nodelist1 == nodelist2
assert self.h5file2.title == self.title
def test00a_srcdstequal(self):
"Checking copy of a File (srcfile == dstfile)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test00a_srcdstequal..." % self.__class__.__name__
# Copy the file to the destination
self.assertRaises(IOError, self.h5file.copyFile, self.h5file.filename)
def test00b_firstclass(self):
"Checking copy of a File (first-class function)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test00b_firstclass..." % self.__class__.__name__
# Close the temporary file
self.h5file.close()
# Copy the file to the destination
copyFile(self.file, self.file2, title=self.title,
copyuserattrs = 0, filters = None, overwrite = 1)
# ...and open the source and destination file
self.h5file = openFile(self.file, "r")
self.h5file2 = openFile(self.file2, "r")
# Check that the copy has been done correctly
srcgroup = self.h5file.root
dstgroup = self.h5file2.root
nodelist1 = srcgroup._v_children.keys()
nodelist2 = dstgroup._v_children.keys()
# Sort the lists
nodelist1.sort(); nodelist2.sort()
if common.verbose:
print "The origin node list -->", nodelist1
print "The copied node list -->", nodelist2
assert srcgroup._v_nchildren == dstgroup._v_nchildren
assert nodelist1 == nodelist2
assert self.h5file2.title == self.title
def test01_copy(self):
"Checking copy of a File (attributes not copied)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test01_copy..." % self.__class__.__name__
# Copy the file to the destination
self.h5file.copyFile(self.file2, title=self.title,
copyuserattrs = 0,
filters = self.filters)
# Close the original file, if needed
if self.close:
self.h5file.close()
# re-open it
self.h5file = openFile(self.file, "r")
# ...and open the destination file
self.h5file2 = openFile(self.file2, "r")
# Check that the copy has been done correctly
srcgroup = self.h5file.root
dstgroup = self.h5file2.root
nodelist1 = srcgroup._v_children.keys()
nodelist2 = dstgroup._v_children.keys()
# Sort the lists
nodelist1.sort(); nodelist2.sort()
if common.verbose:
print "The origin node list -->", nodelist1
print "The copied node list -->", nodelist2
assert srcgroup._v_nchildren == dstgroup._v_nchildren
assert nodelist1 == nodelist2
#print "_v_attrnames-->", self.h5file2.root._v_attrs._v_attrnames
#print "--> <%s,%s>" % (self.h5file2.title, self.title)
assert self.h5file2.title == self.title
# Check that user attributes has not been copied
for srcnode in srcgroup:
dstnode = getattr(dstgroup, srcnode._v_name)
srcattrs = srcnode._v_attrs
srcattrskeys = srcattrs._f_list("sys")
dstattrs = dstnode._v_attrs
dstattrskeys = dstattrs._f_list("all")
# Filters may differ, do not take into account
if self.filters is not None:
dstattrskeys.remove('FILTERS')
# These lists should already be ordered
if common.verbose:
print "srcattrskeys for node %s: %s" %(srcnode._v_name,
srcattrskeys)
print "dstattrskeys for node %s: %s" %(dstnode._v_name,
dstattrskeys)
assert srcattrskeys == dstattrskeys
if common.verbose:
print "The attrs names has been copied correctly"
# Now, for the contents of attributes
for srcattrname in srcattrskeys:
srcattrvalue = str(getattr(srcattrs, srcattrname))
dstattrvalue = str(getattr(dstattrs, srcattrname))
assert srcattrvalue == dstattrvalue
if self.filters is not None:
assert dstattrs.FILTERS == self.filters
if common.verbose:
print "The attrs contents has been copied correctly"
def test02_Attrs(self):
"Checking copy of a File (attributes copied)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test02_Attrs..." % self.__class__.__name__
# Copy the file to the destination
self.h5file.copyFile(self.file2, title=self.title,
copyuserattrs = 1,
filters = self.filters)
# Close the original file, if needed
if self.close:
self.h5file.close()
# re-open it
self.h5file = openFile(self.file, "r")
# ...and open the destination file
self.h5file2 = openFile(self.file2, "r")
# Check that the copy has been done correctly
srcgroup = self.h5file.root
dstgroup = self.h5file2.root
for srcnode in srcgroup:
dstnode = getattr(dstgroup, srcnode._v_name)
srcattrs = srcnode._v_attrs
srcattrskeys = srcattrs._f_list("all")
dstattrs = dstnode._v_attrs
dstattrskeys = dstattrs._f_list("all")
# These lists should already be ordered
if common.verbose:
print "srcattrskeys for node %s: %s" %(srcnode._v_name,
srcattrskeys)
print "dstattrskeys for node %s: %s" %(dstnode._v_name,
dstattrskeys)
# Filters may differ, do not take into account
if self.filters is not None:
dstattrskeys.remove('FILTERS')
assert srcattrskeys == dstattrskeys
if common.verbose:
print "The attrs names has been copied correctly"
# Now, for the contents of attributes
for srcattrname in srcattrskeys:
srcattrvalue = str(getattr(srcattrs, srcattrname))
dstattrvalue = str(getattr(dstattrs, srcattrname))
assert srcattrvalue == dstattrvalue
if self.filters is not None:
assert dstattrs.FILTERS == self.filters
if common.verbose:
print "The attrs contents has been copied correctly"
class CopyFileCase1(CopyFileTestCase):
close = 0
title = "A new title"
filters = None
class CopyFileCase2(CopyFileTestCase):
close = 1
title = "A new title"
filters = None
class CopyFileCase3(CopyFileTestCase):
close = 0
title = "A new title"
filters = Filters(complevel=1)
class CopyFileCase4(CopyFileTestCase):
close = 1
title = "A new title"
filters = Filters(complevel=1)
class CopyFileCase5(CopyFileTestCase):
close = 0
title = "A new title"
filters = Filters(fletcher32=True)
class CopyFileCase6(CopyFileTestCase):
close = 1
title = "A new title"
filters = Filters(fletcher32=True)
class CopyFileCase7(CopyFileTestCase):
close = 0
title = "A new title"
filters = Filters(complevel=1, complib="lzo")
class CopyFileCase8(CopyFileTestCase):
close = 1
title = "A new title"
filters = Filters(complevel=1, complib="lzo")
class CopyFileCase10(unittest.TestCase):
def test01_notoverwrite(self):
"Checking copy of a File (checking not overwriting)"
if common.verbose:
print '\n', '-=' * 30
print "Running %s.test01_notoverwrite..." % self.__class__.__name__
# Create two empty files:
file = tempfile.mktemp(".h5")
fileh = openFile(file, "w")
file2 = tempfile.mktemp(".h5")
fileh2 = openFile(file2, "w")
fileh2.close() # close the second one
# Copy the first into the second
try:
fileh.copyFile(file2, overwrite=False)
except IOError:
if common.verbose:
(type, value, traceback) = sys.exc_info()
print "\nGreat!, the next IOError was catched!"
print value
else:
self.fail("expected a IOError")
# Delete files
fileh.close()
os.remove(file)
os.remove(file2)
class GroupFiltersTestCase(common.TempFileMixin, common.PyTablesTestCase):
filters = tables.Filters(complevel=4) # something non-default
def setUp(self):
super(GroupFiltersTestCase, self).setUp()
atom, shape = tables.IntAtom(), (1, 1)
createGroup = self.h5file.createGroup
createCArray = self.h5file.createCArray
createGroup('/', 'implicit_no')
createGroup('/implicit_no', 'implicit_no')
createCArray( '/implicit_no/implicit_no', 'implicit_no',
atom=atom, shape=shape )
createCArray( '/implicit_no/implicit_no', 'explicit_no',
atom=atom, shape=shape, filters=tables.Filters() )
createCArray( '/implicit_no/implicit_no', 'explicit_yes',
atom=atom, shape=shape, filters=self.filters )
createGroup('/', 'explicit_yes', filters=self.filters)
createGroup('/explicit_yes', 'implicit_yes')
createCArray( '/explicit_yes/implicit_yes', 'implicit_yes',
atom=atom, shape=shape )
createCArray( '/explicit_yes/implicit_yes', 'explicit_yes',
atom=atom, shape=shape, filters=self.filters )
createCArray( '/explicit_yes/implicit_yes', 'explicit_no',
atom=atom, shape=shape, filters=tables.Filters() )
def _check_filters(self, h5file, filters=None):
for node in h5file:
# Get node filters.
if hasattr(node, 'filters'):
node_filters = node.filters
else:
node_filters = node._v_filters
# Compare to given filters.
if filters is not None:
self.assertEqual(node_filters, filters)
return
# Guess filters to compare to by node name.
if node._v_name.endswith('_no'):
self.assertEqual(
node_filters, tables.Filters(),
"node ``%s`` should have no filters" % node._v_pathname )
elif node._v_name.endswith('_yes'):
self.assertEqual(
node_filters, self.filters,
"node ``%s`` should have filters" % node._v_pathname )
def test00_propagate(self):
"""Filters propagating to children."""
self._check_filters(self.h5file)
def _test_copyFile(self, filters=None):
copyfname = tempfile.mktemp(suffix='.h5')
try:
self.h5file.copyFile(copyfname, filters=filters)
try:
copyf = tables.openFile(copyfname)
self._check_filters(copyf, filters=filters)
finally:
copyf.close()
finally:
os.remove(copyfname)
def test01_copyFile(self):
"""Keeping filters when copying a file."""
self._test_copyFile()
def test02_copyFile_override(self):
"""Overriding filters when copying a file."""
self._test_copyFile(self.filters)
def _test_change(self, pathname, change_filters, new_filters):
group = self.h5file.getNode(pathname)
# Check expected current filters.
old_filters = tables.Filters()
if pathname.endswith('_yes'):
old_filters = self.filters
self.assertEqual(group._v_filters, old_filters)
# Change filters.
change_filters(group)
self.assertEqual(group._v_filters, new_filters)
# Get and check changed filters.
if self._reopen():
group = self.h5file.getNode(pathname)
self.assertEqual(group._v_filters, new_filters)
def test03_change(self):
"""Changing the filters of a group."""
def set_filters(group):
group._v_filters = self.filters
self._test_change('/', set_filters, self.filters)
def test04_delete(self):
"""Deleting the filters of a group."""
def del_filters(group):
del group._v_filters
self._test_change('/explicit_yes', del_filters, tables.Filters())
#----------------------------------------------------------------------
def suite():
import doctest
import tables.atom
theSuite = unittest.TestSuite()
niter = 1
#common.heavy = 1 # Uncomment this only for testing purposes!
#theSuite.addTest(unittest.makeSuite(FiltersCase1))
#theSuite.addTest(unittest.makeSuite(createTestCase))
#theSuite.addTest(unittest.makeSuite(CopyGroupCase1))
#theSuite.addTest(unittest.makeSuite(CopyGroupCase2))
#theSuite.addTest(unittest.makeSuite(CopyFileCase1))
for i in range(niter):
theSuite.addTest(unittest.makeSuite(FiltersCase1))
theSuite.addTest(unittest.makeSuite(FiltersCase2))
theSuite.addTest(unittest.makeSuite(CopyGroupCase1))
theSuite.addTest(unittest.makeSuite(CopyGroupCase2))
theSuite.addTest(unittest.makeSuite(CopyFileCase1))
theSuite.addTest(unittest.makeSuite(CopyFileCase2))
theSuite.addTest(unittest.makeSuite(GroupFiltersTestCase))
theSuite.addTest(doctest.DocTestSuite(tables.filters))
if common.heavy:
theSuite.addTest(unittest.makeSuite(createTestCase))
theSuite.addTest(unittest.makeSuite(FiltersCase3))
theSuite.addTest(unittest.makeSuite(FiltersCase4))
theSuite.addTest(unittest.makeSuite(FiltersCase5))
theSuite.addTest(unittest.makeSuite(FiltersCase6))
theSuite.addTest(unittest.makeSuite(FiltersCase7))
theSuite.addTest(unittest.makeSuite(FiltersCase8))
theSuite.addTest(unittest.makeSuite(FiltersCase9))
theSuite.addTest(unittest.makeSuite(CopyFileCase3))
theSuite.addTest(unittest.makeSuite(CopyFileCase4))
theSuite.addTest(unittest.makeSuite(CopyFileCase5))
theSuite.addTest(unittest.makeSuite(CopyFileCase6))
theSuite.addTest(unittest.makeSuite(CopyFileCase7))
theSuite.addTest(unittest.makeSuite(CopyFileCase8))
theSuite.addTest(unittest.makeSuite(CopyFileCase10))
theSuite.addTest(unittest.makeSuite(CopyGroupCase3))
theSuite.addTest(unittest.makeSuite(CopyGroupCase4))
theSuite.addTest(unittest.makeSuite(CopyGroupCase5))
theSuite.addTest(unittest.makeSuite(CopyGroupCase6))
theSuite.addTest(unittest.makeSuite(CopyGroupCase7))
theSuite.addTest(unittest.makeSuite(CopyGroupCase8))
return theSuite
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
|