test_create.py :  » Database » PyTables » tables-2.1.2 » tables » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » PyTables 
PyTables » tables 2.1.2 » tables » tests » test_create.py
""" This test unit checks object creation funtions, like openFile, createTable,
createArray or createGroup.
It also checks:
- name identifiers in tree objects
- title character limit for objects (255)
- limit in number in table fields (255)
"""

import sys
import unittest
import os
import re
import tempfile
import warnings

from tables import *
# important objects to test
from tables import File,Group,Leaf,Table,Array
from tables.tests import common
from tables.parameters import MAX_COLUMNS

import tables

# To delete the internal attributes automagically
unittest.TestCase.tearDown = common.cleanup

class Record(IsDescription):
    var1 = StringCol(itemsize=4)  # 4-character String
    var2 = IntCol()               # integer
    var3 = Int16Col()             # short integer
    var4 = FloatCol()             # double (double-precision)
    var5 = Float32Col()           # float  (single-precision)

class createTestCase(unittest.TestCase):
    file  = "test.h5"
    title = "This is the table title"
    expectedrows = 100
    maxshort = 2 ** 15
    maxint   = 2147483648   # (2 ** 31)
    compress = 0


    def setUp(self):
        # Create an instance of HDF5 Table
        self.fileh = openFile(self.file, mode = "w")
        self.root = self.fileh.root

        # Create a table object
        self.table = self.fileh.createTable(self.root, 'atable',
                                            Record, "Table title")
        # Create an array object
        self.array = self.fileh.createArray(self.root, 'anarray',
                                            [1], "Array title")
        # Create a group object
        self.group = self.fileh.createGroup(self.root, 'agroup',
                                            "Group title")

    def tearDown(self):

        self.fileh.close()
        os.remove(self.file)
        common.cleanup(self)

    #----------------------------------------

    def test00_isClass(self):
        """Testing table creation"""
        assert isinstance(self.table, Table)
        assert isinstance(self.array, Array)
        assert isinstance(self.array, Leaf)
        assert isinstance(self.group, Group)

    def test01_overwriteNode(self):
        """Checking protection against node overwriting"""

        try:
            self.array = self.fileh.createArray(self.root, 'anarray',
                                                [1], "Array title")
        except NodeError:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next NameError was catched!"
                print value
        else:
            self.fail("expected a NodeError")

    def test02_syntaxname(self):
        """Checking syntax in object tree names"""

        # Now, try to attach an array to the object tree with
        # a not allowed Python variable name
        warnings.filterwarnings("error", category=NaturalNameWarning)
        try:
            self.array = self.fileh.createArray(self.root, ' array',
                                                [1], "Array title")
        except NaturalNameWarning:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next NaturalNameWarning was catched!"
                print value
        else:
            self.fail("expected a NaturalNameWarning")

        # another name error
        try:
            self.array = self.fileh.createArray(self.root, '$array',
                                                [1], "Array title")
        except NaturalNameWarning:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next NaturalNameWarning was catched!"
                print value
        else:
            self.fail("expected a NaturalNameWarning")

        # Finally, test a reserved word
        try:
            self.array = self.fileh.createArray(self.root, 'for',
                                                [1], "Array title")
        except NaturalNameWarning:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next NaturalNameWarning was catched!"
                print value
        else:
            self.fail("expected a NaturalNameWarning")
        # Reset the warning
        warnings.filterwarnings("default", category=NaturalNameWarning)

    def test03a_titleAttr(self):
        """Checking the self.title attr in nodes"""

        # Close the opened file to destroy the object tree
        self.fileh.close()
        # Open the file again to re-create the objects
        self.fileh = openFile(self.file,"r")

        # Now, test that self.title exists and is correct in all the nodes
        assert self.fileh.root.agroup._v_title == "Group title"
        assert self.fileh.root.atable.title == "Table title"
        assert self.fileh.root.anarray.title == "Array title"

    def test03b_titleLength(self):
        """Checking large title character length limit (1023)"""

        titlelength = 1023
        # Try to put a very long title on a group object
        group = self.fileh.createGroup(self.root, 'group',
                                       "t" * titlelength)
        assert group._v_title == "t" * titlelength
        assert group._f_getAttr('TITLE') == "t" * titlelength

        # Now, try with a table object
        table = self.fileh.createTable(self.root, 'table',
                                       Record, "t" * titlelength)
        assert table.title == "t" * titlelength
        assert table.getAttr("TITLE") == "t" * titlelength

        # Finally, try with an Array object
        arr = self.fileh.createArray(self.root, 'arr',
                                     [1], "t" * titlelength)
        assert arr.title == "t" * titlelength
        assert arr.getAttr("TITLE") == "t" * titlelength

    def test04_maxFields(self):
        "Checking a large number of fields in tables"

        # The number of fields for a table
        varnumber = MAX_COLUMNS

        varnames = []
        for i in range(varnumber):
            varnames.append('int%d' % i)

        # Build a dictionary with the types as values and varnames as keys
        recordDict = {}
        i = 0
        for varname in varnames:
            recordDict[varname] = Col.from_type("int32", dflt=1, pos=i)
            i += 1
        # Append this entry to indicate the alignment!
        recordDict['_v_align'] = "="
        table = self.fileh.createTable(self.root, 'table',
                                       recordDict, "MetaRecord instance")
        row = table.row
        listrows = []
        # Write 10 records
        for j in range(10):
            rowlist = []
            for i in range(len(table.colnames)):
                row[varnames[i]] = i*j
                rowlist.append(i*j)

            row.append()
            listrows.append(tuple(rowlist))

        # write data on disk
        table.flush()

        # Read all the data as a list
        listout = table.read().tolist()

        # Compare the input rowlist and output row list. They should
        # be equal.
        if common.verbose:
            print "Original row list:", listrows[-1]
            print "Retrieved row list:", listout[-1]
        assert listrows == listout

    # The next limitation has been released. A warning is still there, though
    def test05_maxFieldsExceeded(self):
        "Checking an excess of the maximum number of fields in tables"

        # The number of fields for a table
        varnumber = MAX_COLUMNS + 1

        varnames = []
        for i in range(varnumber):
            varnames.append('int%d' % i)

        # Build a dictionary with the types as values and varnames as keys
        recordDict = {}
        i = 0
        for varname in varnames:
            recordDict[varname] = Col.from_type("int32", dflt=1)
            i += 1

        # Now, create a table with this record object
        # This way of creating node objects has been deprecated
        #table = Table(recordDict, "MetaRecord instance")

        # Attach the table to object tree
        warnings.filterwarnings("error", category=PerformanceWarning)
        # Here, a PerformanceWarning should be raised!
        try:
            table = self.fileh.createTable(self.root, 'table',
                                           recordDict, "MetaRecord instance")
        except PerformanceWarning:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next PerformanceWarning was catched!"
                print value
        else:
            self.fail("expected an PerformanceWarning")
        # Reset the warning
        warnings.filterwarnings("default", category=PerformanceWarning)

    # The next limitation has been released
    def _test06_maxColumnNameLengthExceeded(self):
        "Checking an excess (256) of the maximum length in column names"

        # Build a dictionary with the types as values and varnames as keys
        recordDict = {}
        recordDict["a"*255] = IntCol(dflt=1)
        recordDict["b"*256] = IntCol(dflt=1) # Should trigger a ValueError

        # Now, create a table with this record object
        # This way of creating node objects has been deprecated
        table = Table(recordDict, "MetaRecord instance")

        # Attach the table to object tree
        # Here, ValueError should be raised!
        try:
            table = self.fileh.createTable(self.root, 'table',
                                           recordDict, "MetaRecord instance")
        except ValueError:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next ValueError was catched!"
                print value
        else:
            self.fail("expected an ValueError")

    def test06_noMaxColumnNameLength(self):
        "Checking unlimited length in column names"

        # Build a dictionary with the types as values and varnames as keys
        recordDict = {}
        recordDict["a"*255] = IntCol(dflt=1, pos=0)
        recordDict["b"*1024] = IntCol(dflt=1, pos=1) # Should work well

        # Attach the table to object tree
        # Here, IndexError should be raised!
        table = self.fileh.createTable(self.root, 'table',
                                       recordDict, "MetaRecord instance")
        assert table.colnames[0] == "a"*255
        assert table.colnames[1] == "b"*1024


