#!/usr/bin/env python
# (C) 2000 Huaiyu Zhu <hzhu@users.sourceforge.net>. Licence: GPL
# $Id: kalman.py,v 1.3 2000/10/04 22:27:36 hzhu Exp $
"""
DynSys.kalman : Defines LinearSystem with kalman filtering
System description:
state: x+ = A*x + u + w
measurement: z = H*x + v
Variables:
state: x
input: u
measurement: z
process noise: w = Q2*randn ~ N(0, Q)
measurement noise: v = R2*randn ~ N(0, R)
precision: S = <x, x>^-1
variance: V = <x, x>
dim of system: n
dim of measurement: m
Kalman filtering:
Random walk increases prior variance
S_ = (A * V * A.T + Q).I
x_ = A * x + u
Measurement increases posterior precision
V = (S_ + H.T * R.I * H).I
x = x_ + V * H.T * R.I * (z - H * x_)
Note: x is used for
system state - in evolve(), measure()
estimate state - in filter()
Open question: Why A.T*V*A appears to give a little better result?
"""
from MatPy.Matrix import eye,zeros,norm
from MatPy.Stats.distribs import randn
class LinearSystem:
""" LinearSystem: with system process, measurement process and filtering
Usage: (see also example in the self test block at end)
next state: x, u = evolve(x)
measurement: z = measure(x)
posterior est: tx = filter(z, u)
Applications are likely to override some of the methods:
init_params, init_state, system_params, measure_params
evolve, measure, filter
"""
def init_params(self, n=1, m=1):
""" sets system parameters - override in apps """
self.n = n
self.m = m
self.A = randn((n,n))/n;
self.Q2 = q = randn((n,n))/n*.1
self.Q = q * q.T
self.H = randn((m,n))/m
self.R2 = r = randn((m,m))/m
self.R = r * r.T
self.RI = self.R.I
return self
def init_state(self):
""" set initial system states - override in apps """
n = self.n
self.x = zeros((n,1))
self.V = eye(n)
return self
def system_params(self):
""" one-step system parameters - override in apps """
return self.A, self.Q, self.Q2, self.n
def measure_params(self):
""" one-step measurement parameters - override in apps """
return self.H, self.RI, self.R2, self.m
def evolve(self, x, w=None):
""" simulated system evolution - override in apps """
A, Q, Q2, n = self.system_params()
if w is None: w = Q2 * randn((n,1))
u = randn(self.x.shape)
x_ = A * x + u + w
return x_, u
def measure(self, x, v=None):
""" simulated measurement - override in apps """
H, RI, R2, m = self.measure_params()
if v is None: v = R2 * randn((m,1))
z = H * x + v
return z
def filter(self, z, u):
""" Kalman filtering: return estimate x, given measure z, input u
"""
x, V = self.x, self.V
# prior esitmate
A, Q, Q2, n = self.system_params()
S_ = (A * V * A.T + Q).I
x_ = A * x + u
# posterior estimate
H, RI, R2, m = self.measure_params()
V = (S_ + H.T * RI * H).I
x = x_ + V * H.T * RI * (z - H * x_)
self.x, self.V = x, V
return x
if __name__ == "__main__":
""" Simulate linear system with Kalman filtering """
# system and mesurement dimensions
n = 4
m = 5
linsys = LinearSystem().init_params(n=n, m=m).init_state()
x = randn(linsys.x.shape)
e = []
for i in range(300):
x, u = linsys.evolve(x)
z = linsys.measure(x)
tx = linsys.filter(z, u)
# print state, estimate or their distance
e.append(norm(x-tx))
if n < 6: print x.T, tx.T, "%.4g" % e[i]
else: print e[i]
# Plotting errors
from MatPy.gplot import Gplot,wait
from MatPy.Matrix import Matrix
g = Gplot()
g.plot([Matrix(e)])
wait()
|