pntOS Python API

Python API of pntOS.

class pntos.api.CommonPlugin

Bases: ABC

Common definitions that all plugins must provide.

This structure should not be used directly (except in the case of a utility plugin), but instead is composed as the first field on all of the concrete pntOS plugin structures. For example, the transport plugin is specified as:

class TransportPlugin(CommonPlugin, ABC):

    def init_plugin(...):
        ...init_plugin implementation...

    def ...other function implementations...

Thus this class defines a set of functions and variables that all plugins have. The pntos.api.CommonPlugin.init_plugin() function is guaranteed to be called by pntOS when the plugin is first loaded into memory by the system.

Example

When defining a new transport plugin, the plugin writer is responsible for implementing all fields on the pntos.api.TransportPlugin class. Thus, the fields of the pntos.api.CommonPlugin nested on the pntos.api.TransportPlugin are implemented by the plugin writer.

Variables:

identifier (str) – A string identifier uniquely identifying this plugin. This string will be used to determine the unique space this plugin receives in the system config.

abstract init_plugin(plugin_resources_location=None, mediator=None)

The first plugin method called; initializes the plugin.

A function that will be called by pntOS once and only once when it first initializes the plugin before any other functions on the plugin are called. Here the plugin may do dynamic runtime initialization of its members, and is given the full path to the location of a data folder specific to the plugin, in case it needs to acquire additional files. An instance of pntos.api.Mediator will be passed which the plugin should save off for later use. Whenever the plugin needs to make a request of pntOS, it should use one of the fields in the pntos.api.Mediator instance received by the plugin in this function call.

Note

Implementation note: This inversion of control allows the controller to implement the pntos.api.Mediator class, and abstracts away the return communication channel from the plugin to the rest of the system. Thus, the plugin need only implement pntos.api.Mediator by simply saving a copy of the functions that the controller passes into it. Then, when the plugin later needs to make requests of the system, it may call a function in its copy of pntos.api.Mediator, without needing any knowledge of how the controller implemented pntos.api.Mediator. This allows controllers to implement arbitrary concurrency models, including single-threaded, multi-threaded, multi-process, and distributed computing.

Parameters:
  • plugin_resources_location (str | None, optional) – Specifies the location of the plugin’s resources. The location is determined by the controller plugin, and therefore is controller implementation specific. Plugin implementers wishing to provide a resource to their plugin should consult the documentation of the controller to determine which location scheme will be passed into this function.

  • mediator (Mediator | None, optional) – None-able if the plugin type being initialized is a pntos.api.ControllerPlugin. Non-controller plugins may assume that the mediator parameter is not None.

abstract shutdown_plugin()

A function that will be called by pntOS when it is done using the plugin.

Here the plugin should release any resources it has acquired. When this function call returns pntOS may only call the destructor function (it will not call any other functions of this plugin). The plugin may not call any function on any other plugin, mediator, or use any resource that was given to it by pntOS after it returns from this function.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

class pntos.api.EstimateWithCovariance(type, estimate, covariance)

Bases: object

A container for holding an estimate and covariance.

Variables:
  • type (EstimateWithCovarianceType) – Describes how the fields in this struct are used.

  • estimate (NDArray[float64]) – An array of doubles representing an estimate vector. Usage depends on the type field.

  • covariance (NDArray[float64]) – An array of doubles representing a square covariance matrix. Data is stored in row major form. Usage depends on the type field.

estimate

An estimate vector.

covariance

A covariance matrix, describing the errors in the estimate.

class pntos.api.EstimateWithCovarianceType(value)

Bases: IntEnum

Describes how the fields in pntos.api.EstimateWithCovariance are used.

EWC_GENERIC = 0

Contains a mean (estimate) and covariance describing a multivariate Gaussian distribution.

EWC_ATTITUDE_QUAT = 1

Contains a mean (estimate) and covariance describing a rotation modeled by a multivariate Gaussian distribution, but the estimate is in quaternion form and the covariance is in tilt error form.

class pntos.api.KeyValueStore

Bases: AbstractContextManager[KeyValueStore]

A key-value store implemented with a string-pair key.

This key-value store is intended to function as an expanded python dictionary.

Each value can be looked up by an associated key (string). Values can be of any type specified by RegistryValueType/RegistryValueTypeUnion.

Example

For example, to store a string value “foo” in the key-value store under the key “k1”, one would write either of the following:

store.set_value("k1", "foo")
store["k1"] = "foo"

At this point, the key-value store would have recorded the value into its internal data storage. Later a user could call either of these lines:

foo = store.get_value("k1", str)
foo = store["k1"]

to retrieve the value at key “k1”.

The advantage of get_value is that if the conversion is possible, the user can specify a different type to return than the type that was originally stored in the key-value store. For example, these two lines demonstrate a user saving an integer to the key-value store and then retrieving it as a string:

store["k2"] = 42
val = store.get_value("k2", str) # val now equals "42"

In general, a pntos.api.KeyValueStore is generated by a pntos.api.Registry and not directly by other code. The pntos.api.Registry will return key/value stores on demand, utilizing the data backing store chosen by the plugin to store data (either ephemerally in memory or permanently in persistent storage). In general, it is only valid to call the getters/setters on a pntos.api.KeyValueStore during a batch operation. See pntos.api.Registry for more information.

Variables:

data_format (KeyValueStoreDataFormat) – The data format that is used by the set_raw() and get_raw() methods.

abstract keys()

Get the array of keys which currently exist in this store.

Returns:

Returns the keys in the store or None if no keys are present.

Return type:

list[str] | None

abstract __contains__(key)

Check whether or not a given key exists in the store.

Particularly, this function allows the user to use python “if in” statements:

if key in store:
    ...
Parameters:

key (str)

Returns:

bool

abstract get_value(key, value_type)

Get the value stored at key with return type value_type.

Example

For example, to access altitude in a pntos.api.KeyValueStore named kv_store as an integer:

altitude = kv_store.get_value("altitude", int)
Parameters:
Returns:

Returns None if the key is not available. The return is guaranteed to not be None if called with a valid key (which can be checked with pntos.api.KeyValueStore.__contains__()) and if the store can convert the value to the requested type.

Return type:

pntos.api.RegistryValueType | None

abstract __getitem__(key)

Gets an item in the key-value store.

This function enables python bracket indexing to return a value from the store for a given key.

Example

For example:

if key in store:
    foo = store[key] # foo is now the value stored at key
Parameters:

key (str)

Returns:

This is guaranteed to return a value of the same type as get_type() returns for the given key, if the key exists. If get_type() returns None (indicating type information is not available), then this method will only return values as strings or Messages.

Return type:

RegistryValueTypeUnion | None

abstract get_raw(key=None)

Get the value for the given key as an array of bytes.

Parameters:

key (str | None, optional)

Returns:

The return format will conform to the definition in pntos.api.KeyValueStore.data_format. Returns None if the given key is not available. The return is guaranteed to not be None if called with a valid key, which can be checked with pntos.api.KeyValueStore.__contains__():

if key in store:
...

If key is None, then this function will return all of the keys and values in the group passed to pntos.api.Registry.batch_start() and will be formatted to conform to keys and values as defined in pntos.api.KeyValueStore.data_format.

Return type:

bytes | None

abstract set_value(key, value)

Set the given key to the provided value.

Parameters:
abstract __setitem__(key, value)

Set an item in the key-value store.

Same functionality as set_value(), but allows python bracket indexing to set values in the store.

Example

For example:

store["key"] = 42
Parameters:
abstract set_raw(key, bytes)

Set the given key to the provided value.

Parameters:
abstract remove_key(key)

Remove the given key from the registry.

Parameters:

key (str)

Returns:

True if key is successfully removed, and False otherwise. Keys may fail to be removed if the key does not currently exist, or the backend is unable to remove the key.

Return type:

bool

abstract batch_end()

Ends a batch operation.

Ends a batch operation started with a pntos.api.Registry.batch_start() call. After calling this, the user should not use the pntos.api.KeyValueStore they received from pntos.api.Registry.batch_start() again without calling pntos.api.KeyValueStore.batch_restart() on the pntos.api.KeyValueStore.

If keys in the batch were acted upon with set_permanent() turned on, and the plugin supports permanent storage, this call will save changes to permanent storage if set_permanent() is True during the call to batch_end(). Enacts equivalent of set_permanent(self,false) before return. If any request_notify() observers have been added, they will be processed prior to this call returning.

Example

Example 1: Flushing to permanent storage on batch_end():

store = registry.batch_start("group")
...work...
store.set_permanent(true) # if not disabled, flush on batch_end
...work...
store.batch_end()    # will flush values

Example 2: Not flushing to permanent storage on batch_end():

store = registry.batch_start("group")
...work...
store.set_permanent(true)  # tag some values
...work...
store.set_permanent(false) # do not flush on batch_end
store.batch_end()          # will not flush values

In the second example above, values set with “set” methods after the initial set_permanent() call are still stored for potential saving to permanent storage.

abstract batch_restart()

Restarts a batch.

Restarts a batch that was previously started with pntos.api.Registry.batch_start() and subsequently ended with pntos.api.KeyValueStore.batch_end(). This method is likely much more efficient than pntos.api.Registry.batch_start() (depending on the registry implementation) as the pntos.api.Registry.batch_start() method must find the store again given the group name.

Note

While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call batch_end() as soon as possible after they are done getting/setting values in the returned pntos.api.KeyValueStore.

abstract request_notify(key, callback)

Register a callback which gets called each time a key in the store is updated.

Allows plugins to respond asynchronously to parameter updates. Returns True if the notifier was successfully registered, and False if the store is unable to notify the requester. If key is None, then the callback will be invoked when any key in the batch’s group is modified. Otherwise, the callback will only be invoked when the given key is modified.

Note

The callback must not attempt to set any values inside the pntos.api.KeyValueStore, as the callback is likely being invoked during the processing of another operation. The callback should endeavour to store off the updated keys/values as quickly as possible and return, leaving the processing of the updates to another context or thread when possible. Calling pntos.api.Mediator within the callback may be disallowed by the controller implementation and lead to undefined behavior.

Parameters:
  • key (str | None)

  • callback (Callable[[str, list[str], KeyValueStore], None]) –

    A function with a function definition compatible with:

    def my_callback(group: str, modified_keys: list[str], kv: KeyValueStore) -> None:
        ...
    

Returns:

True if the notifier was successfully registered, and False if the store is unable to notify the requester.

Return type:

bool

abstract remove_notify(key, callback)

Removes a notification as requested by request_notify().

The group and callback must match the parameters passed to request_notify() in order to successfully remove a callback.

Note