class Record2(IsDescription):
    var1 = StringCol(itemsize=4)  # 4-character String
    var2 = IntCol()               # integer
    var3 = Int16Col()             # short integer

class FiltersTreeTestCase(unittest.TestCase):
    title = "A title"
    nrows = 10

    def setUp(self):
        # Create a temporary file
        self.file = tempfile.mktemp(".h5")
        # Create an instance of HDF5 Table
        self.h5file = openFile(self.file, "w", filters=self.filters)
        self.populateFile()

    def populateFile(self):
        group = self.h5file.root
        # Create a tree with three levels of depth
        for j in range(5):
            # Create a table
            table = self.h5file.createTable(group, 'table1', Record2,
                                        title = self.title,
                                        filters = None)
            # Get the record object associated with the new table
            d = table.row
            # Fill the table
            for i in xrange(self.nrows):
                d['var1'] = '%04d' % (self.nrows - i)
                d['var2'] = i
                d['var3'] = i * 2
                d.append()      # This injects the Record values
            # Flush the buffer for this table
            table.flush()

            # Create a couple of arrays in each group
            var1List = [ x['var1'] for x in table.iterrows() ]
            var3List = [ x['var3'] for x in table.iterrows() ]

            self.h5file.createArray(group, 'array1', var1List, "col 1")
            self.h5file.createArray(group, 'array2', var3List, "col 3")

            # Create a couple of EArrays as well
            ea1 = self.h5file.createEArray(group, 'earray1',
                                           StringAtom(itemsize=4), (0,),
                                           "col 1")
            ea2 = self.h5file.createEArray(group, 'earray2',
                                           Int16Atom(), (0,), "col 3")
            # And fill them with some values
            ea1.append(var1List)
            ea2.append(var3List)

            # Create a new group (descendant of group)
            if j == 1: # The second level
                group2 = self.h5file.createGroup(group, 'group'+str(j),
                                                 filters=self.gfilters)
            elif j == 2: # third level
                group2 = self.h5file.createGroup(group, 'group'+str(j))
            else:   # The rest of levels
                group2 = self.h5file.createGroup(group, 'group'+str(j),
                                                 filters=self.filters)
            # Iterate over this new group (group2)
            group = group2

    def tearDown(self):
        # Close the file
        if self.h5file.isopen:
            self.h5file.close()

        os.remove(self.file)
        common.cleanup(self)

    #----------------------------------------

    def test00_checkFilters(self):
        "Checking inheritance of filters on trees (open file version)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test00_checkFilters..." % self.__class__.__name__

        # First level check
        if common.verbose:
            print "Test filter:", repr(self.filters)
            print "Filters in file:", repr(self.h5file.filters)

        if self.filters == None:
            filters = Filters()
        else:
            filters = self.filters
        assert repr(filters) == repr(self.h5file.filters)
        # The next nodes have to have the same filter properties as
        # self.filters
        nodelist = ['/table1', '/group0/earray1', '/group0']
        for node in nodelist:
            object = self.h5file.getNode(node)
            if isinstance(object, Group):
                assert repr(filters) == repr(object._v_filters)
            else:
                assert repr(filters) == repr(object.filters)

        # Second and third level check
        group1 = self.h5file.root.group0.group1
        if self.gfilters == None:
            if self.filters == None:
                gfilters = Filters()
            else:
                gfilters = self.filters
        else:
            gfilters = self.gfilters
        if common.verbose:
            print "Test gfilter:", repr(gfilters)
            print "Filters in file:", repr(group1._v_filters)

        assert repr(gfilters) == repr(group1._v_filters)
        # The next nodes have to have the same filter properties as
        # gfilters
        nodelist = ['/group0/group1', '/group0/group1/earray1',
                    '/group0/group1/table1', '/group0/group1/group2/table1']
        for node in nodelist:
            object = self.h5file.getNode(node)
            if isinstance(object, Group):
                assert repr(gfilters) == repr(object._v_filters)
            else:
                assert repr(gfilters) == repr(object.filters)

        # Fourth and fifth level check
        if self.filters == None:
            # If None, the filters are inherited!
            if self.gfilters == None:
                filters = Filters()
            else:
                filters = self.gfilters
        else:
            filters = self.filters
        group3 = self.h5file.root.group0.group1.group2.group3
        if common.verbose:
            print "Test filter:", repr(filters)
            print "Filters in file:", repr(group3._v_filters)

        assert repr(filters) == repr(group3._v_filters)
        # The next nodes have to have the same filter properties as
        # self.filter
        nodelist = ['/group0/group1/group2/group3',
                    '/group0/group1/group2/group3/earray1',
                    '/group0/group1/group2/group3/table1',
                    '/group0/group1/group2/group3/group4']
        for node in nodelist:
            object = self.h5file.getNode(node)
            if isinstance(object, Group):
                assert repr(filters) == repr(object._v_filters)
            else:
                assert repr(filters) == repr(object.filters)


        # Checking the special case for Arrays in which the compression
        # should always be the empty Filter()
        # The next nodes have to have the same filter properties as
        # Filter()
        nodelist = ['/array1',
                    '/group0/array1',
                    '/group0/group1/array1',
                    '/group0/group1/group2/array1',
                    '/group0/group1/group2/group3/array1']
        for node in nodelist:
            object = self.h5file.getNode(node)
            assert repr(Filters()) == repr(object.filters)

    def test01_checkFilters(self):
        "Checking inheritance of filters on trees (close file version)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_checkFilters..." % self.__class__.__name__

        # Close the file
        self.h5file.close()
        # And open it again
        self.h5file = openFile(self.file, "r")

        # First level check
        if self.filters == None:
            filters = Filters()
        else:
            filters = self.filters
        if common.verbose:
            print "Test filter:", repr(filters)
            print "Filters in file:", repr(self.h5file.filters)

        assert repr(filters) == repr(self.h5file.filters)
        # The next nodes have to have the same filter properties as
        # self.filters
        nodelist = ['/table1', '/group0/earray1', '/group0']
        for node in nodelist:
            object_ = self.h5file.getNode(node)
            if isinstance(object_, Group):
                assert repr(filters) == repr(object_._v_filters)
            else:
                assert repr(filters) == repr(object_.filters)

        # Second and third level check
        group1 = self.h5file.root.group0.group1
        if self.gfilters == None:
            if self.filters == None:
                gfilters = Filters()
            else:
                gfilters = self.filters
        else:
            gfilters = self.gfilters
        if common.verbose:
            print "Test filter:", repr(gfilters)
            print "Filters in file:", repr(group1._v_filters)

        repr(gfilters) == repr(group1._v_filters)
        # The next nodes have to have the same filter properties as
        # gfilters
        nodelist = ['/group0/group1', '/group0/group1/earray1',
                    '/group0/group1/table1', '/group0/group1/group2/table1']
        for node in nodelist:
            object_ = self.h5file.getNode(node)
            if isinstance(object_, Group):
                assert repr(gfilters) == repr(object_._v_filters)
            else:
                assert repr(gfilters) == repr(object_.filters)

        # Fourth and fifth level check
        if self.filters == None:
            if self.gfilters == None:
                filters = Filters()
            else:
                filters = self.gfilters
        else:
            filters = self.filters
        group3 = self.h5file.root.group0.group1.group2.group3
        if common.verbose:
            print "Test filter:", repr(filters)
            print "Filters in file:", repr(group3._v_filters)

        repr(filters) == repr(group3._v_filters)
        # The next nodes have to have the same filter properties as
        # self.filters
        nodelist = ['/group0/group1/group2/group3',
                    '/group0/group1/group2/group3/earray1',
                    '/group0/group1/group2/group3/table1',
                    '/group0/group1/group2/group3/group4']
        for node in nodelist:
            object = self.h5file.getNode(node)
            if isinstance(object, Group):
                assert repr(filters) == repr(object._v_filters)
            else:
                assert repr(filters) == repr(object.filters)

        # Checking the special case for Arrays in which the compression
        # should always be the empty Filter()
        # The next nodes have to have the same filter properties as
        # Filter()
        nodelist = ['/array1',
                    '/group0/array1',
                    '/group0/group1/array1',
                    '/group0/group1/group2/array1',
                    '/group0/group1/group2/group3/array1']
        for node in nodelist:
            object = self.h5file.getNode(node)
            assert repr(Filters()) == repr(object.filters)


