Cobra Plugins

pntos.cobra should only directly contain plugin imports. These are all the plugins that can be directly imported from pntos.cobra.

class pntos.cobra.BuscatControllerPlugin(identifier)

Bases: ControllerPlugin

This is a simple single-threaded Buscat controller plugin.

The purpose of this plugin is to route one or more data streams from TransportPlugins back out through one or more TransportPlugins, as specified in the supplied BuscatConfig.output_transports. This has the effect of combining multiple data streams and converting all input to the formats supported by the output TransportPlugins. Note that this controller does not do any buffering or sorting of the input data before publishing. It (or more specifically, the BuscatMediator) also does not explicitly pass input data to plugins other than TransportPlugins, and thus will not support sensor fusion without some modification.

Here are the plugins and corresponding expected number of instances this controller looks for:

  • LoggingPlugin - 1

  • RegistryPlugin - 1

  • TransportPlugin - at least 1

  • UiPlugin - any, but only 1 can require the main thread

It checks for the expected plugins, sets up mediators, and then passes the mediators to each plugin in pntos.api.CommonPlugin.init_plugin(), then passes off to the _main() function.

Inside the main function, calls pntos.api.TransportPlugin.start_listening() on all transport plugins, checks if the UI needs to update (and passes it the thread if so), then waits for a ctrl+c to exit pntOS.

_plugin_resources_location
__init__(identifier)

Cobra Buscat Controller

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier field.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

take_control(plugins, plugin_resources_locations=None, initial_config=None)

Takes over primary control of the program from the app.

Takes over primary control of the program from the app, using the plugins to process data, generate fused estimates, and ultimately produce and output PNT solutions. take_control() must use the plugins passed to it and construct a full pntOS system. Please see the description of the pntos.api.PlatformIntegrationPlugin (PIP) for a description of duties of this function compared to the duties of the pntos.api.PlatformIntegrationPlugin.take_control() method (if a PIP is in the plugins list).

Parameters:
  • plugins (list[CommonPlugin]) – An array of plugins available to the controller.

  • plugin_resources_locations (list[str | None] | None, optional) – A list of strings which represent a list of locations, one for each plugin in plugins, where those plugins may find auxiliary data if needed. The array can be None, otherwise the array is of the same length as plugins. Each string may be None, filesystem paths, or adhere to some scheme, such as the URI scheme, for defining the resource’s location.

  • initial_config (str | None, optional) –

    Represents a source of values to initialize the primary pntOS configuration. This could be None or a str. Examples of possible values for the latter are:

    1. The entire config.

    2. A local file path on systems which support them.

    3. A string adhering to the URI scheme.

_sort_and_validate_plugins(plugins)

Utility function to ensure plugins contains enough plugins to run pntOS Cobra. Then assigns and dispatches them to the relevant fields on the controller. Raises a RuntimeError if plugins are not as expected.

_log(level, message)

Utility logging method for controller.

NOTE: This is only intended for log messages originating from the controller.

_main()

The main control of pntOS Cobra.

class pntos.cobra.CobraUiLogPlayerPlugin(identifier, lcm_url='tcpq://localhost:7700', upload_dir='_static/dist/', group='ui/logplayer')

Bases: UtilityPlugin

_mediator
_file
_file_found
_play
_step
_lcm_url
_group
_log_thread
_upload_dir
_run()
_request_notify_new_file()
_remove_notify_new_file()
_initialize_keys()
_request_notify_run_thread()
_remove_notify_run_thread()
_play_pause_callback(group, keys, kv)
_load_callback(group, keys, kv)
_step_callback(group, keys, kv)
_shutdown_current_thread()
_info(message)

Log info message.

_warn(message)

Log warning message.

_debug(message)

Log debug message.

_error(message)

Log error message.

class pntos.cobra.ExperimentalCobraUiPlugin(identifier, config_group='config/cobra_ui')

Bases: UiPlugin

Flask/SocketIO-based UI plugin for pntOS.

Provides WebSocket API for: - Real-time registry subscriptions - Snapshot retrieval - Update notifications

mediator
registry_manager
app
socket
config
_metadata_manager
static_folder
identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

config_group
write_buffer
_shutdown_event
requires_main_thread()

Check if this plugin needs to run on the main thread.

Some systems require GUI applications to run on the main thread. This method can be used to query whether or not this plugin must be run on the main thread. If this method returns True, then run_main_thread() must be called from the main thread in order to start this plugin.

Returns:

True if plugin needs to run on the main thread, False otherwise.

Return type:

bool

run_main_thread()

Start plugin on the main thread.

This method should only be called if requires_main_thread() returns True. This method should only be called from the main thread.

_runtime_assets_exist(static_folder)
_run_registry_updates_thread()