This will remove all matching callbacks that have a matching group and callback. If a user registers the same callback twice this will remove both.

Parameters:
  • key (str | None)

  • callback (Callable[[str, list[str], KeyValueStore], None])

Returns:

True if removal was successful and False if it was not. False will be returned if a callback did not exist for the group.

Return type:

bool

abstract __delitem__(key)

Delete the item with the specified key.

Must remove the given key and the associated value from the store, along with any callbacks or permanency settings.

This enables the python del operator.

Example

For example:

for key in keys_to_remove:
    del store[key]
Parameters:

key (str)

abstract __iter__()

Return an iterator over the keys in the store.

This allows for easy iteration over the key-value store.

Example

For example:

for key in store:
    ...
Returns:

Iterator

abstract __len__()

Return the number of items in the store.

This allows the use of python’s len function.

Example

For example:

num_elements = len(key_val_store)
Returns:

int

abstract clear()

Remove all items from key-value store.

abstract set_permanent(permanent)

Tag values modified with “set” methods as permanently stored.

Configure the pntos.api.KeyValueStore to tag values modified with “set” methods as permanently stored (as opposed to ephemerally stored in memory). Only values acted upon with “set” methods while set_permanent() is True will be tagged. Values will be flushed according to registry configuration settings or per batch_end() API. Returns the value of the permanent storage configuration. Callers should check this to verify if the set was successful.

Example

Tagging specific keys to be permanently stored:

store: PntosKeyValueStore = registry.batch_start("group")
store.set_value("key1",1234.56) # does not tag this value as permanently stored
store.set_permanent(True)       # start tagging set calls as permanently stored
store.set_value("key1",987.65)
store.set_value("key2",123)     # both key1 and key2 values tagged
store.set_permanent(False)      # disable permanent storage
store.set_value("key1",456.78)  # key1 = 456.78 is value of key1 in store
                                # key1 = 987.65 tagged to be permanently stored
                                # key2 = 123    tagged to be permanently stored
Parameters:

permanent (bool)

Returns:

The value of the permanent storage configuration. Callers should check this to verify if the set was successful.

Return type:

bool

abstract values()

Returns a list of the key-value store’s values.

This method is useful for unloading all values from a key-value store.

Example

For example:

def get_mean_of_kv_store(kv: KeyValueStore) -> float | None:
    '''Returns the mean of a key-value store if all values are int or float'''
    values = kv.values()
    if all(isinstance(x, (int, float)) for x in values):
        return sum(values) / len(values)
    return None
Returns:

A list of all values in the store.

Return type:

list[RegistryValueTypeUnion]

abstract items()

Returns a list of the items (key-value pairs) in the store.

This method is useful for when you need to iterate over both keys and values in a dictionary.

Example

For example:

for key, value in my_dict.items()
    printf('Key: {key}, Value: {value}')
Returns:

A list of the key-value pairs (as tuples) in the store.

Return type:

list[tuple[str, RegistryValueTypeUnion]]

abstract get_type(key)

Returns the type of a value in the KeyValueStore.

This is helpful for situations when it is advantageous to know the type of a value in the store without need for the value itself.

Example

For example, it might be useful for these situations where different callbacks are needed depending on what type is in the store:

if key in kv_store:
    val_type = kv_store.get_type(key)
    if val_type is int:
        kv_store.request_notify(key, int_callback)
    elif val_type is Message:
        kv_store.request_notify(key, message_callback)
    else:
        kv_store.request_notify(key, generic_callback)
Parameters:

key (str)

Returns:

If key exists in registry, this is guaranteed to return the return type of pntos.api.KeyValueStore.__getitem__() for the given key. Else, it returns None if type information is not available or key does not exist in registry.

Return type:

type[RegistryValueTypeUnion] | None

data_format

Defines the underlying format which the data in the key-value store will be stored as.

class pntos.api.KeyValueStoreDataFormat(value)

Bases: IntEnum

The format of data returned or expected.

An enum that specifies the format of data returned/expected in the pntos.api.KeyValueStore.get_raw() and pntos.api.KeyValueStore.set_raw() methods. This value is otherwise unused when querying a key-value store.

INI = 0

Keys and their corresponding values are returned according to the INI file format specification.

UNSPECIFIED = 1

An opaque type that is undefined by the implementer.

class pntos.api.LoggingLevel(value)

Bases: IntEnum

An enumeration of the types of log outs that are available in pntOS.

ERROR = 0

This output indicates the program has entered an error state, and likely needs to be inspected to discover what went wrong.

WARN = 1

This output is designed to warn of a possibly unintended state that may be harmless or be indicative of a bug.

INFO = 2

This output is designed to be informational, and may indicate correct operation.

DEBUG = 3

This output is designed to assist in debugging plugins by providing additional information about state and behavior which would be otherwise unnecessary.

class pntos.api.Mediator

Bases: ABC

A set of callbacks which are handed to a pntOS plugin upon initialization.

When a plugin is first initialized into pntOS, it is guaranteed that the plugin will be passed an instance of this struct via an invocation of pntos.api.CommonPlugin.init_plugin() (See pntos.api.CommonPlugin for more information). The plugin may then use the set of function calls in this class to make requests of the pntos.api.ControllerPlugin.

All of the functions on this class (and any returned values from those functions) are guaranteed to be thread-safe for use by all plugins. Thus, after a pntOS plugin has received a copy of a pntos.api.Mediator it can freely call the functions contained therein without doing any explicit locking. This thread safety is implemented by the pntos.api.ControllerPlugin when it creates the mediator before passing them to other plugins.

Callers must still take care to only call functions in pntos.api.Mediator which they are not themselves responsible for implementing. The details of which plugins are used in the implementation of any particular function on this struct is decided by the pntos.api.ControllerPlugin, and thus is implementation specific to the pntos.api.ControllerPlugin used.

Variables:

registry (Registry) – A pntos.api.Registry object that can be used to update keys/values in the pntOS global registry.

abstract property filter_description_list

Request a list of strings describing the solutions available.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • Strings should include a substring indicating the type of solution returned. This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

If the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the solutions available.

Return type:

list[str]

abstract request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The times at which to return solutions.

  • filter_description (str | None, optional) – To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. Some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message array may be None if filter_description is invalid.

Return type:

list[Message | None] | None

abstract process_pntos_message(message)

Send a new message to the system for arbitrary processing.

For example, this function is useful for plugins who have just received new sensor data that they wish to relay to the system to be used in a sensor fusion solution.

Parameters:

message (Message)

abstract broadcast_aspn_message(message, transport=None, destination_identifier=None)

Request that pntOS broadcast the provided message out to the network.

Parameters:
  • message (Message)

  • transport (str | None, optional) – The identifier of a transport plugin that the message should be routed to. The transport parameter should match the pntos.api.CommonPlugin.identifier string of a pntos.api.TransportPlugin active in the system. If the transport parameter is None, this indicates that the message should be broadcast to all available transports.

  • destination_identifier (str | None, optional) – A transport-specific identifier that allows transports to determine how to route the message. If the destination transport has the concept of a channel or topic, destination_identifier should be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner defined by the destination transport. If destination_identifier is None, then the transport should output the message in the “default” output channel/topic and route being used by pntOS.

abstract log_message(level, message)

Log a message.

Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).

Parameters:
class pntos.api.Message(wrapped_message, source_identifier)

Bases: object

A container for an ASPN message.

This container may contain either proper ASPN messages which are part of the ASPN data model, or extension messages specific to pntOS which augment ASPN.

Variables:
  • wrapped_message (AspnBase) – Either an ASPN message or a pntOS ASPN Extension message.

  • source_identifier (str) – Indicates where this message came from. If the message originated from a pntos.api.TransportPlugin and the underlying transport has the concept of a channel or topic, this field should be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner by the originating plugin that created the message.

class pntos.api.Registry

Bases: ABC

A registry of key/value data which is organized by (string) groups.

In order to get/set a key in the registry, one must call pntos.api.Registry.batch_start() with the group the key is stored under and then use the resulting pntos.api.KeyValueStore to get/set the key/value pair. When one is done accessing keys in the pntos.api.KeyValueStore, they must call pntos.api.KeyValueStore.batch_end(). It is not permitted to access any member inside the pntos.api.KeyValueStore after a batch has ended. If a user has ended a batch and then desires to access the pntos.api.KeyValueStore again, they may use the pntos.api.KeyValueStore.batch_restart() method.

abstract batch_start(group)

Begin a batch get/set operation.

Begin a batch get/set operation wherein the user may make any number of modifications to the keys/values in the group. The registry implementation may wait to batch these requests until pntos.api.KeyValueStore.batch_end() is called for better performance. For example, a lock may be obtained at the beginning of a batch_start() and not released until a pntos.api.KeyValueStore.batch_end() call is encountered. Thus, a plugin that calls batch_start() should endeavour to make its calls to the set_, get_, and register methods as quickly as possible and call pntos.api.KeyValueStore.batch_end() immediately, as doing otherwise may be locking other plugins out of access to the registry (depending on the registry plugin implementation). If a plugin supports pntos.api.KeyValueStore.request_notify(), then notifications of updates may be suspended until the batch ends. After a batch is ended, the returned pntos.api.KeyValueStore can still be used to access the store via pntos.api.KeyValueStore.batch_restart().

Note

While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call pntos.api.KeyValueStore.batch_end() as soon as possible after they are done getting/setting values in the returned pntos.api.KeyValueStore.

Parameters:

group (str)

Returns:

KeyValueStore

abstract property group_array

Get the array of groups which currently exist.

Returns:

The array of groups which currently exists. Returns None if no groups exist.

Return type:

list[str] | None

abstract has_group(group)

Checks whether or not a given group has had any values added to it (for any key).

Parameters:

group (str)

Returns:

bool

abstract request_notify_new_group(callback)

Register a callback which gets called each time a new group is made in the registry.

Parameters:

callback (Callable[[str], None])

Returns:

True if the notifier was successfully registered, and False if the registry is unable to notify the requester.

Return type:

bool

class pntos.api.ControllerPlugin

Bases: CommonPlugin, ABC

Controller plugin.

An implementation of a primary controller in charge of defining the usage of all other pntOS plugins.

In ordinary operation, an app will import plugins, initialize a controller plugin, and then call pntos.api.ControllerPlugin.take_control() on the controller plugin, passing in a list of plugins that it imported.