class FiltersCase1(FiltersTreeTestCase):
    filters = Filters()
    gfilters = Filters(complevel=1)

class FiltersCase2(FiltersTreeTestCase):
    filters = Filters(complevel=1, complib="bzip2")
    gfilters = Filters(complevel=1)

class FiltersCase3(FiltersTreeTestCase):
    filters = Filters(shuffle=True, complib="zlib")
    gfilters = Filters(complevel=1, shuffle=False, complib="lzo")

class FiltersCase4(FiltersTreeTestCase):
    filters = Filters(shuffle=True)
    gfilters = Filters(complevel=1, shuffle=False)

class FiltersCase5(FiltersTreeTestCase):
    filters = Filters(fletcher32=True)
    gfilters = Filters(complevel=1, shuffle=False)

class FiltersCase6(FiltersTreeTestCase):
    filters = None
    gfilters = Filters(complevel=1, shuffle=False)

class FiltersCase7(FiltersTreeTestCase):
    filters = Filters(complevel=1)
    gfilters = None

class FiltersCase8(FiltersTreeTestCase):
    filters = None
    gfilters = None

class FiltersCase9(FiltersTreeTestCase):
    filters = Filters(shuffle=True, complib="zlib")
    gfilters = Filters(complevel=5, shuffle=True, complib="bzip2")

