pntOS Python API
Python API of pntOS.
- class pntos.api.CommonPlugin
Bases:
ABCCommon definitions that all plugins must provide.
This structure should not be used directly (except in the case of a utility plugin), but instead is composed as the first field on all of the concrete pntOS plugin structures. For example, the transport plugin is specified as:
class TransportPlugin(CommonPlugin, ABC): def init_plugin(...): ...init_plugin implementation... def ...other function implementations...
Thus this class defines a set of functions and variables that all plugins have. The
pntos.api.CommonPlugin.init_plugin()function is guaranteed to be called by pntOS when the plugin is first loaded into memory by the system.Example
When defining a new transport plugin, the plugin writer is responsible for implementing all fields on the
pntos.api.TransportPluginclass. Thus, the fields of thepntos.api.CommonPluginnested on thepntos.api.TransportPluginare implemented by the plugin writer.- Variables:
identifier (str) – A string identifier uniquely identifying this plugin. This string will be used to determine the unique space this plugin receives in the system config.
- abstract init_plugin(plugin_resources_location=None, mediator=None)
The first plugin method called; initializes the plugin.
A function that will be called by pntOS once and only once when it first initializes the plugin before any other functions on the plugin are called. Here the plugin may do dynamic runtime initialization of its members, and is given the full path to the location of a data folder specific to the plugin, in case it needs to acquire additional files. An instance of
pntos.api.Mediatorwill be passed which the plugin should save off for later use. Whenever the plugin needs to make a request of pntOS, it should use one of the fields in thepntos.api.Mediatorinstance received by the plugin in this function call.Note
Implementation note: This inversion of control allows the controller to implement the
pntos.api.Mediatorclass, and abstracts away the return communication channel from the plugin to the rest of the system. Thus, the plugin need only implementpntos.api.Mediatorby simply saving a copy of the functions that the controller passes into it. Then, when the plugin later needs to make requests of the system, it may call a function in its copy ofpntos.api.Mediator, without needing any knowledge of how the controller implementedpntos.api.Mediator. This allows controllers to implement arbitrary concurrency models, including single-threaded, multi-threaded, multi-process, and distributed computing.- Parameters:
plugin_resources_location (str | None, optional) – Specifies the location of the plugin’s resources. The location is determined by the controller plugin, and therefore is controller implementation specific. Plugin implementers wishing to provide a resource to their plugin should consult the documentation of the controller to determine which location scheme will be passed into this function.
mediator (Mediator | None, optional) –
None-able if the plugin type being initialized is apntos.api.ControllerPlugin. Non-controller plugins may assume that the mediator parameter is notNone.
- abstract shutdown_plugin()
A function that will be called by pntOS when it is done using the plugin.
Here the plugin should release any resources it has acquired. When this function call returns pntOS may only call the destructor function (it will not call any other functions of this plugin). The plugin may not call any function on any other plugin, mediator, or use any resource that was given to it by pntOS after it returns from this function.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- class pntos.api.EstimateWithCovariance(type, estimate, covariance)
Bases:
objectA container for holding an estimate and covariance.
- Variables:
type (EstimateWithCovarianceType) – Describes how the fields in this struct are used.
estimate (NDArray[float64]) – An array of doubles representing an estimate vector. Usage depends on the
typefield.covariance (NDArray[float64]) – An array of doubles representing a square covariance matrix. Data is stored in row major form. Usage depends on the
typefield.
- estimate
An estimate vector.
- covariance
A covariance matrix, describing the errors in the estimate.
- class pntos.api.EstimateWithCovarianceType(value)
Bases:
IntEnumDescribes how the fields in
pntos.api.EstimateWithCovarianceare used.- EWC_GENERIC = 0
Contains a mean (estimate) and covariance describing a multivariate Gaussian distribution.
EstimateWithCovariance.estimateis size Nx1 where N is the length field.EstimateWithCovariance.covarianceis size NxN where N is the length field.
- EWC_ATTITUDE_QUAT = 1
Contains a mean (estimate) and covariance describing a rotation modeled by a multivariate Gaussian distribution, but the estimate is in quaternion form and the covariance is in tilt error form.
EstimateWithCovariance.estimateis size 4x1.EstimateWithCovariance.covarianceis size 3x3, in \(\text{radians}^2\).
- class pntos.api.KeyValueStore
Bases:
AbstractContextManager[KeyValueStore]A key-value store implemented with a string-pair key.
This key-value store is intended to function as an expanded python dictionary.
Each value can be looked up by an associated key (string). Values can be of any type specified by
RegistryValueType/RegistryValueTypeUnion.Example
For example, to store a string value “foo” in the key-value store under the key “k1”, one would write either of the following:
store.set_value("k1", "foo") store["k1"] = "foo"
At this point, the key-value store would have recorded the value into its internal data storage. Later a user could call either of these lines:
foo = store.get_value("k1", str) foo = store["k1"]
to retrieve the value at key “k1”.
The advantage of
get_valueis that if the conversion is possible, the user can specify a different type to return than the type that was originally stored in the key-value store. For example, these two lines demonstrate a user saving an integer to the key-value store and then retrieving it as a string:store["k2"] = 42 val = store.get_value("k2", str) # val now equals "42"
In general, a
pntos.api.KeyValueStoreis generated by apntos.api.Registryand not directly by other code. Thepntos.api.Registrywill return key/value stores on demand, utilizing the data backing store chosen by the plugin to store data (either ephemerally in memory or permanently in persistent storage). In general, it is only valid to call the getters/setters on apntos.api.KeyValueStoreduring a batch operation. Seepntos.api.Registryfor more information.- Variables:
data_format (KeyValueStoreDataFormat) – The data format that is used by the
set_raw()andget_raw()methods.
- abstract keys()
Get the array of keys which currently exist in this store.
- Returns:
Returns the keys in the store or
Noneif no keys are present.- Return type:
list[str] | None
- abstract __contains__(key)
Check whether or not a given key exists in the store.
Particularly, this function allows the user to use python “if in” statements:
if key in store: ...
- Parameters:
key (str)
- Returns:
bool
- abstract get_value(key, value_type)
Get the value stored at
keywith return typevalue_type.Example
For example, to access altitude in a
pntos.api.KeyValueStorenamedkv_storeas an integer:altitude = kv_store.get_value("altitude", int)
- Parameters:
key (str)
value_type (type[RegistryValueType])
- Returns:
Returns
Noneif the key is not available. The return is guaranteed to not beNoneif called with a valid key (which can be checked withpntos.api.KeyValueStore.__contains__()) and if the store can convert the value to the requested type.- Return type:
pntos.api.RegistryValueType| None
- abstract __getitem__(key)
Gets an item in the key-value store.
This function enables python bracket indexing to return a value from the store for a given key.
Example
For example:
if key in store: foo = store[key] # foo is now the value stored at key
- Parameters:
key (str)
- Returns:
This is guaranteed to return a value of the same type as
get_type()returns for the given key, if the key exists. Ifget_type()returnsNone(indicating type information is not available), then this method will only return values as strings or Messages.- Return type:
RegistryValueTypeUnion| None
- abstract get_raw(key=None)
Get the value for the given
keyas an array of bytes.- Parameters:
key (str | None, optional)
- Returns:
The return format will conform to the definition in
pntos.api.KeyValueStore.data_format. ReturnsNoneif the given key is not available. The return is guaranteed to not beNoneif called with a valid key, which can be checked withpntos.api.KeyValueStore.__contains__():if key in store: ...
If
keyisNone, then this function will return all of the keys and values in the group passed topntos.api.Registry.batch_start()and will be formatted to conform to keys and values as defined inpntos.api.KeyValueStore.data_format.- Return type:
bytes | None
- abstract set_value(key, value)
Set the given key to the provided value.
- Parameters:
key (str)
value (RegistryValueTypeUnion) – Can be of any type specified by
pntos.api.RegistryValueTypeUnion.
- abstract __setitem__(key, value)
Set an item in the key-value store.
Same functionality as
set_value(), but allows python bracket indexing to set values in the store.Example
For example:
store["key"] = 42
- Parameters:
key (str)
value (RegistryValueTypeUnion) – Can be of any type specified by
pntos.api.RegistryValueTypeUnion.
- abstract set_raw(key, bytes)
Set the given key to the provided value.
- Parameters:
key (str | None) – If
keyisNone, then the contents ofbytesmust include both keys and values and must be formatted to conform topntos.api.KeyValueStore.data_format.byteswill then be used to set the corresponding keys and values in the group passed topntos.api.Registry.batch_start().bytes (bytes) – Must be formatted to conform to the definition of a value in
pntos.api.KeyValueStore.data_format.
- abstract remove_key(key)
Remove the given key from the registry.
- Parameters:
key (str)
- Returns:
Trueifkeyis successfully removed, andFalseotherwise. Keys may fail to be removed if the key does not currently exist, or the backend is unable to remove the key.- Return type:
bool
- abstract batch_end()
Ends a batch operation.
Ends a batch operation started with a
pntos.api.Registry.batch_start()call. After calling this, the user should not use thepntos.api.KeyValueStorethey received frompntos.api.Registry.batch_start()again without callingpntos.api.KeyValueStore.batch_restart()on thepntos.api.KeyValueStore.If keys in the batch were acted upon with
set_permanent()turned on, and the plugin supports permanent storage, this call will save changes to permanent storage ifset_permanent()isTrueduring the call tobatch_end(). Enacts equivalent ofset_permanent(self,false)before return. If anyrequest_notify()observers have been added, they will be processed prior to this call returning.Example
Example 1: Flushing to permanent storage on
batch_end():store = registry.batch_start("group") ...work... store.set_permanent(true) # if not disabled, flush on batch_end ...work... store.batch_end() # will flush values
Example 2: Not flushing to permanent storage on
batch_end():store = registry.batch_start("group") ...work... store.set_permanent(true) # tag some values ...work... store.set_permanent(false) # do not flush on batch_end store.batch_end() # will not flush values
In the second example above, values set with “set” methods after the initial
set_permanent()call are still stored for potential saving to permanent storage.
- abstract batch_restart()
Restarts a batch.
Restarts a batch that was previously started with
pntos.api.Registry.batch_start()and subsequently ended withpntos.api.KeyValueStore.batch_end(). This method is likely much more efficient thanpntos.api.Registry.batch_start()(depending on the registry implementation) as thepntos.api.Registry.batch_start()method must find the store again given the group name.Note
While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call
batch_end()as soon as possible after they are done getting/setting values in the returnedpntos.api.KeyValueStore.
- abstract request_notify(key, callback)
Register a callback which gets called each time a key in the store is updated.
Allows plugins to respond asynchronously to parameter updates. Returns
Trueif the notifier was successfully registered, andFalseif the store is unable to notify the requester. IfkeyisNone, then the callback will be invoked when any key in the batch’s group is modified. Otherwise, the callback will only be invoked when the given key is modified.Note
The callback must not attempt to set any values inside the
pntos.api.KeyValueStore, as the callback is likely being invoked during the processing of another operation. The callback should endeavour to store off the updated keys/values as quickly as possible and return, leaving the processing of the updates to another context or thread when possible. Callingpntos.api.Mediatorwithin the callback may be disallowed by the controller implementation and lead to undefined behavior.- Parameters:
key (str | None)
callback (Callable[[str, list[str], KeyValueStore], None]) –
A function with a function definition compatible with:
def my_callback(group: str, modified_keys: list[str], kv: KeyValueStore) -> None: ...
- Returns:
Trueif the notifier was successfully registered, andFalseif the store is unable to notify the requester.- Return type:
bool
- abstract remove_notify(key, callback)
Removes a notification as requested by
request_notify().The group and callback must match the parameters passed to
request_notify()in order to successfully remove a callback.Note
This will remove all matching callbacks that have a matching group and callback. If a user registers the same callback twice this will remove both.
- Parameters:
key (str | None)
callback (Callable[[str, list[str], KeyValueStore], None])
- Returns:
Trueif removal was successful andFalseif it was not.Falsewill be returned if a callback did not exist for the group.- Return type:
bool
- abstract __delitem__(key)
Delete the item with the specified key.
Must remove the given key and the associated value from the store, along with any callbacks or permanency settings.
This enables the python
deloperator.Example
For example:
for key in keys_to_remove: del store[key]
- Parameters:
key (str)
- abstract __iter__()
Return an iterator over the keys in the store.
This allows for easy iteration over the key-value store.
Example
For example:
for key in store: ...
- Returns:
Iterator
- abstract __len__()
Return the number of items in the store.
This allows the use of python’s
lenfunction.Example
For example:
num_elements = len(key_val_store)
- Returns:
int
- abstract clear()
Remove all items from key-value store.
- abstract set_permanent(permanent)
Tag values modified with “set” methods as permanently stored.
Configure the
pntos.api.KeyValueStoreto tag values modified with “set” methods as permanently stored (as opposed to ephemerally stored in memory). Only values acted upon with “set” methods whileset_permanent()isTruewill be tagged. Values will be flushed according to registry configuration settings or perbatch_end()API. Returns the value of the permanent storage configuration. Callers should check this to verify if the set was successful.Example
Tagging specific keys to be permanently stored:
store: PntosKeyValueStore = registry.batch_start("group") store.set_value("key1",1234.56) # does not tag this value as permanently stored store.set_permanent(True) # start tagging set calls as permanently stored store.set_value("key1",987.65) store.set_value("key2",123) # both key1 and key2 values tagged store.set_permanent(False) # disable permanent storage store.set_value("key1",456.78) # key1 = 456.78 is value of key1 in store # key1 = 987.65 tagged to be permanently stored # key2 = 123 tagged to be permanently stored
- Parameters:
permanent (bool)
- Returns:
The value of the permanent storage configuration. Callers should check this to verify if the set was successful.
- Return type:
bool
- abstract values()
Returns a list of the key-value store’s values.
This method is useful for unloading all values from a key-value store.
Example
For example:
def get_mean_of_kv_store(kv: KeyValueStore) -> float | None: '''Returns the mean of a key-value store if all values are int or float''' values = kv.values() if all(isinstance(x, (int, float)) for x in values): return sum(values) / len(values) return None
- Returns:
A list of all values in the store.
- Return type:
list[RegistryValueTypeUnion]
- abstract items()
Returns a list of the items (key-value pairs) in the store.
This method is useful for when you need to iterate over both keys and values in a dictionary.
Example
For example:
for key, value in my_dict.items() printf('Key: {key}, Value: {value}')
- Returns:
A list of the key-value pairs (as tuples) in the store.
- Return type:
list[tuple[str, RegistryValueTypeUnion]]
- abstract get_type(key)
Returns the type of a value in the KeyValueStore.
This is helpful for situations when it is advantageous to know the type of a value in the store without need for the value itself.
Example
For example, it might be useful for these situations where different callbacks are needed depending on what type is in the store:
if key in kv_store: val_type = kv_store.get_type(key) if val_type is int: kv_store.request_notify(key, int_callback) elif val_type is Message: kv_store.request_notify(key, message_callback) else: kv_store.request_notify(key, generic_callback)
- Parameters:
key (str)
- Returns:
If key exists in registry, this is guaranteed to return the return type of
pntos.api.KeyValueStore.__getitem__()for the given key. Else, it returnsNoneif type information is not available or key does not exist in registry.- Return type:
type[RegistryValueTypeUnion] | None
- data_format
Defines the underlying format which the data in the key-value store will be stored as.
- class pntos.api.KeyValueStoreDataFormat(value)
Bases:
IntEnumThe format of data returned or expected.
An enum that specifies the format of data returned/expected in the
pntos.api.KeyValueStore.get_raw()andpntos.api.KeyValueStore.set_raw()methods. This value is otherwise unused when querying a key-value store.- INI = 0
Keys and their corresponding values are returned according to the INI file format specification.
- UNSPECIFIED = 1
An opaque type that is undefined by the implementer.
- class pntos.api.LoggingLevel(value)
Bases:
IntEnumAn enumeration of the types of log outs that are available in pntOS.
- ERROR = 0
This output indicates the program has entered an error state, and likely needs to be inspected to discover what went wrong.
- WARN = 1
This output is designed to warn of a possibly unintended state that may be harmless or be indicative of a bug.
- INFO = 2
This output is designed to be informational, and may indicate correct operation.
- DEBUG = 3
This output is designed to assist in debugging plugins by providing additional information about state and behavior which would be otherwise unnecessary.
- class pntos.api.Mediator
Bases:
ABCA set of callbacks which are handed to a pntOS plugin upon initialization.
When a plugin is first initialized into pntOS, it is guaranteed that the plugin will be passed an instance of this struct via an invocation of
pntos.api.CommonPlugin.init_plugin()(Seepntos.api.CommonPluginfor more information). The plugin may then use the set of function calls in this class to make requests of thepntos.api.ControllerPlugin.All of the functions on this class (and any returned values from those functions) are guaranteed to be thread-safe for use by all plugins. Thus, after a pntOS plugin has received a copy of a
pntos.api.Mediatorit can freely call the functions contained therein without doing any explicit locking. This thread safety is implemented by thepntos.api.ControllerPluginwhen it creates the mediator before passing them to other plugins.Callers must still take care to only call functions in
pntos.api.Mediatorwhich they are not themselves responsible for implementing. The details of which plugins are used in the implementation of any particular function on this struct is decided by thepntos.api.ControllerPlugin, and thus is implementation specific to thepntos.api.ControllerPluginused.- Variables:
registry (Registry) – A
pntos.api.Registryobject that can be used to update keys/values in the pntOS global registry.
- abstract property filter_description_list
Request a list of strings describing the solutions available.
One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).Strings should include a substring indicating the type of solution returned. This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
If the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the solutions available.
- Return type:
list[str]
- abstract request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The times at which to return solutions.
filter_description (str | None, optional) – To select which filter(s) to request solutions from, enter a valid filter description string in
filter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particular implementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. Some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagearray may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- abstract process_pntos_message(message)
Send a new message to the system for arbitrary processing.
For example, this function is useful for plugins who have just received new sensor data that they wish to relay to the system to be used in a sensor fusion solution.
- Parameters:
message (Message)
- abstract broadcast_aspn_message(message, transport=None, destination_identifier=None)
Request that pntOS broadcast the provided message out to the network.
- Parameters:
message (Message)
transport (str | None, optional) – The identifier of a transport plugin that the message should be routed to. The transport parameter should match the
pntos.api.CommonPlugin.identifierstring of apntos.api.TransportPluginactive in the system. If the transport parameter isNone, this indicates that the message should be broadcast to all available transports.destination_identifier (str | None, optional) – A transport-specific identifier that allows transports to determine how to route the message. If the destination transport has the concept of a channel or topic,
destination_identifiershould be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner defined by the destination transport. Ifdestination_identifierisNone, then the transport should output the message in the “default” output channel/topic and route being used by pntOS.
- abstract log_message(level, message)
Log a message.
Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).
- Parameters:
level (LoggingLevel)
message (str)
- class pntos.api.Message(wrapped_message, source_identifier)
Bases:
objectA container for an ASPN message.
This container may contain either proper ASPN messages which are part of the ASPN data model, or extension messages specific to pntOS which augment ASPN.
- Variables:
wrapped_message (AspnBase) – Either an ASPN message or a pntOS ASPN Extension message.
source_identifier (str) – Indicates where this message came from. If the message originated from a
pntos.api.TransportPluginand the underlying transport has the concept of a channel or topic, this field should be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner by the originating plugin that created the message.
- class pntos.api.Registry
Bases:
ABCA registry of key/value data which is organized by (string) groups.
In order to get/set a key in the registry, one must call
pntos.api.Registry.batch_start()with the group the key is stored under and then use the resultingpntos.api.KeyValueStoreto get/set the key/value pair. When one is done accessing keys in thepntos.api.KeyValueStore, they must callpntos.api.KeyValueStore.batch_end(). It is not permitted to access any member inside thepntos.api.KeyValueStoreafter a batch has ended. If a user has ended a batch and then desires to access thepntos.api.KeyValueStoreagain, they may use thepntos.api.KeyValueStore.batch_restart()method.- abstract batch_start(group)
Begin a batch get/set operation.
Begin a batch get/set operation wherein the user may make any number of modifications to the keys/values in the
group. The registry implementation may wait to batch these requests untilpntos.api.KeyValueStore.batch_end()is called for better performance. For example, a lock may be obtained at the beginning of abatch_start()and not released until apntos.api.KeyValueStore.batch_end()call is encountered. Thus, a plugin that callsbatch_start()should endeavour to make its calls to theset_,get_, andregistermethods as quickly as possible and callpntos.api.KeyValueStore.batch_end()immediately, as doing otherwise may be locking other plugins out of access to the registry (depending on the registry plugin implementation). If a plugin supportspntos.api.KeyValueStore.request_notify(), then notifications of updates may be suspended until the batch ends. After a batch is ended, the returnedpntos.api.KeyValueStorecan still be used to access the store viapntos.api.KeyValueStore.batch_restart().Note
While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call
pntos.api.KeyValueStore.batch_end()as soon as possible after they are done getting/setting values in the returnedpntos.api.KeyValueStore.- Parameters:
group (str)
- Returns:
KeyValueStore
- abstract property group_array
Get the array of groups which currently exist.
- Returns:
The array of groups which currently exists. Returns
Noneif no groups exist.- Return type:
list[str] | None
- abstract has_group(group)
Checks whether or not a given group has had any values added to it (for any key).
- Parameters:
group (str)
- Returns:
bool
- abstract request_notify_new_group(callback)
Register a callback which gets called each time a new group is made in the registry.
- Parameters:
callback (Callable[[str], None])
- Returns:
Trueif the notifier was successfully registered, andFalseif the registry is unable to notify the requester.- Return type:
bool
- class pntos.api.ControllerPlugin
Bases:
CommonPlugin,ABCController plugin.
An implementation of a primary controller in charge of defining the usage of all other pntOS plugins.
In ordinary operation, an app will import plugins, initialize a controller plugin, and then call
pntos.api.ControllerPlugin.take_control()on the controller plugin, passing in a list ofpluginsthat it imported.From that point forward, the controller is responsible for all activity. It may use any or none of the plugins in the
pluginslist as desired. In general, it may do anything it wants, within the following guidelines:The controller should not load new plugins - it should work with the
pluginspassed to it in thetake_control()parameters.The controller must call
pntos.api.CommonPlugin.init_plugin()on any plugin it wishes to use prior to using any other functionality on that plugin. The controller must pass apntos.api.Mediatorthat the plugin may use to communicate back to the controller. This callback design allows the controller to abstract away the concurrency model - in particular, a callback function may be anything from a direct invocation of a Python function to a shim that utilizes IPC channels to communicate to process-separated or machine-separated plugins.
Outside of these requirements, the controller defines any and all I/O it supports, which pntOS plugins are loaded / used, and the type of fusion being done. The controller can be hard-coded to support only a specific sensor configuration or written generically to support arbitrary run-time environment sensing. Outside of some initialization in the app, the controller is the conceptual “main” function.
When the controller is provided a
pntos.api.PlatformIntegrationPluginas one of the plugins in thepluginslist passed totake_control(), that indicates to the controller that platform-specific control logic exists and must be used. In this case, the controller’s primary objective’s are to:Pick the concurrency model being used (multiprocessed, multithreaded, coroutines, single-threaded, etc.).
Spin up the concurrency primitives to implement the chosen concurrency type.
Provide a
pntos.api.Mediatorto all plugins in theirpntos.api.CommonPlugin.init_plugin()call that enables inter-plugin communication.
After that task is accomplished, the controller should call the
pntos.api.PlatformIntegrationPlugin.take_control()function and allow thepntos.api.PlatformIntegrationPlugin(PIP) to actively call functions on thepluginslist. Once thepntos.api.PlatformIntegrationPlugin.take_control()has been called, the controller must lock/synchronize the controller’s and the PIP’s access to function calls in thepluginslist, such that any action the PIP might take will not interfere with the controller’s actions (or, equivalently, actions that the mediator provided by the controller is taking). One simple approach to synchronizing access to the plugins list across the controller and PIP is for the plugins list provided to the PIP to be a facade that actually routes messages through to the mediator, such that the mediator handles all requests and can synchronously dispatch them. More efficient approaches exist, however, it is the responsibility of the controller to make sure that all accesses the PIP makes to resources passed to it are synchronized with respect to the controller’s chosen concurrency primitive.- abstract take_control(plugins, plugin_resources_locations=None, initial_config=None)
Takes over primary control of the program from the app.
Takes over primary control of the program from the app, using the
pluginsto process data, generate fused estimates, and ultimately produce and output PNT solutions.take_control()must use the plugins passed to it and construct a full pntOS system. Please see the description of thepntos.api.PlatformIntegrationPlugin(PIP) for a description of duties of this function compared to the duties of thepntos.api.PlatformIntegrationPlugin.take_control()method (if a PIP is in thepluginslist).- Parameters:
plugins (list[CommonPlugin]) – An array of plugins available to the controller.
plugin_resources_locations (list[str | None] | None, optional) – A list of strings which represent a list of locations, one for each plugin in
plugins, where those plugins may find auxiliary data if needed. The array can beNone, otherwise the array is of the same length asplugins. Each string may beNone, filesystem paths, or adhere to some scheme, such as the URI scheme, for defining the resource’s location.initial_config (str | None, optional) –
Represents a source of values to initialize the primary pntOS configuration. This could be
Noneor astr. Examples of possible values for the latter are:The entire config.
A local file path on systems which support them.
A string adhering to the URI scheme.
- class pntos.api.CrossCovariances(block_labels, cross_covariances)
Bases:
objectA container for a set of covariances relating a StateBlock to one or more other StateBlocks.
For example, suppose that some StateBlock named
Aexisted containingastates. Then this structure could define the cross covariance ofAwith respect to other StateBlocks namedB(withbstates) andC(withcstates). In that case,block_labelswould be an array of 2 stringsBandC, andcross_covarianceswould be an array of two matrices: the cross-covariance matrix ofBandA, with shape[b, a]and the cross-covariance matrix ofCandAwith shape[c, a].- Variables:
block_labels (list[str]) – A list of labels of the
pntos.api.StandardStateBlockthis structure contains the cross-covariances for.cross_covariances (list[NDArray[float64]]) – A list of cross-covariance matrices between a single StateBlock and the set of StateBlocks listed in
block_labels.
- block_labels
The labels of the state blocks that the entries in
cross_covariancesrelate to.
- cross_covariances
The covariances between the state blocks.
- class pntos.api.FusionPlugin
Bases:
CommonPlugin,ABCPlugin that provides a fusion engine.
A fusion engine allows data from multiple sensors to be integrated into a unified state estimate.
- abstract is_fusion_type_supported(fusion_type)
Check if the plugin supports a given type of fusion.
- Parameters:
fusion_type (type[FusionEngineType])
- Returns:
bool
- abstract new_fusion_engine(fusion_type)
Create a fusion engine.
- Parameters:
fusion_type (type[FusionEngineType]) – This parameter specifies the type of fusion engine that will be returned.
- Returns:
The
fusion_typeparameter specifies the type of fusion engine that will be returned. For example, if the user passes inpntos.api.StandardFusionEngine, then an implementation ofpntos.api.StandardFusionEnginewill be returned. ReturnsNoneiffusion_typeis not supported by this fusion plugin (is_fusion_type_supported()can be used to check the type before calling this method). Otherwise the return is guaranteed to not beNone.- Return type:
FusionEngineType | None
- class pntos.api.StandardFusionEngine
Bases:
ABCAn implementation of a fusion engine that supports the standard fusion model.
Assumes the system is described by discrete-time matrices and noise inputs are zero-mean white Gaussian. In addition, all covariance matrices / mean vectors are descriptions of jointly-Gaussian multivariate distributions. All noise sources are jointly-Gaussian distributed.
This object requires a
pntos.api.StandardFusionStrategyto work. Some implementations may be able to provide their own. Others will require a strategy to be provided by setting theStandardFusionEngine.strategyfield. It is possible to check whether a fusion engine needs to be provided a fusion strategy by checking theStandardFusionEngine.strategyfield (if it isNonethen this fusion engine needs to be provided a strategy). WhileStandardFusionEngine.strategyisNone, all other methods are unsafe to be called.Note
This class must have an operational
__deepcopy__method. For most classes, the default__deepcopy__method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own__deepcopy__(such as a C object wrapped to Python), then the class will need to implement a custom__deepcopy__which properly copies all fields.Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract property time
Get the current time of the filter.
- Returns:
TypeTimestamp
- abstract property strategy
The underlying algorithm used for Bayesian inference.
- Returns:
The fusion strategy is the type of filter (EKF, UKF, etc.).
- Return type:
StandardFusionStrategy | None
- abstract property num_states
Get the total number of states currently in the fusion engine.
Virtual state blocks do not affect this result.
- Returns:
The total number of states currently in the fusion engine.
- Return type:
int
- abstract property state_block_labels
Get a list of
pntos.api.StandardStateBlocklabels that have been added to this fusion engine.- Returns:
A list of the
pntos.api.StandardStateBlocklabels that have been added to this fusion engine. ReturnsNoneif no state blocks have been added. Guaranteed to not returnNoneifnum_statesis a value other than 0.- Return type:
list[str] | None
- abstract add_state_block(block, initial_estimate_covariance, cross_covariances=None)
Add the given
pntos.api.StandardStateBlockto the fusion engine.This will expand the state vector being estimated by the value of
num_states.- Parameters:
block (StandardStateBlock) – The
pntos.api.StandardStateBlockto be added to the fusion engine.initial_estimate_covariance (EstimateWithCovariance) – Contains the initial conditions of the states, with
initial_estimate_covariance.estimatebeing an Nx1 matrix andinitial_estimate_covariance.covariancebeing an NxN matrix, where N isblock.num_states.cross_covariances (CrossCovariances | None, optional) – An optional parameter which, if non-
None, contains a description of the newly added StateBlock’s cross covariances with respect to a set of StateBlocks which already exist inside the filter (specified bycross_covariances.block_labels). If thecross_covarianceparameter isNone, cross covariance between the existing states and the added states will be set to zeroes.
- abstract get_state_block_estimate(block_label)
Get the estimate associated with a state block.
Find a
pntos.api.StandardStateBlockorpntos.api.VirtualStateBlockwithin the fusion engine matchingblock_label, and return a copy of its current estimate vector.- Parameters:
block_label (str)
- Returns:
A copy of its current estimate vector. If
block_labelreferences a virtual state block (VSB) this will return a converted estimate, converted into the VSBs coordinate frame. ReturnsNoneifblock_labeldoes not correspond to a block that has been added to the fusion engine. Guaranteed to not returnNonewhenblock_labelis in the list returned bystate_block_labelsandstrategyis notNone.- Return type:
NDArray[float64] | None
- abstract get_state_block_covariance(block_label)
Get the covariance associated with a state block.
Find a
pntos.api.StandardStateBlockorpntos.api.VirtualStateBlockwithin the fusion engine matchingblock_label, and return a copy of its current covariance matrix.- Parameters:
block_label (str)
- Returns:
A copy of its current covariance matrix. If
block_labelreferences a virtual state block (VSB) this will return a converted covariance, converted into the VSBs coordinate frame. ReturnsNoneifblock_labeldoes not correspond to a block that has been added to the fusion engine. Guaranteed to not returnNonewhenblock_labelis in the list returned bystate_block_labelsandstrategyis notNone.- Return type:
NDArray[float64] | None
- abstract get_state_block_cross_covariance(block_label1, block_label2)
Get the cross covariance between the states associated with two state blocks.
Find the
pntos.api.StandardStateBlocks within the fusion engine matchingblock_label1andblock_label2, and return the cross-covariance matrix between them.- Parameters:
block_label1 (str)
block_label2 (str)
- Returns:
The cross-covariance matrix between
block_label1andblock_label2. ReturnsNoneifblock_label1orblock_label2do not correspond to blocks that have been added to the fusion engine. Guaranteed to not returnNonewhen bothblock_label1andblock_label2are in the list returned bystate_block_labelsandstrategyis notNone.- Return type:
NDArray[float64] | None
- abstract set_state_block_estimate(block_label, estimate)
Update the estimate associated with a given state block.
Find a
pntos.api.StandardStateBlockwithin the fusion engine matchingblock_label, and change its current estimate vector.Note
This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.
- Parameters:
block_label (str)
estimate (NDArray[float64])
- abstract set_state_block_covariance(block_label, covariance)
Update the covariance associated with a given state block.
Find a
pntos.api.StandardStateBlockwithin the fusion engine matchingblock_label, and change its current covariance matrix.Note
This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.
- Parameters:
block_label (str)
covariance (NDArray[float64])
- abstract set_state_block_cross_covariance(block_label1, block_label2, covariance)
Update the covariance between two state blocks.
Find the
pntos.api.StandardStateBlocks within the fusion engine matchingblock_label1andblock_label2, and change the current covariance matrix between them.Note
This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.
- Parameters:
block_label1 (str)
block_label2 (str)
covariance (NDArray[float64])
- abstract remove_state_block(block_label)
Remove the
pntos.api.StandardStateBlockmatchingblock_label.This will reduce the state vector being estimated by the number of states that the block represents.
- Parameters:
block_label (str)
- abstract property virtual_state_block_target_labels
Gets a list of the target labels of virtual state blocks that have been added.
A label being returned by this list is not a guarantee that the virtual state block has a valid source. For that, call
has_virtual_state_block().- Returns:
A list of the target labels of virtual state blocks that have been added. Returns
Noneif no virtual state blocks have been added to this fusion engine.- Return type:
list[str] | None
- abstract has_virtual_state_block(vsb_target_label)
Checks if the fusion engine has a
pntos.api.VirtualStateBlockwith a matching target label.- Parameters:
vsb_target_label (str)
- Returns:
Trueif the fusion engine has apntos.api.VirtualStateBlockwith a matching target label,Falseif no virtual state block with matching target label exists or if one exists but is not capable of generating an estimate. That is, the VSB’s source must exist and be in a continuous chain to a concrete state block which also exists in the fusion engine in order to returnTrue.- Return type:
bool
- abstract add_virtual_state_block(virtual_state_block)
Add the given
pntos.api.VirtualStateBlockto the fusion engine.A virtual state block (VSB) convert from an underlying block coordinate frame into the VSB coordinate frame.
- Parameters:
virtual_state_block (VirtualStateBlock)
- abstract remove_virtual_state_block(vsb_target_label)
Remove the
pntos.api.VirtualStateBlockmatchingvsb_target_label.- Parameters:
vsb_target_label (str)
- abstract property measurement_processor_labels
Get a list of the labels of measurement processors that have been added.
- Returns:
list of labels of measurement processors that have been added. Returns
Noneif no measurement processors have been added to this fusion engine.- Return type:
list[str] | None
- abstract add_measurement_processor(processor)
Add a
pntos.api.StandardMeasurementProcessor.This can be used to process future measurements that correspond to
processor.label.- Parameters:
processor (StandardMeasurementProcessor)
- abstract remove_measurement_processor(processor_label)
Remove a
pntos.api.StandardMeasurementProcessorpreviously added to the fusion engine.Assumes a measurement processor was previously added via
add_measurement_processor()with the labelprocessor_label.- Parameters:
processor_label (str)
- abstract propagate(time)
Propagate the filter estimate forward in time.
May be evaluated lazily (when results are requested).
- Parameters:
time (TypeTimestamp)
- abstract update(processor_label, message)
Update the filter with the given measurement.
Will propagate first if needed to reach the time encoded inside the measurement.
- Parameters:
processor_label (str)
message (Message)
- abstract peek_ahead(time, block_labels)
Calculates the estimate and covariance at a requested time.
Uses the state blocks listed in
block_labels, without changing the state of the fusion engine or its underlying filter. Blocks are assembled in the order that the labels are passed in.If all of the following are true:
timeis equal to or after the filter time (which can be checked withtime).All labels in
block_labelscorrespond to a block that has been added to the fusion engine (which can be checked withstate_block_labels).block_labelshas at least one element.
Then the result returned is guaranteed to not be
None. Otherwise, if any of the above are false then the result will beNone.- Parameters:
time (TypeTimestamp)
block_labels (list[str]) – An array of strings.
- Returns:
EstimateWithCovariance | None
- abstract generate_x_and_p(block_labels)
Generates the current estimate and covariance.
Estimate and covariance are built corresponding to a list of StateBlock labels. Blocks are assembled in the order that the labels are passed in.
If all of the following are true:
All labels in
block_labelscorrespond to a block that has been added to the fusion engine (which can be checked withstate_block_labels).block_labelshas at least one element.
Then the result returned is guaranteed to not be
None. Otherwise, if any of the above are false then the result will beNone.- Parameters:
block_labels (list[str]) – An array of strings.
- Returns:
EstimateWithCovariance | None
- abstract give_state_block_aux_data(block_label, aux)
Route a list of messages of aux data to a
pntos.api.StandardStateBlock.- Parameters:
block_label (str)
aux (list[Message | None])
- abstract give_measurement_processor_aux_data(processor_label, aux)
Route a list of messages of aux data to a
pntos.api.StandardMeasurementProcessor.- Parameters:
processor_label (str)
aux (list[Message | None])
- abstract give_virtual_state_block_aux_data(target_label, aux)
Route a list of messages of aux data to a
pntos.api.VirtualStateBlock.- Parameters:
target_label (str)
aux (list[Message | None])
- class pntos.api.FusionStrategyPlugin
Bases:
CommonPlugin,ABCA plugin that provides computational engines for estimation.
At the high level, a fusion strategy is a computation engine that knows how to estimating one or more states, given a set of observations/measurements. For example, the EKF equations are an implementation of a fusion strategy. This plugin is a factory that produces fusion strategies on demand, which is useful for multi-filter approaches where the system may need several fusion strategies running simultaneously.
There are many ways to model sensor fusion, including very simple (linear Kalman filter) and complex (neural networks, factor graphs, etc.). This plugin aims to allow any and all of those models to be represented. It achieves this by having multiple fusion strategies, so that the user may select what kind of fusion they want. Currently, only the
pntos.api.StandardFusionStrategyis implemented, which is suitable for EKFs and filters that have similar interfaces to an EKF (such as a UKF). However, more strategy types are planned to be added in the future.- abstract is_fusion_type_supported(fusion_type)
Check if a particular fusion strategy is supported by
new_fusion_strategy().The
new_fusion_strategy()factory method on this class can create one or more types of fusion strategy. However,pntos.api.FusionStrategyPluginmay not support all types (they must support at least one). Therefore, when a user receives apntos.api.FusionStrategyPlugin, they should:Initialize the plugin by calling
pntos.api.CommonPlugin.init_plugin()(seepntos.api.CommonPluginfor more information).Call
is_fusion_type_supported()to check that the plugin supports the type that the user wants to use.If
is_fusion_type_supported()returned True, call thenew_fusion_strategy()factory method to get a new fusion strategy of the desired fusion strategy.Proceed to use the fusion strategy to do estimation.
- Parameters:
fusion_type (type[FusionStrategyType]) – The fusion strategy type we are checking.
- Returns:
Whether or not we support the requested
fusion_type.- Return type:
bool
- abstract new_fusion_strategy(fusion_type)
Create a new fusion strategy of the requested type.
Users must first ensure that the fusion strategy is supported by calling
is_fusion_type_supported().- Parameters:
fusion_type (type[FusionStrategyType]) – The type of fusion strategy that we want
returned.
- Returns:
The newly created fusion strategy, which is an implementation of the type specified by
fusion_type. For example, if the user callsnew_fusion_strategy()with afusion_typeofStandardFusionStrategy, then the returned object will be an implementation ofpntos.api.StandardFusionStrategy.- Return type:
FusionStrategyType | None
- class pntos.api.StandardFusionStrategy
Bases:
ABCA Fusion strategy making linearized Bayesian assumptions.
An implementation of the standard fusion strategy that is capable of Bayesian inference on a linearized discrete-time system with Gaussian noise inputs (e.g. an EKF).
At a fundamental level, this class manages an estimate of a state space as it propagates (changes over time) and updates (incorporates new measurements and observations). An estimate of a set of values (states) is stored and maintained within this object. The estimate is then mutated and updated over time by the various method calls.
A typical usage pattern of this class is as follows:
The user adds a set of states by calling the
add_states()method. As part of this step, the user all passes in initial conditions for the estimate. The fusion strategy is now storing an estimate of the states, set to the initial conditions.The user propagates this estimate forward in time by calling the
propagate()method. For example, if the initial conditions were the estimates of the states at time 0.0s, but we now want to know the estimate of the values at time 5.0s, the user would callpropagate()with a dynamics model parameter that specifies how to take an estimate at time 0.0 and use it to compute the estimate at time 5.0.The user updates this estimate by using observations of the states at the current time. For example, if the filter is currently propagated to time 5.0s and a measurement is received that observes the states’ values at time 5.0s, the user would call
update()with a measurement model parameter that describes how to take the current estimate at 5.0s and incorporate the new information from the measurement.At any point, when the user wants to know the latest estimate given all the propagate/updates that have occurred, they may call
estimate.
Note
This class must have an operational
__deepcopy__method. For most classes, the default__deepcopy__method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own__deepcopy__(such as a C object wrapped to Python), then the class will need to implement a custom__deepcopy__which properly copies all fields.- abstract property num_states
Get the total number of states this filter is estimating.
The count will initially be zero, until
add_states()is called.- Returns:
Number of states being estimated in this filter.
- Return type:
int
- abstract add_states(initial_estimate, initial_covariance, cross_covariance=None)
Add new states to this filter.
Increases number of filter states and set the initial conditions of the new states. Returns index of the first added state. If
cross_covarianceisNone, cross covariance between the existing states and the added states will be set to zeroes.- Parameters:
initial_estimate (NDArray[float64]) – The initial estimate to populate the new states with.
initial_covariance (NDArray[float64]) – The initial covariance matrix used to initialize the uncertainty of the new states.
cross_covariance (NDArray[float64] | None) – A covariance matrix that describes the cross terms between the previous states and the new states. If
None, the cross-terms will be set to zero. Otherwise, if there arenexisting states andmnew states being added, this argument should have shape[n, m].
- Returns:
Index of first state being added to this filter (zero indexed).
- Return type:
int
- abstract remove_states(first_index, count)
Removes a set of states from the filter.
- Parameters:
first_index (int) – Index of the first state to be removed (zero indexed).
count (int) – The number of states to be removed.
- abstract property estimate
Get the current internal estimate managed by this strategy.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. This method returns the current estimate, incorporating all changes made by previous method calls to this strategy.- Returns:
An estimate if available. Returns
Noneif no states have been added yet.- Return type:
NDArray[float64] | None
- abstract set_estimate_slice(new_estimate, first_index)
Set a slice of the state estimates to a given set of values.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. This method allows for manually overriding the current estimate. Sets a block of states to new values, starting withfirst_indexand overwriting a number of states equal to the length ofnew_estimate.- Parameters:
new_estimate (NDArray[float64]) – The new estimate values that will overwrite the previous values.
first_index (int) – The index of the first state to overwrite.
- abstract property covariance
Get the covariance of the current estimate.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed. This method returns this covariance, incorporating all changes made by previous method calls to this strategy.- Returns:
The covariance of the current estimate. Returns
Noneif no states have been added yet.- Return type:
NDArray[float64] | None
- abstract set_covariance_slice(new_covariance, first_row, first_col=None)
Set a slice of the covariance matrix to a given set of values.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed.Allows for manually overriding the current covariance matrix. Sets a block of the covariance matrix to new values. The overwritten values are those in a rectangular area defined by the upper left corner at
first_row,first_coland extending down and right to cover an area equal to the size ofnew_covariance. Iffirst_colis not set or set toNone, the value offirst_rowwill be used as a column index as well.- Parameters:
new_covariance (NDArray[float64]) – The new covariance values that will overwrite a slice of the previous covariance matrix.
first_row (int) – The row of the first value to overwrite.
first_col (int | None, optional) – The column of the first value to overwrite.
- abstract propagate(dynamics_model)
Propagates the estimate of the state space forward in time.
This method assumes that a state space is already initialized with a set of states via the
add_states()method. Thedynamics_modelparameter includes a description of how the current state estimate can be propagated from the current time to a new time. Note that the actual numerical values of the current/new times are not specified anywhere, as that information is not needed to perform the computation. This method then takes the current state space estimate and thedynamics_modeland uses both to compute an estimate at the new time. The new estimate clobbers the old estimate managed by this class, and be acquired by callingestimate.- Parameters:
dynamics_model (StandardDynamicsModel)
- abstract update(measurement_model)
Updates the estimate of the state space, incorporating a new measurement.
This method assumes that a state space is already initialized with a set of states via the
add_states()method. Themeasurement_modelparameter includes both the measurement itself and a description of how the measurement relates to the state space. This method then takes the current state space estimate and updates it using information from themeasurement_model. The updated estimate can be acquired by callingestimate.- Parameters:
measurement_model (StandardMeasurementModel) – The measurement with which to update the filter, as well as a model that describes how the measurement relates to the states this strategy is estimating.
- class pntos.api.CommonInertial
Bases:
ABCA common base type for an inertial.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract request_solution_message_type()
Get the solution type.
- Returns:
The message type that will be returned by
request_current_solution(),request_solution(), andrequest_solutions().- Return type:
type[AspnBase]
- abstract request_current_solution()
Get the current inertial solution.
- Returns:
The current inertial solution.
- Return type:
- abstract request_solution(time)
Request solution at a specific time.
- Parameters:
time (TypeTimestamp) – The time at which the returned solution should be valid.
- Returns:
The solution computed by this inertial at
timeiftimeis in the valid range,Noneotherwise (is_time_in_range()can be used to checktimebefore calling this method).- Return type:
Message | None
- abstract request_solutions(times, solution_type)
Request solutions at multiple specific times.
- Parameters:
times (list[TypeTimestamp]) – An array of times at which solutions are requested.
solution_type (InertialSolutionRangeType) – The type of solution requested.
- Returns:
An array of solutions. Returns
Noneifsolution_typeis unsupported by this inertial or every instance oftimesis outside the valid range. Otherwise guaranteed to not beNone.- Return type:
list[Message | None] | None
- abstract is_time_in_range(time)
Check if a solution exists at a given time.
- Parameters:
time (TypeTimestamp) – The query time.
- Returns:
Trueif a solution exists attime,Falseotherwise. This result is only valid until another method (for example,process_pntos_message()) is called.- Return type:
bool
- abstract request_earliest_time()
Get the earliest available time at which a solution or forces and rates can be requested.
This result is only valid until another method (for example,
process_pntos_message()) is called.- Returns:
The earliest available time at which a solution or forces and rates can be requested.
- Return type:
TypeTimestamp
- abstract request_latest_time()
Get the latest available time at which a solution or forces and rates can be requested.
This result is only valid until another method (for example,
process_pntos_message()) is called.- Returns:
The latest available time at which a solution or forces and rates can be requested.
- Return type:
TypeTimestamp
- abstract request_process_pntos_message_types()
Returns an array of message types that are supported by this plugin.
- Returns:
An array of message types that are supported by this plugin as inputs to
process_pntos_message().- Return type:
list[type[AspnBase]]
- abstract process_pntos_message(message)
A new message to be incorporated into the computed inertial solution.
- Parameters:
message (Message)
- abstract request_forces_and_rates(time)
Request forces and rates for a given time.
- Parameters:
time (TypeTimestamp) – The time at which the forces and rates should be valid.
- Returns:
The instantaneous forces and rates at
timeiftimeis in the valid range,Noneotherwise (is_time_in_range()can be used to checktimebefore calling this method).- Return type:
InertialForcesRates | None
- abstract request_average_forces_and_rates(time1, time2)
Request average forces and rates over a time period.
- Parameters:
time1 (TypeTimestamp) – The start of the time range over which the forces and rates should be valid.
time2 (TypeTimestamp) – The end of the time range over which the forces and rates should be valid.
- Returns:
The average forces and rates over the period of time defined by
time1andtime2if at least one of them is in the valid range,Noneotherwise (is_time_in_range()can be used to check both times before calling this method).- Return type:
InertialForcesRates | None
- pntos.api.ExternalInertial
alias of
CommonInertial
- class pntos.api.InertialForcesRates(forces_and_rates, frame)
Bases:
objectA struct containing specific forces and rotation rates from the inertial.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- Variables:
forces_and_rates (MeasurementImu) – An ASPN IMU message which has been repurposed to hold specific forces (the
meas_accelfield) and rotation rates (themeas_gyrofield) in a different frame (seeframebelow).frame (InertialFrame) – Specifies the frame of the above forces and rates.
- frame
The frame at which the forces and rates are valid.
- class pntos.api.InertialFrame(value)
Bases:
IntEnumAn enumeration that specifies frame.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- INERTIAL_FRAME_NED = 0
Force vectors in this frame are north-east-down in \(m/s^2\).
Rotation rate vectors in this frame are of an inertial sensor with respect to inertial frame, in the inertial sensor frame (\(rad/s\)). Sometimes represented as \(w^\text{s}_\text{is}\) (or \(w^\text{b}_\text{ib}\) if the body frame is aligned with the inertial sensor frame.
- class pntos.api.InertialPlugin
Bases:
CommonPlugin,ABCAn implementation of an inertial plugin.
This plugin generates
pntos.api.CommonInertialinstances which may be used to generate INS solutions from raw IMU data.Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract is_inertial_type_supported(inertial_type)
Check if the plugin supports a given type of inertial.
- Parameters:
inertial_type (type[InertialType])
- Returns:
Trueif inertial type is supported,Falseotherwise.- Return type:
bool
- abstract new_inertial(inertial_type, solution, config_group=None)
Create an instance of an implementation of
pntos.api.CommonInertial.- Parameters:
inertial_type (type[InertialType]) – Specifies the type of inertial that the returned value will support. For example, if the user passes in
STANDARD_INERTIAL_MECHANIZATION, then the returned value will be an implementation ofpntos.api.StandardInertialMechanization. Ifinertial_typeis unsupported by the plugin, thenNonewill be returned. Please useis_inertial_type_supported()to check if the type is supported by the plugin.solution (Message) – The initial solution (i.e. the alignment) to mechanize from.
config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to initialize the new inertial. This allows for multiple inertial instances to exist with unique settings.
- Returns:
A new inertial object. Returns
Noneifinertial_typeis unsupported,solutionis invalid, orconfig_groupis invalid.is_inertial_type_supported()can be called to verifyinertial_typebefore calling this method.- Return type:
InertialType | None
- class pntos.api.InertialSolutionRangeType(value)
Bases:
IntEnumSolution type to request from an inertial.
An enumeration that allows the user to decide if the solution they request is the best available solution or one that has no discontinuities in it due to resets.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- class pntos.api.StandardInertialErrors(accel_biases, gyro_biases, accel_scale_factors, gyro_scale_factors)
Bases:
objectA structure representing the biases on a set of 3-axis gyros and 3-axis accelerometers.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- Variables:
accel_biases (NDArray[float64]) – A 1D vector of length 3 containing biases for a 3-axis accelerometer in the sensor’s (X-Y-Z) frame, expressed in \(m/s^2\).
gyro_biases (NDArray[float64]) – A 1D vector of length 3 containing biases for a 3-axis gyro in the sensor’s (X-Y-Z) frame, expressed in \(rad/s\).
accel_scale_factors (NDArray[float64]) – A 1D vector of length 3 containing scale factor errors for a 3-axis accelerometer in the sensor’s (X-Y-Z) frame, unitless.
gyro_scale_factors (NDArray[float64]) – A 1D vector of length 3 containing scale factor errors for a 3-axis gyroscope in the sensor’s (X-Y-Z) frame, unitless.
- class pntos.api.StandardInertialMechanization
Bases:
CommonInertial,ABCA struct produced by a
pntos.api.InertialPlugin. It generates solutions from raw IMU data.Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract request_reset_message_types()
Get valid types of reset messages.
- Returns:
An array of message types that are supported by this plugin for resetting the inertial solution, or
Noneif resetting the inertial solution is an unsupported operation by the inertial plugin.- Return type:
list[type[AspnBase]] | None
- abstract reset_solution(message)
Set the solution to the values in
message.For example, if
messageis PVA then the inertial solution will be set to that PVA. Ifmessageis just position, then only the position portion of the inertial solution will be set usingmessage.- Parameters:
message (Message) – A message containing the information necessary to reset the solution. To see the types supported by the implementation, call
request_reset_message_types().
- abstract correct_sensor_errors(time, errors)
Reset the current inertial internal bias values.
Reset the current inertial internal bias values with corrections from an external source, such as a filter or error estimator. The errors passed in here will be adjusted for internally by the inertial when processing incoming data. Thus, if errors are passed into the inertial here they should not be corrected for in an external filter processing the inertial output (which would lead to a double correction).
- Parameters:
time (TypeTimestamp) – The time at which
errorsshould be valid.errors (StandardInertialErrors) – An estimate of the inertial sensor’s errors.
- abstract request_sensor_errors(time)
Request inertial errors for a given time.
- Parameters:
time (TypeTimestamp) – Time at which inertial errors should be valid.
- Returns:
Inertial errors at
timeiftimeis in the valid range (pntos.api.CommonInertial.is_time_in_range()can be used to checktimebefore calling this method),Noneotherwise.- Return type:
StandardInertialErrors | None
- class pntos.api.CommonInitializationStrategy
Bases:
ABCA common base type for initialization algorithms.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract request_motion_needed()
Check the type of motion (if any) needed.
- Returns:
InitializationMotionNeeded
- abstract request_current_status()
Check the current initialization status.
- Returns:
InitializationStatus
- class pntos.api.EwcInitializationStrategy
Bases:
CommonInitializationStrategyGenerates an initial estimate-with-covariance (EWC) solution from sensor data.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract request_solution()
Get the current initial solution.
- Returns:
The current initial solution. Will be
Noneif the initialization strategy has not yet finished. Usepntos.api.CommonInitializationStrategy.request_current_status()to check current status of the strategy. If the status isINITIALIZING_FINEorINITIALIZED_GOOD, then the result is guaranteed to not beNone.- Return type:
- class pntos.api.InertialInitializationStrategy
Bases:
CommonInitializationStrategy,ABCGenerates an initial ASPN solution from sensor data.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract request_solution()
Get the current initial solution.
- Returns:
InitialInertialSolution
- class pntos.api.InitialEstimateWithCovariance(time, estimate_with_covariance, status)
Bases:
objectHolds both the current estimate and its associated covariance as well as the current status.
Coupling these avoids time-of-check to time-of-use (TOCTOU) issues.
- Variables:
time (TypeTimestamp) – The time at which
estimate_with_covarianceis valid.estimate_with_covariance (EstimateWithCovariance | None) – The current estimate of the initial solution. Check
statusfor its validity (can beNoneifstatusis anything other thanINITIALIZED_GOOD).status (InitializationStatus) – Indicates the current initialization status. Should be checked before using
estimate_with_covariance.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- class pntos.api.InitialInertialSolution(solution, inertial_errors, inertial_error_covariance, status)
Bases:
objectHolds both the current solution, inertial errors (and their covariance), and the current status.
Coupling these avoids time-of-check to time-of-use (TOCTOU) issues.
- Variables:
solution (Message | None) – The initial solution.
inertial_errors (StandardInertialErrors | None) – The inertial errors.
inertial_error_covariance (NDArray[float64] | None) – The covariance matrix associated with the terms in
inertial_errors.status (InitializationStatus) – Indicates the current initialization status. Should be checked before using any of the other fields.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- class pntos.api.InitializationMotionNeeded(value)
Bases:
IntEnumAn enumeration that specifies what type of motion is required by the initialization strategy.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- NO_MOTION = 0
Stationary data is needed.
- MOTION_NEEDED = 1
Dynamic data is needed.
- ANY_MOTION = 2
No particular type of motion is required.
- class pntos.api.InitializationPlugin
Bases:
CommonPlugin,ABCAn implementation of an initialization plugin.
This plugin generates
pntos.api.CommonInitializationStrategyinstances which may be used to generate an initial solution from additional external sensor data, such as IMU data.Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract is_initialization_type_supported(initialization_type)
Check if given
InitializationTypeis supported.- Parameters:
initialization_type (type[InitializationType])
- Returns:
Trueif the plugin supports a given type of mechanization,Falseotherwise.- Return type:
bool
- abstract new_initialization_strategy(initialization_type, config_group=None)
Create an instance of
pntos.api.CommonInitializationStrategy.- Parameters:
initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in
INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation ofpntos.api.InertialInitializationStrategy. Ifinitialization_typeis unsupported by the plugin, thenNonewill be returned. Please useis_initialization_type_supported()to check if the type is supported by the plugin.config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.
- Returns:
The new initialization strategy object. Returns
Noneifinitialization_typeis unsupported by this plugin (this can be checked usingis_initialization_type_supported()) or ifconfig_groupis invalid.- Return type:
InitializationType | None
- class pntos.api.InitializationStatus(value)
Bases:
IntEnumAn enumeration that allows the user to know the initialization status.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- WAITING = 0
Waiting to start initialization process.
- INITIALIZING_COARSE = 1
Attempting to initialize and produce a navigation solution.
- INITIALIZING_FINE = 2
A coarse initialization has been calculated by the algorithm, and the initialization is being tested or adjusted before producing a navigation solution.
- INITIALIZED_GOOD = 3
We have a good initialization.
The provided solution can be used to kickoff inertial and fusion.
- INITIALIZATION_FAILED = 4
The initialization process failed in some way, and may attempt to restart.
- class pntos.api.LoggingPlugin
Bases:
CommonPlugin,ABCLogging plugin.
A plugin for logging out data to an arbitrary sink (e.g. console, file, network, etc.).
- abstract log(source_plugin_type, source_plugin_identifier, level, message)
Log a string to the logging plugin’s sink.
- Parameters:
source_plugin_type (PluginType) – Information on the plugin that sent the logout.
source_plugin_identifier (str) – Information on the plugin that sent the logout.
level (LoggingLevel) – The event severity.
message (str) – The string contents to be logged.
- class pntos.api.MessageStreamConfig
Bases:
ABCMessage stream configuration.
This class configures the buffering, delay, and sorting characteristics of messages that are streamed into the
pntos.api.OrchestrationPlugin. The pntOS system will deliver messages to the orchestration plugin as it receives them. However, there is a fundamental tradeoff between latency and those messages being in-order. In particular, to guarantee that messages are sorted by timestamp, it is necessary to build a buffer and delay delivery, such that a sorting function may be applied. This structure allows the plugin to choose which messages are buffered and which are not.- abstract sequenced_stream_add(message_type, source_identifier=None)
Request messages are streamed in sorted timestamp ordering.
Request messages of the given
message_typeand optionalsource_identifierare streamed in sorted timestamp ordering.- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- abstract sequenced_stream_remove(message_type, source_identifier=None)
Request messages are no longer streamed in sorted timestamp ordering.
Request messages of the given
message_typeand optionalsource_identifierare no longer streamed in sorted timestamp ordering. This will remove a type that was previously added in a call tosequenced_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call tosequenced_stream_all().- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- abstract sequenced_stream_all(enable)
Request all messages are streamed in sorted timestamp ordering.
Note that the ability to do this reliably will depend on the length of the buffer used by the
pntos.api.Mediator.- Parameters:
enable (bool)
- abstract immediate_stream_add(message_type, source_identifier=None)
Request messages are streamed immediately.
Request messages of the given
message_typeand optionalsource_identifierare streamed immediately without delay, buffering, or sorting.- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- abstract immediate_stream_remove(message_type, source_identifier=None)
Request messages are no longer streamed immediately.
Request messages of the given
message_typeand optionalsource_identifierare no longer streamed immediately. This will remove a type that was previously added in a call toimmediate_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call toimmediate_stream_all().- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- abstract immediate_stream_all(enable)
Request all messages are streamed immediately without delay, buffering, or sorting.
- Parameters:
enable (bool)
- class pntos.api.OrchestrationPlugin
Bases:
CommonPlugin,ABCOrchestration plugin.
The pntOS orchestration plugin is responsible for orchestrating one or more fusion engines, state model providers and other plugins in order to perform sensor fusion. The orchestration plugin is sent (sorted, buffered) ASPN messages from the
pntos.api.ControllerPlugin, and is responsible for computing a solution for the system, as well as estimating any other quantities of interest.In order to achieve this task, the orchestration plugin may be passed a set of other plugins during the call to
init_orchestration_plugin(). If so, the orchestration plugin then, as the name suggests, configures and orchestrates these plugins to work together to perform sensor fusion.Example
For example, the orchestration plugin may set up a fusion engine it received in the call to
init_orchestration_plugin(), then add state blocks or measurement processors to that fusion engine from apntos.api.StateModelingPluginit also received, and process inertial data from anpntos.api.InertialPluginit received. Therequest_solutions()function will be called by the system when pntOS needs to know the current filtering solutions. Other quantities which need to be estimated by the orchestration engine can be returned to the system by registry updates.- abstract init_orchestration_plugin(plugins, stream_config)
Initial data structures needed by the orchestration plugin.
This function will be called by the system after the
pntos.api.CommonPlugin.init_plugin()but before any other call to anpntos.api.OrchestrationPluginfunction.- Parameters:
plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a
pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include apntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins,Nonemay be passed.stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the
pntos.api.ControllerPluginhow it would prefer delivery of messages. When the orchestration plugin receives thestream_configstruct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of thepntos.api.ControllerPlugin.
- abstract process_pntos_message(message, sequenced)
Deliver a new message from an external to pntOS source into the orchestration plugin.
The plugin should utilize this sensor data contained in message by passing it into a fusion engine.
- Parameters:
message (Message) – The
pntos.api.Messageto be delivered to thepntos.api.OrchestrationPluginfrom thepntos.api.ControllerPlugin. From here thepntos.api.OrchestrationPluginmay use it directly or route it to other plugins (like thepntos.api.FusionPlugin,pntos.api.InertialPlugin, orpntos.api.InitializationPlugin).sequenced (bool) –
Falseifmessagewas the last receivedpntos.api.MessageorTrueif it was delayed by buffering. Seepntos.api.MessageStreamConfigfor more details.
- abstract property filter_description_list
Get a list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
For example, if the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.- Return type:
list[str]
- abstract request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The solution times.
filter_description (str | None, optional) – An
pntos.api.OrchestrationPluginmay run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string infilter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particularpntos.api.OrchestrationPluginimplementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. The number of solutions should equal the number of times insolution_times, although some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagelist may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- class pntos.api.PlatformIntegrationPlugin
Bases:
CommonPlugin,ABCPlatform integration plugin.
A plugin for command, control, solution output, and other behavior of the system which is specific to a particular platform. Works closely with the
pntos.api.ControllerPluginto fully define the overall behavior of the system.Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract take_control(plugins, plugin_resources_locations=None, initial_config=None)
Takes over secondary control from the
pntos.api.ControllerPlugin.When pntOS first boots, it passes control over to the
pntos.api.ControllerPlugin. After thepntos.api.ControllerPluginhas initialized the plugins it wants to run, it calls this plugin’stake_control()to allow for platform specific control behavior to run. Thepntos.api.PlatformIntegrationPlugin(PIP) is not responsible for calling thepntos.api.CommonPlugin.init_plugin()call on any of the plugins passed in its plugins list, and thus the list of plugins that is passed to the PIP should be pruned to only those plugins thepntos.api.ControllerPlugininitialized. The PIP is consequently not responsible for thepntos.api.Mediatorconstruction or message routing - those responsibilities fall on thepntos.api.ControllerPlugin. Instead, the PIP is responsible for doing any additional logic that may be platform specific.Example
For example, the PIP may decide to output solutions at a particular rate, or to have the
pntos.api.TransportPluginpassed in its plugins list start/stop listening in response to moding commands, or inform thepntos.api.OrchestrationPluginthat it should not use a particular sensor at runtime (via a registry convention).In general, the goal of the PIP is to implement the platform-specific needs, whereas the
pntos.api.ControllerPluginis designed to be the generic portion of the code. Thepntos.api.ControllerPluginshould be designed to be generic and re-useable, but work hand-in-hand with the PIP to fully define the control behavior of the system.pntos.api.ControllerPluginresponsibilities:Defining concurrency model.
Initializing plugins (and constructing/passing in
pntos.api.Mediator).Routing data from transport plugin to orchestration/initialization/inertial plugins.
Routing requests for registry data to registry plugins.
pntos.api.PlatformIntegrationPluginresponsibilities:Platform specific outputs.
Responding to moding commands from platform.
Routing situational awareness information to other pntOS plugins (via registry convention).
The parameters to
pntos.api.PlatformIntegrationPlugin.take_control()should be identical to those passed to thepntos.api.ControllerPlugin.take_control(), with the exception of thepluginslist being a subset of thepluginspassed to thepntos.api.ControllerPlugin. Which plugins are passed to the PIP is implementation specific and decided by thepntos.api.ControllerPlugin.- Parameters:
plugins (list[CommonPlugin]) – A subset of the
pluginspassed to thepntos.api.ControllerPlugin. Which plugins are passed to the PIP is implementation specific and decided by thepntos.api.ControllerPlugin.plugin_resources_locations (list[str | None] | None, optional) – Should be identical to what was passed to
pntos.api.ControllerPlugin.take_control().initial_config (str | None, optional) – Should be identical to what was passed to
pntos.api.ControllerPlugin.take_control().
- class pntos.api.Preprocessor
Bases:
ABCA preprocessor.
Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- abstract process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.api.PreprocessorPlugin
Bases:
CommonPlugin,ABCAn implementation of a preprocessor plugin.
This plugin generates
pntos.api.Preprocessorinstances which may be used to process incoming messages before being distributed to other plugins.Caution
Unstable: This feature is unstable and is not yet considered part of the stable pntOS API. Usage of this feature is highly discouraged in non-experimental code, and its definition may change at any time.
- Variables:
preprocessor_identifiers (list[str]) – A list of identifying strings for each kind of
pntos.api.Preprocessorthat thispntos.api.PreprocessorPlugincan create instances of. Thepreprocessor_indexparameter ofnew_preprocessor()is an index into this array.
- preprocessor_identifiers
Strings describing the preprocessors the provider can create.
- abstract new_preprocessor(preprocessor_index, config_group=None)
Get a newly created
pntos.api.Preprocessor.- Parameters:
preprocessor_index (int) – Since the
pntos.api.PreprocessorPlugincan create a different preprocessor for each element inpreprocessor_identifiers, thepreprocessor_indexparameter is used to select which kind of preprocessor to create a new instance of. Thepntos.api.PreprocessorPlugin.preprocessor_identifiersfield contains identifying strings for the kinds of preprocessors.
Example
For example, if the plugin can create 45 different preprocessors, the identifier of the last preprocessor that can be created is found in
preprocessor_identifiers[44]. An instance of this preprocessor can be created by callingnew_preprocessor(44, ...). Note that0 <= preprocessor_index < length of preprocessor_identifiers.- config_group (str | None, optional): Indicates which (if any) parameter group in the
registry may be used to obtain additional configuration values to generate the new preprocessor. If the preprocessor requires no outside configuration,
config_groupmay beNone.
- Returns:
A newly created
pntos.api.Preprocessor. ReturnsNoneifpreprocessor_indexis greater than or equal to the length ofpntos.api.PreprocessorPlugin.preprocessor_identifiersor ifconfig_groupis invalid.- Return type:
- class pntos.api.RegistryPlugin
Bases:
CommonPlugin,ABCRegistry plugin.
A plugin for a global key-value registry. See the pntOS Registry page in the Internals section for more information on the goal of this plugin.
- abstract new_registry(initial_config=None)
Create a new registry based on the initial values stored in
initial_config.- Parameters:
initial_config (str | None, optional) –
The initial values the new registry should be based on. The format of
initial_configis implementation specific, and plugins are free to support any or no format. Possible formats may include:None, in which case the plugin is free to choose initial values. Choices may include hard-coded in the plugin or none at all.A
str. Examples of possible values the parameter could hold:The entire config.
A local file path on systems which support them.
A string adhering to the URI scheme.
- Returns:
The newly created registry.
- Return type:
Note
The returned
pntos.api.Registryshould be capable of producingpntos.api.KeyValueStorestructs that are able to be used concurrently. Thus if the user uses the return value of this method to start two batches, one on group “foo” and the other on group “bar”, then concurrent access to both of the resultingpntos.api.KeyValueStorestructs for “foo” and “bar” should be supported (i.e. no shared mutable state between them that is not synchronized).
- class pntos.api.StandardDynamicsModel(g, Phi, Qd)
Bases:
objectA description of the propagation dynamics for a set of states.
This model assumes that the state space \(x\) can be propagated forward in time by the equation:
\[x_k = g(x_{k-1}) + w_k\]where \(x_k\) is the set of states at time \(k\), \(g\) is an arbitrary function, and \(w_k\) is additive white Gaussian noise.
- Variables:
g (Callable[[NDArray[float64]], NDArray[float64]]) – A function that propagates forward in time a set of states.
Phi (NDArray[float64]) – The first-order Taylor series expansion (Jacobian) of the function \(g\).
Qd (NDArray[float64]) – The covariance matrix of \(w_k\).
- class pntos.api.StandardMeasurementModel(z, h, H, R)
Bases:
objectA description of how a measurement relates to a state space.
This model assumes that the relationship between the measurement and state vector is well modeled by the equation:
\[z=h(x) + v\]where \(z\) is the measurement itself, \(x\) is the set of states being estimated, \(h\) is an arbitrary function, and \(v\) is additive white Gaussian noise.
- Variables:
z (NDArray[float64]) – A column vector containing the measurement itself.
h (Callable[[NDArray[float64]], NDArray[float64]]) – A function that maps the state space to measurement space.
H (NDArray[float64]) – The first-order Taylor series expansion (i.e. Jacobian) of the function h.
R (NDArray[float64]) – The covariance matrix of \(v\).
- class pntos.api.StandardMeasurementProcessor
Bases:
ABCA class that processes raw measurements/observations.
The measurements are used to calculate estimated states suitable for a linear or linearized filter to use. Each type of measurement should correspond to a
pntos.api.StandardMeasurementProcessorthat is supplied to the fusion engine. Incoming measurements received by the fusion engine will be routed to the corresponding measurement processor (by label) and callgenerate_model()to process the measurement. The resultingpntos.api.StandardMeasurementModelwill be used by the fusion engine to call the underlyingpntos.api.StandardFusionStrategy.update()method to update the filter estimate/error covariance.- Variables:
label (str) – A unique name for this measurement processor. This value will be used to select a measurement processor to handle new measurements that the strategy receives.
state_block_labels (list[str]) – A list of unique state block labels associated with measurements received by this processor. The estimate and covariance matrices passed into
generate_model()will be composed of the states associated with these state blocks, and the returned StandardMeasurementModel.h and StandardMeasurementModel.H must respect these states. Note:state_block_labels[i]is the identifier for theith state block this processor relates to.
Note
This class must have an operational
__deepcopy__method. For most classes, the default__deepcopy__method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own__deepcopy__(such as a C object wrapped to Python), then the class will need to implement a custom__deepcopy__which properly copies all fields.- label
A unique name for this measurement processor. This value will be used to select a measurement processor to handle new measurements that the strategy receives
- state_block_labels
A list of unique state block labels associated with measurements received by this processor. The estimate and covariance matrices passed into
generate_model()will be composed of the states associated with these state blocks, and the returned StandardMeasurementModel.h and StandardMeasurementModel.H must respect these states. Note:state_block_labels[i]is the identifier for theith state block this processor relates to
- abstract receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- abstract generate_model(message, gen_x_and_p_func)
Generate a
pntos.api.StandardMeasurementModel.- Parameters:
- Returns:
A generated model containing the parameters required for a filter update. Will be
Nonewhen a measurement cannot be produced frommessage(for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).- Return type:
StandardMeasurementModel | None
- class pntos.api.StandardStateBlock
Bases:
ABCA description of a set of states and their dynamics.
- Variables:
label (str) – The unique name for this state block.
num_states (int) – The number of states represented by this state block.
Note
This class must have an operational
__deepcopy__method. For most classes, the default__deepcopy__method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own__deepcopy__(such as a C object wrapped to Python), then the class will need to implement a custom__deepcopy__which properly copies all fields.- label
The unique name for this state block.
- num_states
The number of states represented by this state block.
- abstract receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_state_block_aux_data()is called with a label corresponding to this state block’slabel.- Parameters:
aux (list[Message | None])
- abstract generate_dynamics(gen_x_and_p_func, time_from, time_to)
Generate a
pntos.api.StandardDynamicsModel.The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.
- Parameters:
gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.
time_from (TypeTimestamp) – The time to propagate from.
time_to (TypeTimestamp) – The time to propagate to.
- Returns:
The description of how to propagate this state block over the given time interval, or
Noneiftime_fromis later thantime_to. Otherwise guaranteed to not returnNone.- Return type:
StandardDynamicsModel | None
- class pntos.api.StandardStateModelProvider
Bases:
ABCA collection of tools for modeling states and measurements.
These tools are used to model the propagation and innovation of state spaces using pntOS’s standard fusion model. Specifically, a
pntos.api.StandardStateModelProviderprovides three types of tools:State Blocks - Define a set of states and a model for propagating those states.
Virtual State Blocks - Relate two statespaces to each other.
Measurement Processors - Relate measurements to a statespace.
A
pntos.api.StandardStateModelProviderconceptually models a set of zero or morepntos.api.StandardStateBlocks and a set of zero or morepntos.api.StandardMeasurementProcessors which together model the phenomenology of sensor data that is being brought into a fusion engine. The first type, state blocks, describe how a set of states propagates forward through time. The second type, measurement processors, describe how a measurement relates to a set of state blocks.Each
pntos.api.StandardStateModelProviderconsists of factory methods which generate instances of the state blocks and measurement processors it provides. Thepntos.api.StandardStateModelProvider.new_block()method is a factory method that returns a newly created state block on each invocation. Because thepntos.api.StandardStateModelProvidercan provide more than one kind of state block, thepntos.api.StandardStateModelProvider.new_block()method takes ablock_indexparameter which allows the user to request which kind of state block is created by the factory.block_identifiers[i]gives a description of theith kind of state block returned whenblock_index=i.Similarly,
pntos.api.StandardStateModelProvider.new_processor()is a factory method for returning new measurement processors andprocessor_identifiersis a set of identifiers for each available kind of measurement processor that can be returned by the factory.- Variables:
processor_identifiers (list[str] | None) – A list of identifying strings for each kind of measurement processor that this
pntos.api.StandardStateModelProvidercan create instances of. Theprocessor_indexparameter ofnew_processor()is an index into this array. This field will beNonewhen this state model provider does not provide any measurement processors.block_identifiers (list[str] | None) – A list of identifying strings for each kind of state block that this
pntos.api.StandardStateModelProvidercan create instances of. Theblock_indexparameter ofnew_block()is an index into this array. This field will beNonewhen this state model provider does not provide any state blocks.virtual_block_identifiers (list[str] | None) – A list of identifying strings for each kind of virtual state block that this
pntos.api.StandardStateModelProvidercan create instances of. Thevirtual_block_indexparameter ofnew_virtual_block()is an index into this array. This field will beNonewhen this state model provider does not provide any virtual state blocks.
- processor_identifiers
Strings describing the measurement processors the provider can create.
- block_identifiers
Strings describing the state blocks the provider can create.
- virtual_block_identifiers
Strings describing the virtual state blocks the provider can create.
- abstract new_processor(processor_index, engine, label, state_block_labels, config_group)
Generate a newly created
pntos.api.StandardMeasurementProcessor.This measurement processor describes the relationship between a measurement and a set of state blocks.
- Parameters:
processor_index (int) – Since the
pntos.api.StandardStateModelProvidercan create different kinds of measurement processors, theprocessor_indexparameter is used to select which kind of measurement processor to create a new instance of. Theprocessor_identifiersfield contains identifying strings for the kinds of processors. For example, if the model can create 45 different processors, the identifier of the last processor that can be created is found inprocessor_identifiers[44]. An instance of this processor can be created by callingnew_processor(self, 44, ...). Note that0 <= processor_index < len(processor_identifiers).engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new processor, such that the processor may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to
Nonewhen no engine is available for the processor to use.label (str) – A string which will be used to populate the
labelfield of the newly created processor. Thislabelwill be the unique name for the returned instance of a processor, and used to track the processor throughout its lifecycle. Note that it differs fromprocessor_identifierswhich is the model’s mechanism for selecting the kind of processor to create.state_block_labels (list[str]) – A list of strings which will be used to populate the
state_block_labelsfield of the newly created processor.config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new processor. If the processor requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created
pntos.api.StandardMeasurementProcessororNonewhen no measurement processor can be produced with the givenprocessor_index,engine, andconfig_group.- Return type:
StandardMeasurementProcessor | None
- abstract new_block(block_index, engine, label, config_group)
Generate a newly created
pntos.api.StandardStateBlock.This state block describes a set of states and how they propagate over time.
- Parameters:
block_index (int) – Since the
pntos.api.StandardStateModelProvidercan create different kinds of state blocks, theblock_indexparameter is used to select which kind of state block to create a new instance of. Theblock_identifiersfield contains identifying strings for the kinds of state blocks. For example, if the model can create 45 different state blocks, the identifier of the last state block that can be created is found inblock_identifiers[44]. An instance of this state block can be created by callingnew_block(self, 44, ...). Note that0 <= block_index < len(block_identifiers).engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new block, such that the block may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to
Nonewhen no engine is available for the block to use.label (str) – A string which will be used to populate the
labelfield of the newly created state block. Thislabelwill be the unique name for the returned instance of a state block, and used to track the state block throughout its lifecycle. Note that it differs fromblock_identifierswhich is the model’s mechanism for selecting the kind of state block to create.config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new state block. If the state block requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created
pntos.api.StandardStateBlockorNonewhen no state block can be produced with the givenblock_index,engine, andconfig_group.- Return type:
StandardStateBlock | None
- abstract new_virtual_block(virtual_block_index, source_label, target_label, config_group)
Generate a newly created
pntos.api.VirtualStateBlock.This virtual state block is used to convert a set of states from one representation to another.
- Parameters:
virtual_block_index (int) – Since the
pntos.api.StandardStateModelProvidercan create different kinds of virtual state blocks, thevirtual_block_index parameteris used to select which kind of virtual state block to create a new instance of. Thepntos.api.StandardStateModelProvider.virtual_block_identifiersfield contains identifying strings for the kinds of virtual state blocks. For example, if the model can create 45 different virtual state blocks, the identifier of the last virtual state block that can be created is found invirtual_block_identifiers[44]. An instance of this virtual state block can be created by callingnew_virtual_block(self, 44, ...). Note that0 <= virtual_block_index < len(virtual_block_identifiers).source_label (str) – The label of the state block or virtual state block whose states this virtual state block transforms.
target_label (str) – A unique identifier for this virtual state block.
config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new virtual state block. If the virtual state block requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created
pntos.api.VirtualStateBlockorNonewhen no virtual state block can be produced with the givenvirtual_block_indexandconfig_group.- Return type:
VirtualStateBlock | None
- class pntos.api.StateModelingPlugin
Bases:
CommonPlugin,ABCA
pntos.api.CommonPluginsubclass that generates state model providers.- abstract is_fusion_type_supported(fusion_type)
Check if the plugin supports a given type of fusion. See
StateModelProviderType.- Parameters:
fusion_type (StateModelProviderType)
- Returns:
bool
- abstract new_state_model_provider(fusion_type)
Generate a state model provider.
- Parameters:
fusion_type (StateModelProviderType) – Specifies the type of fusion that the returned value will support. For example, if the user passes in
STANDARD_MODEL, then the returned value will be an implementation ofpntos.api.StandardStateModelProvider.- Returns:
A state model provider which implements the specified type or None if
fusion_typeis not supported (is_fusion_type_supported()can be used to checkfusion_type).- Return type:
StateModelProviderType | None
- class pntos.api.VirtualStateBlock
Bases:
ABCA class used to convert a set of states from one representation to another.
States are converted using a mapping function \(f\) to convert estimates, and the Jacobian of \(f()\) to map covariances (note that this implies that the order/units of terms in the estimate vector and covariance matrix are the same). Each instance is associated with two labels,
sourceandtarget, wheresourceis the label attached to the quantity to be transformed, andtargetis the label attached to the result. Typically used with apntos.api.StandardFusionEnginewheresourcerefers to a realpntos.api.StandardStateBlockandtargetrefers to some representation that is advantageous for some other element, such as apntos.api.StandardMeasurementProcessor, to use.- Variables:
source (str) – The label associated with the representation this instance can transform from.
target (str) – The label associated with the representation this instance can transform to.
Note
This class must have an operational
__deepcopy__method. For most classes, the default__deepcopy__method provided by Python will be sufficient. However, if the class has a field which does not properly implement its own__deepcopy__(such as a C object wrapped to Python), then the class will need to implement a custom__deepcopy__which properly copies all fields.- abstract receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_virtual_state_block_aux_data()is called with a label corresponding to thispntos.api.VirtualStateBlock‘starget.- Parameters:
aux (list[Message | None])
- abstract convert(estimate_with_covariance, time)
Convert a full estimate/covariance pair.
- Parameters:
estimate_with_covariance (EstimateWithCovariance) – Estimate and covariance to convert.
time (TypeTimestamp) – Time that
estimate_with_covarianceis valid at.
- Returns:
The converted value.
- Return type:
- abstract convert_estimate(estimate, time)
Convert just an estimate vector.
- Parameters:
estimate (NDArray[float64]) – Estimate vector to convert, Nx1.
time (TypeTimestamp) – Time that
estimateis valid at.
- Returns:
The converted vector, Mx1.
- Return type:
NDArray[float64]
- abstract jacobian(estimate, time)
Obtain the Jacobian of the transform performed by this instance.
The Jacobian is calculated at an instance in time, given an estimate to differentiate with respect to.
- Parameters:
estimate (NDArray[float64]) – Estimate vector associated with the return value of
source, Nx1.time (TypeTimestamp) – Time that
estimateis valid at.
- Returns:
An MxN matrix that may be used to pre-multiply
estimateto obtain an M length vector intargetrepresentation (to first order).- Return type:
NDArray[float64]
- class pntos.api.TransportPlugin
Bases:
CommonPlugin,ABCTransport plugin.
A plugin that abstracts a network transport. listening for sensor data off the wire and sending data back to the sensors as needed.
- abstract start_listening()
Start listening.
Start listening to the transport that this plugin implements, calling the appropriate controller function as data streams in.
- abstract stop_listening()
Disable listening.
Disable listening to the transport that was previously started in a call to
start_listening().
- abstract broadcast_message(message, channel_name=None)
Send a message back out to the sensor from pntOS.
- Parameters:
message (Message) – The message to send.
channel_name (str | None, optional) – The desired channel. If
channel_nameisNonethe implementation may decide wheremessageshould be routed, if anywhere. For example, a serial cable might send all messages to a single destination.
- class pntos.api.UiPlugin
Bases:
CommonPlugin,ABCA plugin for a UI that is integrated directly into pntOS.
While it is always possible to write a GUI that listens to pntOS outputs and interacts with it externally, this plugin allows users to write a GUI that has direct access to pntOS via the plugin API. This allows for low latency and high performance GUI/UIs to be generated. Note that this plugin is designed for developer/research style UIs and not production environments. A user display in a production environment is better modeled as a
pntos.api.PlatformIntegrationPlugin, as that is designed to represent requests from the system and not simply status updates.- abstract requires_main_thread()
Check if this plugin needs to run on the main thread.
Some systems require GUI applications to run on the main thread. This method can be used to query whether or not this plugin must be run on the main thread. If this method returns True, then
run_main_thread()must be called from the main thread in order to start this plugin.- Returns:
Trueif plugin needs to run on the main thread,Falseotherwise.- Return type:
bool
- abstract run_main_thread()
Start plugin on the main thread.
This method should only be called if
requires_main_thread()returns True. This method should only be called from the main thread.
- class pntos.api.UtilityPlugin
Bases:
CommonPlugin,ABCA plugin that performs a generic utility function.
A utility plugin performs implementation-specific functions that may require access to pntOS resources (such as the registry) via the
pntos.api.CommonPluginAPI. Otherwise, this plugin has no other API-defined functionality.
- class pntos.api.RegistryValueType
A
TypeVarof the types allowed inpntos.api.KeyValueStore.A
TypeVaris particularly for cases where a method needs to guarantee that the type on an input is the same as the returned type.- Example:
For example,
pntos.api.KeyValueStore.get_value()needs to guarantee that the input and the return types are the same. Thus,pntos.api.KeyValueStore.get_value()would be a good place to useRegistryValueTypein the type description:def get_value( self, key: str, value_type: type[RegistryValueType] ) -> RegistryValueType | None
- class pntos.api.RegistryValueTypeUnion
This is a union of all types allowed in
pntos.api.KeyValueStore.This is particularly for cases where a method does not need to guarantee that the type on an input is the same as the returned type.
- Example:
For example,
pntos.api.KeyValueStore.set_value()does not need to guarantee that the input and the return type are the same since it returns None. Thus,pntos.api.KeyValueStore.set_value()would be a good place to useRegistryValueTypeUnionin the type description:def set_value(self, key: str, value: RegistryValueTypeUnion) -> None
- class pntos.api.FusionEngineType
Enumerates the types of fusion engines.
Currently only StandardFusionEngine is defined, but this TypeVar also includes “Any” in the type list for future compatibility.
- class pntos.api.FusionStrategyType
Enumerates the types of fusion strategies.
Currently only StandardFusionStrategy is defined, but this TypeVar also includes “Any” in the type list for future compatibility.
- class pntos.api.PluginType
An union of all the types of plugins.
Can be used by the logging plugin to print which plugin the message originated from.
- class pntos.api.InertialType
An enumeration of the types of inertials an inertial plugin could provide.
“Any” is included for future compatibility.
- class pntos.api.InitializationType
An enumeration of the types of initializers an initializer plugin could provide.
“Any” is included for future compatibility.
- class pntos.api.StateModelProviderType
An enumeration of the types of state model providers a state modeling plugin could provide.
“Any” is included for future compatibility.
- class pntos.api.GenXandP
- A function of type: ``Callable[[list[str]], EstimateWithCovariance | None]``
Returns the estimate and covariance associated with the states of
block_labelswithin a particular measurement processor or state block. This is used to lazily evaluate estimate and covariance.- Args:
- block_labels (list[str]): Labels for state blocks to generate estimate and
covariance for.
- Returns:
Estimate and covariance of the provided block_labels. Returns
Noneif any label inblock_labelsdoes not correspond to a valid block.