From that point forward, the controller is responsible for all activity. It may use any or none of the plugins in the plugins list as desired. In general, it may do anything it wants, within the following guidelines:

  • The controller should not load new plugins - it should work with the plugins passed to it in the take_control() parameters.

  • The controller must call pntos.api.CommonPlugin.init_plugin() on any plugin it wishes to use prior to using any other functionality on that plugin. The controller must pass a pntos.api.Mediator that the plugin may use to communicate back to the controller. This callback design allows the controller to abstract away the concurrency model - in particular, a callback function may be anything from a direct invocation of a Python function to a shim that utilizes IPC channels to communicate to process-separated or machine-separated plugins.

Outside of these requirements, the controller defines any and all I/O it supports, which pntOS plugins are loaded / used, and the type of fusion being done. The controller can be hard-coded to support only a specific sensor configuration or written generically to support arbitrary run-time environment sensing. Outside of some initialization in the app, the controller is the conceptual “main” function.

When the controller is provided a pntos.api.PlatformIntegrationPlugin as one of the plugins in the plugins list passed to take_control(), that indicates to the controller that platform-specific control logic exists and must be used. In this case, the controller’s primary objective’s are to:

  • Pick the concurrency model being used (multiprocessed, multithreaded, coroutines, single-threaded, etc.).

  • Spin up the concurrency primitives to implement the chosen concurrency type.

  • Provide a pntos.api.Mediator to all plugins in their pntos.api.CommonPlugin.init_plugin() call that enables inter-plugin communication.

After that task is accomplished, the controller should call the pntos.api.PlatformIntegrationPlugin.take_control() function and allow the pntos.api.PlatformIntegrationPlugin (PIP) to actively call functions on the plugins list. Once the pntos.api.PlatformIntegrationPlugin.take_control() has been called, the controller must lock/synchronize the controller’s and the PIP’s access to function calls in the plugins list, such that any action the PIP might take will not interfere with the controller’s actions (or, equivalently, actions that the mediator provided by the controller is taking). One simple approach to synchronizing access to the plugins list across the controller and PIP is for the plugins list provided to the PIP to be a facade that actually routes messages through to the mediator, such that the mediator handles all requests and can synchronously dispatch them. More efficient approaches exist, however, it is the responsibility of the controller to make sure that all accesses the PIP makes to resources passed to it are synchronized with respect to the controller’s chosen concurrency primitive.

abstract take_control(plugins, plugin_resources_locations=None, initial_config=None)

Takes over primary control of the program from the app.

Takes over primary control of the program from the app, using the plugins to process data, generate fused estimates, and ultimately produce and output PNT solutions. take_control() must use the plugins passed to it and construct a full pntOS system. Please see the description of the pntos.api.PlatformIntegrationPlugin (PIP) for a description of duties of this function compared to the duties of the pntos.api.PlatformIntegrationPlugin.take_control() method (if a PIP is in the plugins list).

Parameters:
  • plugins (list[CommonPlugin]) – An array of plugins available to the controller.

  • plugin_resources_locations (list[str | None] | None, optional) – A list of strings which represent a list of locations, one for each plugin in plugins, where those plugins may find auxiliary data if needed. The array can be None, otherwise the array is of the same length as plugins. Each string may be None, filesystem paths, or adhere to some scheme, such as the URI scheme, for defining the resource’s location.

  • initial_config (str | None, optional) –

    Represents a source of values to initialize the primary pntOS configuration. This could be None or a str. Examples of possible values for the latter are:

    1. The entire config.

    2. A local file path on systems which support them.

    3. A string adhering to the URI scheme.

class pntos.api.CrossCovariances(block_labels, cross_covariances)

Bases: object

A container for a set of covariances relating a StateBlock to one or more other StateBlocks.

For example, suppose that some StateBlock named A existed containing a states. Then this structure could define the cross covariance of A with respect to other StateBlocks named B (with b states) and C (with c states). In that case, block_labels would be an array of 2 strings B and C, and cross_covariances would be an array of two matrices: the cross-covariance matrix of B and A, with shape [b, a] and the cross-covariance matrix of C and A with shape [c, a].

Variables:
  • block_labels (list[str]) – A list of labels of the pntos.api.StandardStateBlock this structure contains the cross-covariances for.

  • cross_covariances (list[NDArray[float64]]) – A list of cross-covariance matrices between a single StateBlock and the set of StateBlocks listed in block_labels.

block_labels

The labels of the state blocks that the entries in cross_covariances relate to.

cross_covariances

The covariances between the state blocks.

class pntos.api.FusionPlugin

Bases: CommonPlugin, ABC

Plugin that provides a fusion engine.

A fusion engine allows data from multiple sensors to be integrated into a unified state estimate.

abstract is_fusion_type_supported(fusion_type)

Check if the plugin supports a given type of fusion.

Parameters:

fusion_type (type[FusionEngineType])

Returns:

bool

abstract new_fusion_engine(fusion_type)

Create a fusion engine.

Parameters:

fusion_type (type[FusionEngineType]) – This parameter specifies the type of fusion engine that will be returned.

Returns:

The fusion_type parameter specifies the type of fusion engine that will be returned. For example, if the user passes in pntos.api.StandardFusionEngine, then an implementation of pntos.api.StandardFusionEngine will be returned. Returns None if fusion_type is not supported by this fusion plugin (is_fusion_type_supported() can be used to check the type before calling this method). Otherwise the return is guaranteed to not be None.

Return type:

FusionEngineType | None

class pntos.api.StandardFusionEngine

Bases: ABC

An implementation of a fusion engine that supports the standard fusion model.

Assumes the system is described by discrete-time matrices and noise inputs are zero-mean white Gaussian. In addition, all covariance matrices / mean vectors are descriptions of jointly-Gaussian multivariate distributions. All noise sources are jointly-Gaussian distributed.

This object requires a pntos.api.StandardFusionStrategy to work. Some implementations may be able to provide their own. Others will require a strategy to be provided by setting the StandardFusionEngine.strategy field. It is possible to check whether a fusion engine needs to be provided a fusion strategy by checking the StandardFusionEngine.strategy field (if it is None then this fusion engine needs to be provided a strategy). While StandardFusionEngine.strategy is None, all other methods are unsafe to be called.

Note

This class must have an operational __deepcopy__ method. For most classes, the default __deepcopy__ method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own __deepcopy__ (such as a C object wrapped to Python), then the class will need to implement a custom __deepcopy__ which properly copies all fields.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract property time

Get the current time of the filter.

Returns:

TypeTimestamp

abstract property strategy

The underlying algorithm used for Bayesian inference.

Returns:

The fusion strategy is the type of filter (EKF, UKF, etc.).

Return type:

StandardFusionStrategy | None

abstract property num_states

Get the total number of states currently in the fusion engine.

Virtual state blocks do not affect this result.

Returns:

The total number of states currently in the fusion engine.

Return type:

int

abstract property state_block_labels

Get a list of pntos.api.StandardStateBlock labels that have been added to this fusion engine.

Returns:

A list of the pntos.api.StandardStateBlock labels that have been added to this fusion engine. Returns None if no state blocks have been added. Guaranteed to not return None if num_states is a value other than 0.

Return type:

list[str] | None

abstract add_state_block(block, initial_estimate_covariance, cross_covariances=None)

Add the given pntos.api.StandardStateBlock to the fusion engine.

This will expand the state vector being estimated by the value of num_states.

Parameters:
  • block (StandardStateBlock) – The pntos.api.StandardStateBlock to be added to the fusion engine.

  • initial_estimate_covariance (EstimateWithCovariance) – Contains the initial conditions of the states, with initial_estimate_covariance.estimate being an Nx1 matrix and initial_estimate_covariance.covariance being an NxN matrix, where N is block.num_states.

  • cross_covariances (CrossCovariances | None, optional) – An optional parameter which, if non-None, contains a description of the newly added StateBlock’s cross covariances with respect to a set of StateBlocks which already exist inside the filter (specified by cross_covariances.block_labels). If the cross_covariance parameter is None, cross covariance between the existing states and the added states will be set to zeroes.

abstract get_state_block_estimate(block_label)

Get the estimate associated with a state block.

Find a pntos.api.StandardStateBlock or pntos.api.VirtualStateBlock within the fusion engine matching block_label, and return a copy of its current estimate vector.

Parameters:

block_label (str)

Returns:

A copy of its current estimate vector. If block_label references a virtual state block (VSB) this will return a converted estimate, converted into the VSBs coordinate frame. Returns None if block_label does not correspond to a block that has been added to the fusion engine. Guaranteed to not return None when block_label is in the list returned by state_block_labels and strategy is not None.

Return type:

NDArray[float64] | None

abstract get_state_block_covariance(block_label)

Get the covariance associated with a state block.

Find a pntos.api.StandardStateBlock or pntos.api.VirtualStateBlock within the fusion engine matching block_label, and return a copy of its current covariance matrix.

Parameters:

block_label (str)

Returns:

A copy of its current covariance matrix. If block_label references a virtual state block (VSB) this will return a converted covariance, converted into the VSBs coordinate frame. Returns None if block_label does not correspond to a block that has been added to the fusion engine. Guaranteed to not return None when block_label is in the list returned by state_block_labels and strategy is not None.

Return type:

NDArray[float64] | None

abstract get_state_block_cross_covariance(block_label1, block_label2)

Get the cross covariance between the states associated with two state blocks.

Find the pntos.api.StandardStateBlock s within the fusion engine matching block_label1 and block_label2, and return the cross-covariance matrix between them.

Parameters:
  • block_label1 (str)

  • block_label2 (str)

Returns:

The cross-covariance matrix between block_label1 and block_label2. Returns None if block_label1 or block_label2 do not correspond to blocks that have been added to the fusion engine. Guaranteed to not return None when both block_label1 and block_label2 are in the list returned by state_block_labels and strategy is not None.

Return type:

NDArray[float64] | None

abstract set_state_block_estimate(block_label, estimate)

Update the estimate associated with a given state block.

Find a pntos.api.StandardStateBlock within the fusion engine matching block_label, and change its current estimate vector.

Note

This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.

Parameters:
  • block_label (str)

  • estimate (NDArray[float64])

abstract set_state_block_covariance(block_label, covariance)

Update the covariance associated with a given state block.

Find a pntos.api.StandardStateBlock within the fusion engine matching block_label, and change its current covariance matrix.

Note

This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.

Parameters:
  • block_label (str)

  • covariance (NDArray[float64])

abstract set_state_block_cross_covariance(block_label1, block_label2, covariance)

Update the covariance between two state blocks.

Find the pntos.api.StandardStateBlock s within the fusion engine matching block_label1 and block_label2, and change the current covariance matrix between them.

Note

This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.

Parameters:
  • block_label1 (str)

  • block_label2 (str)

  • covariance (NDArray[float64])

abstract remove_state_block(block_label)

Remove the pntos.api.StandardStateBlock matching block_label.