Pops changes from the registry manager and puts them in the send queue.

_run_server_thread()

Run the Flask-SocketIO server in a background thread.

_route_app()
_route_socket()

Set up WebSocket event handlers.

_info(message)

Log info message.

_warn(message)

Log warning message.

_debug(message)

Log debug message.

_error(message)

Log error message.

class pntos.cobra.DummyControllerPlugin(identifier)

Bases: ControllerPlugin

A ControllerPlugin with minimal capability. Not for use in production code.

This class performs just the activities required to get a bare-bones operable system: basic plugin initialization and shutdown calls, as well as the special initialization requirements of OrchestrationPlugin s and TransportPlugin s. No other configuration is performed.

This class uses the similarly limited-in-capability DummyMediator to enable callbacks between other plugins.

_mediator

DummyMediator instance that holds refs to other plugins.

take_control(plugins, plugin_resources_locations=None, initial_config=None)

Takes over control initializing all basic plugins, OrchestrationPlugin s, and starts all TransportPlugin listener threads.

Parameters:
  • plugins (list[CommonPlugin]) – All plugins to include in the operable system.

  • plugin_resources_locations (list[str | None] | None) – Unused.

  • initial_config (str | None) – Unused.

class pntos.cobra.DummyOrchestrationPlugin(identifier)

Bases: OrchestrationPlugin

A very simple OrchestrationPlugin implementation that mimics a fully functional system by accepting inputs and echoing these inputs as outputs through a Mediator. Though additional plugins may be supplied to this instance they are not used in any way.

_plugins

All plugins this instance is managing.

_last_message

The last message received by process_pntos_message().

mediator

Mediator instance used to handle callbacks.

init_orchestration_plugin(plugins, stream_config)

Initial data structures needed by the orchestration plugin.

This function will be called by the system after the pntos.api.CommonPlugin.init_plugin() but before any other call to an pntos.api.OrchestrationPlugin function.

Parameters:
  • plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include a pntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins, None may be passed.

  • stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the pntos.api.ControllerPlugin how it would prefer delivery of messages. When the orchestration plugin receives the stream_config struct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of the pntos.api.ControllerPlugin.

process_pntos_message(message, sequenced)

Deliver a new message from an external to pntOS source into the orchestration plugin.

The plugin should utilize this sensor data contained in message by passing it into a fusion engine.

Parameters:
property filter_description_list

Get a list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

For example, if the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

Return type:

list[str]

request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The solution times.

  • filter_description (str | None, optional) – An pntos.api.OrchestrationPlugin may run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular pntos.api.OrchestrationPlugin implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. The number of solutions should equal the number of times in solution_times, although some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message list may be None if filter_description is invalid.

Return type:

list[Message | None] | None

class pntos.cobra.DummyTransportPlugin(identifier)

Bases: TransportPlugin

A TransportPlugin with minimal capability. Not for use in production code.

This transport simulates inbound data to Cobra by producing fake messages in a loop to pass out through process_pntos_message(). As there is no mechanism for actual publishing, outbound data sent through broadcast_message() die here, eulogized by a logged message containing the channel it would have been published to.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

mediator

Mediator used to pass data around and log messages.

_thread

Main listener thread that produces simulated data.

_listening

Flag that indicates if _thread should be active.

start_listening()

Starts the thread that generates data and pushes into the mediator, simulating data arriving over the network or similar source.

stop_listening()

Stops the data generation if it is active.

broadcast_message(message, channel_name=None)

Send a message back out to the sensor from pntOS.

Parameters:
  • message (Message) – The message to send.

  • channel_name (str | None, optional) – The desired channel. If channel_name is None the implementation may decide where message should be routed, if anywhere. For example, a serial cable might send all messages to a single destination.

_log(message, level=LoggingLevel.INFO)

Wrapper around log_message() that hard asserts that the mediator exists.

Parameters:
  • message – Message to log.

  • level – Level to log message at.

class pntos.cobra.StandardControllerPlugin(identifier)

Bases: ControllerPlugin

This is a simple single-threaded controller plugin.

Here are the plugins and corresponding expected number of instances this controller looks for:

  • FusionPlugin - at least 1

  • FusionStrategyPlugin - at least 1

  • InertialPlugin - at least 1

  • InitializationPlugin - at least 1

  • LoggingPlugin - 1

  • OrchestrationPlugin - 1

  • RegistryPlugin - 1

  • TransportPlugin - at least 1

  • UiPlugin - any, but only 1 can require the main thread

It checks for the expected plugins, sets up mediators, and then passes the mediators to each plugin in pntos.api.CommonPlugin.init_plugin(), initializes the orchestration plugin, then passes off to the _main() function.

