Cobra Utils
These are all the objects and functions that can be directly imported from pntos.cobra.utils.
- pntos.cobra.utils.is_symmetric(mat, mediator, rtol=1e-5, atol=1e-8)
This function will compare a matrix with its transpose and determine if the two are equivalent within a provided tolerance. If the two are equivalent, the matrix is symmetric and this function returns
True, else returnsFalse.NOTE: Symmetry requires a matrix to be square, if
matis not a 2-D square matrix this function will log an error and returnFalse.- Parameters:
mat (NDArray[float64]) – The matrix to be examined.
rtol (float) – A coefficient multiplied with every value in the matrix to generate a relative tolerance.
atol (float) – An absolute value added directly the relative tolerance which creates an overall tolerance.
- Returns:
bool
- pntos.cobra.utils.validate_array(arr, mediator, name, dims=None, rows=None, cols=None, err=False)
Validate the dimensionality and/or length of a 1-D or 2-D numpy array.
Sends error log messages through the mediator if validation fails.
- Parameters:
arr (NDArray[float64]) – The array to validate.
mediator (Mediator) – Mediator to use for logging any error messages.
name (str) – Name of array to be included in any error messages.
dims (int | None, optional) – Expected number of dimensions, or None to ignore dimensions. Defaults to None.
rows (int | None, optional) – Expected number of rows, or None to ignore rows. Defaults to None.
cols (int | None, optional) – Expected number of cols, or None to ignore cols. Defaults to None.
err (bool) – When True, raise a ValueError on a failed check.
- pntos.cobra.utils.convert_header_from_cpp(header)
Convert from ASPN-C++ header to ASPN-Python header.
- Parameters:
header (aspn23_xtensor.TypeHeader) – The header to convert.
- Returns:
TypeHeader
- pntos.cobra.utils.convert_header_to_cpp(header, message_type)
Convert from ASPN-Python header to ASPN-C++ header.
- Parameters:
header (TypeHeader) – The measurement header to convert.
message_type (aspn23_xtensor.AspnMessageType) – The type of measurement from which the ASPN measurement originates.
- Returns:
aspn23_xtensor.TypeHeader
- pntos.cobra.utils.convert_imu_from_cpp(imu)
Convert from ASPN-C++ IMU measurement to ASPN-Python IMU measurement.
- Parameters:
imu (aspn23_xtensor.MeasurementImu) – The IMU measurement to convert.
- Returns:
aspn23.MeausurementImu
- pntos.cobra.utils.convert_imu_to_cpp(imu)
Convert from ASPN-Python IMU measurement to ASPN-C++ IMU measurement.
- Parameters:
imu (MeasurementImu) – The IMU measurement to convert.
- Returns:
aspn23_xtensor.MeasurementImu
- pntos.cobra.utils.convert_imu_type_from_cpp(imu_type)
Convert from ASPN-C++ IMU type to ASPN-Python IMU type. If the type cannot be matched, this function will default and return
MeasurementImuImuType.INTEGRATED.- Parameters:
imu_type (aspn23_xtensor.AspnMeasurementImuImuType) – The IMU type to convert.
- Returns:
MeasurementImuImuType
- pntos.cobra.utils.convert_imu_type_to_cpp(imu_type)
Convert from ASPN-Python IMU type to ASPN-C++ IMU type.
- Parameters:
imu_type (MeasurementImuImuType) – The IMU type to convert.
- Returns:
aspn23_xtensor.AspnMeasurementImuImuType
- pntos.cobra.utils.convert_message(message)
Convert from ASPN-Python message to ASPN-C++ message. Currently only supports
MeasurementImuandaspn23.MeasurementPositionmessages.- Parameters:
message (AspnBase) – The message to convert.
- Returns:
aspn23_xtensor.TypeHeader | None
- pntos.cobra.utils.convert_ndarray_to_list(arr, target_type)
Convert from an NDArray with a numerical
dtypeto a list oftarget_type. Multi-dimensional arrays will be converted and their structure will be preserved in the output list.- Parameters:
arr (NDArray[int | float]) – The array to convert.
target_type (type[Any]) – The data type to convert to.
- Returns:
list[
target_type]
- pntos.cobra.utils.convert_ndarray_to_tuple(arr, target_type)
Convert from an NDArray with a numerical
dtypeto a tuple oftarget_type. Multi-dimensional arrays will be converted and their structure will be preserved in the output tuple.- Parameters:
arr (NDArray[int | float]) – The array to convert.
target_type (type[Any]) – The data type to convert to.
- Returns:
tuple[
target_type]
- pntos.cobra.utils.convert_pva_from_cpp(pva, covariance=None)
Convert from ASPN-C++ PVA measurement to ASPN-Python PVA measurement. If
covarianceisNone, this function will use the covariance stored in thepvaparameter.- Parameters:
pva (aspn23_xtensor.MeasurementPositionVelocityAttitude) – The pva to convert.
covariance (NDArray | None) – The covariance to associate with the pva measurement.
- Returns:
MeasurementPositionVelocityAttitude
- pntos.cobra.utils.convert_pva_to_cpp(pva)
Convert from ASPN-Python PVA measurement to ASPN-C++ PVA measurement.
- Parameters:
pva (MeasurementPositionVelocityAttitude) – The pva to convert.
- Returns:
aspn23_xtensor.MeasurementPositionVelocityAttitude
- pntos.cobra.utils.convert_status(status, mediator)
Convert a NavToolkit
AlignmentStatusto its correspondingInitializationStatus.- Parameters:
status (AlignBase.AlignmentStatus) – The NavToolkit status to convert.
mediator (Mediator) – A mediator used to log errors, if necessary.
- Returns:
InitializationStatus
- pntos.cobra.utils.convert_timestamp_from_cpp(timestamp)
Convert from ASPN-C++ timestamp to ASPN-Python timestamp.
- Parameters:
timestamp (aspn23_xtensor.TypeTimestamp) – The timestamp to convert.
- Returns:
TypeTimestamp
- pntos.cobra.utils.convert_timestamp_to_cpp(timestamp)
Convert from ASPN-Python timestamp to ASPN-C++ timestamp.
- Parameters:
timestamp (TypeTimestamp) – The timestamp to convert.
- Returns:
aspn23_xtensor.TypeTimestamp
- pntos.cobra.utils.load_from_hdf5_file(file, log_func)
Utility function for loading data from an HDF5 file into python.
This function is intended to unpack an HDF5 file that was packed by
save_to_hdf5_file()into a dictionary of keys and lists of logged values. Seesave_to_hdf5_file()for more information.- Parameters:
file (pathlib.Path) – Path to
.hdf5file.mediator (Mediator) – A
pntos.api.Mediatorinstance.
- Returns:
dict[str, list[RegistryValueTypeUnion]]
- pntos.cobra.utils.save_to_hdf5_file(file, store, mediator)
Utility function to store a dictionary into an HDF5 file.
This function is designed to take in a dictionary containing string keys corresponding to lists of registry values. The intended use-case for this would be a scenario where each key in
storecorresponds to a key in a registry group, and each time the value changes at a given key in the registry, the changed value is appended to the corresponding list in the store.This functions makes a few assumptions: - Each list contains only one type in
RegistryValueTypesUnion. - If the type for a given key islist[list[str]]orlist[NDArray[float64]], each element of the outer list is the same length.Note
While most datasets in the output HDF5 file will be understandable outside of Python (say, MATLAB), datasets for
list[Message]types are stored as pickle dump objects, packed inside anumpy.void()object.- Parameters:
file (pathlib.Path) – Path to
.hdf5file.store (dict[str, list[RegistryValueTypeUnion]]) – The dictionary to write to the HDF5 file, subject to the above assumptions.
mediator (Mediator) – A
pntos.api.Mediatorinstance.
- pntos.cobra.utils.decode_aspn_lcm_msg(data)
Decodes a set of bytes into an ASPN-LCM message. Uses the first 8 bytes to determine the type of message, if the type cannot be determined this function will return
None.- Parameters:
data (bytes) – The set of bytes to decode.
- Returns:
Aspn23LcmMsg | None
- pntos.cobra.utils.marshal_from_lcm(msg)
Converts from ASPN-LCM message to ASPN23 message. If the input message cannot be converted, this function will return
None.- Parameters:
msg (
Aspn23LcmMsg) – The message to convert.- Returns:
Aspn23Msg | None
- pntos.cobra.utils.marshal_to_aspn23_lcm(msg)
Convert from ASPN23 message to ASPN23-LCM message. If the input message cannot be converted, this function will return
None.- Parameters:
msg (AspnBase) – The message to convert.
- Returns:
Aspn23LcmMsg | None
- pntos.cobra.utils.process_lcm_message(mediator, channel, data, channels)
Marshal LCM message to ASPN-Python and send to the mediator for processing.
- Parameters:
mediator (Mediator) – Mediator instance used for logging and processing message.
channel (str) – The channel name the data originates from.
data (bytes) – A message represented in binary.
channels (set[str]) – Set of channels found so far.
- pntos.cobra.utils.run_pntos_with_log_transport(app, args=None, validate=False)
Spin up app, process log, then shut down.
- Parameters:
app (pathlib.Path) – Path to app to run.
args (list[str] | None) – Optional command-line arguments to pass to app (e.g. output log).
validate (bool) – Whether to validate the app’s output, ensuring there are no warnings or errors. Defaults to False.
- Returns:
Return code of app. Will be 0 if app ran and terminated successfully.
- pntos.cobra.utils.run_pntos_with_network_transport(app, input_log, output_log, args=None, validate=False)
Spin up app and network tools necessary to run it, process log, then shut down.
- Parameters:
app (pathlib.Path) – Path to app to run.
input_log (pathlib.Path) – LCM log containing the measurements to be processed.
output_log (pathlib.Path) – LCM log to which output should be recorded.
args (list[str] | None) – Optional command-line arguments to pass to app.
validate (bool) – Whether to validate the app’s output, ensuring there are no warnings or errors. Defaults to False.
- Returns:
Return code of app. Will be 0 if app ran and terminated successfully.
- pntos.cobra.utils.print_message(level, plugin_id, message, colorize=True, date_time_format='%d/%m/%Y %H:%M:%S')
Print a formatted message to the console.
The printed message will be in the form:
‘<Time> <Plugin ID> <Log Level> <Message>’
- Parameters:
level (LoggingLevel) – Log-level associated with the message.
plugin_id (str) – ID identifying the type of plugin from which the message comes.
message (str) – The message to log.
colorize (bool) – Whether to add colorization to the logged message. Defaults to True.
date_time_format (str) – Format string for the logged timestamp.
- class pntos.cobra.utils.Cache
Bases:
objectA container for storing entries of type CacheEntry.
- __init__()
Constructor.
- set(key, entry)
Store cache entry.
- Parameters:
key (str) – Key associated with cache entry.
entry (CacheEntry) – The entry to store.
- get(key)
Get a cache entry, recalculating it if necessary.
- Parameters:
key (str) – Key associated with entry to get.
- Raises:
KeyError – If key does not exist in cache.
- Returns:
The entry associated with key.
- Return type:
Any
- clear(key)
Clear a cache entry.
Note that this doesn’t remove the entry from the cache, but clears it such that the stored value must be recalculated the next time it is requested.
- Parameters:
key (str) – The key associated with the entry to clear.
- Raises:
KeyError – If key does not exist in cache.
- class pntos.cobra.utils.CacheEntry(fusion_engine)
Bases:
objectAn entry in a cache storing a filter value at a specific time.
- __init__(fusion_engine)
Constructor.
- Parameters:
fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time.
- is_valid()
Check if cache entry is valid.
- Returns:
True if entry exists at the current filter time, False otherwise.
- abstract recalculate(cache)
Recalculate the current cache entry.
- Parameters:
cache (Cache) – Cache in which this entry is stored. Can be used to grab entries that this entry is dependent on.
- clear()
Clear the current cache entry.
Clearing an entry forces the stored value to be recalculated the next time it is requested.
- class pntos.cobra.utils.EstimateWithCovarianceEntry(fusion_engine, sb_label)
Bases:
CacheEntryCache entry for state block estimate and covariance.
- __init__(fusion_engine, sb_label)
Constructor.
- Parameters:
fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time and state block EstimateWithCovariance.
sb_label (str) – Label associated with the state block.
- class pntos.cobra.utils.FilterSolutionEntry(fusion_engine, solution_channel, inertial_solution_key, pinson_x_and_p_key)
Bases:
CacheEntryCache entry for filter solution.
- __init__(fusion_engine, solution_channel, inertial_solution_key, pinson_x_and_p_key)
Constructor.
- Parameters:
fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time.
solution_channel (str) – Source identifier to attach to filter solution.
inertial_solution_key (str) – Key associated with inertial solution cache entry.
pinson_x_and_p_key (str) – Key associated with pinson state block EstimateWithCovariance entry.
- class pntos.cobra.utils.InertialSolutionEntry(fusion_engine, inertial, solution_channel, log_func)
Bases:
CacheEntryCache entry for inertial solution.
- __init__(fusion_engine, inertial, solution_channel, log_func)
Constructor.
- Parameters:
fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time.
inertial (pntos.api.StandardInertialMechanization) – Inertial instance used to get inertial solution at a specific time.
solution_channel (str) – Source identifier to attach to inertial solution.
log_func (Callable[[LoggingLevel, str], None]) – Function used for logging messages.
- pntos.cobra.utils.apply_error_states(pva, x)
Correct the inertial’s PVA message with the fusion engine’s error estimate.
- Parameters:
pva (MeasurementPositionVelocityAttitude) – The PVA message originating from the inertial solution.
x (NDArray[float64]) – The error estimate originating from the fusion engine’s state block.
- pntos.cobra.utils.get_best_solution(fusion_engine, inertial, time, sb_label, best_sol_chan, log_func)
Utility function to request the best fusion strategy solution. Returns
Noneif the fusion engine is unable to provide a solution for the requested time.
- pntos.cobra.utils.get_dead_reckoning_solution(inertial, time, imu_sol_chan, log_func)
Utility function to request the IMU-only dead-reckoning solution. Returns
Noneif the inertial is unable to provide a solution for the requested time.
- pntos.cobra.utils.has_valid_time(init_solution, fusion_engine, message, log_func)
Utility function which returns true if the message’s time of validity is greater than the current fusion engine time.
- pntos.cobra.utils.initialization_ready(initialization_state, initializer)
Utility function to poll the state of the init strategy plugin.
Populates initialization_state with the relevant
pntos.api.InitializationStatus.
- pntos.cobra.utils.set_up_inertial_mechanization(initializer, inertial_plugin, inertial_group, log_func)
Get initial inertial solution and use it to set up the inertial.
- pntos.cobra.utils.set_up_initializer(initialization_plugin, alignment_config_group, log_func)
Set up inertial initialization strategy, and initialize filter solution if initializer is immediately ready.
- pntos.cobra.utils.plot_llh(llh, truth_llh, llh_sig, time, truth_time, t0, leg, plot_dir=None)
Plot LLH position in units of radians, radians, meters, respectively.
- Parameters:
llh (NDArray[float64]) – LLH position associated with each point in
time.truth_llh (NDArray[float64]) – True LLH position associated with each point in
truth_time.llh_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in
llh.time (NDArray[float64]) – Timestamps associated with
llhposition.truth_time (NDArray[float64]) – Timestamps associated with
truth_llhposition.t0 – Initial time (in seconds). Timestamps of
timeandtruth_timeare relative to this.leg (list[str]) – Legend to use for plot.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_ned(ned, truth_ned, ned_sig, time, truth_time, t0, leg, plot_dir=None)
Plot NED position in meters.
- Parameters:
ned (NDArray[float64]) – NED position associated with each point in
time.truth_ned (NDArray[float64]) – True NED position associated with each point in
truth_time.ned_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in
ned.time (NDArray[float64]) – Timestamps associated with
nedposition.truth_time (NDArray[float64]) – Timestamps associated with
truth_nedposition.t0 – Initial time (in seconds). Timestamps of
timeandtruth_timeare relative to this.leg (list[str]) – Legend to use for plot.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_ned_err(ned_err, ned_sig, time, t0, solution_label, plot_dir=None)
Plot NED position error in meters.
- Parameters:
ned_err (NDArray[float64]) – NED position error associated with each point in
time.ned_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in
ned_err.time (NDArray[float64]) – Timestamps associated with
ned_err.t0 – Initial time (in seconds). Timestamps of
timeare relative to this.solution_label (str) – Label of solution for which error was calculated. Used for plot title.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_pva(pva, truth_pva, t0, save_dir=None)
Generate position, velocity and attitude plots, as well as error plots.
- Parameters:
pva – Main PVA to plot. Assumed to have timestamps relative to
t0.truth_pva – Reference PVA to plot. Also used to calculate and plot errors of
pva. Assumed to have timestamps relative tot0.t0 – Initial time (in seconds). Timestamps of
pvaandtruth_pvaare relative to this.save_dir – Directory to save plots to, if desired. If None, will not save plots.
- pntos.cobra.utils.plot_rpy(rpy, truth_rpy, tilt_sig, time, truth_time, t0, leg, plot_dir=None)
Plot RPY attitude about NED frame over time in degrees.
- Parameters:
rpy (NDArray[float64]) – RPY associated with each point in
time.truth_rpy (NDArray[float64]) – True RPY associated with each point in
truth_time.tilt_sig (NDArray[float64]) – 1-sigma NED tilt uncertainty associated with each point in
rpy.time (NDArray[float64]) – Timestamps associated with
rpy.truth_time (NDArray[float64]) – Timestamps associated with
truth_rpy.t0 – Initial time (in seconds). Timestamps of
timeandtruth_timeare relative to this.leg (list[str]) – Legend to use for plot.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_tilt_err(tilts, tilt_sig, time, t0, solution_label, plot_dir=None)
Plot NED tilt error in degrees.
- Parameters:
tilt_err (NDArray[float64]) – NED tilt error associated with each point in
time.tilt_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in
tilt_err.time (NDArray[float64]) – Timestamps associated with
tilt_err.t0 – Initial time (in seconds). Timestamps of
timeare relative to this.solution_label (str) – Label of solution for which error was calculated. Used for plot title.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_trajectory(ned, truth_ned, time, truth_time, leg, drms_str, plot_dir=None)
Plot trajectory as Northing vs Easting in meters.
- Parameters:
ned (NDArray[float64]) – NED position associated with each point in
time.truth_ned (NDArray[float64]) – True NED position associated with each point in
truth_time.time (NDArray[float64]) – Timestamps associated with
nedposition.truth_time (NDArray[float64]) – Timestamps associated with
truth_nedposition.leg (list[str]) – Legend to use for plot.
drms_str (str) – String containing 2DRMS error of
nedsolution, for displaying on plot.plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_vel(vel, truth_vel, vel_sig, time, truth_time, t0, leg, plot_dir=None)
Plot NED velocity in m/s.
- Parameters:
vel (NDArray[float64]) – NED velocity associated with each point in
time.truth_vel (NDArray[float64]) – True NED velocity associated with each point in
truth_time.vel_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in
vel.time (NDArray[float64]) – Timestamps associated with
vel.truth_time (NDArray[float64]) – Timestamps associated with
truth_vel.t0 – Initial time (in seconds). Timestamps of
timeandtruth_timeare relative to this.leg (list[str]) – Legend to use for plot.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_vel_err(vel_err, vel_sig, time, t0, solution_label, plot_dir=None)
Plot NED velocity error in m/s.
- Parameters:
vel_err (NDArray[float64]) – NED velocity error associated with each point in
time.vel_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in
vel_err.time (NDArray[float64]) – Timestamps associated with
vel_err.t0 – Initial time (in seconds). Timestamps of
timeare relative to this.solution_label (str) – Label of solution for which error was calculated. Used for plot title.
plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.
- pntos.cobra.utils.plot_x_and_p(time, t0, state_labels, estimate, sigma, plot_dir=None)
Plot state estimate and covariance over time.
- Parameters:
time – K-length array of timestamps (seconds)
t0 – Initial time (in seconds). Timestamps of
pvaandtruth_pvaare relative to this.estimate – KxN array of estimates at each time step, where N is the number of states.
sigma – KxN array of state 1-sigmas at each time step, where N is the number of states.
plot_dir – Directory to save plots to, if desired. If None, will not save plots.
- class pntos.cobra.utils.SortedPlugins(controller_plugins: list[pntos.api.plugins.controller.ControllerPlugin] = <factory>, fusion_plugins: list[pntos.api.plugins.fusion.FusionPlugin] = <factory>, fusion_strategy_plugins: list[pntos.api.plugins.fusion_strategy.FusionStrategyPlugin] = <factory>, inertial_plugins: list[pntos.api.plugins.inertial.InertialPlugin] = <factory>, initialization_plugins: list[pntos.api.plugins.initialization.InitializationPlugin] = <factory>, logging_plugins: list[pntos.api.plugins.logging.LoggingPlugin] = <factory>, orchestration_plugins: list[pntos.api.plugins.orchestration.OrchestrationPlugin] = <factory>, platform_integration_plugins: list[pntos.api.plugins.platform_integration.PlatformIntegrationPlugin] = <factory>, preprocessor_plugins: list[pntos.api.plugins.preprocessor.PreprocessorPlugin] = <factory>, registry_plugins: list[pntos.api.plugins.registry.RegistryPlugin] = <factory>, state_modeling_plugins: list[pntos.api.plugins.state_modeling.StateModelingPlugin] = <factory>, transport_plugins: list[pntos.api.plugins.transport.TransportPlugin] = <factory>, ui_plugins: list[pntos.api.plugins.ui.UiPlugin] = <factory>, utility_plugins: list[pntos.api.plugins.utility.UtilityPlugin] = <factory>)
Bases:
object
- pntos.cobra.utils.camel_to_snake(name)
Utility function to go from class name to SortedPlugins data field name.
Example
This is particularly useful for iterating through a list of plugin types when paired with getattr and setattr on a controller or orchestration plugin:
def _sort_and_validate_plugins(self, plugins: list[CommonPlugin]) -> None: sorted_plugins: SortedPlugins = sort_plugins_dataclass(plugins) expected_plugin_types = [LoggingPlugin, OrchestrationPlugin, ...] for t in expected_plugin_types: t_snake = camel_to_snake(t.__name__) plugins_of_type_t = getattr(sorted_plugins, t_snake + 's') n_plugins_of_type_t = len(plugins_of_type_t) if n_plugins_of_type_t != 1: log_func( LoggingLevel.ERROR, f'Expected one {t.__name__}, but received {n_plugins_of_type_t}.', ) return setattr(self, t_snake, plugins_of_type_t[0])
- pntos.cobra.utils.find_base_plugin_type(plugin)
Utility function to determine the base type of the
pluginparameter. Will raise aTypeErrorif the base type cannot be determined.- Parameters:
plugin (CommonPlugin) – Any type of plugin.
- pntos.cobra.utils.sort_plugins_dataclass(plugins)
Utility function to alphabetically sort all of the plugins manually.
plugins (list[CommonPlugin]): The list of plugins to sort.
- Returns:
SortedPlugins
- pntos.cobra.utils.validate_plugins(sorted_plugins, log_func, **kwargs)
A utility function that (for each type) verifies the number of expected plugins against the plugin counts in
sorted_plugins. Accepted keyword arguments are in the formatting [num|min]_[plugin_type] (e.g. num_fusion_plugins, min_fusion_plugins). The num_* parameters specify an exact match, whereas the min_* specify a minimum number of plugins. Only one should be used for any given plugin type.- Parameters:
sorted_plugins (SortedPlugins) – A
SortedPluginsinstance containing fields of plugins to validate.log_func (Callable[[LoggingLevel, str], None]) – The logging function to use within this method.
**kwargs – Keyword arguments mapping plugin type names (as strings) to an expected number of plugins. At least one plugin type must be specified.
Returns: bool: True if all expected plugin counts match the actual counts; False otherwise.
- class pntos.cobra.utils.BufferedMutableValueView(registry, group, key, type=None)
Bases:
MutableValueView[ValueType],BufferedValueView[ValueType],Generic[ValueType]A write-enabled
ValueViewwith buffering for a key in the registry.This class inherits both the buffered behavior from
BufferedValueView(e.g.bufferproperty andpop()) as well asset_value()andclear_value()fromMutableValueView.Note that the buffer will catch all
set_value()calls.
- class pntos.cobra.utils.BufferedValueView(registry, group, key, type=None)
Bases:
ValueView[ValueType],Generic[ValueType]A
ValueViewthat buffers all values at a given group and key in the registry.- property buffer
Returns a deepcopy of the current buffer.
- pop()
Pops all values from this group and key since last
pop().
- class pntos.cobra.utils.GroupsView(registry)
Bases:
objectUtility object to give a passive view of all groups in the registry.
- class pntos.cobra.utils.MutableValueView(registry, group, key, type=None)
Bases:
ValueView[ValueType],Generic[ValueType]A write-enabled
ValueView.This simply adds a setter. If no key value store is provided in the setter, a whole batch operation will be performed (inefficient for high-datarate values). If a
KeyValueStoreis provided, it is assumed that theKeyValueStoreis already live (batch_start()has been called), and the value is set directly.- _batch_start(kv=None)
Utility method to handle batch start.
Allows sub-classes to easily intercept this step.
- _batch_end(kv)
Utility method to handle batch end.
Allows sub-classes to easily intercept this step.
- set_value(new_value, kv=None)
Sets
new_valuein the registry.- Parameters:
new_value (RegistryValueType) – New value to write in the registry.
kv (KeyValueStore | None) – If provided, writes to this store. Otherwise, performs a batch operation. Default: None
- clear_value(kv=None)
Removes the key from the registry if it exists.
- Parameters:
kv (KeyValueStore | None) – If provided, value will be removed from
kv. Otherwise, performs a full batch operation. Default: None
- class pntos.cobra.utils.ValueView(registry, group, key, type=None)
Bases:
Generic[ValueType]- __init__(registry, group, key, type=None)
Utility object to expose an automatically-updating registry value.
On instantiation, this object registers a callback for the value at
group/keyin the registry. This callback simply updates the value internally, which allows theself.valueproperty to always be a synced “view” of the value atgroup/key.When this object is deleted, it will remove the callback from the registry.
- Parameters:
registry (Registry) – Registry to view
group (str) – The group in the registry
key (str) – The key in the registry
type (type[ValueType] | None) – The desired/expected type of the value at
group/keyin the registry. IfNone, the view will use union mode and return values asRegistryValueTypeUnion. If a specific type is provided, the view will use specific type mode and return values as that type. Default: None
- property group
self.valueis the view ofself.keyin this group.
- property key
self.valueis the view of this key inself.group.
- property type
Expected/desired type at
self.group/self.key.
- property value
The most recent value at
self.group/self.keyin the registry.
- pntos.cobra.utils.run_pntos_with_ros_transport(app, input_log, output_log, validate=False)
Spin up app and network tools necessary to run it, process log, then shut down.
- Parameters:
app (pathlib.Path) – Path to app to run.
input_log (pathlib.Path) – ROS log containing the measurements to be processed.
output_log (pathlib.Path) – ROS log to which output should be recorded.
validate (bool) – Whether to validate the app’s output, ensuring there are no warnings or errors. Defaults to False.
- Returns:
Return code of app. Will be 0 if app ran and terminated successfully.
- class pntos.cobra.utils.AspnBaseWithTOV(*args, **kwargs)
Bases:
AspnBase,Protocol
- class pntos.cobra.utils.ChannelView(registry, channel, update_interval)
Bases:
objectUtility object to track all registry <-> UI interactions for a single channel.
- update_channel_info(message)
Thread-safe function for the mediator to update this channel.
- _update_channel_info()
Performs actual channel info registry update.
- property channel
The channel for which this object is a view.
- property group
The group in the registry that contains the metadata for this channel.
- property mediator_enabled
False if UI requests that mediator block this channel, else True.
- property source_enabled
False if UI requests that sources block this channel, else True.
- property message_count
The most recent message count on this channel.
- property rate
The most recent measured messages/second on this channel.
- property type
The most recent measurement type as a string on this channel.
- property jitter
The most recent measured timestamp jitter in seconds on this channel.
- property tov_last_message
The most recent time of validity in elapsed nano-seconds on this channel.
- property bandwidth
The most recent bandwidth value in bytes per second on this channel.
- class pntos.cobra.utils.SourceChannelView(registry, channel)
Bases:
objectA Utility object to track registry <-> UI interactions for a source channel.
- property channel
The channel for which this object is a view.
- property group
The group in the registry that contains the metadata for this channel.
- property source_enabled
False if UI requests that sources block this channel, else True.
- class pntos.cobra.utils.UiMediatorInterface(registry, update_interval=0.5)
Bases:
objectInterface between a
pntos.api.Mediatorand the UI via the registry.- __init__(registry, update_interval=0.5)
- Parameters:
registry (Registry) – Registry reference for UI communications.
update_interval (float) – Minimum time in seconds between UI updates per-channel. Small intervals could incur performance degradation. Default is 0.1 seconds.
- _ensure_channel_view(channel)
Gets the
ChannelViewfor a given channel, creating one if necessary.
- new_mediator_message(message)
Update the UI with a new message from the mediator.
It is up to the controller/mediator implementation to decide what a “new” message means as pertains to buffering messages. It could be that “new” refers to messages from
process_pntos_message()calls, or it could be post-buffer messages, or some combination.- Parameters:
message (Message) – The message from the Mediator.
- Returns:
- False if UI user requests that the mediator block this source ID, else
True.
- Return type:
bool
- class pntos.cobra.utils.UiMetadataInterface(registry)
Bases:
GroupsViewUtility object to populate the keys in the
UI_GROUP_METADATAgroup.
- class pntos.cobra.utils.UiSourceInterface(registry)
Bases:
objectInterface between a source (e.g.
pntos.api.TransportPlugin) and the UI via the registry.- _ensure_channel_view(channel)
Gets the
SourceChannelViewfor a given channel, creating one if necessary.
- new_message(source_identifier)
Update the UI with a new message from this source.
A source (e.g. TransportPlugin) should call this method on it’s
UiSourceInterfaceas soon as it has asource_identifier. Note that the source is not required to provide a wholepntos.api.Message. This provides for some potential performance gains - when the user wishes to block this source ID, the source does not need to allocate and populate a wholepntos.api.Message.- Parameters:
source_identifier (str) – The source ID of the new message.
- Returns:
- False if UI user requests that the source block this source ID, else
True.
- Return type:
bool
- pntos.cobra.utils.delta_lat_to_north(delta_lat, approx_lat, altitude)
Convert delta-latitude in radians to north distance in meters.
- Parameters:
delta_lat – A single delta-latitude or an N-length array of delta-latitudes
approx_lat – Approximate latitude at which to calculate conversion (radians)
altitude – Approximate altitude at which to calculate conversion (meters)
- Returns:
N-length array of distances in the north direction.
- pntos.cobra.utils.delta_lon_to_east(delta_lon, approx_lat, altitude)
Convert delta-longitude in radians to east distance in meters.
- Parameters:
delta_lon – A single delta-longitude or an N-length array of delta-longitudes
approx_lat – Approximate latitude at which to calculate conversion (radians)
altitude – Approximate altitude at which to calculate conversion (meters)
- Returns:
A single east distance or an N-length array of distances in the east direction.
- pntos.cobra.utils.east_to_delta_lon(east_distance, approx_lat, altitude)
Convert east distance in meters to delta-longitude in radians.
- Parameters:
east_distance – A single east distance or N-length array of east distances
approx_lat – Approximate latitude at which to calculate conversion (radians)
altitude – Approximate altitude at which to calculate conversion (meters)
- Returns:
A single delta-longitude or an N-length array of delta-longitudes.
- pntos.cobra.utils.ecef_to_llh(ecef)
Converts from ECEF to LLH.
- Parameters:
ecef (NDArray[float64]) – A vector in the ECEF frame.
- Returns:
The equivalent vector in the LLH frame.
- Return type:
NDArray[float64]
- pntos.cobra.utils.north_to_delta_lat(north_distance, approx_lat, altitude)
Convert north distance in meters to delta-latitude in radians.
- Parameters:
north_distance – A single north distance or an N-length array of north distances
approx_lat – Approximate latitude at which to calculate conversion (radians)
altitude – Approximate altitude at which to calculate conversion (meters)
- Returns:
A single delta-latitude or an N-length array of delta-latitudes.
- pntos.cobra.utils.ValueType: TypeVar
A
TypeVarbound topntos.api.RegistryValueTypeUnion. This allows fields of typeValueTypeto be of typepntos.api.RegistryValueTypeUnion, or any subset of that union.