This will reduce the state vector being estimated by the number of states that the block represents.

Parameters:

block_label (str)

abstract property virtual_state_block_target_labels

Gets a list of the target labels of virtual state blocks that have been added.

A label being returned by this list is not a guarantee that the virtual state block has a valid source. For that, call has_virtual_state_block().

Returns:

A list of the target labels of virtual state blocks that have been added. Returns None if no virtual state blocks have been added to this fusion engine.

Return type:

list[str] | None

abstract has_virtual_state_block(vsb_target_label)

Checks if the fusion engine has a pntos.api.VirtualStateBlock with a matching target label.

Parameters:

vsb_target_label (str)

Returns:

True if the fusion engine has a pntos.api.VirtualStateBlock with a matching target label, False if no virtual state block with matching target label exists or if one exists but is not capable of generating an estimate. That is, the VSB’s source must exist and be in a continuous chain to a concrete state block which also exists in the fusion engine in order to return True.

Return type:

bool

abstract add_virtual_state_block(virtual_state_block)

Add the given pntos.api.VirtualStateBlock to the fusion engine.

A virtual state block (VSB) convert from an underlying block coordinate frame into the VSB coordinate frame.

Parameters:

virtual_state_block (VirtualStateBlock)

abstract remove_virtual_state_block(vsb_target_label)

Remove the pntos.api.VirtualStateBlock matching vsb_target_label.

Parameters:

vsb_target_label (str)

abstract property measurement_processor_labels

Get a list of the labels of measurement processors that have been added.

Returns:

list of labels of measurement processors that have been added. Returns None if no measurement processors have been added to this fusion engine.

Return type:

list[str] | None

abstract add_measurement_processor(processor)

Add a pntos.api.StandardMeasurementProcessor.

This can be used to process future measurements that correspond to processor.label.

Parameters:

processor (StandardMeasurementProcessor)

abstract remove_measurement_processor(processor_label)

Remove a pntos.api.StandardMeasurementProcessor previously added to the fusion engine.

Assumes a measurement processor was previously added via add_measurement_processor() with the label processor_label.

Parameters:

processor_label (str)

abstract propagate(time)

Propagate the filter estimate forward in time.

May be evaluated lazily (when results are requested).

Parameters:

time (TypeTimestamp)

abstract update(processor_label, message)

Update the filter with the given measurement.

Will propagate first if needed to reach the time encoded inside the measurement.

Parameters:
  • processor_label (str)

  • message (Message)

abstract peek_ahead(time, block_labels)

Calculates the estimate and covariance at a requested time.

Uses the state blocks listed in block_labels, without changing the state of the fusion engine or its underlying filter. Blocks are assembled in the order that the labels are passed in.

If all of the following are true:

  • time is equal to or after the filter time (which can be checked with time).

  • All labels in block_labels correspond to a block that has been added to the fusion engine (which can be checked with state_block_labels).

  • block_labels has at least one element.

Then the result returned is guaranteed to not be None. Otherwise, if any of the above are false then the result will be None.

Parameters:
  • time (TypeTimestamp)

  • block_labels (list[str]) – An array of strings.

Returns:

EstimateWithCovariance | None

abstract generate_x_and_p(block_labels)

Generates the current estimate and covariance.

Estimate and covariance are built corresponding to a list of StateBlock labels. Blocks are assembled in the order that the labels are passed in.

If all of the following are true:

  • All labels in block_labels correspond to a block that has been added to the fusion engine (which can be checked with state_block_labels).

  • block_labels has at least one element.

Then the result returned is guaranteed to not be None. Otherwise, if any of the above are false then the result will be None.

Parameters:

block_labels (list[str]) – An array of strings.

Returns:

EstimateWithCovariance | None

abstract give_state_block_aux_data(block_label, aux)

Route a list of messages of aux data to a pntos.api.StandardStateBlock.

Parameters:
  • block_label (str)

  • aux (list[Message | None])

abstract give_measurement_processor_aux_data(processor_label, aux)

Route a list of messages of aux data to a pntos.api.StandardMeasurementProcessor.

Parameters:
  • processor_label (str)

  • aux (list[Message | None])

abstract give_virtual_state_block_aux_data(target_label, aux)

Route a list of messages of aux data to a pntos.api.VirtualStateBlock.

Parameters:
  • target_label (str)

  • aux (list[Message | None])

class pntos.api.FusionStrategyPlugin

Bases: CommonPlugin, ABC

A plugin that provides computational engines for estimation.

At the high level, a fusion strategy is a computation engine that knows how to estimating one or more states, given a set of observations/measurements. For example, the EKF equations are an implementation of a fusion strategy. This plugin is a factory that produces fusion strategies on demand, which is useful for multi-filter approaches where the system may need several fusion strategies running simultaneously.

There are many ways to model sensor fusion, including very simple (linear Kalman filter) and complex (neural networks, factor graphs, etc.). This plugin aims to allow any and all of those models to be represented. It achieves this by having multiple fusion strategies, so that the user may select what kind of fusion they want. Currently, only the pntos.api.StandardFusionStrategy is implemented, which is suitable for EKFs and filters that have similar interfaces to an EKF (such as a UKF). However, more strategy types are planned to be added in the future.

abstract is_fusion_type_supported(fusion_type)

Check if a particular fusion strategy is supported by new_fusion_strategy().

The new_fusion_strategy() factory method on this class can create one or more types of fusion strategy. However, pntos.api.FusionStrategyPlugin may not support all types (they must support at least one). Therefore, when a user receives a pntos.api.FusionStrategyPlugin, they should:

  1. Initialize the plugin by calling pntos.api.CommonPlugin.init_plugin() (see pntos.api.CommonPlugin for more information).

  2. Call is_fusion_type_supported() to check that the plugin supports the type that the user wants to use.

  3. If is_fusion_type_supported() returned True, call the new_fusion_strategy() factory method to get a new fusion strategy of the desired fusion strategy.

  4. Proceed to use the fusion strategy to do estimation.

Parameters:

fusion_type (type[FusionStrategyType]) – The fusion strategy type we are checking.

Returns:

Whether or not we support the requested fusion_type.

Return type:

bool

abstract new_fusion_strategy(fusion_type)

Create a new fusion strategy of the requested type.

Users must first ensure that the fusion strategy is supported by calling is_fusion_type_supported().

Parameters:
  • fusion_type (type[FusionStrategyType]) – The type of fusion strategy that we want

  • returned.

Returns:

The newly created fusion strategy, which is an implementation of the type specified by fusion_type. For example, if the user calls new_fusion_strategy() with a fusion_type of StandardFusionStrategy, then the returned object will be an implementation of pntos.api.StandardFusionStrategy.

Return type:

FusionStrategyType | None

class pntos.api.StandardFusionStrategy

Bases: ABC

A Fusion strategy making linearized Bayesian assumptions.

An implementation of the standard fusion strategy that is capable of Bayesian inference on a linearized discrete-time system with Gaussian noise inputs (e.g. an EKF).

At a fundamental level, this class manages an estimate of a state space as it propagates (changes over time) and updates (incorporates new measurements and observations). An estimate of a set of values (states) is stored and maintained within this object. The estimate is then mutated and updated over time by the various method calls.

A typical usage pattern of this class is as follows:

  1. The user adds a set of states by calling the add_states() method. As part of this step, the user all passes in initial conditions for the estimate. The fusion strategy is now storing an estimate of the states, set to the initial conditions.

  2. The user propagates this estimate forward in time by calling the propagate() method. For example, if the initial conditions were the estimates of the states at time 0.0s, but we now want to know the estimate of the values at time 5.0s, the user would call propagate() with a dynamics model parameter that specifies how to take an estimate at time 0.0 and use it to compute the estimate at time 5.0.

  3. The user updates this estimate by using observations of the states at the current time. For example, if the filter is currently propagated to time 5.0s and a measurement is received that observes the states’ values at time 5.0s, the user would call update() with a measurement model parameter that describes how to take the current estimate at 5.0s and incorporate the new information from the measurement.

  4. At any point, when the user wants to know the latest estimate given all the propagate/updates that have occurred, they may call estimate.

Note

This class must have an operational __deepcopy__ method. For most classes, the default __deepcopy__ method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own __deepcopy__ (such as a C object wrapped to Python), then the class will need to implement a custom __deepcopy__ which properly copies all fields.

abstract property num_states

Get the total number of states this filter is estimating.

The count will initially be zero, until add_states() is called.

Returns:

Number of states being estimated in this filter.

Return type:

int

abstract add_states(initial_estimate, initial_covariance, cross_covariance=None)

Add new states to this filter.

Increases number of filter states and set the initial conditions of the new states. Returns index of the first added state. If cross_covariance is None, cross covariance between the existing states and the added states will be set to zeroes.

Parameters:
  • initial_estimate (NDArray[float64]) – The initial estimate to populate the new states with.

  • initial_covariance (NDArray[float64]) – The initial covariance matrix used to initialize the uncertainty of the new states.

  • cross_covariance (NDArray[float64] | None) – A covariance matrix that describes the cross terms between the previous states and the new states. If None, the cross-terms will be set to zero. Otherwise, if there are n existing states and m new states being added, this argument should have shape [n, m].

Returns:

Index of first state being added to this filter (zero indexed).

Return type:

int

abstract remove_states(first_index, count)

Removes a set of states from the filter.

Parameters:
  • first_index (int) – Index of the first state to be removed (zero indexed).

  • count (int) – The number of states to be removed.

abstract property estimate

Get the current internal estimate managed by this strategy.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. This method returns the current estimate, incorporating all changes made by previous method calls to this strategy.

Returns:

An estimate if available. Returns None if no states have been added yet.

Return type:

NDArray[float64] | None

abstract set_estimate_slice(new_estimate, first_index)

Set a slice of the state estimates to a given set of values.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. This method allows for manually overriding the current estimate. Sets a block of states to new values, starting with first_index and overwriting a number of states equal to the length of new_estimate.

Parameters:
  • new_estimate (NDArray[float64]) – The new estimate values that will overwrite the previous values.

  • first_index (int) – The index of the first state to overwrite.

abstract property covariance

Get the covariance of the current estimate.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed. This method returns this covariance, incorporating all changes made by previous method calls to this strategy.

Returns:

The covariance of the current estimate. Returns None if no states have been added yet.

Return type:

NDArray[float64] | None

abstract set_covariance_slice(new_covariance, first_row, first_col=None)

Set a slice of the covariance matrix to a given set of values.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed.

