import mdp
from mdp import numx,numx_linalg,utils,NodeException
from mdp.utils import mult,CovarianceMatrix
import warnings
sqrt, inv, det = numx.sqrt, utils.inv, numx_linalg.det
normal = mdp.numx_rand.normal
# decreasing likelihood message
_LHOOD_WARNING = ('Likelihood decreased in FANode. This is probably due '
'to some numerical errors.')
warnings.filterwarnings('always', _LHOOD_WARNING, mdp.MDPWarning)
class FANode(mdp.Node):
"""Perform Factor Analysis.
The current implementation should be most efficient for long
data sets: the sufficient statistics are collected in the
training phase, and all EM-cycles are performed at
its end.
The 'execute' function returns the Maximum A Posteriori estimate
of the latent variables. The 'generate_input' function generates
observations from the prior distribution.
tol -- tolerance (minimum change in log-likelihood before exiting
the EM algorithm)
max_cycles -- maximum number of EM cycles
verbose -- if True, print log-likelihood during the EM-cycles
Internal variables of interest:
self.mu -- Mean of the input data (available after training)
self.A -- Generating weights (available after training)
self.E_y_mtx -- Weights for Maximum A Posteriori inference
self.sigma -- Vector of estimated variance of the noise
for all input components
More information about Factor Analysis can be found in
Max Welling's classnotes:
http://www.ics.uci.edu/~welling/classnotes/classnotes.html ,
in the chapter 'Linear Models'.
"""
def __init__(self, tol=1e-4, max_cycles=100, verbose=False,
input_dim=None, output_dim=None, dtype=None):
# Notation as in Max Welling's notes
super(FANode, self).__init__(input_dim, output_dim, dtype)
self.tol = tol
self.max_cycles = max_cycles
self.verbose = verbose
self._cov_mtx = CovarianceMatrix(dtype, bias=True)
def _get_supported_dtypes(self):
"""Return the list of dtypes supported by this node."""
return ['float32', 'float64']
def _train(self, x):
# update the covariance matrix
self._cov_mtx.update(x)
def _stop_training(self):
#### some definitions
verbose = self.verbose
typ = self.dtype
tol = self.tol
d = self.input_dim
# if the number of latent variables is not specified,
# set it equal to the number of input components
if not self.output_dim:
self.output_dim = d
k = self.output_dim
# indices of the diagonal elements of a dxd or kxk matrix
idx_diag_d = [i*(d+1) for i in range(d)]
idx_diag_k = [i*(k+1) for i in range(k)]
# constant term in front of the log-likelihood
const = -d/2. * numx.log(2.*numx.pi)
##### request the covariance matrix and clean up
cov_mtx, mu, tlen = self._cov_mtx.fix()
del self._cov_mtx
cov_diag = cov_mtx.diagonal()
##### initialize the parameters
# noise variances
sigma = cov_diag
# loading factors
# Zoubin uses the determinant of cov_mtx^1/d as scale but it's
# too slow for large matrices. Is the product of the diagonal a good
# approximation?
#scale = det(cov_mtx)**(1./d)
scale = numx.product(sigma)**(1./d)
A = normal(0., sqrt(scale/k), size=(d, k)).astype(typ)
##### EM-cycle
lhood_curve = []
base_lhood = None
old_lhood = -numx.inf
for t in xrange(self.max_cycles):
## compute B = (A A^T + Sigma)^-1
B = mult(A, A.T)
# B += diag(sigma), avoid computing diag(sigma) which is dxd
B.ravel().put(idx_diag_d, B.ravel().take(idx_diag_d)+sigma)
# this quantity is used later for the log-likelihood
# abs is there to avoid numerical errors when det < 0
log_det_B = numx.log(abs(det(B)))
# end the computation of B
B = inv(B)
## other useful quantities
trA_B = mult(A.T, B)
trA_B_cov_mtx = mult(trA_B, cov_mtx)
##### E-step
## E_yyT = E(y_n y_n^T | x_n)
E_yyT = - mult(trA_B, A) + mult(trA_B_cov_mtx, trA_B.T)
# E_yyT += numx.eye(k)
E_yyT.ravel().put(idx_diag_k, E_yyT.ravel().take(idx_diag_k)+1.)
##### M-step
A = mult(trA_B_cov_mtx.T, inv(E_yyT))
sigma = cov_diag - (mult(A, trA_B_cov_mtx)).diagonal()
##### log-likelihood
trace_B_cov = (B*cov_mtx.T).sum()
# this is actually likelihood/tlen.
lhood = const - 0.5*log_det_B - 0.5*trace_B_cov
if verbose:
print 'cycle', t, 'log-lhood:', lhood
##### convergence criterion
if base_lhood is None:
base_lhood = lhood
else:
# convergence criterion
if (lhood-base_lhood)<(1.+tol)*(old_lhood-base_lhood):
break
if lhood < old_lhood:
# this should never happen
# it sometimes does, e.g. if the noise is extremely low,
# because of numerical rounding effects
warnings.warn(_LHOOD_WARNING, mdp.MDPWarning)
old_lhood = lhood
lhood_curve.append(lhood)
self.tlen = tlen
self.A = A
self.mu = mu.reshape(1, d)
self.sigma = sigma
## MAP matrix
# compute B = (A A^T + Sigma)^-1
B = mult(A, A.T).copy()
B.ravel().put(idx_diag_d, B.ravel().take(idx_diag_d)+sigma)
B = inv(B)
self.E_y_mtx = mult(B.T, A)
self.lhood = lhood_curve
def _execute(self, x):
return mult(x-self.mu, self.E_y_mtx)
def is_invertible(self):
return False
def generate_input(self, len_or_y=1, noise=False):
"""
Generate data from the prior distribution.
If the training phase has not been completed yet, call stop_training.
Input arguments:
len_or_y -- If integer, it specified the number of observation
to generate. If array, it is used as a set of samples
of the latent variables
noise -- if True, generation includes the estimated noise
"""
self._if_training_stop_training()
# set the output dimension if necessary
if self.output_dim is None:
# if the input_dim is not defined, raise an exception
if self.input_dim is None:
errstr = ("Number of input dimensions undefined. Inversion"
"not possible.")
raise NodeException(errstr)
self.output_dim = self.input_dim
if isinstance(len_or_y, int):
size = (len_or_y, self.output_dim)
y = self._refcast(mdp.numx_rand.normal(size=size))
else:
y = self._refcast(len_or_y)
self._check_output(y)
res = mult(y, self.A.T)+self.mu
if noise:
ns = mdp.numx_rand.normal(size=(y.shape[0], self.input_dim))
ns *= numx.sqrt(self.sigma)
res += self._refcast(ns)
return res
|