Inside the main function, calls pntos.api.TransportPlugin.start_listening() on all transport plugins, checks if the UI needs to update (and passes it the thread if so), then waits for a ctrl+c to exit pntOS.

The TransportPlugins will initiate the necessary filtering through their calls to pntos.api.Mediator.process_pntos_message().

_plugin_resources_location
__init__(identifier)

Cobra Standard Controller

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier field.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

take_control(plugins, plugin_resources_locations=None, initial_config=None)

Takes over primary control of the program from the app.

Takes over primary control of the program from the app, using the plugins to process data, generate fused estimates, and ultimately produce and output PNT solutions. take_control() must use the plugins passed to it and construct a full pntOS system. Please see the description of the pntos.api.PlatformIntegrationPlugin (PIP) for a description of duties of this function compared to the duties of the pntos.api.PlatformIntegrationPlugin.take_control() method (if a PIP is in the plugins list).

Parameters:
  • plugins (list[CommonPlugin]) – An array of plugins available to the controller.

  • plugin_resources_locations (list[str | None] | None, optional) – A list of strings which represent a list of locations, one for each plugin in plugins, where those plugins may find auxiliary data if needed. The array can be None, otherwise the array is of the same length as plugins. Each string may be None, filesystem paths, or adhere to some scheme, such as the URI scheme, for defining the resource’s location.

  • initial_config (str | None, optional) –

    Represents a source of values to initialize the primary pntOS configuration. This could be None or a str. Examples of possible values for the latter are:

    1. The entire config.

    2. A local file path on systems which support them.

    3. A string adhering to the URI scheme.

_sort_and_validate_plugins(plugins)

Utility function to ensure plugins contains enough plugins to run pntOS Cobra. Then assigns and dispatches them to the relevant fields on the controller. Raises a RuntimeError if plugins are not as expected.

_log(level, message)

Utility logging method for controller.

NOTE: This is only intended for log messages originating from the controller.

_main()

The main control of pntOS Cobra.

class pntos.cobra.DiagnosticLogPlugin(identifier, output_file=None)

Bases: UtilityPlugin

A plugin to save any values put into the diagnostics group in the registry to an output HDF5 file (OUTPUT_FILE).

If any other plugin wants to store values to the output, all the plugin has to do is write the values to any key within the diagnostics group. However, there are a two constraints: - All values assigned to a given key must be of the same type. - If the value for a given key is list[str] or NDArray[float64], the length must not change each update of the value at that key.

__init__(identifier, output_file=None)

Diagnostic-Logging Utility Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

_callback(group, keys, kv)
class pntos.cobra.EkfFusionStrategyPlugin(identifier)

Bases: FusionStrategyPlugin

This is an EKF fusion strategy plugin. It functions as a factory that produces EKF fusion strategies.

_mediator
__init__(identifier)

An Extended Kalman Filter Fusion Strategy Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

is_fusion_type_supported(fusion_type)

Check if a particular fusion strategy is supported by new_fusion_strategy().

The new_fusion_strategy() factory method on this class can create one or more types of fusion strategy. However, pntos.api.FusionStrategyPlugin may not support all types (they must support at least one). Therefore, when a user receives a pntos.api.FusionStrategyPlugin, they should:

  1. Initialize the plugin by calling pntos.api.CommonPlugin.init_plugin() (see pntos.api.CommonPlugin for more information).

  2. Call is_fusion_type_supported() to check that the plugin supports the type that the user wants to use.

  3. If is_fusion_type_supported() returned True, call the new_fusion_strategy() factory method to get a new fusion strategy of the desired fusion strategy.

  4. Proceed to use the fusion strategy to do estimation.

Parameters:

fusion_type (type[FusionStrategyType]) – The fusion strategy type we are checking.

Returns:

Whether or not we support the requested fusion_type.

Return type:

bool

new_fusion_strategy(fusion_type)

Create a new fusion strategy of the requested type.

Users must first ensure that the fusion strategy is supported by calling is_fusion_type_supported().

Parameters:
  • fusion_type (type[FusionStrategyType]) – The type of fusion strategy that we want

  • returned.

Returns:

The newly created fusion strategy, which is an implementation of the type specified by fusion_type. For example, if the user calls new_fusion_strategy() with a fusion_type of StandardFusionStrategy, then the returned object will be an implementation of pntos.api.StandardFusionStrategy.

Return type:

FusionStrategyType | None

class pntos.cobra.StandardFusionPlugin(identifier)

Bases: FusionPlugin

A fusion plugin that provides instances of fusion engines.

_mediator
__init__(identifier)

A Fusion Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

is_fusion_type_supported(fusion_type)

Check if the plugin supports a given type of fusion.

Parameters:

fusion_type (type[FusionEngineType])