Allows for manually overriding the current covariance matrix. Sets a block of the covariance matrix to new values. The overwritten values are those in a rectangular area defined by the upper left corner at first_row, first_col and extending down and right to cover an area equal to the size of new_covariance. If first_col is not set or set to None, the value of first_row will be used as a column index as well.

Parameters:
  • new_covariance (NDArray[float64]) – The new covariance values that will overwrite a slice of the previous covariance matrix.

  • first_row (int) – The row of the first value to overwrite.

  • first_col (int | None, optional) – The column of the first value to overwrite.

abstract propagate(dynamics_model)

Propagates the estimate of the state space forward in time.

This method assumes that a state space is already initialized with a set of states via the add_states() method. The dynamics_model parameter includes a description of how the current state estimate can be propagated from the current time to a new time. Note that the actual numerical values of the current/new times are not specified anywhere, as that information is not needed to perform the computation. This method then takes the current state space estimate and the dynamics_model and uses both to compute an estimate at the new time. The new estimate clobbers the old estimate managed by this class, and be acquired by calling estimate.

Parameters:

dynamics_model (StandardDynamicsModel)

abstract update(measurement_model)

Updates the estimate of the state space, incorporating a new measurement.

This method assumes that a state space is already initialized with a set of states via the add_states() method. The measurement_model parameter includes both the measurement itself and a description of how the measurement relates to the state space. This method then takes the current state space estimate and updates it using information from the measurement_model. The updated estimate can be acquired by calling estimate.

Parameters:

measurement_model (StandardMeasurementModel) – The measurement with which to update the filter, as well as a model that describes how the measurement relates to the states this strategy is estimating.

class pntos.api.CommonInertial

Bases: ABC

A common base type for an inertial.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract request_solution_message_type()

Get the solution type.

Returns:

The message type that will be returned by request_current_solution(), request_solution(), and request_solutions().

Return type:

type[AspnBase]

abstract request_current_solution()

Get the current inertial solution.

Returns:

The current inertial solution.

Return type:

Message

abstract request_solution(time)

Request solution at a specific time.

Parameters:

time (TypeTimestamp) – The time at which the returned solution should be valid.

Returns:

The solution computed by this inertial at time if time is in the valid range, None otherwise (is_time_in_range() can be used to check time before calling this method).

Return type:

Message | None

abstract request_solutions(times, solution_type)

Request solutions at multiple specific times.

Parameters:
  • times (list[TypeTimestamp]) – An array of times at which solutions are requested.

  • solution_type (InertialSolutionRangeType) – The type of solution requested.

Returns:

An array of solutions. Returns None if solution_type is unsupported by this inertial or every instance of times is outside the valid range. Otherwise guaranteed to not be None.

Return type:

list[Message | None] | None

abstract is_time_in_range(time)

Check if a solution exists at a given time.

Parameters:

time (TypeTimestamp) – The query time.

Returns:

True if a solution exists at time, False otherwise. This result is only valid until another method (for example, process_pntos_message()) is called.

Return type:

bool

abstract request_earliest_time()

Get the earliest available time at which a solution or forces and rates can be requested.

This result is only valid until another method (for example, process_pntos_message()) is called.

Returns:

The earliest available time at which a solution or forces and rates can be requested.

Return type:

TypeTimestamp

abstract request_latest_time()

Get the latest available time at which a solution or forces and rates can be requested.

This result is only valid until another method (for example, process_pntos_message()) is called.

Returns:

The latest available time at which a solution or forces and rates can be requested.

Return type:

TypeTimestamp

abstract request_process_pntos_message_types()

Returns an array of message types that are supported by this plugin.

Returns:

An array of message types that are supported by this plugin as inputs to process_pntos_message().

Return type:

list[type[AspnBase]]

abstract process_pntos_message(message)

A new message to be incorporated into the computed inertial solution.

Parameters:

message (Message)

abstract request_forces_and_rates(time)

Request forces and rates for a given time.

Parameters:

time (TypeTimestamp) – The time at which the forces and rates should be valid.

Returns:

The instantaneous forces and rates at time if time is in the valid range, None otherwise (is_time_in_range() can be used to check time before calling this method).

Return type:

InertialForcesRates | None

abstract request_average_forces_and_rates(time1, time2)

Request average forces and rates over a time period.

Parameters:
  • time1 (TypeTimestamp) – The start of the time range over which the forces and rates should be valid.

  • time2 (TypeTimestamp) – The end of the time range over which the forces and rates should be valid.

Returns:

The average forces and rates over the period of time defined by time1 and time2 if at least one of them is in the valid range, None otherwise (is_time_in_range() can be used to check both times before calling this method).

Return type:

InertialForcesRates | None

pntos.api.ExternalInertial

alias of CommonInertial

class pntos.api.InertialForcesRates(forces_and_rates, frame)

Bases: object

A struct containing specific forces and rotation rates from the inertial.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

Variables:
  • forces_and_rates (MeasurementImu) – An ASPN IMU message which has been repurposed to hold specific forces (the meas_accel field) and rotation rates (the meas_gyro field) in a different frame (see frame below).

  • frame (InertialFrame) – Specifies the frame of the above forces and rates.

frame

The frame at which the forces and rates are valid.

class pntos.api.InertialFrame(value)

Bases: IntEnum

An enumeration that specifies frame.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

INERTIAL_FRAME_NED = 0

Force vectors in this frame are north-east-down in \(m/s^2\).

