Source code for pftracker.modules.filter.Filter

"""
Particle Filter class for solving an estimation problem defined 
by a model class.

@author: Bessie Domínguez-Dáger
"""

import numpy as np
from numpy.random import random
from filterpy.monte_carlo import multinomial_resample, stratified_resample, systematic_resample
from pftracker.modules.filter.pfAlgorithms import pfiltering

[docs]class ParticleFilter(): """ Run a particle filter algorithm for an estimation problem. This class creates particle filter estimates corresponding to a model class. This model class contains the calculation way of methods that vary with each model where to apply particle filter estimations. This model methods are: - initialization: Create initial particles distribution. - prediction: Predict next state of the particles. - update: Evaluate predicted particles with an observation model. - update_apf: Evaluate predicted particles for second stage weights in auxiliary particle filter algorithm. - visualize: Visualize the estimate resulting from particle filter algorithm. - saveEstimation: Save particle filter estimates into a .txt file. - saveOutputModel: Save the visualization produced by visualize method into a file. - get_error: Get the particle filter estimation error. - get_estimation: Get all the particle filter estimates. The ParticleFilter class interface model class with the PF algorithms and their methods: - filtering: Runs particle filter algorithm. - estimate: Returns the expected particle filter estimation. - resample: Runs resampling step. Args: model : Especific model where to apply the particle filter estimation algorithm (str): particle filter algorithm to apply N (int): Number of particles output (str): Output estimate method resample (str): Resampling method resamplePercent (int): Resampling percent robustPercent (int, optional): Particles percent to use in the robust mean estimation algorithm. """ def __init__(self, model, algorithm, N, output, resample, resamplePercent, robustPercent=None): self.model = model self.N = N # Filter parameters self.algorithm = algorithm self.arg_output = output if resample != None: self.arg_resample = resample if resamplePercent != None: self.resamplePercent = round(resamplePercent * self.N/100) if robustPercent != None: self.N_robust = round(robustPercent * self.N/100) self.estimation = None self.pfiltering = pfiltering(self.N)
[docs] def initialization(self): """Create intial particles distribution.""" particles = self.model.initialization() return particles
[docs] def prediction(self, particles): """Predict next state of the particles from time k-1 to k. Predicts the a priori pdf p(x_{k}|z_{k-1}) that describes particles distribution x_{k} using the dynamic model p(x_{k}|x_{k-1}) (state transition model). - x_{k}: state vector of particles at time k. - x_{k-1}: state vector of particles at time k-1. - z_{k-1}: observations at time k-1. This function is depedent on the model class. Args: particles (array): Model specific representation of particles x_{k-1}. Returns: 2-element tuple containing - **particles** (*array*): particles array x_{k} with (state_vector_size, N) dimension. - **uk** (*array*): u_{k} array with (state_vector_size, N) dimension. This is a characterization of x_{k}|x_{k-1} (move particles from x_{k-1} to x_{k} without include process noise). """ particles, uk = self.model.prediction(particles) return particles, uk
[docs] def update(self, particles): """Evaluate predicted particles x_{k} with an observation model. This function is depedent on the model class. Returns: (array) of likelihoods of predicted particles x_{k}, with (1, N) dimension. This is p(z_{k}|x_{k}). """ likelihoods = self.model.update(particles) return likelihoods
[docs] def update_apf(self, particles): """Evaluate predicted particles x_{k}^{idx} with an observation model. Evaluate predicted particles x_{k}^{idx} for the second stage weights of auxiliary particle filter algorithm with an observation model. Here idx are the indixes resulting from resampling of the first stage weigths of auxiliary particle filter algorithm. This function is depedent on the model class. Args: particles (array): predicted particles array x_{k}^{idx} with (state_vector_size, N) dimension Returns: (array) of likelihoods of particles x_{k}^{idx}, with (1, N) dimension. This is p(z_{k}|x_{k}^{idx}). """ likelihoods = self.model.update_apf(particles) return likelihoods
[docs] def filtering(self, pf, particles): """Runs particle filter algorithm. Args: pf (ParticleFilter): ParticleFilter class object particles (array): particles at time k-1 Returns: (array) of particles x_{k} with (state_vector_size,N) dimension """ if self.algorithm == "SIS": # Run Sequential Importance Sampling filter particles = self.pfiltering.SIS(pf, particles) elif self.algorithm == "SIR": # Run Sequential Importance Sampling filter particles = self.pfiltering.SIR(pf, particles) elif self.algorithm == "G_PF": # Run Generic particle filter particles = self.pfiltering.G_PF(pf, particles, self.resamplePercent) elif self.algorithm == "APF": # Run Auxilliary particle filter particles = self.pfiltering.APF(pf, particles) return particles
[docs] def resample(self, weights): """Runs resampling step. Resampling methods supported: systematic, stratified, residual and multinomial. Args: weights (array): particle weigths after update process Returns: (array) of indixes resulting from resampling with (1,N) dimension """ def residual_resample(weights): """ This was taken from filterpy package and some changes were made. Performs the residual resampling algorithm used by particle filters. For more documentation see https://filterpy.readthedocs.org """ N = len(weights) indexes = np.zeros(N, 'i') # take int(N*w) copies of each weight, which ensures particles with the # same weight are drawn uniformly num_copies = (np.floor(N*np.asarray(weights))).astype(int) # num_copies = (N*np.asarray(weights)).astype(int) # scaled_weights = N*np.asarray(weights) k = 0 for i in range(N): # change done here in num_copies[i][0] for _ in range(np.array(num_copies[i][0])): # make n copies indexes[k] = i k += 1 # use multinormal resample on the residual to fill up the rest. This # maximizes the variance of the samples residual = weights - num_copies # get fractional part # residual = scaled_weights - num_copies residual /= sum(residual) # normalize cumulative_sum = np.cumsum(residual) cumulative_sum[-1] = 1. # avoid round-off errors: ensures sum is exactly one indexes[k:N] = np.searchsorted(cumulative_sum, random(N-k)) return indexes if self.arg_resample == "systematic": indixes = systematic_resample(weights) elif self.arg_resample == "stratified": indixes = stratified_resample(weights) elif self.arg_resample == "residual": indixes = residual_resample(weights) elif self.arg_resample == "multinomial": indixes = multinomial_resample(weights) return indixes
[docs] def estimate(self, particles, weights): """Returns the expected particle filter estimation. Args: particles: predicted particles x_{k} weights: particle weigths after update process """ if self.arg_output == "weighted_mean": # Calculate weighted mean estimate self.estimation = np.sum(particles * weights, axis=1) elif self.arg_output == "MAP": # Calculate maximum weight idx = np.argmax(weights, axis=1) self.estimation = particles[:, idx] self.estimation = self.estimation.reshape((6,)) elif self.arg_output == "robust_mean": # Calculate robust mean estimate # sort weights in descendent order weights_sort = -np.sort(-weights) # get index of sorted weights idx_sort = np.argsort(-weights) # normalize sorted weights weights_norm = weights_sort[0, :self.N_robust] / np.sum(weights_sort[0, :self.N_robust]) # sort particles by weight particles_sort = particles[:, idx_sort[0, :self.N_robust]] self.estimation = np.sum(particles_sort * weights_norm, axis=1)
[docs] def visualize(self, particles): """Visualize the estimate resulting from particle filter algorithm. This function is depedent on the model class. Args: particles (array): particles array x_{k} with (size_v, N) dimension """ self.model.visualizations(self.estimation, particles)
[docs] def saveEstimation(self, file_name): """Save particle filter estimates into a .txt file. This function is depedent on the model class. """ self.model.saveEstimation(file_name)
[docs] def saveOutputModel(self): """Save the visualization produced by visualize method into a file. This function is depedent on the model class. """ self.model.saveOutputVideo()
[docs] def get_error(self, gt_file): """Returns the particle filter estimation error. This function is depedent on the model class. Args: gt_file (str): path to ground truth .txt file """ return self.model.calc_error(gt_file)
[docs] def get_estimation(self): """Get all the particle filter estimates.""" return self.model.getEstimation()