Returns:

bool

new_fusion_engine(fusion_type)

Create a fusion engine.

Parameters:

fusion_type (type[FusionEngineType]) – This parameter specifies the type of fusion engine that will be returned.

Returns:

The fusion_type parameter specifies the type of fusion engine that will be returned. For example, if the user passes in pntos.api.StandardFusionEngine, then an implementation of pntos.api.StandardFusionEngine will be returned. Returns None if fusion_type is not supported by this fusion plugin (is_fusion_type_supported() can be used to check the type before calling this method). Otherwise the return is guaranteed to not be None.

Return type:

FusionEngineType | None

class pntos.cobra.LcmLogTransportPlugin(identifier)

Bases: TransportPlugin

A transport plugin which process LCM messages from a log.

Capable of marshalling ASPN23-LCM to ASPN23-Python.

mediator
_log_reader_thread
_lcm
_channels_to_process
_ui
__init__(identifier)

LCM Log Transport Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

_shutdown_threads
_channels_found
read_log()

Process messages from LCM log

start_listening()

Start listening.

Start listening to the transport that this plugin implements, calling the appropriate controller function as data streams in.

stop_listening()

Disable listening.

Disable listening to the transport that was previously started in a call to start_listening().

broadcast_message(message, channel_name=None)

Record LCM message to output file

class pntos.cobra.LcmTransportPlugin(identifier)

Bases: TransportPlugin

A transport plugin which listens for LCM messages.

Capable of marshalling ASPN23-LCM to ASPN23-Python.

mediator
subscription
_url
_subscription_regex
_ui
__init__(identifier)

ASPN-LCM Transport Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

_shutdown_threads
lcm
handler
_sender
_channels
_output_queue
_send_thread()
_general_handler(channel, data)
_handler_thread()

Call LCM.handle in a loop.

start_listening()

Start listening.

Start listening to the transport that this plugin implements, calling the appropriate controller function as data streams in.

stop_listening()

Disable listening.

Disable listening to the transport that was previously started in a call to start_listening().

broadcast_message(message, channel_name=None)

Send a message over LCM to a specific channel

class pntos.cobra.ManualHeadingAlignInitializationPlugin(identifier)

Bases: InitializationPlugin

A static alignment initialization plugin that provides the internal.ManualHeadingAlign strategy.

mediator
__init__(identifier)

Cobra Static Alignment Initialization Plugin

Parameters:

identifier – A string identifier uniquely identifying this plugin.

is_initialization_type_supported(initialization_type)

Check if given InitializationType is supported.

Parameters:

initialization_type (type[InitializationType])

Returns:

True if the plugin supports a given type of mechanization, False otherwise.

Return type:

bool

new_initialization_strategy(initialization_type, config_group=None)

Create an instance of pntos.api.CommonInitializationStrategy.

Parameters:
  • initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation of pntos.api.InertialInitializationStrategy. If initialization_type is unsupported by the plugin, then None will be returned. Please use is_initialization_type_supported() to check if the type is supported by the plugin.

  • config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.

Returns:

The new initialization strategy object. Returns None if initialization_type is unsupported by this plugin (this can be checked using is_initialization_type_supported()) or if config_group is invalid.

Return type:

InitializationType | None

class pntos.cobra.StandardPreprocessorPlugin(identifier)

Bases: PreprocessorPlugin

A preprocessor plugin that provides the standard-level set of preprocessors.

The preprocessors this plugin provides are:

  1. DownsamplerPreprocessor - Downsamples messages on a given list of channels.

  2. ImuRotationPreprocessor - Rotated IMU measurements from IMU to platform frame.

  3. TimeAdjusterPreprocessor - Synthesizes timestamps to compensate for erroneous hardware.

  4. BarometerToAltitudePreprocessor - Converts pressure measurements to altitude measurements.

  5. TimeBiasPreprocessor - Applies an offset to timestamps to correct for a constant time bias.

  6. OutagePreprocessor - Induces an outage on a specified channel.

mediator
__init__(identifier)

Constructor.

Parameters:

identifier (str) – The plugin identifier used to set this plugin’s pntos.api.CommonPlugin.identifier field.

new_preprocessor(preprocessor_index, config_group=None)

Get a newly created pntos.api.Preprocessor.

Parameters:

preprocessor_index (int) – Since the pntos.api.PreprocessorPlugin can create a different preprocessor for each element in preprocessor_identifiers, the preprocessor_index parameter is used to select which kind of preprocessor to create a new instance of. The pntos.api.PreprocessorPlugin.preprocessor_identifiers field contains identifying strings for the kinds of preprocessors.

Example

