import mdp
import warnings
# import numeric module (scipy, Numeric or numarray)
numx = mdp.numx
def _check_roundoff(t, dtype):
"""Check if t is so large that t+1 == t up to 2 precision digits"""
# limit precision
limit = 10.**(numx.finfo(dtype).precision-2)
if int(t) >= limit:
wr = ('You have summed %e entries in the covariance matrix.'
'\nAs you are using dtype \'%s\', you are '
'probably getting severe round off'
'\nerrors. See CovarianceMatrix docstring for more'
' information.' % (t, dtype.name))
warnings.warn(wr, mdp.MDPWarning)
class CovarianceMatrix(object):
"""This class stores an empirical covariance matrix that can be updated
incrementally. A call to the 'fix' method returns the current state of
the covariance matrix, the average and the number of observations, and
resets the internal data.
Note that the internal sum is a standard __add__ operation. We are not
using any of the fancy sum algorithms to avoid round off errors when
adding many numbers. If you want to contribute a CovarianceMatrix class
that uses such algorithms we would be happy to include it in MDP.
For a start see the Python recipe by Raymond Hettinger at
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/393090
For a review about floating point arithmetic and its pitfalls see
http://docs.sun.com/source/806-3568/ncg_goldberg.html
"""
def __init__(self, dtype=None, bias=False):
"""If dtype is not defined, it will be inherited from the first
data bunch received by 'update'.
All the matrices in this class are set up with the given dtype and
no upcast is possible.
If bias is True, the covariance matrix is normalized by dividing
by T instead of the usual T-1.
"""
if dtype is None:
self._dtype = None
else:
self._dtype = numx.dtype(dtype)
self._input_dim = None # will be set in _init_internals
# covariance matrix, updated during the training phase
self._cov_mtx = None
# average, updated during the training phase
self._avg = None
# number of observation so far during the training phase
self._tlen = 0
self.bias = bias
def _init_internals(self, x):
"""Init the internal structures.
The reason this is not done in the constructor is that we want to be
able to derive the input dimension and the dtype directly from the
data this class receives.
"""
# init dtype
if self._dtype is None:
self._dtype = x.dtype
dim = x.shape[1]
self._input_dim = dim
type_ = self._dtype
# init covariance matrix
self._cov_mtx = numx.zeros((dim, dim), type_)
# init average
self._avg = numx.zeros(dim, type_)
def update(self, x):
"""Update internal structures.
Note that no consistency checks are performed on the data (this is
typically done in the enclosing node).
"""
if self._cov_mtx is None:
self._init_internals(x)
# cast input
x = mdp.utils.refcast(x, self._dtype)
# update the covariance matrix, the average and the number of
# observations (try to do everything inplace)
self._cov_mtx += mdp.utils.mult(x.T, x)
self._avg += x.sum(axis=0)
self._tlen += x.shape[0]
def fix(self):
"""Returns a triple containing the covariance matrix, the average and
the number of observations. The covariance matrix is then reset to
a zero-state."""
# local variables
type_ = self._dtype
tlen = self._tlen
_check_roundoff(tlen, type_)
avg = self._avg
cov_mtx = self._cov_mtx
##### fix the training variables
# fix the covariance matrix (try to do everything inplace)
avg_mtx = numx.outer(avg, avg)
if self.bias:
avg_mtx /= tlen*(tlen)
cov_mtx /= tlen
else:
avg_mtx /= tlen*(tlen - 1)
cov_mtx /= tlen - 1
cov_mtx -= avg_mtx
# fix the average
avg /= tlen
##### clean up
# covariance matrix, updated during the training phase
self._cov_mtx = None
# average, updated during the training phase
self._avg = None
# number of observation so far during the training phase
self._tlen = 0
return cov_mtx, avg, tlen
class DelayCovarianceMatrix(object):
"""This class stores an empirical covariance matrix between the signal and
time delayed signal that can be updated incrementally.
Note that the internal sum is a standard __add__ operation. We are not
using any of the fancy sum algorithms to avoid round off errors when
adding many numbers. If you want to contribute a CovarianceMatrix class
that uses such algorithms we would be happy to include it in MDP.
For a start see the Python recipe by Raymond Hettinger at
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/393090
For a review about floating point arithmetic and its pitfalls see
http://docs.sun.com/source/806-3568/ncg_goldberg.html
"""
def __init__(self, dt, dtype=None, bias=False):
"""dt is the time delay. If dt==0, DelayCovarianceMatrix equals
CovarianceMatrix. If dtype is not defined, it will be inherited from
the first data bunch received by 'update'.
All the matrices in this class are set up with the given dtype and
no upcast is possible.
If bias is True, the covariance matrix is normalized by dividing
by T instead of the usual T-1.
"""
# time delay
self._dt = int(dt)
if dtype is None:
self._dtype = None
else:
self._dtype = numx.dtype(dtype)
# clean up variables to spare on space
self._cov_mtx = None
self._avg = None
self._avg_dt = None
self._tlen = 0
self.bias = bias
def _init_internals(self, x):
"""Inits some internals structures. The reason this is not done in
the constructor is that we want to be able to derive the input
dimension and the dtype directly from the data this class receives.
"""
# init dtype
if self._dtype is None:
self._dtype = x.dtype
dim = x.shape[1]
self._input_dim = dim
# init covariance matrix
self._cov_mtx = numx.zeros((dim, dim), self._dtype)
# init averages
self._avg = numx.zeros(dim, self._dtype)
self._avg_dt = numx.zeros(dim, self._dtype)
def update(self, x):
"""Update internal structures."""
if self._cov_mtx is None:
self._init_internals(x)
# cast input
x = mdp.utils.refcast(x, self._dtype)
dt = self._dt
# the number of data points in each block should be at least dt+1
tlen = x.shape[0]
if tlen < (dt+1):
err = 'Block length is %d, should be at least %d.' % (tlen, dt+1)
raise mdp.MDPException(err)
# update the covariance matrix, the average and the number of
# observations (try to do everything inplace)
self._cov_mtx += mdp.utils.mult(x[:tlen-dt, :].T, x[dt:tlen, :])
totalsum = x.sum(axis=0)
self._avg += totalsum - x[tlen-dt:, :].sum(axis=0)
self._avg_dt += totalsum - x[:dt, :].sum(axis=0)
self._tlen += tlen-dt
def fix(self, A=None):
"""The collected data is adjusted to compute the covariance matrix of
the signal x(1)...x(N-dt) and the delayed signal x(dt)...x(N),
which is defined as <(x(t)-<x(t)>)*(x(t+dt)-<x(t+dt)>)> .
The function returns a tuple containing the covariance matrix,
the average <x(t)> over the first N-dt points, the average of the
delayed signal <x(t+dt)> and the number of observations. The internal
data is then reset to a zero-state.
If A is defined, the covariance matrix is transformed by the linear
transformation Ax . E.g. to whiten the data, A is the whitening matrix.
"""
# local variables
type_ = self._dtype
tlen = self._tlen
_check_roundoff(tlen, type_)
avg = self._avg
avg_dt = self._avg_dt
cov_mtx = self._cov_mtx
##### fix the training variables
# fix the covariance matrix (try to do everything inplace)
avg_mtx = numx.outer(avg, avg_dt)
avg_mtx /= tlen
cov_mtx -= avg_mtx
if self.bias:
cov_mtx /= tlen
else:
cov_mtx /= tlen - 1
if A is not None:
cov_mtx = mdp.utils.mult(A, mdp.utils.mult(cov_mtx, A.T))
# fix the average
avg /= tlen
avg_dt /= tlen
##### clean up variables to spare on space
self._cov_mtx = None
self._avg = None
self._avg_dt = None
self._tlen = 0
return cov_mtx, avg, avg_dt, tlen
class MultipleCovarianceMatrices(object):
"""Container class for multiple covariance matrices to easily
execute operations on all matrices at the same time.
Note: all operations are done in place where possible."""
def __init__(self, covs):
"""Insantiate with a sequence of covariance matrices."""
# swap axes to get the different covmat on to the 3rd axis
self.dtype = covs[0].dtype
self.covs = (numx.array(covs, dtype=self.dtype)).transpose([1, 2, 0])
self.ncovs = len(covs)
def __getitem__(self, item):
return self.covs[:, :, item]
def symmetrize(self):
"""Symmetrize matrices: C -> (C+C^T)/2 ."""
# symmetrize cov matrices
covs = self.covs
covs = 0.5*(covs+covs.transpose([1, 0, 2]))
self.covs = covs
def weight(self, weights):
"""Apply a weighting factor to matrices.
Argument can be a sequence or a single value. In the latter case
the same weight is applied to all matrices."""
# apply a weighting vector to cov matrices
err = ("len(weights)=%d does not match number "
"of matrices (%d)" % (len(weights), self.ncovs))
assert len(weights) == self.ncovs, err
self.covs *= mdp.utils.refcast(weights, self.dtype)
def rotate(self, angle, indices):
"""Rotate matrices by angle in the plane defined by indices [i,j]."""
covs = self.covs
[i, j] = indices
cos_ = numx.cos(angle)
sin_ = numx.sin(angle)
# rotate columns
# you need to copy the first column that is modified
covs_i = covs[:, i, :] + 0
covs_j = covs[:, j, :]
covs[:, i, :] = cos_*covs_i - sin_*covs_j
covs[:, j, :] = sin_*covs_i + cos_*covs_j
# rotate rows
# you need to copy the first row that is modified
covs_i = covs[i, :, :] + 0
covs_j = covs[j, :, :]
covs[i, :, :] = cos_*covs_i - sin_*covs_j
covs[j, :, :] = sin_*covs_i + cos_*covs_j
self.covs = covs
def permute(self, indices):
"""Swap two columns and two rows of all matrices, whose indices are
specified as [i,j]."""
covs = self.covs
[i, j] = indices
covs[i, :, :], covs[j, :, :] = covs[j, :, :], covs[i, :, :] + 0
covs[:, i, :], covs[:, j, :] = covs[:, j, :], covs[:, i, :] + 0
self.covs = covs
def transform(self, trans_matrix):
"""Apply a linear transformation to all matrices, defined by the
transformation matrix."""
trans_matrix = mdp.utils.refcast(trans_matrix, self.dtype)
for cov in range(self.ncovs):
self.covs[:, :, cov] = mdp.utils.mult(
mdp.utils.mult(trans_matrix.T, self.covs[:, :, cov]),
trans_matrix)
def copy(self):
"""Return a deep copy of the instance."""
return MultipleCovarianceMatrices(self.covs.transpose([2, 0, 1]))
class CrossCovarianceMatrix(CovarianceMatrix):
def _init_internals(self, x, y):
if self._dtype is None:
self._dtype = x.dtype
if y.dtype != x.dtype:
err = 'dtype mismatch: x (%s) != y (%s)'%(x.dtype,
y.dtype)
raise mdp.MDPException(err)
dim_x = x.shape[1]
dim_y = y.shape[1]
type_ = self._dtype
self._cov_mtx = numx.zeros((dim_x, dim_y), type_)
self._avgx = numx.zeros(dim_x, type_)
self._avgy = numx.zeros(dim_y, type_)
def update(self, x, y):
# check internal dimensions consistency
if x.shape[0] != y.shape[0]:
err = '# samples mismatch: x (%d) != y (%d)'%(x.shape[0],
y.shape[0])
raise mdp.MDPException(err)
if self._cov_mtx is None:
self._init_internals(x, y)
# cast input
x = mdp.utils.refcast(x, self._dtype)
y = mdp.utils.refcast(y, self._dtype)
self._cov_mtx += mdp.utils.mult(x.T, y)
self._avgx += x.sum(axis=0)
self._avgy += y.sum(axis=0)
self._tlen += x.shape[0]
def fix(self):
type_ = self._dtype
tlen = self._tlen
_check_roundoff(tlen, type_)
avgx = self._avgx
avgy = self._avgy
cov_mtx = self._cov_mtx
##### fix the training variables
# fix the covariance matrix (try to do everything inplace)
avg_mtx = numx.outer(avgx, avgy)
if self.bias:
avg_mtx /= tlen*(tlen)
cov_mtx /= tlen
else:
avg_mtx /= tlen*(tlen - 1)
cov_mtx /= tlen - 1
cov_mtx -= avg_mtx
# fix the average
avgx /= tlen
avgy /= tlen
##### clean up
# covariance matrix, updated during the training phase
self._cov_mtx = None
# average, updated during the training phase
self._avgx = None
self._avgy = None
# number of observation so far during the training phase
self._tlen = 0
return cov_mtx, avgx, avgy, tlen
|