Rotation rate vectors in this frame are of an inertial sensor with respect to inertial frame, in the inertial sensor frame (\(rad/s\)). Sometimes represented as \(w^\text{s}_\text{is}\) (or \(w^\text{b}_\text{ib}\) if the body frame is aligned with the inertial sensor frame.

class pntos.api.InertialPlugin

Bases: CommonPlugin, ABC

An implementation of an inertial plugin.

This plugin generates pntos.api.CommonInertial instances which may be used to generate INS solutions from raw IMU data.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract is_inertial_type_supported(inertial_type)

Check if the plugin supports a given type of inertial.

Parameters:

inertial_type (type[InertialType])

Returns:

True if inertial type is supported, False otherwise.

Return type:

bool

abstract new_inertial(inertial_type, solution, config_group=None)

Create an instance of an implementation of pntos.api.CommonInertial.

Parameters:
  • inertial_type (type[InertialType]) – Specifies the type of inertial that the returned value will support. For example, if the user passes in STANDARD_INERTIAL_MECHANIZATION, then the returned value will be an implementation of pntos.api.StandardInertialMechanization. If inertial_type is unsupported by the plugin, then None will be returned. Please use is_inertial_type_supported() to check if the type is supported by the plugin.

  • solution (Message) – The initial solution (i.e. the alignment) to mechanize from.

  • config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to initialize the new inertial. This allows for multiple inertial instances to exist with unique settings.

Returns:

A new inertial object. Returns None if inertial_type is unsupported, solution is invalid, or config_group is invalid. is_inertial_type_supported() can be called to verify inertial_type before calling this method.

Return type:

InertialType | None

class pntos.api.InertialSolutionRangeType(value)

Bases: IntEnum

Solution type to request from an inertial.

An enumeration that allows the user to decide if the solution they request is the best available solution or one that has no discontinuities in it due to resets.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

class pntos.api.StandardInertialErrors(accel_biases, gyro_biases, accel_scale_factors, gyro_scale_factors)

Bases: object

A structure representing the biases on a set of 3-axis gyros and 3-axis accelerometers.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

Variables:
  • accel_biases (NDArray[float64]) – A 1D vector of length 3 containing biases for a 3-axis accelerometer in the sensor’s (X-Y-Z) frame, expressed in \(m/s^2\).

  • gyro_biases (NDArray[float64]) – A 1D vector of length 3 containing biases for a 3-axis gyro in the sensor’s (X-Y-Z) frame, expressed in \(rad/s\).

  • accel_scale_factors (NDArray[float64]) – A 1D vector of length 3 containing scale factor errors for a 3-axis accelerometer in the sensor’s (X-Y-Z) frame, unitless.

  • gyro_scale_factors (NDArray[float64]) – A 1D vector of length 3 containing scale factor errors for a 3-axis gyroscope in the sensor’s (X-Y-Z) frame, unitless.

class pntos.api.StandardInertialMechanization

Bases: CommonInertial, ABC

A struct produced by a pntos.api.InertialPlugin. It generates solutions from raw IMU data.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract request_reset_message_types()

Get valid types of reset messages.

Returns:

An array of message types that are supported by this plugin for resetting the inertial solution, or None if resetting the inertial solution is an unsupported operation by the inertial plugin.

Return type:

list[type[AspnBase]] | None

abstract reset_solution(message)

Set the solution to the values in message.

For example, if message is PVA then the inertial solution will be set to that PVA. If message is just position, then only the position portion of the inertial solution will be set using message.

Parameters:

message (Message) – A message containing the information necessary to reset the solution. To see the types supported by the implementation, call request_reset_message_types().

abstract correct_sensor_errors(time, errors)

Reset the current inertial internal bias values.

Reset the current inertial internal bias values with corrections from an external source, such as a filter or error estimator. The errors passed in here will be adjusted for internally by the inertial when processing incoming data. Thus, if errors are passed into the inertial here they should not be corrected for in an external filter processing the inertial output (which would lead to a double correction).

Parameters:
  • time (TypeTimestamp) – The time at which errors should be valid.

  • errors (StandardInertialErrors) – An estimate of the inertial sensor’s errors.

abstract request_sensor_errors(time)

Request inertial errors for a given time.

Parameters:

time (TypeTimestamp) – Time at which inertial errors should be valid.

Returns:

Inertial errors at time if time is in the valid range (pntos.api.CommonInertial.is_time_in_range() can be used to check time before calling this method), None otherwise.

Return type:

StandardInertialErrors | None

class pntos.api.CommonInitializationStrategy

Bases: ABC

A common base type for initialization algorithms.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract request_motion_needed()

Check the type of motion (if any) needed.

Returns:

InitializationMotionNeeded

abstract request_current_status()

Check the current initialization status.

Returns:

InitializationStatus

abstract process_pntos_message(message)

Incorporate a new message into the initialization algorithm.

Parameters:

message (Message)

class pntos.api.EwcInitializationStrategy

Bases: CommonInitializationStrategy

Generates an initial estimate-with-covariance (EWC) solution from sensor data.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract request_solution()

Get the current initial solution.

Returns:

The current initial solution. Will be None if the initialization strategy has not yet finished. Use pntos.api.CommonInitializationStrategy.request_current_status() to check current status of the strategy. If the status is INITIALIZING_FINE or INITIALIZED_GOOD, then the result is guaranteed to not be None.

Return type:

InitialEstimateWithCovariance | None

class pntos.api.InertialInitializationStrategy

Bases: CommonInitializationStrategy, ABC

Generates an initial ASPN solution from sensor data.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract request_solution()

Get the current initial solution.

Returns:

InitialInertialSolution

class pntos.api.InitialEstimateWithCovariance(time, estimate_with_covariance, status)

Bases: object

Holds both the current estimate and its associated covariance as well as the current status.

Coupling these avoids time-of-check to time-of-use (TOCTOU) issues.

Variables:
  • time (TypeTimestamp) – The time at which estimate_with_covariance is valid.

  • estimate_with_covariance (EstimateWithCovariance | None) – The current estimate of the initial solution. Check status for its validity (can be None if status is anything other than INITIALIZED_GOOD).

  • status (InitializationStatus) – Indicates the current initialization status. Should be checked before using estimate_with_covariance.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

class pntos.api.InitialInertialSolution(solution, inertial_errors, inertial_error_covariance, status)

Bases: object

Holds both the current solution, inertial errors (and their covariance), and the current status.

Coupling these avoids time-of-check to time-of-use (TOCTOU) issues.

Variables:
  • solution (Message | None) – The initial solution.

  • inertial_errors (StandardInertialErrors | None) – The inertial errors.

  • inertial_error_covariance (NDArray[float64] | None) – The covariance matrix associated with the terms in inertial_errors.

  • status (InitializationStatus) – Indicates the current initialization status. Should be checked before using any of the other fields.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

class pntos.api.InitializationMotionNeeded(value)

Bases: IntEnum

An enumeration that specifies what type of motion is required by the initialization strategy.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

NO_MOTION = 0

Stationary data is needed.

MOTION_NEEDED = 1

Dynamic data is needed.

ANY_MOTION = 2

No particular type of motion is required.

class pntos.api.InitializationPlugin

Bases: CommonPlugin, ABC

An implementation of an initialization plugin.

This plugin generates pntos.api.CommonInitializationStrategy instances which may be used to generate an initial solution from additional external sensor data, such as IMU data.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract is_initialization_type_supported(initialization_type)

Check if given InitializationType is supported.

Parameters:

initialization_type (type[InitializationType])

Returns:

True if the plugin supports a given type of mechanization, False otherwise.

Return type:

bool

abstract new_initialization_strategy(initialization_type, config_group=None)

Create an instance of pntos.api.CommonInitializationStrategy.

Parameters:
  • initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation of pntos.api.InertialInitializationStrategy. If initialization_type is unsupported by the plugin, then None will be returned. Please use is_initialization_type_supported() to check if the type is supported by the plugin.

  • config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.

Returns:

The new initialization strategy object. Returns None if initialization_type is unsupported by this plugin (this can be checked using is_initialization_type_supported()) or if config_group is invalid.

Return type:

InitializationType | None

class pntos.api.InitializationStatus(value)

Bases: IntEnum

An enumeration that allows the user to know the initialization status.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

WAITING = 0

Waiting to start initialization process.

INITIALIZING_COARSE = 1

Attempting to initialize and produce a navigation solution.

INITIALIZING_FINE = 2

A coarse initialization has been calculated by the algorithm, and the initialization is being tested or adjusted before producing a navigation solution.

INITIALIZED_GOOD = 3

We have a good initialization.

The provided solution can be used to kickoff inertial and fusion.

INITIALIZATION_FAILED = 4

The initialization process failed in some way, and may attempt to restart.

class pntos.api.LoggingPlugin

Bases: CommonPlugin, ABC

Logging plugin.

A plugin for logging out data to an arbitrary sink (e.g. console, file, network, etc.).

abstract log(source_plugin_type, source_plugin_identifier, level, message)

Log a string to the logging plugin’s sink.

Parameters:
  • source_plugin_type (PluginType) – Information on the plugin that sent the logout.

  • source_plugin_identifier (str) – Information on the plugin that sent the logout.

  • level (LoggingLevel) – The event severity.

  • message (str) – The string contents to be logged.

class pntos.api.MessageStreamConfig

Bases: ABC

Message stream configuration.

This class configures the buffering, delay, and sorting characteristics of messages that are streamed into the pntos.api.OrchestrationPlugin. The pntOS system will deliver messages to the orchestration plugin as it receives them. However, there is a fundamental tradeoff between latency and those messages being in-order. In particular, to guarantee that messages are sorted by timestamp, it is necessary to build a buffer and delay delivery, such that a sorting function may be applied. This structure allows the plugin to choose which messages are buffered and which are not.

abstract sequenced_stream_add(message_type, source_identifier=None)

Request messages are streamed in sorted timestamp ordering.

Request messages of the given message_type and optional source_identifier are streamed in sorted timestamp ordering.

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

abstract sequenced_stream_remove(message_type, source_identifier=None)

Request messages are no longer streamed in sorted timestamp ordering.

Request messages of the given message_type and optional source_identifier are no longer streamed in sorted timestamp ordering. This will remove a type that was previously added in a call to sequenced_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call to sequenced_stream_all().

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

abstract sequenced_stream_all(enable)

Request all messages are streamed in sorted timestamp ordering.

Note that the ability to do this reliably will depend on the length of the buffer used by the pntos.api.Mediator.

Parameters:

enable (bool)

abstract immediate_stream_add(message_type, source_identifier=None)

Request messages are streamed immediately.

Request messages of the given message_type and optional source_identifier are streamed immediately without delay, buffering, or sorting.

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

abstract immediate_stream_remove(message_type, source_identifier=None)

Request messages are no longer streamed immediately.

Request messages of the given message_type and optional source_identifier are no longer streamed immediately. This will remove a type that was previously added in a call to immediate_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call to immediate_stream_all().

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

abstract immediate_stream_all(enable)

Request all messages are streamed immediately without delay, buffering, or sorting.

Parameters:

enable (bool)

class pntos.api.OrchestrationPlugin

Bases: CommonPlugin, ABC

Orchestration plugin.

The pntOS orchestration plugin is responsible for orchestrating one or more fusion engines, state model providers and other plugins in order to perform sensor fusion. The orchestration plugin is sent (sorted, buffered) ASPN messages from the pntos.api.ControllerPlugin, and is responsible for computing a solution for the system, as well as estimating any other quantities of interest.

In order to achieve this task, the orchestration plugin may be passed a set of other plugins during the call to init_orchestration_plugin(). If so, the orchestration plugin then, as the name suggests, configures and orchestrates these plugins to work together to perform sensor fusion.

Example

For example, the orchestration plugin may set up a fusion engine it received in the call to init_orchestration_plugin(), then add state blocks or measurement processors to that fusion engine from a pntos.api.StateModelingPlugin it also received, and process inertial data from an pntos.api.InertialPlugin it received. The request_solutions() function will be called by the system when pntOS needs to know the current filtering solutions. Other quantities which need to be estimated by the orchestration engine can be returned to the system by registry updates.

abstract init_orchestration_plugin(plugins, stream_config)

Initial data structures needed by the orchestration plugin.

This function will be called by the system after the pntos.api.CommonPlugin.init_plugin() but before any other call to an pntos.api.OrchestrationPlugin function.

Parameters:
  • plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include a pntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins, None may be passed.

  • stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the pntos.api.ControllerPlugin how it would prefer delivery of messages. When the orchestration plugin receives the stream_config struct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of the pntos.api.ControllerPlugin.

abstract process_pntos_message(message, sequenced)

Deliver a new message from an external to pntOS source into the orchestration plugin.

The plugin should utilize this sensor data contained in message by passing it into a fusion engine.

Parameters:
abstract property filter_description_list

Get a list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

For example, if the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

Return type:

list[str]

abstract request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The solution times.

  • filter_description (str | None, optional) – An pntos.api.OrchestrationPlugin may run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular pntos.api.OrchestrationPlugin implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. The number of solutions should equal the number of times in solution_times, although some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message list may be None if filter_description is invalid.

Return type:

list[Message | None] | None

class pntos.api.PlatformIntegrationPlugin

Bases: CommonPlugin, ABC

Platform integration plugin.

A plugin for command, control, solution output, and other behavior of the system which is specific to a particular platform. Works closely with the pntos.api.ControllerPlugin to fully define the overall behavior of the system.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract take_control(plugins, plugin_resources_locations=None, initial_config=None)

Takes over secondary control from the pntos.api.ControllerPlugin.

When pntOS first boots, it passes control over to the pntos.api.ControllerPlugin. After the pntos.api.ControllerPlugin has initialized the plugins it wants to run, it calls this plugin’s take_control() to allow for platform specific control behavior to run. The pntos.api.PlatformIntegrationPlugin (PIP) is not responsible for calling the pntos.api.CommonPlugin.init_plugin() call on any of the plugins passed in its plugins list, and thus the list of plugins that is passed to the PIP should be pruned to only those plugins the pntos.api.ControllerPlugin initialized. The PIP is consequently not responsible for the pntos.api.Mediator construction or message routing - those responsibilities fall on the pntos.api.ControllerPlugin. Instead, the PIP is responsible for doing any additional logic that may be platform specific.

Example

For example, the PIP may decide to output solutions at a particular rate, or to have the pntos.api.TransportPlugin passed in its plugins list start/stop listening in response to moding commands, or inform the pntos.api.OrchestrationPlugin that it should not use a particular sensor at runtime (via a registry convention).

In general, the goal of the PIP is to implement the platform-specific needs, whereas the pntos.api.ControllerPlugin is designed to be the generic portion of the code. The pntos.api.ControllerPlugin should be designed to be generic and re-useable, but work hand-in-hand with the PIP to fully define the control behavior of the system.

pntos.api.ControllerPlugin responsibilities:
  • Defining concurrency model.

  • Initializing plugins (and constructing/passing in pntos.api.Mediator).

  • Routing data from transport plugin to orchestration/initialization/inertial plugins.

  • Routing requests for registry data to registry plugins.

pntos.api.PlatformIntegrationPlugin responsibilities:
  • Platform specific outputs.

  • Responding to moding commands from platform.

  • Routing situational awareness information to other pntOS plugins (via registry convention).

The parameters to pntos.api.PlatformIntegrationPlugin.take_control() should be identical to those passed to the pntos.api.ControllerPlugin.take_control(), with the exception of the plugins list being a subset of the plugins passed to the pntos.api.ControllerPlugin. Which plugins are passed to the PIP is implementation specific and decided by the pntos.api.ControllerPlugin.

Parameters:
class pntos.api.Preprocessor

Bases: ABC

A preprocessor.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

abstract process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.api.PreprocessorPlugin

Bases: CommonPlugin, ABC

An implementation of a preprocessor plugin.

This plugin generates pntos.api.Preprocessor instances which may be used to process incoming messages before being distributed to other plugins.

Caution

Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.

Variables:

preprocessor_identifiers (list[str]) – A list of identifying strings for each kind of pntos.api.Preprocessor that this pntos.api.PreprocessorPlugin can create instances of. The preprocessor_index parameter of new_preprocessor() is an index into this array.

preprocessor_identifiers

Strings describing the preprocessors the provider can create.

abstract new_preprocessor(preprocessor_index, config_group=None)

Get a newly created pntos.api.Preprocessor.

Parameters:

preprocessor_index (int) – Since the pntos.api.PreprocessorPlugin can create a different preprocessor for each element in preprocessor_identifiers, the preprocessor_index parameter is used to select which kind of preprocessor to create a new instance of. The pntos.api.PreprocessorPlugin.preprocessor_identifiers field contains identifying strings for the kinds of preprocessors.

Example

For example, if the plugin can create 45 different preprocessors, the identifier of the last preprocessor that can be created is found in preprocessor_identifiers[44]. An instance of this preprocessor can be created by calling new_preprocessor(44, ...). Note that 0 <= preprocessor_index < length of preprocessor_identifiers.

config_group (str | None, optional): Indicates which (if any) parameter group in the

registry may be used to obtain additional configuration values to generate the new preprocessor. If the preprocessor requires no outside configuration, config_group may be None.

Returns:

A newly created pntos.api.Preprocessor. Returns None if preprocessor_index is greater than or equal to the length of pntos.api.PreprocessorPlugin.preprocessor_identifiers or if config_group is invalid.

Return type:

Preprocessor

class pntos.api.RegistryPlugin

Bases: CommonPlugin, ABC

Registry plugin.

A plugin for a global key-value registry. See the pntOS Registry page in the Internals section for more information on the goal of this plugin.

abstract new_registry(initial_config=None)

Create a new registry based on the initial values stored in initial_config.

Parameters:

initial_config (str | None, optional) –

The initial values the new registry should be based on. The format of initial_config is implementation specific, and plugins are free to support any or no format. Possible formats may include:

  • None, in which case the plugin is free to choose initial values. Choices may include hard-coded in the plugin or none at all.

  • A str. Examples of possible values the parameter could hold:

    1. The entire config.

    2. A local file path on systems which support them.

    3. A string adhering to the URI scheme.

Returns:

The newly created registry.

Return type:

Registry

Note

The returned pntos.api.Registry should be capable of producing pntos.api.KeyValueStore structs that are able to be used concurrently. Thus if the user uses the return value of this method to start two batches, one on group “foo” and the other on group “bar”, then concurrent access to both of the resulting pntos.api.KeyValueStore structs for “foo” and “bar” should be supported (i.e. no shared mutable state between them that is not synchronized).

class pntos.api.StandardDynamicsModel(g, Phi, Qd)

Bases: object

A description of the propagation dynamics for a set of states.

This model assumes that the state space \(x\) can be propagated forward in time by the equation:

\[x_k = g(x_{k-1}) + w_k\]

where \(x_k\) is the set of states at time \(k\), \(g\) is an arbitrary function, and \(w_k\) is additive white Gaussian noise.

Variables:
  • g (Callable[[NDArray[float64]], NDArray[float64]]) – A function that propagates forward in time a set of states.

  • Phi (NDArray[float64]) – The first-order Taylor series expansion (Jacobian) of the function \(g\).

  • Qd (NDArray[float64]) – The covariance matrix of \(w_k\).

class pntos.api.StandardMeasurementModel(z, h, H, R)

Bases: object

A description of how a measurement relates to a state space.

This model assumes that the relationship between the measurement and state vector is well modeled by the equation:

\[z=h(x) + v\]

where \(z\) is the measurement itself, \(x\) is the set of states being estimated, \(h\) is an arbitrary function, and \(v\) is additive white Gaussian noise.

Variables:
  • z (NDArray[float64]) – A column vector containing the measurement itself.

  • h (Callable[[NDArray[float64]], NDArray[float64]]) – A function that maps the state space to measurement space.

  • H (NDArray[float64]) – The first-order Taylor series expansion (i.e. Jacobian) of the function h.

  • R (NDArray[float64]) – The covariance matrix of \(v\).

class pntos.api.StandardMeasurementProcessor

Bases: ABC

A class that processes raw measurements/observations.

The measurements are used to calculate estimated states suitable for a linear or linearized filter to use. Each type of measurement should correspond to a pntos.api.StandardMeasurementProcessor that is supplied to the fusion engine. Incoming measurements received by the fusion engine will be routed to the corresponding measurement processor (by label) and call generate_model() to process the measurement. The resulting pntos.api.StandardMeasurementModel will be used by the fusion engine to call the underlying pntos.api.StandardFusionStrategy.update() method to update the filter estimate/error covariance.

Variables:
  • label (str) – A unique name for this measurement processor. This value will be used to select a measurement processor to handle new measurements that the strategy receives.

  • state_block_labels (list[str]) – A list of unique state block labels associated with measurements received by this processor. The estimate and covariance matrices passed into generate_model() will be composed of the states associated with these state blocks, and the returned StandardMeasurementModel.h and StandardMeasurementModel.H must respect these states. Note: state_block_labels[i] is the identifier for the i th state block this processor relates to.

Note

This class must have an operational __deepcopy__ method. For most classes, the default __deepcopy__ method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own __deepcopy__ (such as a C object wrapped to Python), then the class will need to implement a custom __deepcopy__ which properly copies all fields.

label

A unique name for this measurement processor. This value will be used to select a measurement processor to handle new measurements that the strategy receives

state_block_labels

A list of unique state block labels associated with measurements received by this processor. The estimate and covariance matrices passed into generate_model() will be composed of the states associated with these state blocks, and the returned StandardMeasurementModel.h and StandardMeasurementModel.H must respect these states. Note: state_block_labels[i] is the identifier for the i th state block this processor relates to

abstract receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

abstract generate_model(message, gen_x_and_p_func)

Generate a pntos.api.StandardMeasurementModel.

Parameters:
  • message (Message) – The measurement/observation to process.

  • gen_x_and_p_func (GenXandP) – A callback function that can be used to retrieve the current estimate and covariance for the state blocks this measurement processor targets.

Returns:

A generated model containing the parameters required for a filter update. Will be None when a measurement cannot be produced from message (for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).

Return type:

StandardMeasurementModel | None

class pntos.api.StandardStateBlock

Bases: ABC

A description of a set of states and their dynamics.

Variables:
  • label (str) – The unique name for this state block.

  • num_states (int) – The number of states represented by this state block.

Note

This class must have an operational __deepcopy__ method. For most classes, the default __deepcopy__ method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own __deepcopy__ (such as a C object wrapped to Python), then the class will need to implement a custom __deepcopy__ which properly copies all fields.

label

The unique name for this state block.

num_states

The number of states represented by this state block.

abstract receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_state_block_aux_data() is called with a label corresponding to this state block’s label.

Parameters:

aux (list[Message | None])

abstract generate_dynamics(gen_x_and_p_func, time_from, time_to)

Generate a pntos.api.StandardDynamicsModel.

The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.

Parameters:
  • gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.

  • time_from (TypeTimestamp) – The time to propagate from.

  • time_to (TypeTimestamp) – The time to propagate to.

Returns:

The description of how to propagate this state block over the given time interval, or None if time_from is later than time_to. Otherwise guaranteed to not return None.

Return type:

StandardDynamicsModel | None

class pntos.api.StandardStateModelProvider

Bases: ABC

A collection of tools for modeling states and measurements.

These tools are used to model the propagation and innovation of state spaces using pntOS’s standard fusion model. Specifically, a pntos.api.StandardStateModelProvider provides three types of tools:

  1. State Blocks - Define a set of states and a model for propagating those states.

  2. Virtual State Blocks - Relate two statespaces to each other.

  3. Measurement Processors - Relate measurements to a statespace.

A pntos.api.StandardStateModelProvider conceptually models a set of zero or more pntos.api.StandardStateBlock s and a set of zero or more pntos.api.StandardMeasurementProcessor s which together model the phenomenology of sensor data that is being brought into a fusion engine. The first type, state blocks, describe how a set of states propagates forward through time. The second type, measurement processors, describe how a measurement relates to a set of state blocks.

Each pntos.api.StandardStateModelProvider consists of factory methods which generate instances of the state blocks and measurement processors it provides. The pntos.api.StandardStateModelProvider.new_block() method is a factory method that returns a newly created state block on each invocation. Because the pntos.api.StandardStateModelProvider can provide more than one kind of state block, the pntos.api.StandardStateModelProvider.new_block() method takes a block_index parameter which allows the user to request which kind of state block is created by the factory. block_identifiers[i] gives a description of the i th kind of state block returned when block_index=i.

Similarly, pntos.api.StandardStateModelProvider.new_processor() is a factory method for returning new measurement processors and processor_identifiers is a set of identifiers for each available kind of measurement processor that can be returned by the factory.

Variables:
  • processor_identifiers (list[str] | None) – A list of identifying strings for each kind of measurement processor that this pntos.api.StandardStateModelProvider can create instances of. The processor_index parameter of new_processor() is an index into this array. This field will be None when this state model provider does not provide any measurement processors.

  • block_identifiers (list[str] | None) – A list of identifying strings for each kind of state block that this pntos.api.StandardStateModelProvider can create instances of. The block_index parameter of new_block() is an index into this array. This field will be None when this state model provider does not provide any state blocks.

  • virtual_block_identifiers (list[str] | None) – A list of identifying strings for each kind of virtual state block that this pntos.api.StandardStateModelProvider can create instances of. The virtual_block_index parameter of new_virtual_block() is an index into this array. This field will be None when this state model provider does not provide any virtual state blocks.

processor_identifiers

Strings describing the measurement processors the provider can create.

block_identifiers

Strings describing the state blocks the provider can create.

virtual_block_identifiers

Strings describing the virtual state blocks the provider can create.

abstract new_processor(processor_index, engine, label, state_block_labels, config_group)

Generate a newly created pntos.api.StandardMeasurementProcessor.

This measurement processor describes the relationship between a measurement and a set of state blocks.

Parameters:
  • processor_index (int) – Since the pntos.api.StandardStateModelProvider can create different kinds of measurement processors, the processor_index parameter is used to select which kind of measurement processor to create a new instance of. The processor_identifiers field contains identifying strings for the kinds of processors. For example, if the model can create 45 different processors, the identifier of the last processor that can be created is found in processor_identifiers[44]. An instance of this processor can be created by calling new_processor(self, 44, ...). Note that 0 <= processor_index < len(processor_identifiers).

  • engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new processor, such that the processor may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to None when no engine is available for the processor to use.

  • label (str) – A string which will be used to populate the label field of the newly created processor. This label will be the unique name for the returned instance of a processor, and used to track the processor throughout its lifecycle. Note that it differs from processor_identifiers which is the model’s mechanism for selecting the kind of processor to create.

  • state_block_labels (list[str]) – A list of strings which will be used to populate the state_block_labels field of the newly created processor.

  • config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new processor. If the processor requires no outside configuration, config_group may be None.

Returns:

The newly created pntos.api.StandardMeasurementProcessor or None when no measurement processor can be produced with the given processor_index, engine, and config_group.

Return type:

StandardMeasurementProcessor | None

abstract new_block(block_index, engine, label, config_group)

Generate a newly created pntos.api.StandardStateBlock.

This state block describes a set of states and how they propagate over time.

Parameters:
  • block_index (int) – Since the pntos.api.StandardStateModelProvider can create different kinds of state blocks, the block_index parameter is used to select which kind of state block to create a new instance of. The block_identifiers field contains identifying strings for the kinds of state blocks. For example, if the model can create 45 different state blocks, the identifier of the last state block that can be created is found in block_identifiers[44]. An instance of this state block can be created by calling new_block(self, 44, ...). Note that 0 <= block_index < len(block_identifiers).

  • engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new block, such that the block may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to None when no engine is available for the block to use.

  • label (str) – A string which will be used to populate the label field of the newly created state block. This label will be the unique name for the returned instance of a state block, and used to track the state block throughout its lifecycle. Note that it differs from block_identifiers which is the model’s mechanism for selecting the kind of state block to create.

  • config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new state block. If the state block requires no outside configuration, config_group may be None.

Returns:

The newly created pntos.api.StandardStateBlock or None when no state block can be produced with the given block_index, engine, and config_group.

Return type:

StandardStateBlock | None

abstract new_virtual_block(virtual_block_index, source_label, target_label, config_group)

Generate a newly created pntos.api.VirtualStateBlock.

This virtual state block is used to convert a set of states from one representation to another.

Parameters:
  • virtual_block_index (int) – Since the pntos.api.StandardStateModelProvider can create different kinds of virtual state blocks, the virtual_block_index parameter is used to select which kind of virtual state block to create a new instance of. The pntos.api.StandardStateModelProvider.virtual_block_identifiers field contains identifying strings for the kinds of virtual state blocks. For example, if the model can create 45 different virtual state blocks, the identifier of the last virtual state block that can be created is found in virtual_block_identifiers[44]. An instance of this virtual state block can be created by calling new_virtual_block(self, 44, ...). Note that 0 <= virtual_block_index < len(virtual_block_identifiers).

  • source_label (str) – The label of the state block or virtual state block whose states this virtual state block transforms.

  • target_label (str) – A unique identifier for this virtual state block.

  • config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new virtual state block. If the virtual state block requires no outside configuration, config_group may be None.

Returns:

The newly created pntos.api.VirtualStateBlock or None when no virtual state block can be produced with the given virtual_block_index and config_group.

Return type:

VirtualStateBlock | None

class pntos.api.StateModelingPlugin

Bases: CommonPlugin, ABC

A pntos.api.CommonPlugin subclass that generates state model providers.

abstract is_fusion_type_supported(fusion_type)

Check if the plugin supports a given type of fusion. See StateModelProviderType.

Parameters:

fusion_type (StateModelProviderType)

Returns:

bool

abstract new_state_model_provider(fusion_type)

Generate a state model provider.

Parameters:

fusion_type (StateModelProviderType) – Specifies the type of fusion that the returned value will support. For example, if the user passes in STANDARD_MODEL, then the returned value will be an implementation of pntos.api.StandardStateModelProvider.

Returns:

A state model provider which implements the specified type or None if fusion_type is not supported (is_fusion_type_supported() can be used to check fusion_type).

Return type:

StateModelProviderType | None

class pntos.api.VirtualStateBlock

Bases: ABC

A class used to convert a set of states from one representation to another.

States are converted using a mapping function \(f\) to convert estimates, and the Jacobian of \(f()\) to map covariances (note that this implies that the order/units of terms in the estimate vector and covariance matrix are the same). Each instance is associated with two labels, source and target, where source is the label attached to the quantity to be transformed, and target is the label attached to the result. Typically used with a pntos.api.StandardFusionEngine where source refers to a real pntos.api.StandardStateBlock and target refers to some representation that is advantageous for some other element, such as a pntos.api.StandardMeasurementProcessor, to use.

Variables:
  • source (str) – The label associated with the representation this instance can transform from.

  • target (str) – The label associated with the representation this instance can transform to.

Note

This class must have an operational __deepcopy__ method. For most classes, the default __deepcopy__ method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own __deepcopy__ (such as a C object wrapped to Python), then the class will need to implement a custom __deepcopy__ which properly copies all fields.

abstract receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_virtual_state_block_aux_data() is called with a label corresponding to this pntos.api.VirtualStateBlock ‘s target.

Parameters:

aux (list[Message | None])

abstract convert(estimate_with_covariance, time)

Convert a full estimate/covariance pair.

Parameters:
  • estimate_with_covariance (EstimateWithCovariance) – Estimate and covariance to convert.

  • time (TypeTimestamp) – Time that estimate_with_covariance is valid at.

Returns:

The converted value.

Return type:

EstimateWithCovariance

abstract convert_estimate(estimate, time)

Convert just an estimate vector.

Parameters:
  • estimate (NDArray[float64]) – Estimate vector to convert, Nx1.

  • time (TypeTimestamp) – Time that estimate is valid at.

Returns:

The converted vector, Mx1.

Return type:

NDArray[float64]

abstract jacobian(estimate, time)

Obtain the Jacobian of the transform performed by this instance.

The Jacobian is calculated at an instance in time, given an estimate to differentiate with respect to.

Parameters:
  • estimate (NDArray[float64]) – Estimate vector associated with the return value of source, Nx1.

  • time (TypeTimestamp) – Time that estimate is valid at.

Returns:

An MxN matrix that may be used to pre-multiply estimate to obtain an M length vector in target representation (to first order).

Return type:

NDArray[float64]

class pntos.api.TransportPlugin

Bases: CommonPlugin, ABC

Transport plugin.

A plugin that abstracts a network transport. listening for sensor data off the wire and sending data back to the sensors as needed.

abstract start_listening()

Start listening.

Start listening to the transport that this plugin implements, calling the appropriate controller function as data streams in.

abstract stop_listening()

Disable listening.

Disable listening to the transport that was previously started in a call to start_listening().

abstract broadcast_message(message, channel_name=None)

Send a message back out to the sensor from pntOS.

Parameters:
  • message (Message) – The message to send.

  • channel_name (str | None, optional) – The desired channel. If channel_name is None the implementation may decide where message should be routed, if anywhere. For example, a serial cable might send all messages to a single destination.

class pntos.api.UiPlugin

Bases: CommonPlugin, ABC

A plugin for a UI that is integrated directly into pntOS.

While it is always possible to write a GUI that listens to pntOS outputs and interacts with it externally, this plugin allows users to write a GUI that has direct access to pntOS via the plugin API. This allows for low latency and high performance GUI/UIs to be generated. Note that this plugin is designed for developer/research style UIs and not production environments. A user display in a production environment is better modeled as a pntos.api.PlatformIntegrationPlugin, as that is designed to represent requests from the system and not simply status updates.

abstract requires_main_thread()

Check if this plugin needs to run on the main thread.

Some systems require GUI applications to run on the main thread. This method can be used to query whether or not this plugin must be run on the main thread. If this method returns True, then run_main_thread() must be called from the main thread in order to start this plugin.

Returns:

True if plugin needs to run on the main thread, False otherwise.

Return type:

bool

abstract run_main_thread()

Start plugin on the main thread.

This method should only be called if requires_main_thread() returns True. This method should only be called from the main thread.

class pntos.api.UtilityPlugin

Bases: CommonPlugin, ABC

A plugin that performs a generic utility function.

A utility plugin performs implementation-specific functions that may require access to pntOS resources (such as the registry) via the pntos.api.CommonPlugin API. Otherwise, this plugin has no other API-defined functionality.

class pntos.api.RegistryValueType

A TypeVar of the types allowed in pntos.api.KeyValueStore.

A TypeVar is particularly for cases where a method needs to guarantee that the type on an input is the same as the returned type.

Example:

For example, pntos.api.KeyValueStore.get_value() needs to guarantee that the input and the return types are the same. Thus, pntos.api.KeyValueStore.get_value() would be a good place to use RegistryValueType in the type description:

def get_value(
    self, key: str, value_type: type[RegistryValueType]
) -> RegistryValueType | None
class pntos.api.RegistryValueTypeUnion

This is a union of all types allowed in pntos.api.KeyValueStore.

This is particularly for cases where a method does not need to guarantee that the type on an input is the same as the returned type.

Example:

For example, pntos.api.KeyValueStore.set_value() does not need to guarantee that the input and the return type are the same since it returns None. Thus, pntos.api.KeyValueStore.set_value() would be a good place to use RegistryValueTypeUnion in the type description:

def set_value(self, key: str, value: RegistryValueTypeUnion) -> None
class pntos.api.FusionEngineType

Enumerates the types of fusion engines.

Currently only StandardFusionEngine is defined, but this TypeVar also includes “Any” in the type list for future compatibility.

class pntos.api.FusionStrategyType

Enumerates the types of fusion strategies.

Currently only StandardFusionStrategy is defined, but this TypeVar also includes “Any” in the type list for future compatibility.

class pntos.api.PluginType

An union of all the types of plugins.

Can be used by the logging plugin to print which plugin the message originated from.

class pntos.api.InertialType

An enumeration of the types of inertials an inertial plugin could provide.

“Any” is included for future compatibility.

class pntos.api.InitializationType

An enumeration of the types of initializers an initializer plugin could provide.

“Any” is included for future compatibility.

class pntos.api.StateModelProviderType

An enumeration of the types of state model providers a state modeling plugin could provide.

“Any” is included for future compatibility.

class pntos.api.GenXandP
A function of type: ``Callable[[list[str]], EstimateWithCovariance | None]``

Returns the estimate and covariance associated with the states of block_labels within a particular measurement processor or state block. This is used to lazily evaluate estimate and covariance.

Args:
block_labels (list[str]): Labels for state blocks to generate estimate and

covariance for.

Returns:

Estimate and covariance of the provided block_labels. Returns None if any label in block_labels does not correspond to a valid block.