For example, if the plugin can create 45 different preprocessors, the identifier of the last preprocessor that can be created is found in preprocessor_identifiers[44]. An instance of this preprocessor can be created by calling new_preprocessor(44, ...). Note that 0 <= preprocessor_index < length of preprocessor_identifiers.

config_group (str | None, optional): Indicates which (if any) parameter group in the

registry may be used to obtain additional configuration values to generate the new preprocessor. If the preprocessor requires no outside configuration, config_group may be None.

Returns:

A newly created pntos.api.Preprocessor. Returns None if preprocessor_index is greater than or equal to the length of pntos.api.PreprocessorPlugin.preprocessor_identifiers or if config_group is invalid.

Return type:

Preprocessor

class pntos.cobra.StandardInertialPlugin(identifier)

Bases: InertialPlugin

An inertial plugin that generates instances of the internal.StandardInertial class.

mediator
__init__(identifier)

An Inertial Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

is_inertial_type_supported(inertial_type)

Check if the plugin supports a given type of inertial.

Parameters:

inertial_type (type[InertialType])

Returns:

True if inertial type is supported, False otherwise.

Return type:

bool

new_inertial(inertial_type, solution, config_group=None)

Create an instance of an implementation of pntos.api.CommonInertial.

Parameters:
  • inertial_type (type[InertialType]) – Specifies the type of inertial that the returned value will support. For example, if the user passes in STANDARD_INERTIAL_MECHANIZATION, then the returned value will be an implementation of pntos.api.StandardInertialMechanization. If inertial_type is unsupported by the plugin, then None will be returned. Please use is_inertial_type_supported() to check if the type is supported by the plugin.

  • solution (Message) – The initial solution (i.e. the alignment) to mechanize from.

  • config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to initialize the new inertial. This allows for multiple inertial instances to exist with unique settings.

Returns:

A new inertial object. Returns None if inertial_type is unsupported, solution is invalid, or config_group is invalid. is_inertial_type_supported() can be called to verify inertial_type before calling this method.

Return type:

InertialType | None

class pntos.cobra.StandardLoggingPlugin(identifier, colorize=True, global_log_level=LoggingLevel.INFO, date_time_format='%d/%m/%Y %H:%M:%S')

Bases: LoggingPlugin

A logging plugin that dictates the formatting and handles logging to the console.

__init__(identifier, colorize=True, global_log_level=LoggingLevel.INFO, date_time_format='%d/%m/%Y %H:%M:%S')

Cobra Logging Plugin

Parameters:
  • identifier (str) – Populates the CommonPlugin.identifier field.

  • colorize (bool) – Prints colored log messages if true, uncolored if false.

  • global_log_level (LoggingLevel) – Selects which log levels get printed. See log() for more info.

  • date_time_format (str) – Specifies the date-time format according to the available format specifiers. See time.strftime() for more info on supported formats.

log(source_plugin_type, source_plugin_identifier, level, message)

Log a message.

This implementation defines the following behavior:
  1. A config defines a global LoggingLevel for the plugin

  2. A logging request has a LoggingLevel and interacts with the global log level to determine if it will be output.

The rules for the interaction between requested and global level are: -requested at INFO should print at global level(s): INFO, DEBUG -requested at WARN should print at global level(s): INFO, WARN, DEBUG -requested at DEBUG should print at global level(s): DEBUG -requested at ERROR should print at global level(s): INFO, WARN, DEBUG, ERROR

or in other words -global level: ERROR - only shows ERROR -global level: WARN - shows ERROR or WARN -global level: INFO - shows ERROR, WARN, or INFO -global level: DEBUG - shows ERROR, WARN, INFO, or DEBUG

class pntos.cobra.StandardOrchestrationPlugin(identifier)

Bases: OrchestrationPlugin

The standard orchestration plugin designed to take multiple state blocks and measurement processors.

mediator
init_pinson_cov
fusion_plugin
fusion_strategy_plugin
inertial_plugin
state_modeling_plugins
initialization_plugin
initializer
inertial
feedback_config
last_feedback_time_ns
fusion_engine
pinson_sb_config
alignment_channels
vsb_labels
__init__(identifier)
Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

initialization_state
init_solution
preprocessors
measurement_channels
inertial_drift_prop_dt
cache
_log(level, message)

Send an informational log message to pntOS.

If no mediator is provided, manually print the message. Otherwise, pass through to the mediator’s log_message method.

init_orchestration_plugin(plugins, stream_config)

Initial data structures needed by the orchestration plugin.

This function will be called by the system after the pntos.api.CommonPlugin.init_plugin() but before any other call to an pntos.api.OrchestrationPlugin function.

Parameters:
  • plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include a pntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins, None may be passed.

  • stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the pntos.api.ControllerPlugin how it would prefer delivery of messages. When the orchestration plugin receives the stream_config struct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of the pntos.api.ControllerPlugin.

