test_nodes.py :  » Math » Modular-toolkit-for-Data-Processing » MDP-2.6 » mdp » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Math » Modular toolkit for Data Processing 
Modular toolkit for Data Processing » MDP 2.6 » mdp » test » test_nodes.py
"""These are test functions for MDP nodes.

Run them with:
>>> import mdp
>>> mdp.test("nodes")

"""
import unittest
import inspect
import mdp
import cPickle
import tempfile
import os
import itertools
import sys
from mdp import utils,numx,numx_rand,numx_linalg,numx_fft
from testing_tools import assert_array_almost_equal,assert_array_equal,\
     assert_almost_equal, assert_equal, assert_array_almost_equal_diff, \
     assert_type_equal

mult = utils.mult
mean = numx.mean
std = numx.std
normal = numx_rand.normal
uniform = numx_rand.random
testtypes = [numx.dtype('d'), numx.dtype('f')]
testtypeschar = [t.char for t in testtypes]
testdecimals = {testtypes[0]: 12, testtypes[1]: 6}


class _BogusNode(mdp.Node):
    def is_trainable(self): return 0
    def _execute(self,x): return 2*x
    def _inverse(self,x): return 0.5*x

class _BogusNodeTrainable(mdp.Node):
    def _train(self, x):        
        pass
    def _stop_training(self):
        self.bogus_attr = 1
    
class _BogusExceptNode(mdp.Node):
    def _train(self,x):
        self.bogus_attr = 1
        raise Exception, "Bogus Exception"
    
    def _execute(self,x):
        raise Exception, "Bogus Exception"

class _BogusMultiNode(mdp.Node):

    def __init__(self):
        super(_BogusMultiNode, self).__init__()
        self.visited = []
    
    def _get_train_seq(self):
        return [(self.train1, self.stop1),
                (self.train2, self.stop2)]

    def train1(self, x):
        self.visited.append(1)
    def stop1(self):
        self.visited.append(2)
    def train2(self, x):
        self.visited.append(3)
    def stop2(self):
        self.visited.append(4)


def _rand_labels(x):
    return numx.around(uniform(x.shape[0]))

def _rand_labels_array(x):
    return numx.around(uniform(x.shape[0])).reshape((x.shape[0],1))