class CopyGroupTestCase(unittest.TestCase):
    title = "A title"
    nrows = 10

    def setUp(self):
        # Create a temporary file
        self.file = tempfile.mktemp(".h5")
        self.file2 = tempfile.mktemp(".h5")
        # Create the source file
        self.h5file = openFile(self.file, "w")
        # Create the destination
        self.h5file2 = openFile(self.file2, "w")
        self.populateFile()

    def populateFile(self):
        group = self.h5file.root
        # Add some user attrs:
        group._v_attrs.attr1 = "an string for root group"
        group._v_attrs.attr2 = 124
        # Create a tree
        for j in range(5):
            for i in range(2):
                # Create a new group (brother of group)
                group2 = self.h5file.createGroup(group, 'bgroup'+str(i),
                                                 filters=None)

                # Create a table
                table = self.h5file.createTable(group2, 'table1', Record2,
                                            title = self.title,
                                            filters = None)
                # Get the record object associated with the new table
                d = table.row
                # Fill the table
                for i in xrange(self.nrows):
                    d['var1'] = '%04d' % (self.nrows - i)
                    d['var2'] = i
                    d['var3'] = i * 2
                    d.append()      # This injects the Record values
                # Flush the buffer for this table
                table.flush()

                # Add some user attrs:
                table.attrs.attr1 = "an string"
                table.attrs.attr2 = 234

                # Create a couple of arrays in each group
                var1List = [ x['var1'] for x in table.iterrows() ]
                var3List = [ x['var3'] for x in table.iterrows() ]

                self.h5file.createArray(group2, 'array1', var1List, "col 1")
                self.h5file.createArray(group2, 'array2', var3List, "col 3")

                # Create a couple of EArrays as well
                ea1 = self.h5file.createEArray(group2, 'earray1',
                                               StringAtom(itemsize=4), (0,),
                                               "col 1")
                ea2 = self.h5file.createEArray(group2, 'earray2',
                                               Int16Atom(), (0,), "col 3")
                # Add some user attrs:
                ea1.attrs.attr1 = "an string for earray"
                ea2.attrs.attr2 = 123
                # And fill them with some values
                ea1.append(var1List)
                ea2.append(var3List)

            # Create a new group (descendant of group)
            group3 = self.h5file.createGroup(group, 'group'+str(j),
                                             filters=None)
            # Iterate over this new group (group3)
            group = group3
            # Add some user attrs:
            group._v_attrs.attr1 = "an string for group"
            group._v_attrs.attr2 = 124

    def tearDown(self):
        # Close the file
        if self.h5file.isopen:
            self.h5file.close()
        if self.h5file2.isopen:
            self.h5file2.close()

        os.remove(self.file)
        os.remove(self.file2)
        common.cleanup(self)

    #----------------------------------------

    def test00_nonRecursive(self):
        "Checking non-recursive copy of a Group"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test00_nonRecursive..." % self.__class__.__name__


        # Copy a group non-recursively
        srcgroup = self.h5file.root.group0.group1