_set_up_preprocessors(sorted_plugins, preprocessor_configs)

Finds and creates a list of preprocessors based on the information from preprocessor_configs.

Parameters:
Returns:

list[Preprocessor]

_generate_initial_inertial_solution()

Utility function that generates the initial inertial solution.

_store_config_data(orch_config, controller_config)

Utility function to store the orchestration config fields in easily accessible data structures.

Parameters:
  • orch_config (StandardOrchestrationConfig) – The StandardOrchestrationConfig containing the data to be stored.

  • controller_config (ControllerConfig) – The ControllerConfig containing additional data to be stored.

_initialize_filter()

Creates a pinson state block with a covariance calculated during alignment. Then adds the block and time to the fusion engine effectively beginning the filter.

_add_state_block(providers, sb_config)

Utility function to add a state block to the fusion engine.

_add_measurement_processor(providers, mp_config)

Utility function to add a measurement processor to the fusion engine.

_add_virtual_state_block(providers, vsb_config)

Utility function to add a virtual state block to the fusion engine.

_map_vsb_inertial_needs(mp_configs)

Utility function that maps measurement processor labels to VSB labels that need forces and rates or the inertial PVA prior to performing a filter update using this measurement processor.

_set_up_fusion_engine(sb_configs, mp_configs, vsb_configs)

Utility function to assemble the components of and create a fusion engine.

_create_state_block_ewc(state_block_id, num_states, ewc=None)
_preprocess_message(message)

Process the given message by the full chain of preprocessors.

Parameters:

message (Message) – The message to process.

Returns:

The output messages, or None if one of the preprocessors dropped the input message.

Return type:

list[Message] | None

_propagate_to_time(target_time)

Propagate up to target time in max steps of self.max_prop_dt_ns

_publish_solution(solution, group, key)

Publish solution to registry and over transport.

_get_inertial_forces()

Get inertial foces at the current filter time.

_send_inertial_aux_to_measurement_processor(mp_label)

Send the current inertial solution and/or forces to the specified measurement processor.

_send_inertial_aux_to_pinson()

Send the current inertial solution, forces, and rates to the pinson state block.

_send_inertial_aux_to_vsbs(mp_label)

Send the current inertial solution, forces, and/or rates to any virtual blocks targeted by the given measurement processor that need it.

_ready_to_apply_feedback()
_apply_inertial_feedback()

Correct inertial solution with pinson error states and reset states.

_perform_measurement_update(message, target_mp)

Send message to the fusion engine to update the filter.

After performing update, applies feedback to inertial solution and biases, and resets pinson error states.

_propagate_during_outage()
_send_message_as_aux_data(message)
process_pntos_message(message, sequenced)

Deliver a new message from an external to pntOS source into the orchestration plugin.

The plugin should utilize this sensor data contained in message by passing it into a fusion engine.

Parameters:
property filter_description_list

Get a list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

For example, if the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

Return type:

list[str]

request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The solution times.

  • filter_description (str | None, optional) – An pntos.api.OrchestrationPlugin may run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular pntos.api.OrchestrationPlugin implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. The number of solutions should equal the number of times in solution_times, although some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message list may be None if filter_description is invalid.

Return type:

list[Message | None] | None

class pntos.cobra.StandardRegistryPlugin(identifier, config=None)

Bases: RegistryPlugin

A registry plugin that creates internal.StandardRegistry instances.

_plugin_resources_location = None
mediator
__init__(identifier, config=None)

Cobra Standard Registry Plugin

Parameters:
  • identifier (str) – The plugin identifier used to set this plugin’s identifier field.

  • config (list[BaseConfig], optional) – A list of configs to store in the registry off instantiation.

registries
config
identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

new_registry(initial_config=None)

Create a new registry based on the initial values stored in initial_config.

Parameters:

initial_config (str | None, optional) –

The initial values the new registry should be based on. The format of initial_config is implementation specific, and plugins are free to support any or no format. Possible formats may include:

  • None, in which case the plugin is free to choose initial values. Choices may include hard-coded in the plugin or none at all.

  • A str. Examples of possible values the parameter could hold:

    1. The entire config.

    2. A local file path on systems which support them.

    3. A string adhering to the URI scheme.

Returns:

The newly created registry.

Return type:

Registry

Note

The returned pntos.api.Registry should be capable of producing pntos.api.KeyValueStore structs that are able to be used concurrently. Thus if the user uses the return value of this method to start two batches, one on group “foo” and the other on group “bar”, then concurrent access to both of the resulting pntos.api.KeyValueStore structs for “foo” and “bar” should be supported (i.e. no shared mutable state between them that is not synchronized).