def _rand_array_halfdim(x):
    return uniform(size=(x.shape[0], x.shape[1]//2))

def _std(x):
    return x.std(axis=0)
    # standard deviation without bias
    mx = mean(x, axis=0)
    mx2 = mean(x*x, axis=0)
    return numx.sqrt((mx2-mx)/(x.shape[0]-1))
    
def _cov(x,y=None):
    #return covariance matrix for x and y
    if y is None: 
        y = x.copy()
    x = x - mean(x,0)
    x = x / _std(x)
    y = y - mean(y,0)
    y = y  / _std(y)
    #return mult(numx.transpose(x),y)/(x.shape[0]-1)
    return mult(numx.transpose(x),y)/(x.shape[0])

_spinner = itertools.cycle((' /\b\b', ' -\b\b', ' \\\b\b', ' |\b\b'))
#_spinner = itertools.cycle((' .\b\b', ' o\b\b', ' 0\b\b', ' O\b\b',
#                            ' 0\b\b', ' o\b\b'))
#_spinner = itertools.cycle([" '\b\b"]*2 + [' !\b\b']*2 + [' .\b\b']*2 +
#                           [' !\b\b']*2)

# create spinner
def spinner():
    sys.stderr.write(_spinner.next())
    sys.stderr.flush()

class NodesTestSuite(unittest.TestSuite):

    def __init__(self, testname=None):
        unittest.TestSuite.__init__(self)
        
        # constants
        self.mat_dim = (500,5)
        self.decimal = 7

        # set nodes to be tested
        self._set_nodes()

        if testname is not None:
            self._nodes_test_factory([testname])
        else:
            # get generic tests
            self._generic_test_factory()
            # get FastICA tests
            self._fastica_test_factory()
            # get nodes tests
            self._nodes_test_factory()

    def _set_nodes(self):
        mn = mdp.nodes
        self._nodes = [mn.PCANode,
                       mn.WhiteningNode,
                       mn.SFANode,
                       mn.SFA2Node,
                       mn.TDSEPNode,
                       mn.CuBICANode,
                       mn.FastICANode,
                       mn.QuadraticExpansionNode,
                       (mn.PolynomialExpansionNode, [3], None),
                       (mn.RBFExpansionNode, [[[0.]*5, [0.]*5], [1., 1.]], None),
                       mn.GrowingNeuralGasExpansionNode,
                       (mn.HitParadeNode, [2, 5], None),
                       (mn.TimeFramesNode, [3, 4], None),
                       mn.EtaComputerNode,
                       mn.GrowingNeuralGasNode,
                       mn.NoiseNode,
                       (mn.FDANode, [], _rand_labels),
                       (mn.GaussianClassifierNode, [], _rand_labels),
                       mn.FANode,
                       mn.ISFANode,
                       (mn.RBMNode, [5], None),
                       (mn.RBMWithLabelsNode, [5, 1], _rand_labels_array),
                       (mn.LinearRegressionNode, [], _rand_array_halfdim)]

    def _nodes_test_factory(self, methods_list=None):
        if methods_list is None:
            methods_list = dir(self)
        for methname in methods_list:
            try:
                meth = getattr(self,methname)
            except AttributeError:
                continue
            if inspect.ismethod(meth) and meth.__name__[:4] == "test":
                # create a nice description
                descr = 'Test '+(meth.__name__[4:]).replace('_',' ')
                self.addTest(unittest.FunctionTestCase(meth,
                             description=descr))

    def _generic_test_factory(self):
        # generate generic test cases
        for node_class in self._nodes:
            if isinstance(node_class, tuple):
                node_class, args, sup_args_func = node_class
            else:
                args = []
                sup_args_func = None

            # generate testdtype_nodeclass test cases
            funcdesc = 'Test dtype consistency of '+node_class.__name__
            testfunc = self._get_testdtype(node_class, args, sup_args_func)
            # add to the suite
            self.addTest(unittest.FunctionTestCase(testfunc,
                                                   description=funcdesc))
            # generate single testinverse_nodeclass test cases
            funcdesc = 'Test inverse function of '+node_class.__name__
            testfunc = self._get_testinverse(node_class, args,
                                             sup_args_func)
            # add to the suite
            if testfunc is not None:
                self.addTest(unittest.FunctionTestCase(testfunc,
                                                       description=funcdesc))
            # generate testoutputdim_nodeclass test cases
            if 'output_dim' in inspect.getargspec(node_class.__init__)[0]:
                funcdesc ='Test output dim consistency of '+node_class.__name__
                testfunc = self._get_testoutputdim(node_class, args,
                                                   sup_args_func)
                # add to the suite
                self.addTest(unittest.FunctionTestCase(testfunc,
                                                       description=funcdesc))
            # generate testdimset_nodeclass test cases
            funcdesc='Test dimensions and dtype settings of '+node_class.__name__
            testfunc = self._get_testdimdtypeset(node_class, args,
                                                 sup_args_func)
            # add to the suite
            self.addTest(unittest.FunctionTestCase(testfunc,
                                                   description=funcdesc))
        

    def _fastica_test_factory(self):
        # generate FastICANode testcases
        fica_parm = {'approach': ['symm', 'defl'],
                     'g': ['pow3', 'tanh', 'gaus', 'skew'],
                     'fine_g': [ None, 'pow3', 'tanh', 'gaus', 'skew'],
                     'sample_size': [ 1, 0.99999 ],
                     'mu': [1, 0.999999 ],
                     'stabilization': [False, True]}

        for parms in utils.orthogonal_permutations(fica_parm):
            if parms['mu'] != 1 and parms['stabilization'] is False:
                # mu != 1 implies setting stabilization
                continue
            # skew nonlinearity only wroks with skewed
            # input data
            if parms['g'] != 'skew' and parms['fine_g'] == 'skew':
                continue
            if parms['g'] == 'skew' and parms['fine_g'] != 'skew':
                continue

            testfunc, funcdesc = self._get_testFastICA(parms)
            self.addTest(unittest.FunctionTestCase(testfunc,
                                                   description=funcdesc))
            

    def _get_random_mix(self, mat_dim = None, type = "d", scale = 1,\
                        rand_func = uniform, avg = 0, \
                        std_dev = 1):
        if mat_dim is None: mat_dim = self.mat_dim
        T = mat_dim[0]
        N = mat_dim[1]
        d = 0
        while d < 1E-3:
            #mat = ((rand_func(size=mat_dim)-0.5)*scale).astype(type)
            mat = rand_func(size=(T,N)).astype(type)
            # normalize
            mat -= mean(mat,axis=0)
            mat /= std(mat,axis=0)
            # check that the minimum eigenvalue is finite and positive
            d1 = min(utils.symeig(mult(mat.T, mat), eigenvectors = 0))
            if std_dev is not None: mat *= std_dev
            if avg is not None: mat += avg
            mix = (rand_func(size=(N,N))*scale).astype(type)
            matmix = mult(mat,mix)
            matmix_n = matmix - mean(matmix, axis=0)
            matmix_n /= std(matmix_n, axis=0)
            d2 = min(utils.symeig(mult(matmix_n.T,matmix_n),eigenvectors=0))
            d = min(d1, d2)
        return mat, mix, matmix

    def _train_if_necessary(self, inp, node, args, sup_args_func):
        if node.is_trainable():
            while True:
                if sup_args_func is not None:
                    # for nodes that need supervision
                    sup_args = sup_args_func(inp)
                    node.train(inp, sup_args)
                else:
                    node.train(inp)
                if node.get_remaining_train_phase() > 1:
                    node.stop_training()
                else:
                    break
                
    def _stop_training_or_execute(self, node, inp):
        if node.is_trainable():
            node.stop_training()
        else:
            out = node(inp)
    
    def _get_testinverse(self, node_class, args=[], sup_args_func=None):
        # generates testinverse_nodeclass test functions
        # only if invertible
        node_args = self._set_node_args(args)
        node = node_class(*node_args)
        if not node.is_invertible():
            return None
        def _testinverse(node_class=node_class):
            mat,mix,inp = self._get_random_mix()
            # take the first available dtype for the test
            node_args = self._set_node_args(args)
            dtype = node_class(*node_args).get_supported_dtypes()[0]
            node = node_class(dtype=dtype, *node_args)
            self._train_if_necessary(inp, node, args, sup_args_func)
            # execute the node
            out = node.execute(inp)
            # compute the inverse
            rec = node.inverse(out)
            # cast inp for comparison!
            inp = inp.astype(dtype)
            assert_array_almost_equal_diff(rec,inp,self.decimal-3)
            assert_type_equal(rec.dtype, dtype)
        return _testinverse

    def _set_node_args(self, args=None):
        # used so that node instantiation arguments can be specified as
        # functions, which can return the real arguments (in case they
        # are immutable objects, so they are recreated each time a node
        # is instantiated)
        node_args = []
        if args is not None:
            for item in args:
                if hasattr(item, '__call__'):
                    node_args.append(item())
                else:
                    node_args.append(item)
            return node_args

    def _get_testdtype(self, node_class, args=[], sup_args_func=None):
        def _testdtype(node_class=node_class):
            node_args = self._set_node_args(args)
            supported_types = node_class(*node_args).get_supported_dtypes()
            for dtype in supported_types:
                node_args = self._set_node_args(args)
                if node_class == mdp.nodes.SFA2Node:
                    freqs = [2*numx.pi*100.,2*numx.pi*200.]
                    t =  numx.linspace(0, 1, num=1000)
                    mat = numx.array([numx.sin(freqs[0]*t),
                                      numx.sin(freqs[1]*t)]).T
                    inp = mat.astype('d')
                elif node_class == mdp.nodes.LinearRegressionNode:
                    inp = uniform(size=(1000, 5))
                else:
                    mat, mix, inp = self._get_random_mix(type="d")
                node = node_class(*node_args, **{'dtype':dtype})
                self._train_if_necessary(inp, node, node_args, sup_args_func)
                if node_class == mdp.nodes.RBMWithLabelsNode:
                    out = node.execute(inp, sup_args_func(inp))
                else:
                    out = node.execute(inp)
                assert_type_equal(out.dtype, dtype) 
        return _testdtype

    def _get_testoutputdim(self, node_class, args=[], sup_args_func=None):
        def _testoutputdim(node_class=node_class):
            mat,mix,inp = self._get_random_mix()
            output_dim = self.mat_dim[1]//2
            # case 1: output dim set in the constructor
            node_args = self._set_node_args(args)
            node = node_class(*node_args, **{'output_dim':output_dim})
            self._train_if_necessary(inp, node, args, sup_args_func)
            # execute the node
            out = node(inp)
            assert out.shape[1]==output_dim,"%d!=%d"%(out.shape[1],output_dim)
            assert node._output_dim==output_dim,\
                   "%d!=%d"%(node._output_dim,output_dim)
            # case 2: output_dim set explicitly
            node_args = self._set_node_args(args)
            node = node_class(*node_args)
            node.output_dim = output_dim
            self._train_if_necessary(inp, node, args, sup_args_func)
            # execute the node
            out = node(inp)
            assert out.shape[1]==output_dim, "%d!=%d"%(out.shape[1],output_dim)
            assert node._output_dim==output_dim,\
                   "%d!=%d"%(node._output_dim,output_dim)
        return _testoutputdim

    def _get_testdimdtypeset(self, node_class, args=[], sup_args_func=None):
        def _testdimdtypeset(node_class=node_class):
            mat,mix,inp = self._get_random_mix()
            node_args = self._set_node_args(args)
            node = node_class(*node_args)
            self._train_if_necessary(inp, node, args, sup_args_func)
            # execute or stop_training the node
            self._stop_training_or_execute(node, inp)
            assert node.output_dim is not None
            assert node.dtype is not None
            assert node.input_dim is not None
        return _testdimdtypeset

    def _uniform(self, min_, max_, dims):
        return uniform(dims)*(max_-min_)+min_

    def testNodecopy(self):
        test_list = [1,2,3]
        generic_node = mdp.Node()
        generic_node.dummy_attr = test_list
        copy_node = generic_node.copy()
        assert generic_node.dummy_attr == copy_node.dummy_attr,\
               'Node copy method did not work'
        copy_node.dummy_attr[0] = 10
        assert generic_node.dummy_attr != copy_node.dummy_attr,\
               'Node copy method did not work'

    def testNodesave(self):
        test_list = [1,2,3]
        generic_node = mdp.Node()
        generic_node.dummy_attr = test_list
        # test string save
        copy_node_pic = generic_node.save(None)
        copy_node = cPickle.loads(copy_node_pic)
        assert generic_node.dummy_attr == copy_node.dummy_attr,\
               'Node save (string) method did not work'
        copy_node.dummy_attr[0] = 10
        assert generic_node.dummy_attr != copy_node.dummy_attr,\
               'Node save (string) method did not work'
        # test file save
        dummy_file = os.path.join(tempfile.gettempdir(),'removeme')
        generic_node.save(dummy_file, protocol=1)
        flh = open(dummy_file, 'rb')
        copy_node = cPickle.load(flh)
        flh.close()
        os.remove(dummy_file)
        assert generic_node.dummy_attr == copy_node.dummy_attr,\
               'Node save (file) method did not work'
        copy_node.dummy_attr[0] = 10
        assert generic_node.dummy_attr != copy_node.dummy_attr,\
               'Node save (file) method did not work'

    def testNode_multiple_training_phases(self):
        x = uniform(size=self.mat_dim)
        node = _BogusMultiNode()
        phases = node.get_remaining_train_phase()
        for i in range(phases):
            assert node.get_current_train_phase() == i
            assert not node._train_phase_started
            node.train(x)
            assert node._train_phase_started
            node.stop_training()
            
        assert not node.is_training()
        
    def testNode_execution_without_training(self):
        x = uniform(size=self.mat_dim)
        # try execution without training: single train phase
        node = _BogusNodeTrainable()
        node.execute(x)
        assert hasattr(node, 'bogus_attr')
        # multiple train phases
        node = _BogusMultiNode()
        node.execute(x)
        assert node.visited == [1, 2, 3, 4]
        
    def testCovarianceMatrix(self):
        mat,mix,inp = self._get_random_mix()
        des_cov = numx.cov(inp, rowvar=0)
        des_avg = mean(inp,axis=0)
        des_tlen = inp.shape[0]
        act_cov = utils.CovarianceMatrix()
        act_cov.update(inp)
        act_cov,act_avg,act_tlen = act_cov.fix()
        assert_array_almost_equal(act_tlen,des_tlen,self.decimal)
        assert_array_almost_equal(act_avg,des_avg,self.decimal)
        assert_array_almost_equal(act_cov,des_cov,self.decimal)
        
    def testDelayCovarianceMatrix(self):
        dt = 5
        mat,mix,inp = self._get_random_mix()
        des_tlen = inp.shape[0] - dt
        des_avg = mean(inp[:des_tlen,:],axis=0)
        des_avg_dt = mean(inp[dt:,:],axis=0)
        des_cov = utils.cov2(inp[:des_tlen,:], inp[dt:,:])
        act_cov = utils.DelayCovarianceMatrix(dt)
        act_cov.update(inp)
        act_cov,act_avg,act_avg_dt,act_tlen = act_cov.fix()
        assert_array_almost_equal(act_tlen,des_tlen,self.decimal-1)
        assert_array_almost_equal(act_avg,des_avg,self.decimal-1)
        assert_array_almost_equal(act_avg_dt,des_avg_dt,self.decimal-1)
        assert_array_almost_equal(act_cov,des_cov,self.decimal-1)

    def testCrossCovarianceMatrix(self):
        mat,mix,inp1 = self._get_random_mix(mat_dim=(500,5))
        mat,mix,inp2 = self._get_random_mix(mat_dim=(500,3))
        des_tlen = inp1.shape[0]
        des_avg1 = mean(inp1, axis=0)
        des_avg2 = mean(inp2, axis=0)
        des_cov = utils.cov2(inp1, inp2)
        act_cov = utils.CrossCovarianceMatrix()
        act_cov.update(inp1, inp2)
        act_cov, act_avg1, act_avg2, act_tlen = act_cov.fix()
        assert_almost_equal(act_tlen,des_tlen,self.decimal-1)
        assert_array_almost_equal(act_avg1,des_avg1,self.decimal-1)
        assert_array_almost_equal(act_avg2,des_avg2,self.decimal-1)
        assert_array_almost_equal(act_cov,des_cov,self.decimal-1)     
        
    def testdtypeCovarianceMatrix(self):
        for type in testtypes:
            mat,mix,inp = self._get_random_mix(type='d')
            cov = utils.CovarianceMatrix(dtype=type)
            cov.update(inp)
            cov,avg,tlen = cov.fix()
            assert_type_equal(cov.dtype,type)
            assert_type_equal(avg.dtype,type) 

    def testdtypeDelayCovarianceMatrix(self):
        for type in testtypes:
            dt = 5
            mat,mix,inp = self._get_random_mix(type='d')
            cov = utils.DelayCovarianceMatrix(dt=dt,dtype=type)
            cov.update(inp)
            cov,avg,avg_dt,tlen = cov.fix()
            assert_type_equal(cov.dtype,type)
            assert_type_equal(avg.dtype,type)
            assert_type_equal(avg_dt.dtype,type)

    def testdtypeCrossCovarianceMatrix(self):
        for type in testtypes:
            dt = 5
            mat,mix,inp = self._get_random_mix(type='d')
            cov = utils.CrossCovarianceMatrix(dtype=type)
            cov.update(inp, inp)
            cov,avg1,avg2,tlen = cov.fix()
            assert_type_equal(cov.dtype,type)
            assert_type_equal(avg1.dtype,type)
            assert_type_equal(avg2.dtype,type)

    def testRoundOffWarningCovMatrix(self):
        import warnings
        warnings.filterwarnings("error",'.*',mdp.MDPWarning)
        for type in ['f','d']:
            inp = uniform((1,2))
            cov = utils.CovarianceMatrix(dtype=type)
            cov._tlen = int(1e+15)
            cov.update(inp)
            try:
                cov.fix()
                assert False, 'RoundOff warning did not work'
            except mdp.MDPWarning:
                pass
        # hope to reset the previous state...
        warnings.filterwarnings("once",'.*',mdp.MDPWarning)

    def testMultipleCovarianceMatricesDtypeAndFuncs(self):
        for type in testtypes:
            dec = testdecimals[type]
            res_type = self._MultipleCovarianceMatrices_funcs(type,dec)
            assert_type_equal(type,res_type)


    def _MultipleCovarianceMatrices_funcs(self,dtype,decimals):
        def assert_all(des,act, dec=decimals):
            # check list of matrices equals multcov array 
            for x in range(nmat):
                assert_array_almost_equal_diff(des[x],act.covs[:,:,x],dec)

        def rotate(mat,angle,indices):
            # perform a givens rotation of a single matrix
            [i,j] = indices
            c, s = numx.cos(angle), numx.sin(angle)
            mat_i, mat_j = mat[:,i].copy(), mat[:,j].copy()
            mat[:,i], mat[:,j] = c*mat_i-s*mat_j, s*mat_i+c*mat_j
            mat_i, mat_j = mat[i,:].copy(), mat[j,:].copy()
            mat[i,:], mat[j,:] = c*mat_i-s*mat_j, s*mat_i+c*mat_j
            return mat.copy()
        
        def permute(mat,indices):
            # permute rows and cols of a single matrix
            [i,j] = indices
            mat_i, mat_j = mat[:,i].copy(), mat[:,j].copy()
            mat[:,i], mat[:,j] = mat_j, mat_i
            mat_i, mat_j = mat[i,:].copy(), mat[j,:].copy()
            mat[i,:], mat[j,:] = mat_j, mat_i
            return mat.copy()

        dim = 7
        nmat = 13
        # create mult cov mat
        covs = [uniform((dim,dim)).astype(dtype) for x in range(nmat)]
        mult_cov = mdp.utils.MultipleCovarianceMatrices(covs)
        assert_equal(nmat,mult_cov.ncovs)
        # test symmetrize
        sym_covs = [0.5*(x+x.T) for x in covs]
        mult_cov.symmetrize()
        assert_all(sym_covs,mult_cov)
        # test weight
        weights = uniform(nmat)
        w_covs = [weights[x]*sym_covs[x] for x in range(nmat)]
        mult_cov.weight(weights)
        assert_all(w_covs,mult_cov)
        # test rotate
        angle = uniform()*2*numx.pi
        idx = numx_rand.permutation(dim)[:2]
        rot_covs = [rotate(x,angle,idx) for x in w_covs]
        mult_cov.rotate(angle,idx)
        assert_all(w_covs,mult_cov)
        # test permute
        per_covs = [permute(x,idx) for x in rot_covs]
        mult_cov.permute(idx)
        assert_all(per_covs,mult_cov)
        # test transform
        trans = uniform((dim,dim))
        trans_covs = [mult(mult(trans.T,x),trans)\
                      for x in per_covs]
        mult_cov.transform(trans)
        assert_all(trans_covs,mult_cov)
        # test copy
        cp_mult_cov = mult_cov.copy()
        assert_array_equal(mult_cov.covs,cp_mult_cov.covs)
        # check that we didn't got a reference
        mult_cov[0][0,0] = 1000
        assert int(cp_mult_cov[0][0,0]) != 1000
        # return dtype 
        return mult_cov.covs.dtype

    def testMultipleCovarianceMatricesTransformations(self):
        def get_mult_covs(inp,nmat):
            # return delayed covariance matrices
            covs = []
            for delay in range(nmat):
                tmp = mdp.utils.DelayCovarianceMatrix(delay)
                tmp.update(inp)
                cov,avg,avg_dt,tlen = tmp.fix()
                covs.append(cov)
            return mdp.utils.MultipleCovarianceMatrices(covs)
        dim = 7
        nmat = 13
        angle = uniform()*2*numx.pi
        idx = numx_rand.permutation(dim)[:2]
        inp = uniform((100*dim,dim))
        rot_inp, per_inp = inp.copy(), inp.copy()
        # test if rotating or permuting the cov matrix is equivalent
        # to rotate or permute the sources.
        mdp.utils.rotate(rot_inp,angle,idx)
        mdp.utils.permute(per_inp,idx,rows=0,cols=1)
        mcov = get_mult_covs(inp, nmat)
        mcov2 = mcov.copy()
        mcov_rot = get_mult_covs(rot_inp, nmat)
        mcov_per = get_mult_covs(per_inp, nmat)
        mcov.rotate(angle,idx)
        mcov2.permute(idx)
        assert_array_almost_equal_diff(mcov.covs, mcov_rot.covs,self.decimal)
        assert_array_almost_equal_diff(mcov2.covs, mcov_per.covs,self.decimal)
                     
    def testPolynomialExpansionNode(self):
        def hardcoded_expansion(x, degree):
            nvars = x.shape[1]
            exp_dim = mdp.nodes._expanded_dim(degree, nvars)
            exp = numx.zeros((x.shape[0], exp_dim), 'd')
            # degree 1
            exp[:,:nvars] = x.copy()
            # degree 2
            k = nvars
            if degree>=2:
                for i in range(nvars):
                    for j in range(i,nvars):
                        exp[:,k] = x[:,i]*x[:,j]
                        k += 1
            # degree 3
            if degree>=3:
                for i in range(nvars):
                    for j in range(i,nvars):
                        for l in range(j,nvars):
                            exp[:,k] = x[:,i]*x[:,j]*x[:,l]
                            k += 1
            # degree 4
            if degree>=4:
                for i in range(nvars):
                    for j in range(i,nvars):
                        for l in range(j,nvars):
                            for m in range(l,nvars):
                                exp[:,k] = x[:,i]*x[:,j]*x[:,l]*x[:,m]
                                k += 1
            # degree 5
            if degree>=5:
                for i in range(nvars):
                    for j in range(i,nvars):
                        for l in range(j,nvars):
                            for m in range(l,nvars):
                                for n in range(m,nvars):
                                    exp[:,k] = \
                                             x[:,i]*x[:,j]*x[:,l]*x[:,m]*x[:,n]
                                    k += 1
            return exp

        for degree in range(1,6):
            for dim in range(1,5):
                expand = mdp.nodes.PolynomialExpansionNode(degree=degree)
                mat,mix,inp = self._get_random_mix((10,dim))
                des = hardcoded_expansion(inp, degree)
                exp = expand.execute(inp)
                assert_array_almost_equal(exp, des, self.decimal)


    def testPCANode(self):
        line_x = numx.zeros((1000,2),"d")
        line_y = numx.zeros((1000,2),"d")
        line_x[:,0] = numx.linspace(-1,1,num=1000,endpoint=1)
        line_y[:,1] = numx.linspace(-0.2,0.2,num=1000,endpoint=1)
        mat = numx.concatenate((line_x,line_y))
        des_var = std(mat,axis=0)
        utils.rotate(mat,uniform()*2*numx.pi)
        mat += uniform(2)
        pca = mdp.nodes.PCANode()
        pca.train(mat)
        act_mat = pca.execute(mat)
        assert_array_almost_equal(mean(act_mat,axis=0),\
                                  [0,0],self.decimal)
        assert_array_almost_equal(std(act_mat,axis=0),\
                                  des_var,self.decimal)
        # test that the total_variance attribute makes sense
        est_tot_var = ((des_var**2)*2000/1999.).sum()
        assert_almost_equal(est_tot_var, pca.total_variance, self.decimal)
        assert_almost_equal(1, pca.explained_variance, self.decimal)
        # test a bug in v.1.1.1, should not crash
        pca.inverse(act_mat[:,:1])

##     # test segmentation fault with symeig, see
##     # http://projects.scipy.org/scipy/numpy/ticket/551
##     def testPCANode_pickled(self):
##         for i in range(2,100):
##             mat, mix, inp = self._get_random_mix(mat_dim=(200, i))
            
##             pca = mdp.nodes.PCANode()
##             pca.train(mat)
##             s = cPickle.dumps(pca)
##             pca = cPickle.loads(s)
##             act_mat = pca.execute(mat)

    def testPCANode_total_variance(self):
        mat, mix, inp = self._get_random_mix(mat_dim=(1000, 3))
        des_var = ((std(mat, axis=0)**2)*1000/999.).sum()
        pca = mdp.nodes.PCANode(output_dim=2)
        pca.train(mat)
        out = pca.execute(mat)
        assert_almost_equal(des_var, pca.total_variance, self.decimal)
        
    def testPCANode_desired_variance(self):
        mat, mix, inp = self._get_random_mix(mat_dim=(1000, 3))
        # first make them white
        pca = mdp.nodes.WhiteningNode()
        pca.train(mat)
        mat = pca.execute(mat)
        # set the variances
        mat *= [0.6,0.3,0.1]
        #mat -= mat.mean(axis=0)
        pca = mdp.nodes.PCANode(output_dim=0.8)
        pca.train(mat)
        out = pca.execute(mat)
        # check that we got exactly two output_dim:
        assert pca.output_dim == 2, '%s'%pca.output_dim
        assert out.shape[1] == 2
        # check that explained variance is > 0.8 and < 1
        assert (pca.explained_variance > 0.8 and pca.explained_variance < 1)


    def testPCANode_desired_variance_after_train(self):
        mat, mix, inp = self._get_random_mix(mat_dim=(1000, 3))
        # first make them white
        pca = mdp.nodes.WhiteningNode()
        pca.train(mat)
        mat = pca.execute(mat)
        # set the variances
        mat *= [0.6,0.3,0.1]
        #mat -= mat.mean(axis=0)
        pca = mdp.nodes.PCANode()
        pca.train(mat)
        # this was not working before the bug fix
        pca.output_dim = 0.8
        out = pca.execute(mat)
        # check that we got exactly two output_dim:
        assert pca.output_dim == 2
        assert out.shape[1] == 2
        # check that explained variance is > 0.8 and < 1
        assert (pca.explained_variance > 0.8 and pca.explained_variance < 1)    
    
    def testPCANode_SVD(self):
        # it should pass atleast the same test as PCANode
        line_x = numx.zeros((1000,2),"d")
        line_y = numx.zeros((1000,2),"d")
        line_x[:,0] = numx.linspace(-1,1,num=1000,endpoint=1)
        line_y[:,1] = numx.linspace(-0.2,0.2,num=1000,endpoint=1)
        mat = numx.concatenate((line_x,line_y))
        des_var = std(mat,axis=0)
        utils.rotate(mat,uniform()*2*numx.pi)
        mat += uniform(2)
        pca = mdp.nodes.PCANode(svd=True)
        pca.train(mat)
        act_mat = pca.execute(mat)
        assert_array_almost_equal(mean(act_mat,axis=0),\
                                  [0,0],self.decimal)
        assert_array_almost_equal(std(act_mat,axis=0),\
                                  des_var,self.decimal)
        # Now a more difficult test, create singular cov matrices
        # and test that PCANode crashes whereas PCASVDNode doesn't
        mat, mix, inp = self._get_random_mix(mat_dim=(1000, 100), avg=1E+8)
        # now create a degenerate input
        for i in range(1,100):
            inp[:,i] = inp[:,1].copy()
        # check that standard PCA fails
        pca = mdp.nodes.PCANode()
        pca.train(inp)
        try:
            pca.stop_training()
            raise Exception, "PCANode didn't catch singular covariance matrix"
        except mdp.NodeException:
            pass
        # now try the SVD version
        pca = mdp.nodes.PCANode(svd=True)
        pca.train(inp)
        pca.stop_training()

        # now check the undetermined case
        mat, mix, inp = self._get_random_mix(mat_dim=(500, 2))
        inp = inp.T
        pca = mdp.nodes.PCANode()
        pca.train(inp)
        try:
            pca.stop_training()
            raise Exception, "PCANode didn't catch singular covariance matrix"
        except mdp.NodeException:
            pass
        # now try the SVD version
        pca = mdp.nodes.PCANode(svd=True)
        pca.train(inp)
        pca.stop_training()

        # try using the automatic dimensionality reduction function
        mat, mix, inp = self._get_random_mix(mat_dim=(1000, 3))
        # first make them decorellated
        pca = mdp.nodes.PCANode()
        pca.train(mat)
        mat = pca.execute(mat)
        mat *= [1E+5,1E-3, 1E-4]
        mat -= mat.mean(axis=0)
        pca = mdp.nodes.PCANode(svd=True,reduce=True, var_rel=1E-2)
        pca.train(mat)
        out = pca.execute(mat)
        # check that we got the only large dimension
        assert_array_almost_equal(mat[:,0].mean(axis=0),out.mean(axis=0),
                                  self.decimal)
        assert_array_almost_equal(mat[:,0].std(axis=0),out.std(axis=0),
                                  self.decimal)

        # second test for automatic dimansionality reduction
        # try using the automatic dimensionality reduction function
        mat, mix, inp = self._get_random_mix(mat_dim=(1000, 3))
        # first make them decorellated
        pca = mdp.nodes.PCANode()
        pca.train(mat)
        mat = pca.execute(mat)
        mat *= [1E+5,1E-3, 1E-18]
        mat -= mat.mean(axis=0)
        pca = mdp.nodes.PCANode(svd=True,reduce=True,
                                var_abs=1E-8, var_rel=1E-30)
        pca.train(mat)
        out = pca.execute(mat)
        # check that we got the only large dimension
        assert_array_almost_equal(mat[:,:2].mean(axis=0),out.mean(axis=0),
                                  self.decimal)
        assert_array_almost_equal(mat[:,:2].std(axis=0),out.std(axis=0),
                                  self.decimal)

    def _mock_symeig(self, x, range=None, overwrite=False):
        if range is None:
            N = x.shape[0]
        else:
            N = range[1]-range[0] + 1
        y = numx.zeros((N,))
        z = numx.zeros((N,N))
        y[0] = -1
        y[-1] = 1
        return y, z
    
    def testPCANode_negative_eigenvalues(self):
        # should throw an Exception if reduce=False and
        # svd = False and output_dim=None
        pca = mdp.nodes.PCANode(output_dim=None, svd=False, reduce=False)
        pca._symeig = self._mock_symeig 
        pca.train(uniform((10,10)))
        try:
            pca.stop_training()
            assert False, "PCA did not catch negative eigenvalues!"
        except mdp.NodeException, e:
            if "Got negative eigenvalues" in str(e):
                pass
            else:
                raise Exception("PCA did not catch negative eigenvalues!\n"+
                                str(e))
        # if reduce=True, should not throw any Exception,
        # and return output_dim = 1
        pca = mdp.nodes.PCANode(output_dim=None, svd=False, reduce=True)
        pca._symeig = self._mock_symeig 
        pca.train(uniform((10,10)))
        pca.stop_training()
        assert pca.output_dim == 1, 'PCA did not remove non-positive eigenvalues!'
        # if svd=True, should not throw any Exception,
        # and return output_dim = 10
        pca = mdp.nodes.PCANode(output_dim=None, svd=True, reduce=False)
        pca._symeig = self._mock_symeig 
        pca.train(uniform((10,10)))
        pca.stop_training()
        assert pca.output_dim == 10, 'PCA did not remove non-positive eigenvalues!'       
        # if output_dim is set, should not throw any Exception,
        # and return the right output_dim
        pca = mdp.nodes.PCANode(output_dim=1, svd=False, reduce=False)
        pca._symeig = self._mock_symeig 
        pca.train(uniform((10,10)))
        pca.stop_training()
        assert pca.output_dim == 1, 'PCA did not remove non-positive eigenvalues!'
        
    def testWhiteningNode(self):
        vars = 5
        dim = (10000,vars)
        mat,mix,inp = self._get_random_mix(mat_dim=dim,
                                           avg=uniform(vars))
        w = mdp.nodes.WhiteningNode()
        w.train(inp)
        out = w.execute(inp)
        assert_array_almost_equal(mean(out,axis=0),\
                                  numx.zeros((dim[1])),self.decimal)
        assert_array_almost_equal(std(out,axis=0),\
                                  numx.ones((dim[1])),self.decimal-3)

    def testWhiteningNode_SVD(self):
        vars = 5
        dim = (10000,vars)
        mat,mix,inp = self._get_random_mix(mat_dim=dim,
                                           avg=uniform(vars))
        w = mdp.nodes.WhiteningNode(svd=True)
        w.train(inp)
        out = w.execute(inp)
        
        assert_array_almost_equal(mean(out,axis=0),\
                                  numx.zeros((dim[1])),self.decimal)
        assert_array_almost_equal(std(out,axis=0),\
                                  numx.ones((dim[1])),self.decimal-3)

    def testSFANode(self):
        dim=10000
        freqs = [2*numx.pi*1, 2*numx.pi*5]
        t =  numx.linspace(0,1,num=dim)
        mat = numx.array([numx.sin(freqs[0]*t),numx.sin(freqs[1]*t)]).T
        mat = (mat - mean(mat[:-1,:],axis=0))\
              /std(mat[:-1,:],axis=0)
        des_mat = mat.copy()
        mat = mult(mat,uniform((2,2))) + uniform(2)
        sfa = mdp.nodes.SFANode()
        sfa.train(mat)
        out = sfa.execute(mat)
        correlation = mult(des_mat[:-1,:].T,out[:-1,:])/(dim-2)
        assert sfa.get_eta_values(t=0.5) is not None, 'get_eta is None'
        assert_array_almost_equal(abs(correlation),
                                  numx.eye(2), self.decimal-3)
        sfa = mdp.nodes.SFANode(output_dim = 1)
        sfa.train(mat)
        out = sfa.execute(mat)
        assert out.shape[1]==1, 'Wrong output_dim'
        correlation = mult(des_mat[:-1,:1].T,out[:-1,:])/(dim-2)
        assert_array_almost_equal(abs(correlation),
                                  numx.eye(1), self.decimal-3)
        

    def testSFA2Node(self):
        dim = 10000
        freqs = [2*numx.pi*100.,2*numx.pi*500.]
        t =  numx.linspace(0,1,num=dim)
        mat = numx.array([numx.sin(freqs[0]*t),numx.sin(freqs[1]*t)]).T
        mat += normal(0., 1e-10, size=(dim, 2))
        mat = (mat - mean(mat[:-1,:],axis=0))\
              /std(mat[:-1,:],axis=0)
        des_mat = mat.copy()
        mat = mult(mat,uniform((2,2))) + uniform(2)
        sfa = mdp.nodes.SFA2Node()
        sfa.train(mat)
        out = sfa.execute(mat)
        assert out.shape[1]==5, "Wrong output_dim" 
        correlation = mult(des_mat[:-1,:].T,
                           numx.take(out[:-1,:], (0,2), axis=1))/(dim-2)
        assert_array_almost_equal(abs(correlation),
                                  numx.eye(2), self.decimal-3)
        for nr in range(sfa.output_dim):
            qform = sfa.get_quadratic_form(nr)
            outq = qform.apply(mat)
            assert_array_almost_equal(outq, out[:,nr], self.decimal)

        sfa = mdp.nodes.SFANode(output_dim = 2)
        sfa.train(mat)
        out = sfa.execute(mat)
        assert out.shape[1]==2, 'Wrong output_dim'
        correlation = mult(des_mat[:-1,:1].T,out[:-1,:1])/(dim-2)
        assert_array_almost_equal(abs(correlation),
                                  numx.eye(1), self.decimal-3)


    def testSFA2Node_input_dim_bug(self):
        dim = 10000
        freqs = [2*numx.pi*100.,2*numx.pi*500.]
        t =  numx.linspace(0,1,num=dim)
        mat = numx.array([numx.sin(freqs[0]*t),numx.sin(freqs[1]*t)]).T
        mat += normal(0., 1e-10, size=(dim, 2))
        mat = (mat - mean(mat[:-1,:],axis=0))\
              /std(mat[:-1,:],axis=0)
        mat = mult(mat,uniform((2,2))) + uniform(2)
        sfa = mdp.nodes.SFA2Node(input_dim=2)
        sfa.train(mat)
        out = sfa.execute(mat)

    def testSFA2Node_output_dim_bug(self):
        dim = 10000
        freqs = [2*numx.pi*100.,2*numx.pi*500.]
        t =  numx.linspace(0,1,num=dim)
        mat = numx.array([numx.sin(freqs[0]*t),numx.sin(freqs[1]*t)]).T
        mat += normal(0., 1e-10, size=(dim, 2))
        mat = (mat - mean(mat[:-1,:],axis=0))\
              /std(mat[:-1,:],axis=0)
        des_mat = mat.copy()
        mat = mult(mat,uniform((2,2))) + uniform(2)
        sfa = mdp.nodes.SFA2Node(output_dim=3)
        sfa.train(mat)
        out = sfa.execute(mat)
        assert out.shape[1]==3, 'SFA2Node output has wrong output dimensions!'
        
    def _testICANode(self,icanode, rand_func = uniform, vars = 3, N=8000,
                     prec = 3):
        dim = (N,vars) 
        mat,mix,inp = self._get_random_mix(rand_func=rand_func,mat_dim=dim)
        icanode.train(inp)
        act_mat = icanode.execute(inp)
        cov = utils.cov2((mat-mean(mat,axis=0))/std(mat,axis=0), act_mat)
        maxima = numx.amax(abs(cov), axis=0)
        assert_array_almost_equal(maxima,numx.ones(vars),prec)        

    def _testICANodeMatrices(self, icanode, rand_func = uniform, vars = 3, N=8000):
        dim = (N,vars) 
        mat,mix,inp = self._get_random_mix(rand_func=rand_func,
                                           mat_dim=dim, avg = 0)
        icanode.train(inp)
        # test projection matrix
        act_mat = icanode.execute(inp)
        T = icanode.get_projmatrix()
        exp_mat = mult(inp, T)
        assert_array_almost_equal(act_mat,exp_mat,6)
        # test reconstruction matrix
        out = act_mat.copy()
        act_mat = icanode.inverse(out)
        B = icanode.get_recmatrix()
        exp_mat = mult(out, B)
        assert_array_almost_equal(act_mat,exp_mat,6)
        
    def testCuBICANodeBatch(self):
        ica = mdp.nodes.CuBICANode(limit = 10**(-self.decimal))
        ica2 = ica.copy()
        self._testICANode(ica)
        self._testICANodeMatrices(ica2)
        
    def testCuBICANodeTelescope(self):
        ica = mdp.nodes.CuBICANode(limit = 10**(-self.decimal), telescope = 1)
        ica2 = ica.copy()
        self._testICANode(ica)
        self._testICANodeMatrices(ica2)
      
    def _get_testFastICA(self, parms):
        # create a function description
##         # old func description: verbose and with newlines
##         header = 'TestFastICANode:'
##         app =     '  Approach:     '+parms['approach']
##         nl =      '  Nonlinearity: '+parms['g']
##         fine_nl = '  Fine-tuning:  '+str(parms['fine_g'])
##         if parms['sample_size'] == 1:
##             compact = '  Samples  100%, '
##         else:
##             compact = '  Samples <100%, '
##         if parms['mu'] == 1:
##             compact = compact + 'Step:  1, '
##         else:
##             compact = compact + 'Step: <1, '
##         if parms['stabilization'] is True:
##             compact = compact +'Stabilized algorithm'
##         else:
##             compact = compact +'Standard   algorithm'
##         desc = '\n'.join([header, app, nl, fine_nl, compact])
        # new func description: compact and one line
        header = 'Test FastICANode'
        app =     'AP:'+parms['approach']
        nl =      'NL:'+parms['g']
        fine_nl = 'FT:'+str(parms['fine_g'])
        if parms['sample_size'] == 1:
            compact = 'SA:01 '
        else:
            compact = 'SA:<1 '
        if parms['mu'] == 1:
            compact = compact + 'S:01 '
        else:
            compact = compact + 'S:<1 '
        if parms['stabilization'] is True:
            compact = compact +'STB'
        else:
            compact = compact +'STD'
        desc = ' '.join([header, app, nl, fine_nl, compact])
        
        def _testFastICA(parms=parms):
            if parms['g'] == 'skew':
                rand_func = numx_rand.exponential
            else:
                rand_func = uniform
                
            # try two times just to clear failures due to randomness
            try:
                ica=mdp.nodes.FastICANode(limit=10**(-self.decimal),**parms)
                ica2 = ica.copy()
                self._testICANode(ica, rand_func=rand_func, vars=2)
                self._testICANodeMatrices(ica2, rand_func=rand_func, vars=2)
            except Exception:
                ica=mdp.nodes.FastICANode(limit=10**(-self.decimal),**parms)
                ica2 = ica.copy()
                self._testICANode(ica, rand_func=rand_func, vars=2)
                self._testICANodeMatrices(ica2, rand_func=rand_func, vars=2)

        return _testFastICA, desc

    def _rand_with_timestruct(self, size=None):    
        T, N = size
        # do something special only if T!=N, otherwise
        # we were asked to generate a mixing matrix
        if T == N:
            return uniform(size=size)
        # create independent sources
        src = uniform((T,N))*2-1
        fsrc = numx_fft.rfft(src,axis=0)
        # enforce different speeds
        for i in range(N):
            fsrc[(i+1)*(T//20):,i] = 0.
        src = numx_fft.irfft(fsrc,axis=0)
        return src
        
        
    def testTDSEPNode(self):
        ica = mdp.nodes.TDSEPNode(lags=20,limit = 1E-10)
        ica2 = ica.copy()
        self._testICANode(ica, rand_func=self._rand_with_timestruct,vars=2, N=2**14, prec=2)
        self._testICANodeMatrices(ica2, rand_func=self._rand_with_timestruct,vars=2,N=2**14)
        

    def testOneDimensionalHitParade(self):
        signal = (uniform(300)-0.5)*2
        gap = 5
        # put some maxima and minima
        signal[0] , signal[10] , signal[50] = 1.5, 1.4, 1.3
        signal[1] , signal[11] , signal[51] = -1.5, -1.4, -1.3
        # put two maxima and two minima within the gap
        signal[100], signal[103] = 2, 3
        signal[110], signal[113] = 3.1, 2
        signal[120], signal[123] = -2, -3.1
        signal[130], signal[133] = -3, -2
        hit = mdp.nodes._OneDimensionalHitParade(5,gap)
        hit.update((signal[:100],numx.arange(100)))
        hit.update((signal[100:200],numx.arange(100,200)))
        hit.update((signal[200:300],numx.arange(200,300)))
        maxima,ind_maxima = hit.get_maxima()
        minima,ind_minima = hit.get_minima()
        assert_array_equal(maxima,[3.1,3,1.5,1.4,1.3])
        assert_array_equal(ind_maxima,[110,103,0,10,50])
        assert_array_equal(minima,[-3.1,-3,-1.5,-1.4,-1.3])
        assert_array_equal(ind_minima,[123,130,1,11,51])

    def testHitParadeNode(self):
        signal = uniform((300,3))
        gap = 5
        signal[10,0], signal[120,1], signal[230,2] = 4,3,2
        signal[11,0], signal[121,1], signal[231,2] = -4,-3,-2
        hit = mdp.nodes.HitParadeNode(1,gap,3)
        hit.train(signal[:100,:])
        hit.train(signal[100:200,:])
        hit.train(signal[200:300,:])
        maxima, max_ind = hit.get_maxima()
        minima, min_ind = hit.get_minima()
        assert_array_equal(maxima,numx.array([[4,3,2]]))
        assert_array_equal(max_ind,numx.array([[10,120,230]]))
        assert_array_equal(minima,numx.array([[-4,-3,-2]]))
        assert_array_equal(min_ind,numx.array([[11,121,231]]))
        # test integer type:
        signal = (uniform((300,3))*10).astype('i')
        gap = 5
        signal[10,0], signal[120,1], signal[230,2] = 40,30,20
        signal[11,0], signal[121,1], signal[231,2] = -40,-30,-20
        hit = mdp.nodes.HitParadeNode(1,gap,3)
        hit.train(signal[:100,:])
        hit.train(signal[100:200,:])
        hit.train(signal[200:300,:])
        maxima, max_ind = hit.get_maxima()
        minima, min_ind = hit.get_minima()
        assert_array_equal(maxima,numx.array([[40,30,20]]))
        assert_array_equal(max_ind,numx.array([[10,120,230]]))
        assert_array_equal(minima,numx.array([[-40,-30,-20]]))
        assert_array_equal(min_ind,numx.array([[11,121,231]]))
        
        
    def testTimeFramesNode(self):
        length = 14
        gap = 6
        time_frames = 3
        inp = numx.array([numx.arange(length), -numx.arange(length)]).T
        # create node to be tested
        tf = mdp.nodes.TimeFramesNode(time_frames,gap)
        out = tf.execute(inp)
        # check last element
        assert_equal(out[-1,-1], -length+1)
        # check horizontal sequence
        for i in range(1,time_frames):
            assert_array_equal(out[:,2*i],out[:,0]+i*gap)
            assert_array_equal(out[:,2*i+1],out[:,1]-i*gap)
        # check pseudo-inverse
        rec = tf.pseudo_inverse(out)
        assert_equal(rec.shape[1], inp.shape[1])
        block_size = min(out.shape[0], gap)
        for i in range(0,length,gap):
            assert_array_equal(rec[i:i+block_size], inp[i:i+block_size])

    def testTimeFramesNodeBugInputDim(self):
        mdp.nodes.TimeFramesNode(time_frames=10, gap=1, input_dim=1)
        
    def testEtaComputerNode(self):
        tlen = 1e5
        t = numx.linspace(0,2*numx.pi,tlen)
        inp = numx.array([numx.sin(t), numx.sin(5*t)]).T
        # create node to be tested
        ecnode = mdp.nodes.EtaComputerNode()
        ecnode.train(inp)
        #
        etas = ecnode.get_eta(t=tlen)
        # precision gets better with increasing tlen
        assert_array_almost_equal(etas, [1, 5], decimal=4)

    def testGrowingNeuralGasNode(self):
        ### test 1D distribution in a 10D space
        # line coefficients
        dim = 10
        npoints = 1000
        const = self._uniform(-100,100,[dim])
        dir = self._uniform(-1,1,[dim])
        dir /= utils.norm2(dir)
        x = self._uniform(-1,1,[npoints])
        data = numx.outer(x, dir)+const
        # train the gng network
        gng = mdp.nodes.GrowingNeuralGasNode(start_poss=[data[0,:],data[1,:]])
        gng.train(data)
        gng.stop_training()
        # control that the nodes in the graph lie on the line
        poss = gng.get_nodes_position()-const
        norms = numx.sqrt(numx.sum(poss*poss, axis=1))
        poss = (poss.T/norms).T
        assert max(numx.minimum(numx.sum(abs(poss-dir),axis=1),
                                 numx.sum(abs(poss+dir),axis=1)))<1e-7, \
               'At least one node of the graph does lies out of the line.'
        # check that the graph is linear (no additional branches)
        # get a topological sort of the graph
        topolist = gng.graph.topological_sort()
        deg = map(lambda n: n.degree(), topolist)
        assert_equal(deg[:2],[1,1])
        assert_array_equal(deg[2:], [2 for i in range(len(deg)-2)])
        # check the distribution of the nodes' position is uniform
        # this node is at one of the extrema of the graph
        x0 = numx.outer(numx.amin(x, axis=0), dir)+const
        x1 = numx.outer(numx.amax(x, axis=0), dir)+const
        linelen = utils.norm2(x0-x1)
        # this is the mean distance the node should have
        dist = linelen/poss.shape[0]
        # sort the node, depth first
        nodes = gng.graph.undirected_dfs(topolist[0])
        poss = numx.array(map(lambda n: n.data.pos, nodes))
        dists = numx.sqrt(numx.sum((poss[:-1,:]-poss[1:,:])**2, axis=1))
        assert_almost_equal(dist, mean(dists), 1)
        #
        # test the nearest_neighbor function
        start_poss = [numx.asarray([2.,0]), numx.asarray([-2.,0])]
        gng = mdp.nodes.GrowingNeuralGasNode(start_poss=start_poss)
        x = numx.asarray([[2.,0]])
        gng.train(x)
        nodes, dists = gng.nearest_neighbor(numx.asarray([[1.,0]]))
        assert_equal(dists[0],1.)
        assert_array_equal(nodes[0].data.pos,numx.asarray([2,0]))

    def testNoiseNode(self):
        def bogus_noise(mean, size=None):
            return numx.ones(size)*mean

        node = mdp.nodes.NoiseNode(bogus_noise, (1.,))
        out = node.execute(numx.zeros((100,10),'d'))
        assert_array_equal(out, numx.ones((100,10),'d'))
        node = mdp.nodes.NoiseNode(bogus_noise, (1.,), 'multiplicative')
        out = node.execute(numx.zeros((100,10),'d'))
        assert_array_equal(out, numx.zeros((100,10),'d'))
        
    def testNormalNoiseNode(self):
        node = mdp.nodes.NormalNoiseNode(noise_args=(2.1, 0.001))
        x = numx.array([range(100), range(100)])
        node.execute(x)

    def testNoiseNodePickling(self):
        node = mdp.nodes.NoiseNode()
        node.copy()
        dummy = node.save(None)
        
    def testFDANode(self):
        mean1 = [0., 2.]
        mean2 = [0., -2.]
        std_ = numx.array([1., 0.2])
        npoints = 50000
        rot = 45
        
        # input data: two distinct gaussians rotated by 45 deg
        def distr(size): return normal(0, 1., size=(size)) * std_
        x1 = distr((npoints,2)) + mean1
        utils.rotate(x1, rot, units='degrees')
        x2 = distr((npoints,2)) + mean2
        utils.rotate(x2, rot, units='degrees')
        x = numx.concatenate((x1, x2), axis=0)
        
        # labels
        cl1 = numx.ones((x1.shape[0],), dtype='d')
        cl2 = 2.*numx.ones((x2.shape[0],), dtype='d')
        classes = numx.concatenate((cl1, cl2))

        # shuffle the data
        perm_idx = numx_rand.permutation(classes.shape[0])
        x = numx.take(x, perm_idx, axis=0)
        
        classes = numx.take(classes, perm_idx)

        flow = mdp.Flow([mdp.nodes.FDANode()])
        try:
            flow[0].train(x, numx.ones((2,)))
            assert False, 'No exception despite wrong number of labels'
        except mdp.TrainingException:
            pass
        flow.train([[(x, classes)]])
        fda_node = flow[0]

        assert fda_node.tlens[1] == npoints
        assert fda_node.tlens[2] == npoints
        m1 = numx.array([mean1])
        m2 = numx.array([mean2])
        utils.rotate(m1, rot, units='degrees')
        utils.rotate(m2, rot, units='degrees')
        assert_array_almost_equal(fda_node.means[1], m1, 2)
        assert_array_almost_equal(fda_node.means[2], m2, 2)
       
        y = flow.execute(x)
        assert_array_almost_equal(mean(y, axis=0), [0., 0.], self.decimal-2)
        assert_array_almost_equal(std(y, axis=0), [1., 1.], self.decimal-2)
        assert_almost_equal(mult(y[:,0], y[:,1].T), 0., self.decimal-2)

        v1 = fda_node.v[:,0]/fda_node.v[0,0]
        assert_array_almost_equal(v1, [1., -1.], 2)
        v1 = fda_node.v[:,1]/fda_node.v[0,1]
        assert_array_almost_equal(v1, [1., 1.], 2)

    def testGaussianClassifier_train(self):
        nclasses = 10
        dim = 4
        npoints = 10000
        covs = []
        means = []

        node = mdp.nodes.GaussianClassifierNode()
        for i in range(nclasses):
            cov = utils.symrand(uniform((dim,))*dim+1)
            mn = uniform((dim,))*10.
            x = normal(0., 1., size=(npoints, dim))
            x = mult(x, utils.sqrtm(cov)) + mn
            x = utils.refcast(x, 'd')
            cl = numx.ones((npoints,))*i
            
            mn_estimate = mean(x, axis=0)
            means.append(mn_estimate)
            covs.append(numx.cov(x, rowvar=0))

            node.train(x, cl)
        try:
            node.train(x, numx.ones((2,)))
            assert False, 'No exception despite wrong number of labels'
        except mdp.TrainingException:
            pass

        node.stop_training()

        for i in range(nclasses):
            lbl_idx = node.labels.index(i)
            assert_array_almost_equal_diff(means[i],
                                      node.means[lbl_idx],
                                      self.decimal-1)
            assert_array_almost_equal_diff(utils.inv(covs[i]),
                                      node.inv_covs[lbl_idx],
                                      self.decimal-2)

    def testGaussianClassifier_labellistbug(self):
        gc = mdp.nodes.GaussianClassifierNode()
        # this was failing as of MDP-2.5-309-gefa0f9d!
        gc.train(mdp.numx_rand.random((50, 3)), [+1] * 50)
        

    def testGaussianClassifier_classify(self):
        mean1 = [0., 2.]
        mean2 = [0., -2.]
        std_ = numx.array([1., 0.2])
        npoints = 100
        rot = 45
        
        # input data: two distinct gaussians rotated by 45 deg
        def distr(size): return normal(0, 1., size=(size)) * std_
        x1 = distr((npoints,2)) + mean1
        utils.rotate(x1, rot, units='degrees')
        x2 = distr((npoints,2)) + mean2
        utils.rotate(x2, rot, units='degrees')
        x = numx.concatenate((x1, x2), axis=0)

        # labels
        cl1 = numx.ones((x1.shape[0],), dtype='i')
        cl2 = 2*numx.ones((x2.shape[0],), dtype='i')
        classes = numx.concatenate((cl1, cl2))

        # shuffle the data
        perm_idx = numx_rand.permutation(classes.shape[0])
        x = numx.take(x, perm_idx, axis=0)
        classes = numx.take(classes, perm_idx, axis=0)
        
        node = mdp.nodes.GaussianClassifierNode()
        node.train(x, classes)
        classification = node.label(x)

        assert_array_equal(classes, classification)

    def testFANode(self):
        d = 10
        N = 5000
        k = 4

        mu = uniform((1, d))*3.+2.
        sigma = uniform((d,))*0.01
        #A = utils.random_rot(d)[:k,:]
        A = numx_rand.normal(size=(k,d))

        # latent variables
        y = numx_rand.normal(0., 1., size=(N, k))
        # observations
        noise = numx_rand.normal(0., 1., size=(N, d)) * sigma
        
        x = mult(y, A) + mu + noise
        
        fa = mdp.nodes.FANode(output_dim=k, dtype='d')
        fa.train(x)
        fa.stop_training()

        # compare estimates to real parameters
        assert_array_almost_equal(fa.mu[0,:], mean(x, axis=0), 5)
        assert_array_almost_equal(fa.sigma, std(noise, axis=0)**2, 2)
        # FA finds A only up to a rotation. here we verify that the
        # A and its estimation span the same subspace
        AA = numx.concatenate((A,fa.A.T),axis=0)
        u,s,vh = utils.svd(AA)
        assert sum(s/max(s)>1e-2)==k, \
               'A and its estimation do not span the same subspace'

        y = fa.execute(x)
        fa.generate_input()
        fa.generate_input(10)
        fa.generate_input(y)
        fa.generate_input(y, noise=True)

        # test that noise has the right mean and variance
        est = fa.generate_input(numx.zeros((N, k)), noise=True)
        est -= fa.mu
        assert_array_almost_equal(numx.diag(numx.cov(est, rowvar=0)),
                                  fa.sigma, 3)
        assert_almost_equal(numx.amax(abs(numx.mean(est, axis=0)), axis=None), 0., 3)

        est = fa.generate_input(100000)
        assert_array_almost_equal_diff(numx.cov(est, rowvar=0),
                                       utils.mult(fa.A, fa.A.T), 1)

    def testISFANodeGivensRotations(self):
        ncovs = 5
        dim = 7
        ratio = uniform(2).tolist()
        covs = [uniform((dim,dim)) for j in range(ncovs)]
        covs= mdp.utils.MultipleCovarianceMatrices(covs)
        covs.symmetrize()
        i = mdp.nodes.ISFANode(range(1, ncovs+1),sfa_ica_coeff=ratio,
                               icaweights=uniform(ncovs),
                               sfaweights=uniform(ncovs),
                               output_dim = dim-1, dtype="d")
        i._adjust_ica_sfa_coeff()
        ratio = i._bica_bsfa
        # case 2: only one axis within output space
        # get contrast using internal function
        phi, cont1, min_, dummy =\
                   i._givens_angle_case2(dim-2,dim-1,covs,ratio,complete=1)
        # get contrast using explicit rotations
        cont2 = []
        for angle in phi:
            cp = covs.copy()
            cp.rotate(angle,[dim-2,dim-1])
            cont2.append(numx.sum(i._get_contrast(cp,ratio)))
        assert_array_almost_equal(cont1,cont2,self.decimal)
        # case 1: both axes within output space
        # get contrast using internal function
        phi,cont1, min_ , dummy =\
                   i._givens_angle_case1(0,1,covs,ratio,complete = 1)
        # get contrast using explicit rotations
        cont2 = []
        for angle in phi:
            cp = covs.copy()
            cp.rotate(angle,[0,1])
            cont2.append(numx.sum(i._get_contrast(cp,ratio)))
        assert abs(min_) < numx.pi/4, 'Estimated Minimum out of bounds'
        assert_array_almost_equal(cont1,cont2,self.decimal)

    def testISFANode_SFAPart(self):
        # create independent sources
        mat = uniform((100000,3))*2-1
        fmat = numx_fft.rfft(mat,axis=0)
        # enforce different speeds
        for i in range(3):
            fmat[(i+1)*5000:,i] = 0.
        mat = numx_fft.irfft(fmat,axis=0)
        src = mdp.sfa(mat)
        # test with unmixed signals (i.e. the node should make nothing at all)
        out = mdp.nodes.ISFANode(lags=1,
                                 whitened=True,
                                 sfa_ica_coeff=[1.,0.])(src)
        max_cv = numx.diag(abs(_cov(out,src)))
        assert_array_almost_equal(max_cv, numx.ones((3,)),6)
        # mix linearly the signals
        mix = mult(src,uniform((3,3))*2-1)
        out = mdp.nodes.ISFANode(lags=1,
                                 whitened=False,
                                 sfa_ica_coeff=[1.,0.])(mix)
        max_cv = numx.diag(abs(_cov(out,src)))
        assert_array_almost_equal(max_cv, numx.ones((3,)),5)

    def testISFANode_ICAPart(self):
        # create independent sources
        src = uniform((100000,3))*2-1
        fsrc = numx_fft.rfft(src,axis=0)
        # enforce different speeds
        for i in range(3):
            fsrc[(i+1)*5000:,i] = 0.
        src = numx_fft.irfft(fsrc,axis=0)
        # enforce time-lag-1-independence
        src = mdp.nodes.ISFANode(lags=1, sfa_ica_coeff=[1.,0.])(src)
        out = mdp.nodes.ISFANode(lags=1,
                                 whitened=True,
                                 sfa_ica_coeff=[0.,1.])(src)
        max_cv = numx.diag(abs(_cov(out,src)))
        assert_array_almost_equal(max_cv, numx.ones((3,)),5)
        # mix linearly the signals
        mix = mult(src,uniform((3,3))*2-1)
        out = mdp.nodes.ISFANode(lags=1,
                                 whitened=False,
                                 sfa_ica_coeff=[0.,1.])(mix)
        max_cv = numx.diag(abs(_cov(out,src)))
        assert_array_almost_equal(max_cv, numx.ones((3,)),5)

    def testISFANode_3Complete(self):
        # test transition from ica to sfa behavior of isfa
        # use ad hoc sources
        lag = 25
        src = numx.zeros((1001,3),"d")
        idx = [(2,4),(80,1),(2+lag,6)]
        for i in range(len(idx)):
            i0, il = idx[i]
            src[i0:i0+il,i] = 1.
            src[i0+il:i0+2*il,i] = -1.
            src[:,i] -= mean(src[:,i])
            src[:,i] /= std(src[:,i])
        # test extreme cases
        # case 1: ICA
        out = mdp.nodes.ISFANode(lags=[1,lag],
                                 icaweights=[1.,1.],
                                 sfaweights=[1.,0.],
                                 output_dim=2,
                                 whitened=True,
                                 sfa_ica_coeff=[1E-4,1.])(src)
        cv = abs(_cov(src,out))
        idx_cv = numx.argmax(cv,axis=0)
        assert_array_equal(idx_cv,[2,1])
        max_cv = numx.amax(cv,axis=0)
        assert_array_almost_equal(max_cv, numx.ones((2,)),5)
        # case 2: SFA
        out = mdp.nodes.ISFANode(lags=[1,lag],
                                 icaweights=[1.,1.],
                                 sfaweights=[1.,0.],
                                 output_dim=2,
                                 whitened=True,
                                 sfa_ica_coeff=[1.,0.])(src)
        cv = abs(_cov(src,out))
        idx_cv = numx.argmax(cv,axis=0)
        assert_array_equal(idx_cv,[2,0])
        max_cv = numx.amax(cv,axis=0)
        assert_array_almost_equal(max_cv, numx.ones((2,)),5)

    def _ISFA_analytical_solution(self, nsources, nmat, dim, ica_ambiguity):
        # build a sequence of random diagonal matrices
        matrices = [numx.eye(dim, dtype='d')]*nmat
        # build first matrix:
        #   - create random diagonal with elements
        #     in [0, 1]
        diag = uniform(dim)
        #   - sort it in descending order (in absolute value)
        #     [large first]
        diag = numx.take(diag, numx.argsort(abs(diag)))[::-1]
        #   - save larger elements [sfa solution] 
        sfa_solution = diag[:nsources].copy()
        #   - modify diagonal elements order to allow for a
        #     different solution for isfa:
        #     create index array
        idx = range(0,dim)
        #     take the second slowest element and put it at the end
        idx = [idx[0]]+idx[2:]+[idx[1]]
        diag = numx.take(diag, idx)
        #   - save isfa solution
        isfa_solution = diag[:nsources]
        #   - set the first matrix
        matrices[0] = matrices[0]*diag
        # build other matrices
        diag_dim = nsources+ica_ambiguity 
        for i in range(1,nmat):
            # get a random symmetric matrix
            matrices[i] = mdp.utils.symrand(dim)
            # diagonalize the subspace diag_dim
            tmp_diag = (uniform(diag_dim)-0.5)*2
            matrices[i][:diag_dim,:diag_dim] = numx.diag(tmp_diag)
        # put everything in MultCovMat
        matrices = mdp.utils.MultipleCovarianceMatrices(matrices)
        return matrices, sfa_solution, isfa_solution

    def _ISFA_unmixing_error(self, nsources, goal, estimate):
        check = mult(goal[:nsources,:], estimate[:,:nsources])
        error = (abs(numx.sum(numx.sum(abs(check),axis=1)-1))+
                 abs(numx.sum(numx.sum(abs(check),axis=0)-1)))
        error /= nsources*nsources
        return error

    def testISFANode_AnalyticalSolution(self):
        nsources = 2
        # number of time lags
        nmat = 20
        # degree of polynomial expansion
        deg = 3
        # sfa_ica coefficient
        sfa_ica_coeff = [1., 1.]
        # how many independent subspaces in addition to the sources
        ica_ambiguity = 2
        # dimensions of expanded space
        dim = mdp.nodes._expanded_dim(deg, nsources)
        assert (nsources+ica_ambiguity) < dim, 'Too much ica ambiguity.'
        trials = 20
        for trial in range(trials):
            # get analytical solution:
            # prepared matrices, solution for sfa, solution for isf
            covs,sfa_solution,isfa_solution=self._ISFA_analytical_solution(
                nsources,nmat,dim,ica_ambiguity)
            # get contrast of analytical solution
            # sfasrc, icasrc = _get_matrices_contrast(covs, nsources, dim,
            #                                         sfa_ica_coeff)
            # set rotation matrix
            R = mdp.utils.random_rot(dim)
            covs_rot = covs.copy()
            # rotate the analytical solution
            covs_rot.transform(R)
            # find the SFA solution to initialize ISFA
            eigval, SFARP = mdp.utils.symeig(covs_rot.covs[:,:,0])
            # order SFA solution by slowness
            SFARP = SFARP[:,-1::-1]
            # run ISFA
            isfa = mdp.nodes.ISFANode(lags = covs_rot.ncovs, whitened=True,
                                      sfa_ica_coeff = sfa_ica_coeff,
                                      eps_contrast = 1e-7,
                                      output_dim = nsources,
                                      max_iter = 500,
                                      verbose = False,
                                      RP = SFARP)
            isfa.train(uniform((100,dim)))
            isfa.stop_training(covs = covs_rot.copy())
            # check that the rotation matrix found by ISFA is R
            # up to a permutation matrix.
            # Unmixing error as in Tobias paper
            error = self._ISFA_unmixing_error(nsources, R, isfa.RPC)
            if error < 1E-4:
                break
        assert error < 1E-4, 'Not one out of %d trials succeded.'%trials
            
    def testRBMSample_h(self):
        # number of visible and hidden units
        I, J = 2, 4

        # create RBM node
        bm = mdp.nodes.RBMNode(J, I)
        # fake training to initialize internals
        bm.train(numx.zeros((1,I)))
        # init to deterministic model
        bm.w[0,:] = [1,0,1,0]
        bm.w[1,:] = [0,1,0,1]
        bm.w *= 2e4
        bm.bv *= 0.
        bm.bh *= 0.

        # ### test 1
        v = numx.array([[0,0],[1,0],[0,1],[1,1.]])
        h = []
        for n in range(1000):
            prob, sample = bm.sample_h(v)
            h.append(sample)

        # check inferred probabilities
        expected_probs = numx.array([[0.5, 0.5, 0.5, 0.5],
                                      [1.0, 0.5, 1.0, 0.5],
                                      [0.5, 1.0, 0.5, 1.0],
                                      [1.0, 1.0, 1.0, 1.0]])
        assert_array_almost_equal(prob, expected_probs, 8)

        # check sampled units
        h = numx.array(h)
        for n in range(4):
            distr = h[:,n,:].mean(axis=0)
            assert_array_almost_equal(distr, expected_probs[n,:], 1)

        # ### test 2, with bias
        bm.bh -= 1e2
        h = []
        for n in range(100):
            prob, sample = bm.sample_h(v)
            h.append(sample)

        # check inferred probabilities
        expected_probs = numx.array([[0., 0., 0., 0.],
                                      [1.0, 0., 1.0, 0.],
                                      [0., 1.0, 0., 1.0],
                                      [1.0, 1.0, 1.0, 1.0]])
        assert_array_almost_equal(prob, expected_probs, 8)

        # check sampled units
        h = numx.array(h)
        for n in range(4):
            distr = h[:,n,:].mean(axis=0)
            assert_array_almost_equal(distr, expected_probs[n,:], 1)

    def testRBMSample_v(self):
        # number of visible and hidden units
        I, J = 4, 2

        # create RBM node
        bm = mdp.nodes.RBMNode(J, I)
        # fake training to initialize internals
        bm.train(numx.zeros((1,I)))
        # init to deterministic model
        bm.w[:,0] = [1,0,1,0]
        bm.w[:,1] = [0,1,0,1]
        bm.w *= 2e4
        bm.bv *= 0
        bm.bh *= 0

        # test 1
        h = numx.array([[0,0],[1,0],[0,1],[1,1.]])
        v = []
        for n in range(1000):
            prob, sample = bm.sample_v(h)
            v.append(sample)

        # check inferred probabilities
        expected_probs = numx.array([[0.5, 0.5, 0.5, 0.5],
                                      [1.0, 0.5, 1.0, 0.5],
                                      [0.5, 1.0, 0.5, 1.0],
                                      [1.0, 1.0, 1.0, 1.0]])
        assert_array_almost_equal(prob, expected_probs, 8)

        # check sampled units
        v = numx.array(v)
        for n in range(4):
            distr = v[:,n,:].mean(axis=0)
            assert_array_almost_equal(distr, expected_probs[n,:], 1)

        # test 2, with bias
        bm.bv -= 1e2
        v = []
        for n in range(1000):
            prob, sample = bm.sample_v(h)
            v.append(sample)

        # check inferred probabilities
        expected_probs = numx.array([[0., 0., 0., 0.],
                                      [1.0, 0., 1.0, 0.],
                                      [0., 1.0, 0., 1.0],
                                      [1.0, 1.0, 1.0, 1.0]])
        assert_array_almost_equal(prob, expected_probs, 8)

        # check sampled units
        v = numx.array(v)
        for n in range(4):
            distr = v[:,n,:].mean(axis=0)
            assert_array_almost_equal(distr, expected_probs[n,:], 1)

    def testRBMStability(self):
        # number of visible and hidden units
        I, J = 8, 2

        # create RBM node
        bm = mdp.nodes.RBMNode(J, I)
        bm._init_weights()
        # init to random model
        bm.w = mdp.utils.random_rot(max(I,J), dtype='d')[:I, :J]
        bm.bv = numx_rand.randn(I)
        bm.bh = numx_rand.randn(J)

        # save original weights
        real_w = bm.w.copy()
        real_bv = bm.bv.copy()
        real_bh = bm.bh.copy()

        # Gibbs sample to reach the equilibrium distribution
        N = 1e4
        v = numx_rand.randint(0,2,(N,I)).astype('d')
        for k in range(100):
            if k%5==0: spinner()
            p, h = bm._sample_h(v)
            p, v = bm._sample_v(h)

        # see that w remains stable after learning
        for k in range(100):
            if k%5==0: spinner()
            err = bm.train(v)
        bm.stop_training()

        assert_array_almost_equal(real_w, bm.w, 1)
        assert_array_almost_equal(real_bv, bm.bv, 1)
        assert_array_almost_equal(real_bh, bm.bh, 1)

    def testRBMLearning(self):
        # number of visible and hidden units
        I, J = 4, 2
        
        bm = mdp.nodes.RBMNode(J, I)
        bm.w = mdp.utils.random_rot(max(I,J), dtype='d')[:I, :J]

        # the observations consist of two disjunct patterns that
        # never appear together
        N = 1e4
        v = numx.zeros((N,I))
        for n in range(int(N)):
            r = numx_rand.random()
            if r>0.666: v[n,:] = [0,1,0,1]
            elif r>0.333: v[n,:] = [1,0,1,0]

        for k in range(1500):
            if k%5==0: spinner()

            if k>5:
                mom = 0.9
            else:
                mom = 0.5
            bm.train(v, epsilon=0.3, momentum=mom)
            if bm._train_err/N<0.1: break
            #print '-------', bm._train_err

        assert bm._train_err/N<0.1

    def testRBMWithLabelsNode(self):
        I, J, L = 4, 4, 2
        bm = mdp.nodes.RBMWithLabelsNode(J,L,I)
        assert bm.input_dim == I+L

        # generate input data
        N = 2500
        v = numx.zeros((2*N,I))
        l = numx.zeros((2*N,L))
        for n in range(N):
            r = numx_rand.random()
            if r>0.1:
                v[n,:] = [1,0,1,0]
                l[n,:] = [1,0]
        for n in range(N):
            r = numx_rand.random()
            if r>0.1:
                v[n,:] = [0,1,0,1]
                l[n,:] = [1,0]

        x = numx.concatenate((v, l), axis=1)
        for k in range(2500):
            if k%5==0: spinner()
            
            if k>200:
                mom = 0.9
                eps = 0.7
            else:
                mom = 0.5
                eps = 0.2
            bm.train(v, l, epsilon=eps, momentum=mom)

            ph, sh = bm._sample_h(x)
            pv, pl, sv, sl = bm._sample_v(sh, concatenate=False)

            v_train_err = float(((v-sv)**2.).sum())
            #print '-------', k, v_train_err/(2*N)
            if v_train_err/(2*N)<0.1: break

        # visible units are reconstructed
        assert v_train_err/(2*N)<0.1
        # units with 0 input have 50/50 labels
        idxzeros = v.sum(axis=1)==0
        nzeros = idxzeros.sum()
        point5 = numx.zeros((nzeros, L)) + 0.5
        assert_array_almost_equal(pl[idxzeros], point5, 2)

    def testLinearRegressionNode(self):
        
        def train_LRNode(inp, out, with_bias):
            lrnode = mdp.nodes.LinearRegressionNode(with_bias)
            for i in range(len(inp)):
                lrnode.train(inp[i], out[i])
            lrnode.stop_training()
            return lrnode
        
        indim, outdim, tlen = 5, 3, 10000
        
        # 1. first, without noise
        # 1a   without bias term
        # regression coefficients
        beta = numx_rand.uniform(-10., 10., size=(indim, outdim))
        # input data
        x = numx_rand.uniform(-20., 20., size=(tlen, indim))
        # output of the linear model
        y = mult(x, beta)
        # train
        lrnode = train_LRNode([x], [y], False)
        # test results
        assert_array_almost_equal(lrnode.beta, beta, self.decimal)
        res = lrnode(x)
        assert_array_almost_equal(res, y, self.decimal)

        # 1b with bias
        beta = numx_rand.uniform(-10., 10., size=(indim+1, outdim))
        x = numx_rand.uniform(-20., 20., size=(tlen, indim))
        y = mult(x, beta[1:,:]) + beta[0,:]
        lrnode = train_LRNode([x], [y], True)
        assert_array_almost_equal(lrnode.beta, beta, self.decimal)
        res = lrnode(x)
        assert_array_almost_equal(res, y, self.decimal)

        # 2. with noise, multiple sets of input
        beta = numx_rand.uniform(-10., 10., size=(indim+1, outdim))
        inp = [numx_rand.uniform(-20., 20., size=(tlen, indim))
               for i in range(5)]
        out = [mult(x, beta[1:,:]) + beta[0,:] +
               numx_rand.normal(size=y.shape)*0.1 for x in inp]
        lrnode = train_LRNode(inp, out, True)
        assert_array_almost_equal(lrnode.beta, beta, 2)
        res = lrnode(inp[0])
        assert_array_almost_equal_diff(res, out[0], 2)

        # 3. test error for linearly dependent input
        beta = numx_rand.uniform(-10., 10., size=(indim, outdim))
        x = numx_rand.uniform(-20., 20., size=(tlen, indim))
        x[:,-1] = 2.*x[:,0]
        y = mult(x, beta)
        try:
            lrnode = train_LRNode([x], [y], False)
            raise Exception("LinearRegressionNode didn't raise "
                            "error for linearly dependent input")
        except mdp.NodeException:
            pass

        # 4. test wrong output size
        beta = numx_rand.uniform(-10., 10., size=(indim, outdim))
        x = numx_rand.uniform(-20., 20., size=(tlen, indim))
        x[:,-1] = 2.*x[:,0]
        y = mult(x, beta)
        y = y[:10,:]
        try:
            lrnode = train_LRNode([x], [y], False)
            raise Exception, ("LinearRegressionNode didn't raise "+
                              "error for output of wrong size")
        except mdp.TrainingException:
            pass
        
    def testCutoffNode(self):
        node = mdp.nodes.CutoffNode(-1.5, 1.2)
        x = numx.array([[0.1, 0, -2, 3, 1.2, -1.5, -3.33]])
        y_ref = numx.array([[0.1, 0, -1.5, 1.2, 1.2, -1.5, -1.5]])
        y = node.execute(x)
        assert numx.all(y==y_ref)
        
    def testHistogramNode_nofraction(self):
        """Test HistogramNode with fraction set to 1.0."""
        node = mdp.nodes.HistogramNode()
        x1 = numx.array([[0.1, 0.2], [0.3, 0.5]])
        x2 = numx.array([[0.3, 0.6], [0.2, 0.1]])
        x = numx.concatenate([x1, x2])
        node.train(x1)
        node.train(x2)
        assert numx.all(x == node.data_hist)
        
    def testHistogramNode_fraction(self):
        """Test HistogramNode with fraction set to 0.5."""
        node = mdp.nodes.HistogramNode(hist_fraction=0.5)
        x1 = numx_rand.random((1000, 3))
        x2 = numx_rand.random((500, 3))
        node.train(x1)
        node.train(x2)
        assert len(node.data_hist) < 1000
        
    def testAdaptiveCutoffNode_smalldata(self):
        """Test AdaptiveCutoffNode thoroughly on a small data set."""
        # values from 0.1 to 0.6 and 0.2 to 0.7
        x1 = numx.array([[0.1, 0.3], [0.3, 0.5], [0.5, 0.7]])
        x2 = numx.array([[0.4, 0.6], [0.2, 0.4], [0.6, 0.2]])
        x = numx.concatenate([x1, x2])
        node = mdp.nodes.AdaptiveCutoffNode(lower_cutoff_fraction= 0.2, # clip first
                                        upper_cutoff_fraction=0.4) # last two
        node.train(x1)
        node.train(x2)
        node.stop_training()
        assert numx.all(x == node.data_hist)
        # test bound values
        assert numx.all(node.lower_bounds == numx.array([0.2, 0.3]))
        assert numx.all(node.upper_bounds == numx.array([0.4, 0.5]))
        # test execute
        x_test = (numx.array([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]]))
        x_clip = node.execute(x_test)
        x_goal = numx.array([[0.2, 0.3], [0.3, 0.4], [0.4, 0.5]])
        assert numx.all(x_clip == x_goal)
        
    def testAdaptiveCutoffNode_randomdata(self):
        """Test AdaptiveCutoffNode on a large random data."""
        node = mdp.nodes.AdaptiveCutoffNode(lower_cutoff_fraction= 0.2,
                                        upper_cutoff_fraction=0.4,
                                        hist_fraction=0.5)
        x1 = numx_rand.random((1000, 3))
        x2 = numx_rand.random((500, 3))
        x = numx.concatenate([x1, x2])
        node.train(x1)
        node.train(x2)
        node.stop_training()
        node.execute(x)

    def testRBFExpansionNode(self):
        rrep = mdp.utils.rrep
        dim, n = 2, 10
        centers = numx_rand.random((n, dim))
        # grid of points to numerically compute the integral
        grid = numx.meshgrid(numx.linspace(-3., 4., 100),
                             numx.linspace(-3., 4., 100))
        grid = numx.array([grid[0].flatten(), grid[1].flatten()]).T
        # compute covariance for each point of the grid
        grid_cov = numx.zeros((grid.shape[0], dim, dim))
        for i in range(dim):
            for j in range(dim):
                grid_cov[:,i,j] = grid[:,i]*grid[:,j]

        def check_mn_cov(rbf, real_covs):
            y = rbf(grid)
            # verify means, sizes
            for i in range(n):
                p = y[:,i]/y[:,i].sum()
                # check mean
                mn = (rrep(p,dim)*grid).sum(0)
                assert_array_almost_equal(mn, centers[i,:], 2)
                # check variance
                vr = ((rrep(rrep(p,2),2)*grid_cov).sum(0)
                      - numx.outer(mn, mn))
                assert_array_almost_equal(vr, real_covs[i], 2)

        def scalar_to_covs(x, n):
            if numx.isscalar(x):
                x = [x]*n
            return [numx.array([[x[i],0],[0,x[i]]]) for i in range(n)]

        # 1: sizes is a scalar
        sizes = 0.32
        rbf = mdp.nodes.RBFExpansionNode(centers, sizes)
        check_mn_cov(rbf, scalar_to_covs(sizes, n))

        # 2: sizes is many scalars
        sizes = 0.3 + numx_rand.random(n)*0.2
        rbf = mdp.nodes.RBFExpansionNode(centers, sizes)
        check_mn_cov(rbf, scalar_to_covs(sizes, n))

        # 3: sizes is one covariance
        sizes = mdp.utils.symrand(numx.array([0.2, 0.4]))
        rbf = mdp.nodes.RBFExpansionNode(centers, sizes)
        check_mn_cov(rbf, [sizes]*n)

        # 4: sizes is many covariances
        sizes = [mdp.utils.symrand(numx.array([0.2, 0.4]))
                 for i in range(n)]
        rbf = mdp.nodes.RBFExpansionNode(centers, sizes)
        check_mn_cov(rbf, sizes)

def get_suite(testname=None):
    return NodesTestSuite(testname=testname)

if __name__ == '__main__':
    numx_rand.seed(1268049219)
    unittest.TextTestRunner(verbosity=2).run(get_suite())

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.