#         srcgroup._f_copyChildren(self.h5file2.root,
#                                recursive=False,
#                                filters=self.filters)
        self.h5file.copyChildren(srcgroup, self.h5file2.root,
                                 recursive=False, filters=self.filters)
        if self.close:
            # Close the destination file
            self.h5file2.close()
            # And open it again
            self.h5file2 = openFile(self.file2, "r")

        # Check that the copy has been done correctly
        dstgroup = self.h5file2.root
        nodelist1 = srcgroup._v_children.keys()
        nodelist2 = dstgroup._v_children.keys()
        # Sort the lists
        nodelist1.sort(); nodelist2.sort()
        if common.verbose:
            print "The origin node list -->", nodelist1
            print "The copied node list -->", nodelist2
        assert srcgroup._v_nchildren == dstgroup._v_nchildren
        assert nodelist1 == nodelist2

    def test01_nonRecursiveAttrs(self):
        "Checking non-recursive copy of a Group (attributes copied)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_nonRecursiveAttrs..." % self.__class__.__name__

        # Copy a group non-recursively with attrs
        srcgroup = self.h5file.root.group0.group1
        srcgroup._f_copyChildren(self.h5file2.root,
                                 recursive=False,
                                 filters=self.filters,
                                 copyuserattrs = 1)
        if self.close:
            # Close the destination file
            self.h5file2.close()
            # And open it again
            self.h5file2 = openFile(self.file2, "r")

        # Check that the copy has been done correctly
        dstgroup = self.h5file2.root
        for srcnode in srcgroup:
            dstnode = getattr(dstgroup, srcnode._v_name)
            if isinstance(srcnode, Group):
                srcattrs = srcnode._v_attrs
                srcattrskeys = srcattrs._f_list("all")
                dstattrs = dstnode._v_attrs
                dstattrskeys = dstattrs._f_list("all")
            else:
                srcattrs = srcnode.attrs
                srcattrskeys = srcattrs._f_list("all")
                dstattrs = dstnode.attrs
                dstattrskeys = dstattrs._f_list("all")
            # Filters may differ, do not take into account
            if self.filters is not None:
                dstattrskeys.remove('FILTERS')
            # These lists should already be ordered
            if common.verbose:
                print "srcattrskeys for node %s: %s" %(srcnode._v_name,
                                                       srcattrskeys)
                print "dstattrskeys for node %s: %s" %(dstnode._v_name,
                                                       dstattrskeys)
            assert srcattrskeys == dstattrskeys
            if common.verbose:
                print "The attrs names has been copied correctly"

            # Now, for the contents of attributes
            for srcattrname in srcattrskeys:
                srcattrvalue = str(getattr(srcattrs, srcattrname))
                dstattrvalue = str(getattr(dstattrs, srcattrname))
                assert srcattrvalue == dstattrvalue
            if self.filters is not None:
                assert dstattrs.FILTERS == self.filters

            if common.verbose:
                print "The attrs contents has been copied correctly"

    def test02_Recursive(self):
        "Checking recursive copy of a Group"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_Recursive..." % self.__class__.__name__


        # Create the destination node
        group = self.h5file2.root
        for groupname in self.dstnode.split("/"):
            if groupname:
                group = self.h5file2.createGroup(group, groupname)
        dstgroup = self.h5file2.getNode(self.dstnode)

        # Copy a group non-recursively
        srcgroup = self.h5file.getNode(self.srcnode)
        self.h5file.copyChildren(srcgroup, dstgroup,
                                 recursive=True,
                                 filters=self.filters)
        lenSrcGroup = len(srcgroup._v_pathname)
        if lenSrcGroup == 1:
            lenSrcGroup = 0  # Case where srcgroup == "/"
        if self.close:
            # Close the destination file
            self.h5file2.close()
            # And open it again
            self.h5file2 = openFile(self.file2, "r")
            dstgroup = self.h5file2.getNode(self.dstnode)

        # Check that the copy has been done correctly
        lenDstGroup = len(dstgroup._v_pathname)
        if lenDstGroup == 1:
            lenDstGroup = 0  # Case where dstgroup == "/"
        first = 1
        nodelist1 = []
        for node in srcgroup._f_walkNodes():
            if first:
                # skip the first group
                first = 0
                continue
            nodelist1.append(node._v_pathname[lenSrcGroup:])

        first = 1
        nodelist2 = []
        for node in dstgroup._f_walkNodes():
            if first:
                # skip the first group
                first = 0
                continue
            nodelist2.append(node._v_pathname[lenDstGroup:])

        if common.verbose:
            print "The origin node list -->", nodelist1
            print "The copied node list -->", nodelist2
        assert nodelist1 == nodelist2

    def test03_RecursiveFilters(self):
        "Checking recursive copy of a Group (cheking Filters)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test03_RecursiveFilters..." % self.__class__.__name__

        # Create the destination node
        group = self.h5file2.root
        for groupname in self.dstnode.split("/"):
            if groupname:
                group = self.h5file2.createGroup(group, groupname)
        dstgroup = self.h5file2.getNode(self.dstnode)

        # Copy a group non-recursively
        srcgroup = self.h5file.getNode(self.srcnode)
        srcgroup._f_copyChildren(dstgroup,
                                 recursive=True,
                                 filters=self.filters)
        lenSrcGroup = len(srcgroup._v_pathname)
        if lenSrcGroup == 1:
            lenSrcGroup = 0  # Case where srcgroup == "/"
        if self.close:
            # Close the destination file
            self.h5file2.close()
            # And open it again
            self.h5file2 = openFile(self.file2, "r")
            dstgroup = self.h5file2.getNode(self.dstnode)

        # Check that the copy has been done correctly
        lenDstGroup = len(dstgroup._v_pathname)
        if lenDstGroup == 1:
            lenDstGroup = 0  # Case where dstgroup == "/"
        first = 1
        nodelist1 = {}
        for node in srcgroup._f_walkNodes():
            if first:
                # skip the first group
                first = 0
                continue
            nodelist1[node._v_name] = node._v_pathname[lenSrcGroup:]

        first = 1
        for node in dstgroup._f_walkNodes():
            if first:
                # skip the first group
                first = 0
                continue
            if isinstance(node, Group):
                repr(node._v_filters) == repr(nodelist1[node._v_name])
            else:
                repr(node.filters) == repr(nodelist1[node._v_name])


class CopyGroupCase1(CopyGroupTestCase):
    close = 0
    filters = None
    srcnode = '/group0/group1'
    dstnode = '/'

class CopyGroupCase2(CopyGroupTestCase):
    close = 1
    filters = None
    srcnode = '/group0/group1'
    dstnode = '/'

class CopyGroupCase3(CopyGroupTestCase):
    close = 0
    filters = None
    srcnode = '/group0'
    dstnode = '/group2/group3'

class CopyGroupCase4(CopyGroupTestCase):
    close = 1
    filters = Filters(complevel=1)
    srcnode = '/group0'
    dstnode = '/group2/group3'

class CopyGroupCase5(CopyGroupTestCase):
    close = 0
    filters = Filters()
    srcnode = '/'
    dstnode = '/group2/group3'

class CopyGroupCase6(CopyGroupTestCase):
    close = 1
    filters = Filters(fletcher32=True)
    srcnode = '/group0'
    dstnode = '/group2/group3'

class CopyGroupCase7(CopyGroupTestCase):
    close = 0
    filters = Filters(complevel=1, shuffle=False)
    srcnode = '/'
    dstnode = '/'

class CopyGroupCase8(CopyGroupTestCase):
    close = 1
    filters = Filters(complevel=1, complib="lzo")
    srcnode = '/'
    dstnode = '/'