_log(level, message)
class pntos.cobra.StandardStateModelingPlugin(identifier)

Bases: StateModelingPlugin

StateModelingPlugin that generates a pntos.cobra.internal.StandardStateModelProvider.

_mediator
new_state_model_provider(type)

Generate a state model provider.

Parameters:

fusion_type (StateModelProviderType) – Specifies the type of fusion that the returned value will support. For example, if the user passes in STANDARD_MODEL, then the returned value will be an implementation of pntos.api.StandardStateModelProvider.

Returns:

A state model provider which implements the specified type or None if fusion_type is not supported (is_fusion_type_supported() can be used to check fusion_type).

Return type:

StateModelProviderType | None

is_fusion_type_supported(type)

Check if the plugin supports a given type of fusion. See StateModelProviderType.

Parameters:

fusion_type (StateModelProviderType)

Returns:

bool

class pntos.cobra.StaticAlignInitializationPlugin(identifier)

Bases: InitializationPlugin

A static alignment initialization plugin that provides the internal.StaticAlign strategy.

mediator
__init__(identifier)

Cobra Static Alignment Initialization Plugin

Parameters:

identifier – A string identifier uniquely identifying this plugin.

is_initialization_type_supported(initialization_type)

Check if given InitializationType is supported.

Parameters:

initialization_type (type[InitializationType])

Returns:

True if the plugin supports a given type of mechanization, False otherwise.

Return type:

bool

new_initialization_strategy(initialization_type, config_group=None)

Create an instance of pntos.api.CommonInitializationStrategy.

Parameters:
  • initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation of pntos.api.InertialInitializationStrategy. If initialization_type is unsupported by the plugin, then None will be returned. Please use is_initialization_type_supported() to check if the type is supported by the plugin.

  • config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.

Returns:

The new initialization strategy object. Returns None if initialization_type is unsupported by this plugin (this can be checked using is_initialization_type_supported()) or if config_group is invalid.

Return type:

InitializationType | None

class pntos.cobra.TutorialPosInsStateModelingPlugin(identifier)

Bases: StateModelingPlugin

StateModelingPlugin that generates a pntos.cobra.internal.TutorialPosInsStateModelProvider.

_mediator
new_state_model_provider(fusion_type)

Generate a state model provider.

Parameters:

fusion_type (StateModelProviderType) – Specifies the type of fusion that the returned value will support. For example, if the user passes in STANDARD_MODEL, then the returned value will be an implementation of pntos.api.StandardStateModelProvider.

Returns:

A state model provider which implements the specified type or None if fusion_type is not supported (is_fusion_type_supported() can be used to check fusion_type).

Return type:

StateModelProviderType | None

is_fusion_type_supported(fusion_type)

Check if the plugin supports a given type of fusion. See StateModelProviderType.

Parameters:

fusion_type (StateModelProviderType)

Returns:

bool

class pntos.cobra.TutorialInitializationPlugin(identifier)

Bases: InitializationPlugin

A simple initialization plugin that generates internal.ManualInitialization instances.

mediator
__init__(identifier)

Cobra Simple Initialization Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

is_initialization_type_supported(initialization_type)

Check if given InitializationType is supported.

Parameters:

initialization_type (type[InitializationType])

Returns:

True if the plugin supports a given type of mechanization, False otherwise.

Return type:

bool

new_initialization_strategy(initialization_type, config_group=None)

Create an instance of pntos.api.CommonInitializationStrategy.

Parameters:
  • initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation of pntos.api.InertialInitializationStrategy. If initialization_type is unsupported by the plugin, then None will be returned. Please use is_initialization_type_supported() to check if the type is supported by the plugin.

  • config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.

Returns:

The new initialization strategy object. Returns None if initialization_type is unsupported by this plugin (this can be checked using is_initialization_type_supported()) or if config_group is invalid.

Return type:

InitializationType | None

class pntos.cobra.TutorialLcmTransportPlugin(identifier)

Bases: TransportPlugin

A tutorial transport plugin which process LCM messages from a log.

Capable of marshalling ASPN23-LCM to ASPN23-Python.

mediator
_log_reader_thread
_shutdown_threads
_lcm
__init__(identifier)

LCM Log Transport Plugin

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

_channels
start_listening()

Process messages from LCM log

stop_listening()

Disable listening.

Disable listening to the transport that was previously started in a call to start_listening().

broadcast_message(message, channel_name=None)

Record LCM message to output file

class pntos.cobra.TutorialPosOrchestrationPlugin(identifier)

Bases: OrchestrationPlugin

fusion_plugin
fusion_strategy_plugin
inertial_plugin
preprocessor_time_adjust
preprocessor_time_bias
preprocessor_imu_rotator
mediator
measurement_channels
inertial_channel
fusion_engine
inertial
__init__(identifier)

