Adding State/Sensor Support (Python)

While it is possible to add support for new states and measurements directly within the NavToolkit C++ source code, sometimes it is more convenient to develop such modules separately in Python. NavToolkit includes Python bindings for many of its classes.

Custom Python Classes

NavToolkit allows Python classes to inherit from several C++ classes.

To add support for a new sensor, the user can define a new StateBlock to add new states to the filter, and a new MeasurementProcessor to support the measurement inputs. A new StateBlock must implement the methods that describe a set of one or more states and how they propagate through time, while a new MeasurementProcessor must implement the methods that describe how a measurement relates to one or more StateBlocks.

Creating a Python StateBlock

In order to create the Python equivalent to the BiasBlock.hpp we created in Adding State/Sensor Support (C++), BiasBlock.py, we’ll write:

from navtk.filtering import StateBlock, StandardDynamicsModel

from aspn23_xtensor import to_seconds


class BiasBlock(StateBlock):
    def __init__(self, label):
        StateBlock.__init__(self, num_states=1, label=label)

    def clone(self):
        return BiasBlock(self.label)

    # This function takes in the current state estimate and time
    # and produces the needed dynamics equation parameters (g, Phi, Qd)
    def generate_dynamics(self, xhat, time_from, time_to):

        # Define the g(x) propagation function as g(x_(k)) = x_(k-1)
        def g(x):
            return x

        delta_time = to_seconds(time_to - time_from)

        Phi = [[1]]
        Qd = [[delta_time]]

        return StandardDynamicsModel(g, Phi, Qd)

Creating a Python MeasurementProcessor

Note that the location of the NavToolkit shared library needs to be added to the path in order to import NavToolkit modules. In this case we’re assuming the library resides in the subdirectory build.

So far we’ve created a new StateBlock that can be added to a fusion engine like any other StateBlock can. The next step is to implement a MeasurementProcessor which uses our new StateBlock. We can look at the interface for MeasurementProcessor and follow similar steps as in Adding State/Sensor Support (C++) to create BiasMeasurementProcessor.py:

from navtk.filtering import MeasurementProcessor, StandardMeasurementModel


class BiasMeasurementProcessor(MeasurementProcessor):
    def __init__(self, label, state_block_label):
        MeasurementProcessor.__init__(self, label, state_block_label)
        self.measurement_matrix = [[1.0]]

    def generate_model(self, meas, gen_X_and_P_func):
        z = meas.estimate
        H = self.measurement_matrix
        R = meas.covariance
        return StandardMeasurementModel(z, H, R)

    def clone(self):
        BiasMeasurementProcessor(self.label, self.state_block_labels[0])

Using the StateBlock and MeasurementProcessor with a Fusion Engine

Then we can create a script which uses both the StateBlock and the MeasurementProcessor in a fusion engine. We’ll create bias_example_with_update.py:

#!/usr/bin/env python3

from BiasBlock import BiasBlock
from BiasMeasurementProcessor import BiasMeasurementProcessor

from navtk.filtering import StandardFusionEngine, GaussianVectorData

from aspn23_xtensor import to_type_timestamp


def bias_example_with_update():
    '''
    Example showing use of custom state block and measurement processor
    for a simple bias-estimating situation.
    '''
    # Create our state block, measurement processor, and fusion engine
    # instance. All together these act as a navigation filter.
    block = BiasBlock("mybiasblock")
    processor = BiasMeasurementProcessor("myprocessor", "mybiasblock")
    engine = StandardFusionEngine()

    # Add the block and processor to the engine/filter
    engine.add_state_block(block)
    engine.add_measurement_processor(processor)

    # Get and print the initial state estimate and covariance
    out = engine.get_state_block_estimate("mybiasblock")
    out_cov = engine.get_state_block_covariance("mybiasblock")
    print("Initial:")
    print("The state estimate is {}".format(out))
    print("The state covariance is {}\n".format(out_cov))

    # Propagate to 0.1 seconds
    engine.propagate(to_type_timestamp(0.1))

    # Get and print the state estimate and covariance after propagation
    out = engine.get_state_block_estimate("mybiasblock")
    out_cov = engine.get_state_block_covariance("mybiasblock")
    print("After propagation:")
    print("The state estimate is {}".format(out))
    print("The state covariance is {}\n".format(out_cov))

    # Make a measurement at 0.1 seconds of sensed value 1.0 with cov 1.001
    time_validity = to_type_timestamp(0.1)
    measurement_data = [1.0]
    measurement_covariance = [[1.001]]
    raw_meas = GaussianVectorData(
        time_validity, measurement_data, measurement_covariance
    )

    # Update the filter estimate with the measurement
    engine.update("myprocessor", raw_meas)

    # Get and print the state estimate and covariance after the update
    out = engine.get_state_block_estimate("mybiasblock")
    out_cov = engine.get_state_block_covariance("mybiasblock")
    print("After update:")
    print("The state estimate is {}".format(out))
    print("The state covariance is {}".format(out_cov))


if __name__ == "__main__":
    bias_example_with_update()

If you’d like to try this example out you can find it (as well as the StateBlock and MeasurementProcessor) in the examples/bias_with_update directory. It can be run from the NavToolkit root folder via the command:

examples/bias_with_update/bias_example_with_update.py