class CopyFileTestCase(unittest.TestCase):
    title = "A title"
    nrows = 10

    def setUp(self):
        # Create a temporary file
        self.file = tempfile.mktemp(".h5")
        self.file2 = tempfile.mktemp(".h5")
        # Create the source file
        self.h5file = openFile(self.file, "w")
        self.populateFile()

    def populateFile(self):
        group = self.h5file.root
        # Add some user attrs:
        group._v_attrs.attr1 = "an string for root group"
        group._v_attrs.attr2 = 124
        # Create a tree
        for j in range(5):
            for i in range(2):
                # Create a new group (brother of group)
                group2 = self.h5file.createGroup(group, 'bgroup'+str(i),
                                                 filters=None)

                # Create a table
                table = self.h5file.createTable(group2, 'table1', Record2,
                                            title = self.title,
                                            filters = None)
                # Get the record object associated with the new table
                d = table.row
                # Fill the table
                for i in xrange(self.nrows):
                    d['var1'] = '%04d' % (self.nrows - i)
                    d['var2'] = i
                    d['var3'] = i * 2
                    d.append()      # This injects the Record values
                # Flush the buffer for this table
                table.flush()

                # Add some user attrs:
                table.attrs.attr1 = "an string"
                table.attrs.attr2 = 234

                # Create a couple of arrays in each group
                var1List = [ x['var1'] for x in table.iterrows() ]
                var3List = [ x['var3'] for x in table.iterrows() ]

                self.h5file.createArray(group2, 'array1', var1List, "col 1")
                self.h5file.createArray(group2, 'array2', var3List, "col 3")

                # Create a couple of EArrays as well
                ea1 = self.h5file.createEArray(group2, 'earray1',
                                               StringAtom(itemsize=4), (0,),
                                               "col 1")
                ea2 = self.h5file.createEArray(group2, 'earray2',
                                               Int16Atom(), (0,),
                                               "col 3")
                # Add some user attrs:
                ea1.attrs.attr1 = "an string for earray"
                ea2.attrs.attr2 = 123
                # And fill them with some values
                ea1.append(var1List)
                ea2.append(var3List)

            # Create a new group (descendant of group)
            group3 = self.h5file.createGroup(group, 'group'+str(j),
                                             filters=None)
            # Iterate over this new group (group3)
            group = group3
            # Add some user attrs:
            group._v_attrs.attr1 = "an string for group"
            group._v_attrs.attr2 = 124

    def tearDown(self):
        # Close the file
        if self.h5file.isopen:
            self.h5file.close()
        if hasattr(self, 'h5file2') and self.h5file2.isopen:
            self.h5file2.close()

        os.remove(self.file)
        if hasattr(self, 'file2') and os.path.exists(self.file2):
            os.remove(self.file2)
        common.cleanup(self)

    #----------------------------------------

    def test00_overwrite(self):
        "Checking copy of a File (overwriting file)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test00_overwrite..." % self.__class__.__name__

        # Create a temporary file
        file2h = open(self.file2, "w")
        file2h.close()
        # Copy the file to the destination
        self.h5file.copyFile(self.file2, title=self.title,
                             overwrite = 1,
                             copyuserattrs = 0,
                             filters = None)

        # Close the original file, if needed
        if self.close:
            self.h5file.close()
            # re-open it
            self.h5file = openFile(self.file, "r")

        # ...and open the destination file
        self.h5file2 = openFile(self.file2, "r")

        # Check that the copy has been done correctly
        srcgroup = self.h5file.root
        dstgroup = self.h5file2.root
        nodelist1 = srcgroup._v_children.keys()
        nodelist2 = dstgroup._v_children.keys()
        # Sort the lists
        nodelist1.sort(); nodelist2.sort()
        if common.verbose:
            print "The origin node list -->", nodelist1
            print "The copied node list -->", nodelist2
        assert srcgroup._v_nchildren == dstgroup._v_nchildren
        assert nodelist1 == nodelist2
        assert self.h5file2.title == self.title

    def test00a_srcdstequal(self):
        "Checking copy of a File (srcfile == dstfile)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test00a_srcdstequal..." % self.__class__.__name__

        # Copy the file to the destination
        self.assertRaises(IOError, self.h5file.copyFile, self.h5file.filename)

    def test00b_firstclass(self):
        "Checking copy of a File (first-class function)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test00b_firstclass..." % self.__class__.__name__

        # Close the temporary file
        self.h5file.close()

        # Copy the file to the destination
        copyFile(self.file, self.file2, title=self.title,
                 copyuserattrs = 0, filters = None, overwrite = 1)

        # ...and open the source and destination file
        self.h5file = openFile(self.file, "r")
        self.h5file2 = openFile(self.file2, "r")

        # Check that the copy has been done correctly
        srcgroup = self.h5file.root
        dstgroup = self.h5file2.root
        nodelist1 = srcgroup._v_children.keys()
        nodelist2 = dstgroup._v_children.keys()
        # Sort the lists
        nodelist1.sort(); nodelist2.sort()
        if common.verbose:
            print "The origin node list -->", nodelist1
            print "The copied node list -->", nodelist2
        assert srcgroup._v_nchildren == dstgroup._v_nchildren
        assert nodelist1 == nodelist2
        assert self.h5file2.title == self.title

    def test01_copy(self):
        "Checking copy of a File (attributes not copied)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_copy..." % self.__class__.__name__

        # Copy the file to the destination
        self.h5file.copyFile(self.file2, title=self.title,
                             copyuserattrs = 0,
                             filters = self.filters)

        # Close the original file, if needed
        if self.close:
            self.h5file.close()
            # re-open it
            self.h5file = openFile(self.file, "r")

        # ...and open the destination file
        self.h5file2 = openFile(self.file2, "r")

        # Check that the copy has been done correctly
        srcgroup = self.h5file.root
        dstgroup = self.h5file2.root
        nodelist1 = srcgroup._v_children.keys()
        nodelist2 = dstgroup._v_children.keys()
        # Sort the lists
        nodelist1.sort(); nodelist2.sort()
        if common.verbose:
            print "The origin node list -->", nodelist1
            print "The copied node list -->", nodelist2
        assert srcgroup._v_nchildren == dstgroup._v_nchildren
        assert nodelist1 == nodelist2
        #print "_v_attrnames-->", self.h5file2.root._v_attrs._v_attrnames
        #print "--> <%s,%s>" % (self.h5file2.title, self.title)
        assert self.h5file2.title == self.title

        # Check that user attributes has not been copied
        for srcnode in srcgroup:
            dstnode = getattr(dstgroup, srcnode._v_name)
            srcattrs = srcnode._v_attrs
            srcattrskeys = srcattrs._f_list("sys")
            dstattrs = dstnode._v_attrs
            dstattrskeys = dstattrs._f_list("all")
            # Filters may differ, do not take into account
            if self.filters is not None:
                dstattrskeys.remove('FILTERS')
            # These lists should already be ordered
            if common.verbose:
                print "srcattrskeys for node %s: %s" %(srcnode._v_name,
                                                       srcattrskeys)
                print "dstattrskeys for node %s: %s" %(dstnode._v_name,
                                                       dstattrskeys)
            assert srcattrskeys == dstattrskeys
            if common.verbose:
                print "The attrs names has been copied correctly"

            # Now, for the contents of attributes
            for srcattrname in srcattrskeys:
                srcattrvalue = str(getattr(srcattrs, srcattrname))
                dstattrvalue = str(getattr(dstattrs, srcattrname))
                assert srcattrvalue == dstattrvalue
            if self.filters is not None:
                assert dstattrs.FILTERS == self.filters

            if common.verbose:
                print "The attrs contents has been copied correctly"


    def test02_Attrs(self):
        "Checking copy of a File (attributes copied)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_Attrs..." % self.__class__.__name__


        # Copy the file to the destination
        self.h5file.copyFile(self.file2, title=self.title,
                             copyuserattrs = 1,
                             filters = self.filters)

        # Close the original file, if needed
        if self.close:
            self.h5file.close()
            # re-open it
            self.h5file = openFile(self.file, "r")

        # ...and open the destination file
        self.h5file2 = openFile(self.file2, "r")

        # Check that the copy has been done correctly
        srcgroup = self.h5file.root
        dstgroup = self.h5file2.root
        for srcnode in srcgroup:
            dstnode = getattr(dstgroup, srcnode._v_name)
            srcattrs = srcnode._v_attrs
            srcattrskeys = srcattrs._f_list("all")
            dstattrs = dstnode._v_attrs
            dstattrskeys = dstattrs._f_list("all")
            # These lists should already be ordered
            if common.verbose:
                print "srcattrskeys for node %s: %s" %(srcnode._v_name,
                                                       srcattrskeys)
                print "dstattrskeys for node %s: %s" %(dstnode._v_name,
                                                       dstattrskeys)
            # Filters may differ, do not take into account
            if self.filters is not None:
                dstattrskeys.remove('FILTERS')
            assert srcattrskeys == dstattrskeys
            if common.verbose:
                print "The attrs names has been copied correctly"

            # Now, for the contents of attributes
            for srcattrname in srcattrskeys:
                srcattrvalue = str(getattr(srcattrs, srcattrname))
                dstattrvalue = str(getattr(dstattrs, srcattrname))
                assert srcattrvalue == dstattrvalue
            if self.filters is not None:
                assert dstattrs.FILTERS == self.filters

            if common.verbose:
                print "The attrs contents has been copied correctly"

