Cobra Plugins
pntos.cobra should only directly contain plugin imports. These are all the plugins
that can be directly imported from pntos.cobra.
- class pntos.cobra.BuscatControllerPlugin(identifier)
Bases:
ControllerPluginThis is a simple single-threaded Buscat controller plugin.
The purpose of this plugin is to route one or more data streams from
TransportPluginsback out through one or moreTransportPlugins, as specified in the suppliedBuscatConfig.output_transports. This has the effect of combining multiple data streams and converting all input to the formats supported by the outputTransportPlugins. Note that this controller does not do any buffering or sorting of the input data before publishing. It (or more specifically, theBuscatMediator) also does not explicitly pass input data to plugins other thanTransportPlugins, and thus will not support sensor fusion without some modification.Here are the plugins and corresponding expected number of instances this controller looks for:
LoggingPlugin - 1
RegistryPlugin - 1
TransportPlugin - at least 1
UiPlugin - any, but only 1 can require the main thread
It checks for the expected plugins, sets up mediators, and then passes the mediators to each plugin in
pntos.api.CommonPlugin.init_plugin(), then passes off to the_main()function.Inside the main function, calls
pntos.api.TransportPlugin.start_listening()on all transport plugins, checks if the UI needs to update (and passes it the thread if so), then waits for a ctrl+c to exit pntOS.- _plugin_resources_location
- __init__(identifier)
Cobra Buscat Controller
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifierfield.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- take_control(plugins, plugin_resources_locations=None, initial_config=None)
Takes over primary control of the program from the app.
Takes over primary control of the program from the app, using the
pluginsto process data, generate fused estimates, and ultimately produce and output PNT solutions.take_control()must use the plugins passed to it and construct a full pntOS system. Please see the description of thepntos.api.PlatformIntegrationPlugin(PIP) for a description of duties of this function compared to the duties of thepntos.api.PlatformIntegrationPlugin.take_control()method (if a PIP is in thepluginslist).- Parameters:
plugins (list[CommonPlugin]) – An array of plugins available to the controller.
plugin_resources_locations (list[str | None] | None, optional) – A list of strings which represent a list of locations, one for each plugin in
plugins, where those plugins may find auxiliary data if needed. The array can beNone, otherwise the array is of the same length asplugins. Each string may beNone, filesystem paths, or adhere to some scheme, such as the URI scheme, for defining the resource’s location.initial_config (str | None, optional) –
Represents a source of values to initialize the primary pntOS configuration. This could be
Noneor astr. Examples of possible values for the latter are:The entire config.
A local file path on systems which support them.
A string adhering to the URI scheme.
- _sort_and_validate_plugins(plugins)
Utility function to ensure
pluginscontains enough plugins to run pntOS Cobra. Then assigns and dispatches them to the relevant fields on the controller. Raises aRuntimeErrorif plugins are not as expected.
- _log(level, message)
Utility logging method for controller.
NOTE: This is only intended for log messages originating from the controller.
- _main()
The main control of pntOS Cobra.
- class pntos.cobra.CobraUiLogPlayerPlugin(identifier, lcm_url='tcpq://localhost:7700', upload_dir='_static/dist/', group='ui/logplayer')
Bases:
UtilityPlugin- _mediator
- _file
- _file_found
- _play
- _step
- _lcm_url
- _group
- _log_thread
- _upload_dir
- _run()
- _request_notify_new_file()
- _remove_notify_new_file()
- _initialize_keys()
- _request_notify_run_thread()
- _remove_notify_run_thread()
- _play_pause_callback(group, keys, kv)
- _load_callback(group, keys, kv)
- _step_callback(group, keys, kv)
- _shutdown_current_thread()
- _info(message)
Log info message.
- _warn(message)
Log warning message.
- _debug(message)
Log debug message.
- _error(message)
Log error message.
- class pntos.cobra.ExperimentalCobraUiPlugin(identifier, config_group='config/cobra_ui')
Bases:
UiPluginFlask/SocketIO-based UI plugin for pntOS.
Provides WebSocket API for: - Real-time registry subscriptions - Snapshot retrieval - Update notifications
- mediator
- registry_manager
- app
- socket
- config
- _metadata_manager
- static_folder
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- config_group
- write_buffer
- _shutdown_event
- requires_main_thread()
Check if this plugin needs to run on the main thread.
Some systems require GUI applications to run on the main thread. This method can be used to query whether or not this plugin must be run on the main thread. If this method returns True, then
run_main_thread()must be called from the main thread in order to start this plugin.- Returns:
Trueif plugin needs to run on the main thread,Falseotherwise.- Return type:
bool
- run_main_thread()
Start plugin on the main thread.
This method should only be called if
requires_main_thread()returns True. This method should only be called from the main thread.
- _runtime_assets_exist(static_folder)
- _run_registry_updates_thread()
Pops changes from the registry manager and puts them in the send queue.
- _run_server_thread()
Run the Flask-SocketIO server in a background thread.
- _route_app()
- _route_socket()
Set up WebSocket event handlers.
- _info(message)
Log info message.
- _warn(message)
Log warning message.
- _debug(message)
Log debug message.
- _error(message)
Log error message.
- class pntos.cobra.DummyControllerPlugin(identifier)
Bases:
ControllerPluginA ControllerPlugin with minimal capability. Not for use in production code.
This class performs just the activities required to get a bare-bones operable system: basic plugin initialization and shutdown calls, as well as the special initialization requirements of
OrchestrationPlugins andTransportPlugins. No other configuration is performed.This class uses the similarly limited-in-capability
DummyMediatorto enable callbacks between other plugins.- _mediator
DummyMediatorinstance that holds refs to other plugins.
- take_control(plugins, plugin_resources_locations=None, initial_config=None)
Takes over control initializing all basic plugins,
OrchestrationPlugins, and starts allTransportPluginlistener threads.- Parameters:
plugins (list[CommonPlugin]) – All plugins to include in the operable system.
plugin_resources_locations (list[str | None] | None) – Unused.
initial_config (str | None) – Unused.
- class pntos.cobra.DummyOrchestrationPlugin(identifier)
Bases:
OrchestrationPluginA very simple
OrchestrationPluginimplementation that mimics a fully functional system by accepting inputs and echoing these inputs as outputs through aMediator. Though additional plugins may be supplied to this instance they are not used in any way.- _plugins
All plugins this instance is managing.
- _last_message
The last message received by
process_pntos_message().
- init_orchestration_plugin(plugins, stream_config)
Initial data structures needed by the orchestration plugin.
This function will be called by the system after the
pntos.api.CommonPlugin.init_plugin()but before any other call to anpntos.api.OrchestrationPluginfunction.- Parameters:
plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a
pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include apntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins,Nonemay be passed.stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the
pntos.api.ControllerPluginhow it would prefer delivery of messages. When the orchestration plugin receives thestream_configstruct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of thepntos.api.ControllerPlugin.
- process_pntos_message(message, sequenced)
Deliver a new message from an external to pntOS source into the orchestration plugin.
The plugin should utilize this sensor data contained in message by passing it into a fusion engine.
- Parameters:
message (Message) – The
pntos.api.Messageto be delivered to thepntos.api.OrchestrationPluginfrom thepntos.api.ControllerPlugin. From here thepntos.api.OrchestrationPluginmay use it directly or route it to other plugins (like thepntos.api.FusionPlugin,pntos.api.InertialPlugin, orpntos.api.InitializationPlugin).sequenced (bool) –
Falseifmessagewas the last receivedpntos.api.MessageorTrueif it was delayed by buffering. Seepntos.api.MessageStreamConfigfor more details.
- property filter_description_list
Get a list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
For example, if the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.- Return type:
list[str]
- request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The solution times.
filter_description (str | None, optional) – An
pntos.api.OrchestrationPluginmay run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string infilter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particularpntos.api.OrchestrationPluginimplementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. The number of solutions should equal the number of times insolution_times, although some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagelist may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- class pntos.cobra.DummyTransportPlugin(identifier)
Bases:
TransportPluginA TransportPlugin with minimal capability. Not for use in production code.
This transport simulates inbound data to Cobra by producing fake messages in a loop to pass out through
process_pntos_message(). As there is no mechanism for actual publishing, outbound data sent throughbroadcast_message()die here, eulogized by a logged message containing the channel it would have been published to.- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- mediator
Mediator used to pass data around and log messages.
- _thread
Main listener thread that produces simulated data.
- start_listening()
Starts the thread that generates data and pushes into the
mediator, simulating data arriving over the network or similar source.
- stop_listening()
Stops the data generation if it is active.
- broadcast_message(message, channel_name=None)
Send a message back out to the sensor from pntOS.
- Parameters:
message (Message) – The message to send.
channel_name (str | None, optional) – The desired channel. If
channel_nameisNonethe implementation may decide wheremessageshould be routed, if anywhere. For example, a serial cable might send all messages to a single destination.
- _log(message, level=LoggingLevel.INFO)
Wrapper around
log_message()that hard asserts that the mediator exists.- Parameters:
message – Message to log.
level – Level to log message at.
- class pntos.cobra.StandardControllerPlugin(identifier)
Bases:
ControllerPluginThis is a simple single-threaded controller plugin.
Here are the plugins and corresponding expected number of instances this controller looks for:
FusionPlugin - at least 1
FusionStrategyPlugin - at least 1
InertialPlugin - at least 1
InitializationPlugin - at least 1
LoggingPlugin - 1
OrchestrationPlugin - 1
RegistryPlugin - 1
TransportPlugin - at least 1
UiPlugin - any, but only 1 can require the main thread
It checks for the expected plugins, sets up mediators, and then passes the mediators to each plugin in
pntos.api.CommonPlugin.init_plugin(), initializes the orchestration plugin, then passes off to the_main()function.Inside the main function, calls
pntos.api.TransportPlugin.start_listening()on all transport plugins, checks if the UI needs to update (and passes it the thread if so), then waits for a ctrl+c to exit pntOS.The TransportPlugins will initiate the necessary filtering through their calls to
pntos.api.Mediator.process_pntos_message().- _plugin_resources_location
- __init__(identifier)
Cobra Standard Controller
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifierfield.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- take_control(plugins, plugin_resources_locations=None, initial_config=None)
Takes over primary control of the program from the app.
Takes over primary control of the program from the app, using the
pluginsto process data, generate fused estimates, and ultimately produce and output PNT solutions.take_control()must use the plugins passed to it and construct a full pntOS system. Please see the description of thepntos.api.PlatformIntegrationPlugin(PIP) for a description of duties of this function compared to the duties of thepntos.api.PlatformIntegrationPlugin.take_control()method (if a PIP is in thepluginslist).- Parameters:
plugins (list[CommonPlugin]) – An array of plugins available to the controller.
plugin_resources_locations (list[str | None] | None, optional) – A list of strings which represent a list of locations, one for each plugin in
plugins, where those plugins may find auxiliary data if needed. The array can beNone, otherwise the array is of the same length asplugins. Each string may beNone, filesystem paths, or adhere to some scheme, such as the URI scheme, for defining the resource’s location.initial_config (str | None, optional) –
Represents a source of values to initialize the primary pntOS configuration. This could be
Noneor astr. Examples of possible values for the latter are:The entire config.
A local file path on systems which support them.
A string adhering to the URI scheme.
- _sort_and_validate_plugins(plugins)
Utility function to ensure
pluginscontains enough plugins to run pntOS Cobra. Then assigns and dispatches them to the relevant fields on the controller. Raises aRuntimeErrorif plugins are not as expected.
- _log(level, message)
Utility logging method for controller.
NOTE: This is only intended for log messages originating from the controller.
- _main()
The main control of pntOS Cobra.
- class pntos.cobra.DiagnosticLogPlugin(identifier, output_file=None)
Bases:
UtilityPluginA plugin to save any values put into the
diagnosticsgroup in the registry to an output HDF5 file (OUTPUT_FILE).If any other plugin wants to store values to the output, all the plugin has to do is write the values to any key within the
diagnosticsgroup. However, there are a two constraints: - All values assigned to a given key must be of the same type. - If the value for a given key islist[str]orNDArray[float64], the length must not change each update of the value at that key.- __init__(identifier, output_file=None)
Diagnostic-Logging Utility Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- _callback(group, keys, kv)
- class pntos.cobra.EkfFusionStrategyPlugin(identifier)
Bases:
FusionStrategyPluginThis is an EKF fusion strategy plugin. It functions as a factory that produces EKF fusion strategies.
- _mediator
- __init__(identifier)
An Extended Kalman Filter Fusion Strategy Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- is_fusion_type_supported(fusion_type)
Check if a particular fusion strategy is supported by
new_fusion_strategy().The
new_fusion_strategy()factory method on this class can create one or more types of fusion strategy. However,pntos.api.FusionStrategyPluginmay not support all types (they must support at least one). Therefore, when a user receives apntos.api.FusionStrategyPlugin, they should:Initialize the plugin by calling
pntos.api.CommonPlugin.init_plugin()(seepntos.api.CommonPluginfor more information).Call
is_fusion_type_supported()to check that the plugin supports the type that the user wants to use.If
is_fusion_type_supported()returned True, call thenew_fusion_strategy()factory method to get a new fusion strategy of the desired fusion strategy.Proceed to use the fusion strategy to do estimation.
- Parameters:
fusion_type (type[FusionStrategyType]) – The fusion strategy type we are checking.
- Returns:
Whether or not we support the requested
fusion_type.- Return type:
bool
- new_fusion_strategy(fusion_type)
Create a new fusion strategy of the requested type.
Users must first ensure that the fusion strategy is supported by calling
is_fusion_type_supported().- Parameters:
fusion_type (type[FusionStrategyType]) – The type of fusion strategy that we want
returned.
- Returns:
The newly created fusion strategy, which is an implementation of the type specified by
fusion_type. For example, if the user callsnew_fusion_strategy()with afusion_typeofStandardFusionStrategy, then the returned object will be an implementation ofpntos.api.StandardFusionStrategy.- Return type:
FusionStrategyType | None
- class pntos.cobra.StandardFusionPlugin(identifier)
Bases:
FusionPluginA fusion plugin that provides instances of fusion engines.
- _mediator
- __init__(identifier)
A Fusion Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- is_fusion_type_supported(fusion_type)
Check if the plugin supports a given type of fusion.
- Parameters:
fusion_type (type[FusionEngineType])
- Returns:
bool
- new_fusion_engine(fusion_type)
Create a fusion engine.
- Parameters:
fusion_type (type[FusionEngineType]) – This parameter specifies the type of fusion engine that will be returned.
- Returns:
The
fusion_typeparameter specifies the type of fusion engine that will be returned. For example, if the user passes inpntos.api.StandardFusionEngine, then an implementation ofpntos.api.StandardFusionEnginewill be returned. ReturnsNoneiffusion_typeis not supported by this fusion plugin (is_fusion_type_supported()can be used to check the type before calling this method). Otherwise the return is guaranteed to not beNone.- Return type:
FusionEngineType | None
- class pntos.cobra.LcmLogTransportPlugin(identifier)
Bases:
TransportPluginA transport plugin which process LCM messages from a log.
Capable of marshalling ASPN23-LCM to ASPN23-Python.
- mediator
- _log_reader_thread
- _lcm
- _channels_to_process
- _ui
- __init__(identifier)
LCM Log Transport Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- _shutdown_threads
- _channels_found
- read_log()
Process messages from LCM log
- start_listening()
Start listening.
Start listening to the transport that this plugin implements, calling the appropriate controller function as data streams in.
- stop_listening()
Disable listening.
Disable listening to the transport that was previously started in a call to
start_listening().
- broadcast_message(message, channel_name=None)
Record LCM message to output file
- class pntos.cobra.LcmTransportPlugin(identifier)
Bases:
TransportPluginA transport plugin which listens for LCM messages.
Capable of marshalling ASPN23-LCM to ASPN23-Python.
- mediator
- subscription
- _url
- _subscription_regex
- _ui
- __init__(identifier)
ASPN-LCM Transport Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- _shutdown_threads
- lcm
- handler
- _sender
- _channels
- _output_queue
- _send_thread()
- _general_handler(channel, data)
- _handler_thread()
Call LCM.handle in a loop.
- start_listening()
Start listening.
Start listening to the transport that this plugin implements, calling the appropriate controller function as data streams in.
- stop_listening()
Disable listening.
Disable listening to the transport that was previously started in a call to
start_listening().
- broadcast_message(message, channel_name=None)
Send a message over LCM to a specific channel
- class pntos.cobra.ManualHeadingAlignInitializationPlugin(identifier)
Bases:
InitializationPluginA static alignment initialization plugin that provides the
internal.ManualHeadingAlignstrategy.- mediator
- __init__(identifier)
Cobra Static Alignment Initialization Plugin
- Parameters:
identifier – A string identifier uniquely identifying this plugin.
- is_initialization_type_supported(initialization_type)
Check if given
InitializationTypeis supported.- Parameters:
initialization_type (type[InitializationType])
- Returns:
Trueif the plugin supports a given type of mechanization,Falseotherwise.- Return type:
bool
- new_initialization_strategy(initialization_type, config_group=None)
Create an instance of
pntos.api.CommonInitializationStrategy.- Parameters:
initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in
INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation ofpntos.api.InertialInitializationStrategy. Ifinitialization_typeis unsupported by the plugin, thenNonewill be returned. Please useis_initialization_type_supported()to check if the type is supported by the plugin.config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.
- Returns:
The new initialization strategy object. Returns
Noneifinitialization_typeis unsupported by this plugin (this can be checked usingis_initialization_type_supported()) or ifconfig_groupis invalid.- Return type:
InitializationType | None
- class pntos.cobra.StandardPreprocessorPlugin(identifier)
Bases:
PreprocessorPluginA preprocessor plugin that provides the standard-level set of preprocessors.
The preprocessors this plugin provides are:
DownsamplerPreprocessor - Downsamples messages on a given list of channels.
ImuRotationPreprocessor - Rotated IMU measurements from IMU to platform frame.
TimeAdjusterPreprocessor - Synthesizes timestamps to compensate for erroneous hardware.
BarometerToAltitudePreprocessor - Converts pressure measurements to altitude measurements.
TimeBiasPreprocessor - Applies an offset to timestamps to correct for a constant time bias.
OutagePreprocessor - Induces an outage on a specified channel.
- mediator
- __init__(identifier)
Constructor.
- Parameters:
identifier (str) – The plugin identifier used to set this plugin’s
pntos.api.CommonPlugin.identifierfield.
- new_preprocessor(preprocessor_index, config_group=None)
Get a newly created
pntos.api.Preprocessor.- Parameters:
preprocessor_index (int) – Since the
pntos.api.PreprocessorPlugincan create a different preprocessor for each element inpreprocessor_identifiers, thepreprocessor_indexparameter is used to select which kind of preprocessor to create a new instance of. Thepntos.api.PreprocessorPlugin.preprocessor_identifiersfield contains identifying strings for the kinds of preprocessors.
Example
For example, if the plugin can create 45 different preprocessors, the identifier of the last preprocessor that can be created is found in
preprocessor_identifiers[44]. An instance of this preprocessor can be created by callingnew_preprocessor(44, ...). Note that0 <= preprocessor_index < length of preprocessor_identifiers.- config_group (str | None, optional): Indicates which (if any) parameter group in the
registry may be used to obtain additional configuration values to generate the new preprocessor. If the preprocessor requires no outside configuration,
config_groupmay beNone.
- Returns:
A newly created
pntos.api.Preprocessor. ReturnsNoneifpreprocessor_indexis greater than or equal to the length ofpntos.api.PreprocessorPlugin.preprocessor_identifiersor ifconfig_groupis invalid.- Return type:
- class pntos.cobra.StandardInertialPlugin(identifier)
Bases:
InertialPluginAn inertial plugin that generates instances of the
internal.StandardInertialclass.- mediator
- __init__(identifier)
An Inertial Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- is_inertial_type_supported(inertial_type)
Check if the plugin supports a given type of inertial.
- Parameters:
inertial_type (type[InertialType])
- Returns:
Trueif inertial type is supported,Falseotherwise.- Return type:
bool
- new_inertial(inertial_type, solution, config_group=None)
Create an instance of an implementation of
pntos.api.CommonInertial.- Parameters:
inertial_type (type[InertialType]) – Specifies the type of inertial that the returned value will support. For example, if the user passes in
STANDARD_INERTIAL_MECHANIZATION, then the returned value will be an implementation ofpntos.api.StandardInertialMechanization. Ifinertial_typeis unsupported by the plugin, thenNonewill be returned. Please useis_inertial_type_supported()to check if the type is supported by the plugin.solution (Message) – The initial solution (i.e. the alignment) to mechanize from.
config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to initialize the new inertial. This allows for multiple inertial instances to exist with unique settings.
- Returns:
A new inertial object. Returns
Noneifinertial_typeis unsupported,solutionis invalid, orconfig_groupis invalid.is_inertial_type_supported()can be called to verifyinertial_typebefore calling this method.- Return type:
InertialType | None
- class pntos.cobra.StandardLoggingPlugin(identifier, colorize=True, global_log_level=LoggingLevel.INFO, date_time_format='%d/%m/%Y %H:%M:%S')
Bases:
LoggingPluginA logging plugin that dictates the formatting and handles logging to the console.
- __init__(identifier, colorize=True, global_log_level=LoggingLevel.INFO, date_time_format='%d/%m/%Y %H:%M:%S')
Cobra Logging Plugin
- Parameters:
identifier (str) – Populates the
CommonPlugin.identifierfield.colorize (bool) – Prints colored log messages if true, uncolored if false.
global_log_level (LoggingLevel) – Selects which log levels get printed. See
log()for more info.date_time_format (str) – Specifies the date-time format according to the available format specifiers. See
time.strftime()for more info on supported formats.
- log(source_plugin_type, source_plugin_identifier, level, message)
Log a message.
- This implementation defines the following behavior:
A config defines a global LoggingLevel for the plugin
A logging request has a LoggingLevel and interacts with the global log level to determine if it will be output.
The rules for the interaction between requested and global level are: -requested at INFO should print at global level(s): INFO, DEBUG -requested at WARN should print at global level(s): INFO, WARN, DEBUG -requested at DEBUG should print at global level(s): DEBUG -requested at ERROR should print at global level(s): INFO, WARN, DEBUG, ERROR
or in other words -global level: ERROR - only shows ERROR -global level: WARN - shows ERROR or WARN -global level: INFO - shows ERROR, WARN, or INFO -global level: DEBUG - shows ERROR, WARN, INFO, or DEBUG
- class pntos.cobra.StandardOrchestrationPlugin(identifier)
Bases:
OrchestrationPluginThe standard orchestration plugin designed to take multiple state blocks and measurement processors.
- mediator
- init_pinson_cov
- fusion_plugin
- fusion_strategy_plugin
- inertial_plugin
- state_modeling_plugins
- initialization_plugin
- initializer
- inertial
- feedback_config
- last_feedback_time_ns
- fusion_engine
- pinson_sb_config
- alignment_channels
- vsb_labels
- __init__(identifier)
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- initialization_state
- init_solution
- preprocessors
- measurement_channels
- inertial_drift_prop_dt
- cache
- _log(level, message)
Send an informational log message to pntOS.
If no mediator is provided, manually print the message. Otherwise, pass through to the mediator’s
log_messagemethod.
- init_orchestration_plugin(plugins, stream_config)
Initial data structures needed by the orchestration plugin.
This function will be called by the system after the
pntos.api.CommonPlugin.init_plugin()but before any other call to anpntos.api.OrchestrationPluginfunction.- Parameters:
plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a
pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include apntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins,Nonemay be passed.stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the
pntos.api.ControllerPluginhow it would prefer delivery of messages. When the orchestration plugin receives thestream_configstruct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of thepntos.api.ControllerPlugin.
- _set_up_preprocessors(sorted_plugins, preprocessor_configs)
Finds and creates a list of preprocessors based on the information from
preprocessor_configs.- Parameters:
sorted_plugins (SortedPlugins) – A SortedPlugins instance.
preprocessor_configs (tuple[PreprocessorConfig, ...])
- Returns:
list[Preprocessor]
- _generate_initial_inertial_solution()
Utility function that generates the initial inertial solution.
- _store_config_data(orch_config, controller_config)
Utility function to store the orchestration config fields in easily accessible data structures.
- Parameters:
orch_config (StandardOrchestrationConfig) – The
StandardOrchestrationConfigcontaining the data to be stored.controller_config (ControllerConfig) – The
ControllerConfigcontaining additional data to be stored.
- _initialize_filter()
Creates a pinson state block with a covariance calculated during alignment. Then adds the block and time to the fusion engine effectively beginning the filter.
- _add_state_block(providers, sb_config)
Utility function to add a state block to the fusion engine.
- _add_measurement_processor(providers, mp_config)
Utility function to add a measurement processor to the fusion engine.
- _add_virtual_state_block(providers, vsb_config)
Utility function to add a virtual state block to the fusion engine.
- _map_vsb_inertial_needs(mp_configs)
Utility function that maps measurement processor labels to VSB labels that need forces and rates or the inertial PVA prior to performing a filter update using this measurement processor.
- _set_up_fusion_engine(sb_configs, mp_configs, vsb_configs)
Utility function to assemble the components of and create a fusion engine.
- _create_state_block_ewc(state_block_id, num_states, ewc=None)
- _preprocess_message(message)
Process the given message by the full chain of preprocessors.
- _propagate_to_time(target_time)
Propagate up to target time in max steps of self.max_prop_dt_ns
- _publish_solution(solution, group, key)
Publish solution to registry and over transport.
- _get_inertial_forces()
Get inertial foces at the current filter time.
- _send_inertial_aux_to_measurement_processor(mp_label)
Send the current inertial solution and/or forces to the specified measurement processor.
- _send_inertial_aux_to_pinson()
Send the current inertial solution, forces, and rates to the pinson state block.
- _send_inertial_aux_to_vsbs(mp_label)
Send the current inertial solution, forces, and/or rates to any virtual blocks targeted by the given measurement processor that need it.
- _ready_to_apply_feedback()
- _apply_inertial_feedback()
Correct inertial solution with pinson error states and reset states.
- _perform_measurement_update(message, target_mp)
Send message to the fusion engine to update the filter.
After performing update, applies feedback to inertial solution and biases, and resets pinson error states.
- _propagate_during_outage()
- _send_message_as_aux_data(message)
- process_pntos_message(message, sequenced)
Deliver a new message from an external to pntOS source into the orchestration plugin.
The plugin should utilize this sensor data contained in message by passing it into a fusion engine.
- Parameters:
message (Message) – The
pntos.api.Messageto be delivered to thepntos.api.OrchestrationPluginfrom thepntos.api.ControllerPlugin. From here thepntos.api.OrchestrationPluginmay use it directly or route it to other plugins (like thepntos.api.FusionPlugin,pntos.api.InertialPlugin, orpntos.api.InitializationPlugin).sequenced (bool) –
Falseifmessagewas the last receivedpntos.api.MessageorTrueif it was delayed by buffering. Seepntos.api.MessageStreamConfigfor more details.
- property filter_description_list
Get a list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
For example, if the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.- Return type:
list[str]
- request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The solution times.
filter_description (str | None, optional) – An
pntos.api.OrchestrationPluginmay run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string infilter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particularpntos.api.OrchestrationPluginimplementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. The number of solutions should equal the number of times insolution_times, although some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagelist may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- class pntos.cobra.StandardRegistryPlugin(identifier, config=None)
Bases:
RegistryPluginA registry plugin that creates
internal.StandardRegistryinstances.- _plugin_resources_location = None
- mediator
- __init__(identifier, config=None)
Cobra Standard Registry Plugin
- Parameters:
identifier (str) – The plugin identifier used to set this plugin’s
identifierfield.config (list[BaseConfig], optional) – A list of configs to store in the registry off instantiation.
- registries
- config
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- new_registry(initial_config=None)
Create a new registry based on the initial values stored in
initial_config.- Parameters:
initial_config (str | None, optional) –
The initial values the new registry should be based on. The format of
initial_configis implementation specific, and plugins are free to support any or no format. Possible formats may include:None, in which case the plugin is free to choose initial values. Choices may include hard-coded in the plugin or none at all.A
str. Examples of possible values the parameter could hold:The entire config.
A local file path on systems which support them.
A string adhering to the URI scheme.
- Returns:
The newly created registry.
- Return type:
Note
The returned
pntos.api.Registryshould be capable of producingpntos.api.KeyValueStorestructs that are able to be used concurrently. Thus if the user uses the return value of this method to start two batches, one on group “foo” and the other on group “bar”, then concurrent access to both of the resultingpntos.api.KeyValueStorestructs for “foo” and “bar” should be supported (i.e. no shared mutable state between them that is not synchronized).
- _log(level, message)
- class pntos.cobra.StandardStateModelingPlugin(identifier)
Bases:
StateModelingPluginStateModelingPlugin that generates a
pntos.cobra.internal.StandardStateModelProvider.- _mediator
- new_state_model_provider(type)
Generate a state model provider.
- Parameters:
fusion_type (StateModelProviderType) – Specifies the type of fusion that the returned value will support. For example, if the user passes in
STANDARD_MODEL, then the returned value will be an implementation ofpntos.api.StandardStateModelProvider.- Returns:
A state model provider which implements the specified type or None if
fusion_typeis not supported (is_fusion_type_supported()can be used to checkfusion_type).- Return type:
StateModelProviderType | None
- is_fusion_type_supported(type)
Check if the plugin supports a given type of fusion. See
StateModelProviderType.- Parameters:
fusion_type (StateModelProviderType)
- Returns:
bool
- class pntos.cobra.StaticAlignInitializationPlugin(identifier)
Bases:
InitializationPluginA static alignment initialization plugin that provides the
internal.StaticAlignstrategy.- mediator
- __init__(identifier)
Cobra Static Alignment Initialization Plugin
- Parameters:
identifier – A string identifier uniquely identifying this plugin.
- is_initialization_type_supported(initialization_type)
Check if given
InitializationTypeis supported.- Parameters:
initialization_type (type[InitializationType])
- Returns:
Trueif the plugin supports a given type of mechanization,Falseotherwise.- Return type:
bool
- new_initialization_strategy(initialization_type, config_group=None)
Create an instance of
pntos.api.CommonInitializationStrategy.- Parameters:
initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in
INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation ofpntos.api.InertialInitializationStrategy. Ifinitialization_typeis unsupported by the plugin, thenNonewill be returned. Please useis_initialization_type_supported()to check if the type is supported by the plugin.config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.
- Returns:
The new initialization strategy object. Returns
Noneifinitialization_typeis unsupported by this plugin (this can be checked usingis_initialization_type_supported()) or ifconfig_groupis invalid.- Return type:
InitializationType | None
- class pntos.cobra.TutorialPosInsStateModelingPlugin(identifier)
Bases:
StateModelingPluginStateModelingPlugin that generates a
pntos.cobra.internal.TutorialPosInsStateModelProvider.- _mediator
- new_state_model_provider(fusion_type)
Generate a state model provider.
- Parameters:
fusion_type (StateModelProviderType) – Specifies the type of fusion that the returned value will support. For example, if the user passes in
STANDARD_MODEL, then the returned value will be an implementation ofpntos.api.StandardStateModelProvider.- Returns:
A state model provider which implements the specified type or None if
fusion_typeis not supported (is_fusion_type_supported()can be used to checkfusion_type).- Return type:
StateModelProviderType | None
- is_fusion_type_supported(fusion_type)
Check if the plugin supports a given type of fusion. See
StateModelProviderType.- Parameters:
fusion_type (StateModelProviderType)
- Returns:
bool
- class pntos.cobra.TutorialInitializationPlugin(identifier)
Bases:
InitializationPluginA simple initialization plugin that generates
internal.ManualInitializationinstances.- mediator
- __init__(identifier)
Cobra Simple Initialization Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- is_initialization_type_supported(initialization_type)
Check if given
InitializationTypeis supported.- Parameters:
initialization_type (type[InitializationType])
- Returns:
Trueif the plugin supports a given type of mechanization,Falseotherwise.- Return type:
bool
- new_initialization_strategy(initialization_type, config_group=None)
Create an instance of
pntos.api.CommonInitializationStrategy.- Parameters:
initialization_type (type[InitializationType]) – Specifies the type of initializer that the returned value will support. For example, if the user passes in
INERTIAL_INITIALIZATION_STRATEGY, then the returned value will be an implementation ofpntos.api.InertialInitializationStrategy. Ifinitialization_typeis unsupported by the plugin, thenNonewill be returned. Please useis_initialization_type_supported()to check if the type is supported by the plugin.config_group (str | None, optional) – An optional parameter which can be used to specify which group in the config should be used to set up the new initialization strategy. This allows for multiple initialization strategy instances to exist with unique settings.
- Returns:
The new initialization strategy object. Returns
Noneifinitialization_typeis unsupported by this plugin (this can be checked usingis_initialization_type_supported()) or ifconfig_groupis invalid.- Return type:
InitializationType | None
- class pntos.cobra.TutorialLcmTransportPlugin(identifier)
Bases:
TransportPluginA tutorial transport plugin which process LCM messages from a log.
Capable of marshalling ASPN23-LCM to ASPN23-Python.
- mediator
- _log_reader_thread
- _shutdown_threads
- _lcm
- __init__(identifier)
LCM Log Transport Plugin
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- _channels
- start_listening()
Process messages from LCM log
- stop_listening()
Disable listening.
Disable listening to the transport that was previously started in a call to
start_listening().
- broadcast_message(message, channel_name=None)
Record LCM message to output file
- class pntos.cobra.TutorialPosOrchestrationPlugin(identifier)
Bases:
OrchestrationPlugin- fusion_plugin
- fusion_strategy_plugin
- inertial_plugin
- preprocessor_time_adjust
- preprocessor_time_bias
- preprocessor_imu_rotator
- mediator
- measurement_channels
- inertial_channel
- fusion_engine
- inertial
- __init__(identifier)
Tutorial orchestration plugin.
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- init_orchestration_plugin(plugins, stream_config)
Initial data structures needed by the orchestration plugin.
This function will be called by the system after the
pntos.api.CommonPlugin.init_plugin()but before any other call to anpntos.api.OrchestrationPluginfunction.- Parameters:
plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a
pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include apntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins,Nonemay be passed.stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the
pntos.api.ControllerPluginhow it would prefer delivery of messages. When the orchestration plugin receives thestream_configstruct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of thepntos.api.ControllerPlugin.
- process_pntos_message(message, sequenced)
Deliver a new message from an external to pntOS source into the orchestration plugin.
The plugin should utilize this sensor data contained in message by passing it into a fusion engine.
- Parameters:
message (Message) – The
pntos.api.Messageto be delivered to thepntos.api.OrchestrationPluginfrom thepntos.api.ControllerPlugin. From here thepntos.api.OrchestrationPluginmay use it directly or route it to other plugins (like thepntos.api.FusionPlugin,pntos.api.InertialPlugin, orpntos.api.InitializationPlugin).sequenced (bool) –
Falseifmessagewas the last receivedpntos.api.MessageorTrueif it was delayed by buffering. Seepntos.api.MessageStreamConfigfor more details.
- property filter_description_list
Get a list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
For example, if the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.- Return type:
list[str]
- request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The solution times.
filter_description (str | None, optional) – An
pntos.api.OrchestrationPluginmay run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string infilter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particularpntos.api.OrchestrationPluginimplementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. The number of solutions should equal the number of times insolution_times, although some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagelist may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- _set_up_fusion_engine()
- _initialize_inertial_and_fusion_engine()
- _get_best_solution(time)
Utility function to generate the ‘best’ solution- raw inertial corrected with error estimates.
- _send_inertial_aux_to_pinson()
Send the current inertial solution and forces to the Pinson15 state-block.
- _send_inertial_aux_to_measurement_processor(time, label)
Send the current inertial solution to the position measurement processor.
- _supply_aux(cur_time, label)
- class pntos.cobra.TutorialPosVelOrchestrationPlugin(identifier)
Bases:
OrchestrationPlugin- fusion_plugin
- fusion_strategy_plugin
- inertial_plugin
- preprocessor_time_adjust
- preprocessor_time_bias
- preprocessor_imu_rotator
- mediator
- measurement_channels
- inertial_channel
- fusion_engine
- inertial
- __init__(identifier)
Tutorial orchestration plugin.
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- init_orchestration_plugin(plugins, stream_config)
Initial data structures needed by the orchestration plugin.
This function will be called by the system after the
pntos.api.CommonPlugin.init_plugin()but before any other call to anpntos.api.OrchestrationPluginfunction.- Parameters:
plugins (list[CommonPlugin] | None) – A set of plugins which should be used by the orchestration plugin. For example, the plugins list may include a
pntos.api.StandardFusionEngine, which the orchestration plugin can use to perform fusion of sensor data received. The list may also include apntos.api.StateModelingPlugin, which the orchestration plugin can use to extract the algorithms needed for parsing sensor data into the data model a fusion engine needs. If the orchestration plugin does not require any plugins,Nonemay be passed.stream_config (MessageStreamConfig) – A set of configuration options that the orchestration plugin can use to indicate to the
pntos.api.ControllerPluginhow it would prefer delivery of messages. When the orchestration plugin receives thestream_configstruct, it should call the functions on it to set up how messages will be delivered to it. If it does not, the order of messages’ arrival will be unspecified and at the discretion of thepntos.api.ControllerPlugin.
- process_pntos_message(message, sequenced)
Deliver a new message from an external to pntOS source into the orchestration plugin.
The plugin should utilize this sensor data contained in message by passing it into a fusion engine.
- Parameters:
message (Message) – The
pntos.api.Messageto be delivered to thepntos.api.OrchestrationPluginfrom thepntos.api.ControllerPlugin. From here thepntos.api.OrchestrationPluginmay use it directly or route it to other plugins (like thepntos.api.FusionPlugin,pntos.api.InertialPlugin, orpntos.api.InitializationPlugin).sequenced (bool) –
Falseifmessagewas the last receivedpntos.api.MessageorTrueif it was delayed by buffering. Seepntos.api.MessageStreamConfigfor more details.
- property filter_description_list
Get a list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
For example, if the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the filters available in this
pntos.api.OrchestrationPlugin.- Return type:
list[str]
- request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The solution times.
filter_description (str | None, optional) – An
pntos.api.OrchestrationPluginmay run multiple filters. To select which filter(s) to request solutions from, enter a valid filter description string infilter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particularpntos.api.OrchestrationPluginimplementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. The number of solutions should equal the number of times insolution_times, although some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagelist may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- _set_up_fusion_engine()
- _initialize_inertial_and_fusion_engine()
- _get_best_solution(time)
Utility function to generate the ‘best’ solution- raw inertial corrected with error estimates.
- _send_inertial_aux_to_pinson()
Send the current inertial solution and forces to the Pinson15 state-block.
- _send_inertial_aux_to_measurement_processor(time, label)
Send the current inertial solution to the position measurement processor.
- _supply_aux(cur_time, label)
- class pntos.cobra.UiLogPlottingPlugin(identifier)
Bases:
UiPluginA simple UI plugin that plots results from an LCM or ROS log file on shut down. This plugin ingests UiLogPlottingConfig from config group config/ui_logfile_plotting.
- mediator
- __init__(identifier)
A UI plotting plugin for LCM or ROS log files.
- Parameters:
identifier (str) – The plugin identifier passed to the
pntos.api.CommonPlugin.identifier()field.
- identifier
A string identifier uniquely identifying this plugin.
This string will be used to determine the unique space this plugin receives in the system config.
- _harvest_data(channels, truth_channel)
- _plot_results()
- requires_main_thread()
Check if this plugin needs to run on the main thread.
Some systems require GUI applications to run on the main thread. This method can be used to query whether or not this plugin must be run on the main thread. If this method returns True, then
run_main_thread()must be called from the main thread in order to start this plugin.- Returns:
Trueif plugin needs to run on the main thread,Falseotherwise.- Return type:
bool
- run_main_thread()
Start plugin on the main thread.
This method should only be called if
requires_main_thread()returns True. This method should only be called from the main thread.