Tutorial orchestration plugin.

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

init_orchestration_plugin(plugins, stream_config)

Initial data structures needed by the orchestration plugin.

This function will be called by the system after the pntos.api.CommonPlugin.init_plugin() but before any other call to an pntos.api.OrchestrationPlugin function.

Parameters:
  • plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include a pntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins, None may be passed.

  • stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the pntos.api.ControllerPlugin how it would prefer delivery of messages. When the orchestration plugin receives the stream_config struct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of the pntos.api.ControllerPlugin.

process_pntos_message(message, sequenced)

Deliver a new message from an external to pntOS source into the orchestration plugin.

The plugin should utilize this sensor data contained in message by passing it into a fusion engine.

Parameters:
property filter_description_list

Get a list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

For example, if the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

Return type:

list[str]

request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The solution times.

  • filter_description (str | None, optional) – An pntos.api.OrchestrationPlugin may run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular pntos.api.OrchestrationPlugin implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. The number of solutions should equal the number of times in solution_times, although some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message list may be None if filter_description is invalid.

Return type:

list[Message | None] | None

_set_up_fusion_engine()
_initialize_inertial_and_fusion_engine()
_get_best_solution(time)

Utility function to generate the ‘best’ solution- raw inertial corrected with error estimates.

_send_inertial_aux_to_pinson()

Send the current inertial solution and forces to the Pinson15 state-block.

_send_inertial_aux_to_measurement_processor(time, label)

Send the current inertial solution to the position measurement processor.

_supply_aux(cur_time, label)
class pntos.cobra.TutorialPosVelOrchestrationPlugin(identifier)

Bases: OrchestrationPlugin

fusion_plugin
fusion_strategy_plugin
inertial_plugin
preprocessor_time_adjust
preprocessor_time_bias
preprocessor_imu_rotator
mediator
measurement_channels
inertial_channel
fusion_engine
inertial
__init__(identifier)

Tutorial orchestration plugin.

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

init_orchestration_plugin(plugins, stream_config)

Initial data structures needed by the orchestration plugin.

This function will be called by the system after the pntos.api.CommonPlugin.init_plugin() but before any other call to an pntos.api.OrchestrationPlugin function.

Parameters:
  • plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include a pntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins, None may be passed.

  • stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the pntos.api.ControllerPlugin how it would prefer delivery of messages. When the orchestration plugin receives the stream_config struct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of the pntos.api.ControllerPlugin.

process_pntos_message(message, sequenced)

Deliver a new message from an external to pntOS source into the orchestration plugin.

The plugin should utilize this sensor data contained in message by passing it into a fusion engine.

Parameters:
property filter_description_list

Get a list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

For example, if the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the filters available in this pntos.api.OrchestrationPlugin.

Return type:

list[str]

request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The solution times.

  • filter_description (str | None, optional) – An pntos.api.OrchestrationPlugin may run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular pntos.api.OrchestrationPlugin implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. The number of solutions should equal the number of times in solution_times, although some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message list may be None if filter_description is invalid.

Return type:

list[Message | None] | None

_set_up_fusion_engine()
_initialize_inertial_and_fusion_engine()
_get_best_solution(time)

Utility function to generate the ‘best’ solution- raw inertial corrected with error estimates.

_send_inertial_aux_to_pinson()

Send the current inertial solution and forces to the Pinson15 state-block.

_send_inertial_aux_to_measurement_processor(time, label)

Send the current inertial solution to the position measurement processor.

_supply_aux(cur_time, label)
class pntos.cobra.UiLogPlottingPlugin(identifier)

Bases: UiPlugin

A simple UI plugin that plots results from an LCM or ROS log file on shut down. This plugin ingests UiLogPlottingConfig from config group config/ui_logfile_plotting.

mediator
__init__(identifier)

A UI plotting plugin for LCM or ROS log files.

Parameters:

identifier (str) – The plugin identifier passed to the pntos.api.CommonPlugin.identifier() field.

identifier

A string identifier uniquely identifying this plugin.

This string will be used to determine the unique space this plugin receives in the system config.

_harvest_data(channels, truth_channel)
_plot_results()
requires_main_thread()

Check if this plugin needs to run on the main thread.

Some systems require GUI applications to run on the main thread. This method can be used to query whether or not this plugin must be run on the main thread. If this method returns True, then run_main_thread() must be called from the main thread in order to start this plugin.

Returns:

True if plugin needs to run on the main thread, False otherwise.

Return type:

bool

run_main_thread()

Start plugin on the main thread.

This method should only be called if requires_main_thread() returns True. This method should only be called from the main thread.