class CopyFileCase1(CopyFileTestCase):
    close = 0
    title = "A new title"
    filters = None

class CopyFileCase2(CopyFileTestCase):
    close = 1
    title = "A new title"
    filters = None

class CopyFileCase3(CopyFileTestCase):
    close = 0
    title = "A new title"
    filters = Filters(complevel=1)

class CopyFileCase4(CopyFileTestCase):
    close = 1
    title = "A new title"
    filters = Filters(complevel=1)

class CopyFileCase5(CopyFileTestCase):
    close = 0
    title = "A new title"
    filters = Filters(fletcher32=True)

class CopyFileCase6(CopyFileTestCase):
    close = 1
    title = "A new title"
    filters = Filters(fletcher32=True)

class CopyFileCase7(CopyFileTestCase):
    close = 0
    title = "A new title"
    filters = Filters(complevel=1, complib="lzo")

class CopyFileCase8(CopyFileTestCase):
    close = 1
    title = "A new title"
    filters = Filters(complevel=1, complib="lzo")

class CopyFileCase10(unittest.TestCase):

    def test01_notoverwrite(self):
        "Checking copy of a File (checking not overwriting)"

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_notoverwrite..." % self.__class__.__name__


        # Create two empty files:
        file = tempfile.mktemp(".h5")
        fileh = openFile(file, "w")
        file2 = tempfile.mktemp(".h5")
        fileh2 = openFile(file2, "w")
        fileh2.close()  # close the second one
        # Copy the first into the second
        try:
            fileh.copyFile(file2, overwrite=False)
        except IOError:
            if common.verbose:
                (type, value, traceback) = sys.exc_info()
                print "\nGreat!, the next IOError was catched!"
                print value
        else:
            self.fail("expected a IOError")


        # Delete files
        fileh.close()
        os.remove(file)
        os.remove(file2)

class GroupFiltersTestCase(common.TempFileMixin, common.PyTablesTestCase):
    filters = tables.Filters(complevel=4)  # something non-default

    def setUp(self):
        super(GroupFiltersTestCase, self).setUp()

        atom, shape = tables.IntAtom(), (1, 1)
        createGroup = self.h5file.createGroup
        createCArray = self.h5file.createCArray

        createGroup('/', 'implicit_no')
        createGroup('/implicit_no', 'implicit_no')
        createCArray( '/implicit_no/implicit_no', 'implicit_no',
                      atom=atom, shape=shape )
        createCArray( '/implicit_no/implicit_no', 'explicit_no',
                      atom=atom, shape=shape, filters=tables.Filters() )
        createCArray( '/implicit_no/implicit_no', 'explicit_yes',
                      atom=atom, shape=shape, filters=self.filters )

        createGroup('/', 'explicit_yes', filters=self.filters)
        createGroup('/explicit_yes', 'implicit_yes')
        createCArray( '/explicit_yes/implicit_yes', 'implicit_yes',
                      atom=atom, shape=shape )
        createCArray( '/explicit_yes/implicit_yes', 'explicit_yes',
                      atom=atom, shape=shape, filters=self.filters )
        createCArray( '/explicit_yes/implicit_yes', 'explicit_no',
                      atom=atom, shape=shape, filters=tables.Filters() )

    def _check_filters(self, h5file, filters=None):
        for node in h5file:
            # Get node filters.
            if hasattr(node, 'filters'):
                node_filters = node.filters
            else:
                node_filters = node._v_filters
            # Compare to given filters.
            if filters is not None:
                self.assertEqual(node_filters, filters)
                return
            # Guess filters to compare to by node name.
            if node._v_name.endswith('_no'):
                self.assertEqual(
                    node_filters, tables.Filters(),
                    "node ``%s`` should have no filters" % node._v_pathname )
            elif node._v_name.endswith('_yes'):
                self.assertEqual(
                    node_filters, self.filters,
                    "node ``%s`` should have filters" % node._v_pathname )

    def test00_propagate(self):
        """Filters propagating to children."""
        self._check_filters(self.h5file)

    def _test_copyFile(self, filters=None):
        copyfname = tempfile.mktemp(suffix='.h5')
        try:
            self.h5file.copyFile(copyfname, filters=filters)
            try:
                copyf = tables.openFile(copyfname)
                self._check_filters(copyf, filters=filters)
            finally:
                copyf.close()
        finally:
            os.remove(copyfname)

    def test01_copyFile(self):
        """Keeping filters when copying a file."""
        self._test_copyFile()

    def test02_copyFile_override(self):
        """Overriding filters when copying a file."""
        self._test_copyFile(self.filters)

    def _test_change(self, pathname, change_filters, new_filters):
        group = self.h5file.getNode(pathname)
        # Check expected current filters.
        old_filters = tables.Filters()
        if pathname.endswith('_yes'):
            old_filters = self.filters
        self.assertEqual(group._v_filters, old_filters)
        # Change filters.
        change_filters(group)
        self.assertEqual(group._v_filters, new_filters)
        # Get and check changed filters.
        if self._reopen():
            group = self.h5file.getNode(pathname)
        self.assertEqual(group._v_filters, new_filters)

    def test03_change(self):
        """Changing the filters of a group."""
        def set_filters(group):
            group._v_filters = self.filters
        self._test_change('/', set_filters, self.filters)

    def test04_delete(self):
        """Deleting the filters of a group."""
        def del_filters(group):
            del group._v_filters
        self._test_change('/explicit_yes', del_filters, tables.Filters())


#----------------------------------------------------------------------

def suite():
    import doctest
    import tables.atom

    theSuite = unittest.TestSuite()
    niter = 1
    #common.heavy = 1 # Uncomment this only for testing purposes!

    #theSuite.addTest(unittest.makeSuite(FiltersCase1))
    #theSuite.addTest(unittest.makeSuite(createTestCase))
    #theSuite.addTest(unittest.makeSuite(CopyGroupCase1))
    #theSuite.addTest(unittest.makeSuite(CopyGroupCase2))
    #theSuite.addTest(unittest.makeSuite(CopyFileCase1))
    for i in range(niter):
        theSuite.addTest(unittest.makeSuite(FiltersCase1))
        theSuite.addTest(unittest.makeSuite(FiltersCase2))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase1))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase2))
        theSuite.addTest(unittest.makeSuite(CopyFileCase1))
        theSuite.addTest(unittest.makeSuite(CopyFileCase2))
        theSuite.addTest(unittest.makeSuite(GroupFiltersTestCase))
        theSuite.addTest(doctest.DocTestSuite(tables.filters))
    if common.heavy:
        theSuite.addTest(unittest.makeSuite(createTestCase))
        theSuite.addTest(unittest.makeSuite(FiltersCase3))
        theSuite.addTest(unittest.makeSuite(FiltersCase4))
        theSuite.addTest(unittest.makeSuite(FiltersCase5))
        theSuite.addTest(unittest.makeSuite(FiltersCase6))
        theSuite.addTest(unittest.makeSuite(FiltersCase7))
        theSuite.addTest(unittest.makeSuite(FiltersCase8))
        theSuite.addTest(unittest.makeSuite(FiltersCase9))
        theSuite.addTest(unittest.makeSuite(CopyFileCase3))
        theSuite.addTest(unittest.makeSuite(CopyFileCase4))
        theSuite.addTest(unittest.makeSuite(CopyFileCase5))
        theSuite.addTest(unittest.makeSuite(CopyFileCase6))
        theSuite.addTest(unittest.makeSuite(CopyFileCase7))
        theSuite.addTest(unittest.makeSuite(CopyFileCase8))
        theSuite.addTest(unittest.makeSuite(CopyFileCase10))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase3))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase4))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase5))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase6))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase7))
        theSuite.addTest(unittest.makeSuite(CopyGroupCase8))

    return theSuite


if __name__ == '__main__':
    unittest.main( defaultTest='suite' )
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.