Cobra Utils

These are all the objects and functions that can be directly imported from pntos.cobra.utils.

pntos.cobra.utils.is_symmetric(mat, mediator, rtol=1e-5, atol=1e-8)

This function will compare a matrix with its transpose and determine if the two are equivalent within a provided tolerance. If the two are equivalent, the matrix is symmetric and this function returns True, else returns False.

NOTE: Symmetry requires a matrix to be square, if mat is not a 2-D square matrix this function will log an error and return False.

Parameters:
  • mat (NDArray[float64]) – The matrix to be examined.

  • rtol (float) – A coefficient multiplied with every value in the matrix to generate a relative tolerance.

  • atol (float) – An absolute value added directly the relative tolerance which creates an overall tolerance.

Returns:

bool

pntos.cobra.utils.validate_array(arr, mediator, name, dims=None, rows=None, cols=None, err=False)

Validate the dimensionality and/or length of a 1-D or 2-D numpy array.

Sends error log messages through the mediator if validation fails.

Parameters:
  • arr (NDArray[float64]) – The array to validate.

  • mediator (Mediator) – Mediator to use for logging any error messages.

  • name (str) – Name of array to be included in any error messages.

  • dims (int | None, optional) – Expected number of dimensions, or None to ignore dimensions. Defaults to None.

  • rows (int | None, optional) – Expected number of rows, or None to ignore rows. Defaults to None.

  • cols (int | None, optional) – Expected number of cols, or None to ignore cols. Defaults to None.

  • err (bool) – When True, raise a ValueError on a failed check.

pntos.cobra.utils.convert_header_from_cpp(header)

Convert from ASPN-C++ header to ASPN-Python header.

Parameters:

header (aspn23_xtensor.TypeHeader) – The header to convert.

Returns:

TypeHeader

pntos.cobra.utils.convert_header_to_cpp(header, message_type)

Convert from ASPN-Python header to ASPN-C++ header.

Parameters:
  • header (TypeHeader) – The measurement header to convert.

  • message_type (aspn23_xtensor.AspnMessageType) – The type of measurement from which the ASPN measurement originates.

Returns:

aspn23_xtensor.TypeHeader

pntos.cobra.utils.convert_imu_from_cpp(imu)

Convert from ASPN-C++ IMU measurement to ASPN-Python IMU measurement.

Parameters:

imu (aspn23_xtensor.MeasurementImu) – The IMU measurement to convert.

Returns:

aspn23.MeausurementImu

pntos.cobra.utils.convert_imu_to_cpp(imu)

Convert from ASPN-Python IMU measurement to ASPN-C++ IMU measurement.

Parameters:

imu (MeasurementImu) – The IMU measurement to convert.

Returns:

aspn23_xtensor.MeasurementImu

pntos.cobra.utils.convert_imu_type_from_cpp(imu_type)

Convert from ASPN-C++ IMU type to ASPN-Python IMU type. If the type cannot be matched, this function will default and return MeasurementImuImuType.INTEGRATED.

Parameters:

imu_type (aspn23_xtensor.AspnMeasurementImuImuType) – The IMU type to convert.

Returns:

MeasurementImuImuType

pntos.cobra.utils.convert_imu_type_to_cpp(imu_type)

Convert from ASPN-Python IMU type to ASPN-C++ IMU type.

Parameters:

imu_type (MeasurementImuImuType) – The IMU type to convert.

Returns:

aspn23_xtensor.AspnMeasurementImuImuType

pntos.cobra.utils.convert_message(message)

Convert from ASPN-Python message to ASPN-C++ message. Currently only supports MeasurementImu and aspn23.MeasurementPosition messages.

Parameters:

message (AspnBase) – The message to convert.

Returns:

aspn23_xtensor.TypeHeader | None

pntos.cobra.utils.convert_ndarray_to_list(arr, target_type)

Convert from an NDArray with a numerical dtype to a list of target_type. Multi-dimensional arrays will be converted and their structure will be preserved in the output list.

Parameters:
  • arr (NDArray[int | float]) – The array to convert.

  • target_type (type[Any]) – The data type to convert to.

Returns:

list[target_type]

pntos.cobra.utils.convert_ndarray_to_tuple(arr, target_type)

Convert from an NDArray with a numerical dtype to a tuple of target_type. Multi-dimensional arrays will be converted and their structure will be preserved in the output tuple.

Parameters:
  • arr (NDArray[int | float]) – The array to convert.

  • target_type (type[Any]) – The data type to convert to.

Returns:

tuple[target_type]

pntos.cobra.utils.convert_pva_from_cpp(pva, covariance=None)

Convert from ASPN-C++ PVA measurement to ASPN-Python PVA measurement. If covariance is None, this function will use the covariance stored in the pva parameter.

Parameters:
  • pva (aspn23_xtensor.MeasurementPositionVelocityAttitude) – The pva to convert.

  • covariance (NDArray | None) – The covariance to associate with the pva measurement.

Returns:

MeasurementPositionVelocityAttitude

pntos.cobra.utils.convert_pva_to_cpp(pva)

Convert from ASPN-Python PVA measurement to ASPN-C++ PVA measurement.

Parameters:

pva (MeasurementPositionVelocityAttitude) – The pva to convert.

Returns:

aspn23_xtensor.MeasurementPositionVelocityAttitude

pntos.cobra.utils.convert_status(status, mediator)

Convert a NavToolkit AlignmentStatus to its corresponding InitializationStatus.

Parameters:
  • status (AlignBase.AlignmentStatus) – The NavToolkit status to convert.

  • mediator (Mediator) – A mediator used to log errors, if necessary.

Returns:

InitializationStatus

pntos.cobra.utils.convert_timestamp_from_cpp(timestamp)

Convert from ASPN-C++ timestamp to ASPN-Python timestamp.

Parameters:

timestamp (aspn23_xtensor.TypeTimestamp) – The timestamp to convert.

Returns:

TypeTimestamp

pntos.cobra.utils.convert_timestamp_to_cpp(timestamp)

Convert from ASPN-Python timestamp to ASPN-C++ timestamp.

Parameters:

timestamp (TypeTimestamp) – The timestamp to convert.

Returns:

aspn23_xtensor.TypeTimestamp

pntos.cobra.utils.load_from_hdf5_file(file, log_func)

Utility function for loading data from an HDF5 file into python.

This function is intended to unpack an HDF5 file that was packed by save_to_hdf5_file() into a dictionary of keys and lists of logged values. See save_to_hdf5_file() for more information.

Parameters:
Returns:

dict[str, list[RegistryValueTypeUnion]]

pntos.cobra.utils.save_to_hdf5_file(file, store, mediator)

Utility function to store a dictionary into an HDF5 file.

This function is designed to take in a dictionary containing string keys corresponding to lists of registry values. The intended use-case for this would be a scenario where each key in store corresponds to a key in a registry group, and each time the value changes at a given key in the registry, the changed value is appended to the corresponding list in the store.

This functions makes a few assumptions: - Each list contains only one type in RegistryValueTypesUnion. - If the type for a given key is list[list[str]] or list[NDArray[float64]], each element of the outer list is the same length.

Note

While most datasets in the output HDF5 file will be understandable outside of Python (say, MATLAB), datasets for list[Message] types are stored as pickle dump objects, packed inside a numpy.void() object.

Parameters:
pntos.cobra.utils.decode_aspn_lcm_msg(data)

Decodes a set of bytes into an ASPN-LCM message. Uses the first 8 bytes to determine the type of message, if the type cannot be determined this function will return None.

Parameters:

data (bytes) – The set of bytes to decode.

Returns:

Aspn23LcmMsg | None

pntos.cobra.utils.marshal_from_lcm(msg)

Converts from ASPN-LCM message to ASPN23 message. If the input message cannot be converted, this function will return None.

Parameters:

msg (Aspn23LcmMsg) – The message to convert.

Returns:

Aspn23Msg | None

pntos.cobra.utils.marshal_to_aspn23_lcm(msg)

Convert from ASPN23 message to ASPN23-LCM message. If the input message cannot be converted, this function will return None.

Parameters:

msg (AspnBase) – The message to convert.

Returns:

Aspn23LcmMsg | None

pntos.cobra.utils.process_lcm_message(mediator, channel, data, channels)

Marshal LCM message to ASPN-Python and send to the mediator for processing.

Parameters:
  • mediator (Mediator) – Mediator instance used for logging and processing message.

  • channel (str) – The channel name the data originates from.

  • data (bytes) – A message represented in binary.

  • channels (set[str]) – Set of channels found so far.

pntos.cobra.utils.run_pntos_with_log_transport(app, args=None, validate=False)

Spin up app, process log, then shut down.

Parameters:
  • app (pathlib.Path) – Path to app to run.

  • args (list[str] | None) – Optional command-line arguments to pass to app (e.g. output log).

  • validate (bool) – Whether to validate the app’s output, ensuring there are no warnings or errors. Defaults to False.

Returns:

Return code of app. Will be 0 if app ran and terminated successfully.

pntos.cobra.utils.run_pntos_with_network_transport(app, input_log, output_log, args=None, validate=False)

Spin up app and network tools necessary to run it, process log, then shut down.

Parameters:
  • app (pathlib.Path) – Path to app to run.

  • input_log (pathlib.Path) – LCM log containing the measurements to be processed.

  • output_log (pathlib.Path) – LCM log to which output should be recorded.

  • args (list[str] | None) – Optional command-line arguments to pass to app.

  • validate (bool) – Whether to validate the app’s output, ensuring there are no warnings or errors. Defaults to False.

Returns:

Return code of app. Will be 0 if app ran and terminated successfully.

pntos.cobra.utils.print_message(level, plugin_id, message, colorize=True, date_time_format='%d/%m/%Y %H:%M:%S')

Print a formatted message to the console.

The printed message will be in the form:

‘<Time> <Plugin ID> <Log Level> <Message>’

Parameters:
  • level (LoggingLevel) – Log-level associated with the message.

  • plugin_id (str) – ID identifying the type of plugin from which the message comes.

  • message (str) – The message to log.

  • colorize (bool) – Whether to add colorization to the logged message. Defaults to True.

  • date_time_format (str) – Format string for the logged timestamp.

class pntos.cobra.utils.Cache

Bases: object

A container for storing entries of type CacheEntry.

__init__()

Constructor.

set(key, entry)

Store cache entry.

Parameters:
  • key (str) – Key associated with cache entry.

  • entry (CacheEntry) – The entry to store.

get(key)

Get a cache entry, recalculating it if necessary.

Parameters:

key (str) – Key associated with entry to get.

Raises:

KeyError – If key does not exist in cache.

Returns:

The entry associated with key.

Return type:

Any

clear(key)

Clear a cache entry.

Note that this doesn’t remove the entry from the cache, but clears it such that the stored value must be recalculated the next time it is requested.

Parameters:

key (str) – The key associated with the entry to clear.

Raises:

KeyError – If key does not exist in cache.

class pntos.cobra.utils.CacheEntry(fusion_engine)

Bases: object

An entry in a cache storing a filter value at a specific time.

__init__(fusion_engine)

Constructor.

Parameters:

fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time.

is_valid()

Check if cache entry is valid.

Returns:

True if entry exists at the current filter time, False otherwise.

abstract recalculate(cache)

Recalculate the current cache entry.

Parameters:

cache (Cache) – Cache in which this entry is stored. Can be used to grab entries that this entry is dependent on.

clear()

Clear the current cache entry.

Clearing an entry forces the stored value to be recalculated the next time it is requested.

get(cache)

Get the current cache entry, recalculating if needed.

Parameters:

cache (Cache) – Cache in which this entry is stored. Can be used to grab entries that this entry is dependent on.

Returns:

The current cache entry if available, otherwise None.

class pntos.cobra.utils.EstimateWithCovarianceEntry(fusion_engine, sb_label)

Bases: CacheEntry

Cache entry for state block estimate and covariance.

__init__(fusion_engine, sb_label)

Constructor.

Parameters:
  • fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time and state block EstimateWithCovariance.

  • sb_label (str) – Label associated with the state block.

recalculate(cache)

Calculate and store the state block estimate at the given time.

Parameters:

cache (Cache) – Cache in which this entry is stored. Not needed for calculating estimate.

class pntos.cobra.utils.FilterSolutionEntry(fusion_engine, solution_channel, inertial_solution_key, pinson_x_and_p_key)

Bases: CacheEntry

Cache entry for filter solution.

__init__(fusion_engine, solution_channel, inertial_solution_key, pinson_x_and_p_key)

Constructor.

Parameters:
  • fusion_engine (pntos.api.StandardFusionEngine) – Filter instance used to get current time.

  • solution_channel (str) – Source identifier to attach to filter solution.

  • inertial_solution_key (str) – Key associated with inertial solution cache entry.

  • pinson_x_and_p_key (str) – Key associated with pinson state block EstimateWithCovariance entry.

recalculate(cache)

Calculate and store the filter solution at the given time.

Parameters:

cache (Cache) – Cache in which this entry is stored. Used to grab inertial solution and pinson state block estimate, which are used to calculate the filter solution.

class pntos.cobra.utils.InertialSolutionEntry(fusion_engine, inertial, solution_channel, log_func)

Bases: CacheEntry

Cache entry for inertial solution.

__init__(fusion_engine, inertial, solution_channel, log_func)

Constructor.

Parameters:
recalculate(cache)

Calculate and store the inertial solution at the current filter time.

Parameters:

cache (Cache) – Cache in which this entry is stored. Not needed for calculating inertial solution.

pntos.cobra.utils.apply_error_states(pva, x)

Correct the inertial’s PVA message with the fusion engine’s error estimate.

Parameters:
  • pva (MeasurementPositionVelocityAttitude) – The PVA message originating from the inertial solution.

  • x (NDArray[float64]) – The error estimate originating from the fusion engine’s state block.

pntos.cobra.utils.get_best_solution(fusion_engine, inertial, time, sb_label, best_sol_chan, log_func)

Utility function to request the best fusion strategy solution. Returns None if the fusion engine is unable to provide a solution for the requested time.

pntos.cobra.utils.get_dead_reckoning_solution(inertial, time, imu_sol_chan, log_func)

Utility function to request the IMU-only dead-reckoning solution. Returns None if the inertial is unable to provide a solution for the requested time.

pntos.cobra.utils.has_valid_time(init_solution, fusion_engine, message, log_func)

Utility function which returns true if the message’s time of validity is greater than the current fusion engine time.

pntos.cobra.utils.initialization_ready(initialization_state, initializer)

Utility function to poll the state of the init strategy plugin.

Populates initialization_state with the relevant pntos.api.InitializationStatus.

pntos.cobra.utils.set_up_inertial_mechanization(initializer, inertial_plugin, inertial_group, log_func)

Get initial inertial solution and use it to set up the inertial.

pntos.cobra.utils.set_up_initializer(initialization_plugin, alignment_config_group, log_func)

Set up inertial initialization strategy, and initialize filter solution if initializer is immediately ready.

pntos.cobra.utils.plot_llh(llh, truth_llh, llh_sig, time, truth_time, t0, leg, plot_dir=None)

Plot LLH position in units of radians, radians, meters, respectively.

Parameters:
  • llh (NDArray[float64]) – LLH position associated with each point in time.

  • truth_llh (NDArray[float64]) – True LLH position associated with each point in truth_time.

  • llh_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in llh.

  • time (NDArray[float64]) – Timestamps associated with llh position.

  • truth_time (NDArray[float64]) – Timestamps associated with truth_llh position.

  • t0 – Initial time (in seconds). Timestamps of time and truth_time are relative to this.

  • leg (list[str]) – Legend to use for plot.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_ned(ned, truth_ned, ned_sig, time, truth_time, t0, leg, plot_dir=None)

Plot NED position in meters.

Parameters:
  • ned (NDArray[float64]) – NED position associated with each point in time.

  • truth_ned (NDArray[float64]) – True NED position associated with each point in truth_time.

  • ned_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in ned.

  • time (NDArray[float64]) – Timestamps associated with ned position.

  • truth_time (NDArray[float64]) – Timestamps associated with truth_ned position.

  • t0 – Initial time (in seconds). Timestamps of time and truth_time are relative to this.

  • leg (list[str]) – Legend to use for plot.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_ned_err(ned_err, ned_sig, time, t0, solution_label, plot_dir=None)

Plot NED position error in meters.

Parameters:
  • ned_err (NDArray[float64]) – NED position error associated with each point in time.

  • ned_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in ned_err.

  • time (NDArray[float64]) – Timestamps associated with ned_err.

  • t0 – Initial time (in seconds). Timestamps of time are relative to this.

  • solution_label (str) – Label of solution for which error was calculated. Used for plot title.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_pva(pva, truth_pva, t0, save_dir=None)

Generate position, velocity and attitude plots, as well as error plots.

Parameters:
  • pva – Main PVA to plot. Assumed to have timestamps relative to t0.

  • truth_pva – Reference PVA to plot. Also used to calculate and plot errors of pva. Assumed to have timestamps relative to t0.

  • t0 – Initial time (in seconds). Timestamps of pva and truth_pva are relative to this.

  • save_dir – Directory to save plots to, if desired. If None, will not save plots.

pntos.cobra.utils.plot_rpy(rpy, truth_rpy, tilt_sig, time, truth_time, t0, leg, plot_dir=None)

Plot RPY attitude about NED frame over time in degrees.

Parameters:
  • rpy (NDArray[float64]) – RPY associated with each point in time.

  • truth_rpy (NDArray[float64]) – True RPY associated with each point in truth_time.

  • tilt_sig (NDArray[float64]) – 1-sigma NED tilt uncertainty associated with each point in rpy.

  • time (NDArray[float64]) – Timestamps associated with rpy.

  • truth_time (NDArray[float64]) – Timestamps associated with truth_rpy.

  • t0 – Initial time (in seconds). Timestamps of time and truth_time are relative to this.

  • leg (list[str]) – Legend to use for plot.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_tilt_err(tilts, tilt_sig, time, t0, solution_label, plot_dir=None)

Plot NED tilt error in degrees.

Parameters:
  • tilt_err (NDArray[float64]) – NED tilt error associated with each point in time.

  • tilt_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in tilt_err.

  • time (NDArray[float64]) – Timestamps associated with tilt_err.

  • t0 – Initial time (in seconds). Timestamps of time are relative to this.

  • solution_label (str) – Label of solution for which error was calculated. Used for plot title.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_trajectory(ned, truth_ned, time, truth_time, leg, drms_str, plot_dir=None)

Plot trajectory as Northing vs Easting in meters.

Parameters:
  • ned (NDArray[float64]) – NED position associated with each point in time.

  • truth_ned (NDArray[float64]) – True NED position associated with each point in truth_time.

  • time (NDArray[float64]) – Timestamps associated with ned position.

  • truth_time (NDArray[float64]) – Timestamps associated with truth_ned position.

  • leg (list[str]) – Legend to use for plot.

  • drms_str (str) – String containing 2DRMS error of ned solution, for displaying on plot.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_vel(vel, truth_vel, vel_sig, time, truth_time, t0, leg, plot_dir=None)

Plot NED velocity in m/s.

Parameters:
  • vel (NDArray[float64]) – NED velocity associated with each point in time.

  • truth_vel (NDArray[float64]) – True NED velocity associated with each point in truth_time.

  • vel_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in vel.

  • time (NDArray[float64]) – Timestamps associated with vel.

  • truth_time (NDArray[float64]) – Timestamps associated with truth_vel.

  • t0 – Initial time (in seconds). Timestamps of time and truth_time are relative to this.

  • leg (list[str]) – Legend to use for plot.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_vel_err(vel_err, vel_sig, time, t0, solution_label, plot_dir=None)

Plot NED velocity error in m/s.

Parameters:
  • vel_err (NDArray[float64]) – NED velocity error associated with each point in time.

  • vel_sig (NDArray[float64]) – 1-sigma uncertainty associated with each point in vel_err.

  • time (NDArray[float64]) – Timestamps associated with vel_err.

  • t0 – Initial time (in seconds). Timestamps of time are relative to this.

  • solution_label (str) – Label of solution for which error was calculated. Used for plot title.

  • plot_dir (pathlib.Path | None, optional) – Optional directory to which plot should be saved, if desired. Defaults to None.

pntos.cobra.utils.plot_x_and_p(time, t0, state_labels, estimate, sigma, plot_dir=None)

Plot state estimate and covariance over time.

Parameters:
  • time – K-length array of timestamps (seconds)

  • t0 – Initial time (in seconds). Timestamps of pva and truth_pva are relative to this.

  • estimate – KxN array of estimates at each time step, where N is the number of states.

  • sigma – KxN array of state 1-sigmas at each time step, where N is the number of states.

  • plot_dir – Directory to save plots to, if desired. If None, will not save plots.

class pntos.cobra.utils.SortedPlugins(controller_plugins: list[pntos.api.plugins.controller.ControllerPlugin] = <factory>, fusion_plugins: list[pntos.api.plugins.fusion.FusionPlugin] = <factory>, fusion_strategy_plugins: list[pntos.api.plugins.fusion_strategy.FusionStrategyPlugin] = <factory>, inertial_plugins: list[pntos.api.plugins.inertial.InertialPlugin] = <factory>, initialization_plugins: list[pntos.api.plugins.initialization.InitializationPlugin] = <factory>, logging_plugins: list[pntos.api.plugins.logging.LoggingPlugin] = <factory>, orchestration_plugins: list[pntos.api.plugins.orchestration.OrchestrationPlugin] = <factory>, platform_integration_plugins: list[pntos.api.plugins.platform_integration.PlatformIntegrationPlugin] = <factory>, preprocessor_plugins: list[pntos.api.plugins.preprocessor.PreprocessorPlugin] = <factory>, registry_plugins: list[pntos.api.plugins.registry.RegistryPlugin] = <factory>, state_modeling_plugins: list[pntos.api.plugins.state_modeling.StateModelingPlugin] = <factory>, transport_plugins: list[pntos.api.plugins.transport.TransportPlugin] = <factory>, ui_plugins: list[pntos.api.plugins.ui.UiPlugin] = <factory>, utility_plugins: list[pntos.api.plugins.utility.UtilityPlugin] = <factory>)

Bases: object

pntos.cobra.utils.camel_to_snake(name)

Utility function to go from class name to SortedPlugins data field name.

Example

This is particularly useful for iterating through a list of plugin types when paired with getattr and setattr on a controller or orchestration plugin:

def _sort_and_validate_plugins(self, plugins: list[CommonPlugin]) -> None:
    sorted_plugins: SortedPlugins = sort_plugins_dataclass(plugins)
    expected_plugin_types = [LoggingPlugin, OrchestrationPlugin, ...]
    for t in expected_plugin_types:
        t_snake = camel_to_snake(t.__name__)
        plugins_of_type_t = getattr(sorted_plugins, t_snake + 's')
        n_plugins_of_type_t = len(plugins_of_type_t)
        if n_plugins_of_type_t != 1:
            log_func(
                LoggingLevel.ERROR,
                f'Expected one {t.__name__}, but received {n_plugins_of_type_t}.',
            )
            return
        setattr(self, t_snake, plugins_of_type_t[0])
pntos.cobra.utils.find_base_plugin_type(plugin)

Utility function to determine the base type of the plugin parameter. Will raise a TypeError if the base type cannot be determined.

Parameters:

plugin (CommonPlugin) – Any type of plugin.

pntos.cobra.utils.sort_plugins_dataclass(plugins)

Utility function to alphabetically sort all of the plugins manually.

plugins (list[CommonPlugin]): The list of plugins to sort.

Returns:

SortedPlugins

pntos.cobra.utils.validate_plugins(sorted_plugins, log_func, **kwargs)

A utility function that (for each type) verifies the number of expected plugins against the plugin counts in sorted_plugins. Accepted keyword arguments are in the formatting [num|min]_[plugin_type] (e.g. num_fusion_plugins, min_fusion_plugins). The num_* parameters specify an exact match, whereas the min_* specify a minimum number of plugins. Only one should be used for any given plugin type.

Parameters:
  • sorted_plugins (SortedPlugins) – A SortedPlugins instance containing fields of plugins to validate.

  • log_func (Callable[[LoggingLevel, str], None]) – The logging function to use within this method.

  • **kwargs – Keyword arguments mapping plugin type names (as strings) to an expected number of plugins. At least one plugin type must be specified.

Returns: bool: True if all expected plugin counts match the actual counts; False otherwise.

class pntos.cobra.utils.BufferedMutableValueView(registry, group, key, type=None)

Bases: MutableValueView[ValueType], BufferedValueView[ValueType], Generic[ValueType]

A write-enabled ValueView with buffering for a key in the registry.

This class inherits both the buffered behavior from BufferedValueView (e.g. buffer property and pop()) as well as set_value() and clear_value() from MutableValueView.

Note that the buffer will catch all set_value() calls.

class pntos.cobra.utils.BufferedValueView(registry, group, key, type=None)

Bases: ValueView[ValueType], Generic[ValueType]

A ValueView that buffers all values at a given group and key in the registry.

property buffer

Returns a deepcopy of the current buffer.

pop()

Pops all values from this group and key since last pop().

class pntos.cobra.utils.GroupsView(registry)

Bases: object

Utility object to give a passive view of all groups in the registry.

class pntos.cobra.utils.MutableValueView(registry, group, key, type=None)

Bases: ValueView[ValueType], Generic[ValueType]

A write-enabled ValueView.

This simply adds a setter. If no key value store is provided in the setter, a whole batch operation will be performed (inefficient for high-datarate values). If a KeyValueStore is provided, it is assumed that the KeyValueStore is already live (batch_start() has been called), and the value is set directly.

_batch_start(kv=None)

Utility method to handle batch start.

Allows sub-classes to easily intercept this step.

_batch_end(kv)

Utility method to handle batch end.

Allows sub-classes to easily intercept this step.

set_value(new_value, kv=None)

Sets new_value in the registry.

Parameters:
  • new_value (RegistryValueType) – New value to write in the registry.

  • kv (KeyValueStore | None) – If provided, writes to this store. Otherwise, performs a batch operation. Default: None

clear_value(kv=None)

Removes the key from the registry if it exists.

Parameters:

kv (KeyValueStore | None) – If provided, value will be removed from kv. Otherwise, performs a full batch operation. Default: None

class pntos.cobra.utils.ValueView(registry, group, key, type=None)

Bases: Generic[ValueType]

__init__(registry, group, key, type=None)

Utility object to expose an automatically-updating registry value.

On instantiation, this object registers a callback for the value at group/key in the registry. This callback simply updates the value internally, which allows the self.value property to always be a synced “view” of the value at group/key.

When this object is deleted, it will remove the callback from the registry.

Parameters:
  • registry (Registry) – Registry to view

  • group (str) – The group in the registry

  • key (str) – The key in the registry

  • type (type[ValueType] | None) – The desired/expected type of the value at group/key in the registry. If None, the view will use union mode and return values as RegistryValueTypeUnion. If a specific type is provided, the view will use specific type mode and return values as that type. Default: None

property group

self.value is the view of self.key in this group.

property key

self.value is the view of this key in self.group.

property type

Expected/desired type at self.group/self.key.

property value

The most recent value at self.group/self.key in the registry.

pntos.cobra.utils.run_pntos_with_ros_transport(app, input_log, output_log, validate=False)

Spin up app and network tools necessary to run it, process log, then shut down.

Parameters:
  • app (pathlib.Path) – Path to app to run.

  • input_log (pathlib.Path) – ROS log containing the measurements to be processed.

  • output_log (pathlib.Path) – ROS log to which output should be recorded.

  • validate (bool) – Whether to validate the app’s output, ensuring there are no warnings or errors. Defaults to False.

Returns:

Return code of app. Will be 0 if app ran and terminated successfully.

class pntos.cobra.utils.AspnBaseWithTOV(*args, **kwargs)

Bases: AspnBase, Protocol

class pntos.cobra.utils.ChannelView(registry, channel, update_interval)

Bases: object

Utility object to track all registry <-> UI interactions for a single channel.

update_channel_info(message)

Thread-safe function for the mediator to update this channel.

_update_channel_info()

Performs actual channel info registry update.

property channel

The channel for which this object is a view.

property group

The group in the registry that contains the metadata for this channel.

property mediator_enabled

False if UI requests that mediator block this channel, else True.

property source_enabled

False if UI requests that sources block this channel, else True.

property message_count

The most recent message count on this channel.

property rate

The most recent measured messages/second on this channel.

property type

The most recent measurement type as a string on this channel.

property jitter

The most recent measured timestamp jitter in seconds on this channel.

property tov_last_message

The most recent time of validity in elapsed nano-seconds on this channel.

property bandwidth

The most recent bandwidth value in bytes per second on this channel.

class pntos.cobra.utils.SourceChannelView(registry, channel)

Bases: object

A Utility object to track registry <-> UI interactions for a source channel.

property channel

The channel for which this object is a view.

property group

The group in the registry that contains the metadata for this channel.

property source_enabled

False if UI requests that sources block this channel, else True.

class pntos.cobra.utils.UiMediatorInterface(registry, update_interval=0.5)

Bases: object

Interface between a pntos.api.Mediator and the UI via the registry.

__init__(registry, update_interval=0.5)
Parameters:
  • registry (Registry) – Registry reference for UI communications.

  • update_interval (float) – Minimum time in seconds between UI updates per-channel. Small intervals could incur performance degradation. Default is 0.1 seconds.

_ensure_channel_view(channel)

Gets the ChannelView for a given channel, creating one if necessary.

new_mediator_message(message)

Update the UI with a new message from the mediator.

It is up to the controller/mediator implementation to decide what a “new” message means as pertains to buffering messages. It could be that “new” refers to messages from process_pntos_message() calls, or it could be post-buffer messages, or some combination.

Parameters:

message (Message) – The message from the Mediator.

Returns:

False if UI user requests that the mediator block this source ID, else

True.

Return type:

bool

class pntos.cobra.utils.UiMetadataInterface(registry)

Bases: GroupsView

Utility object to populate the keys in the UI_GROUP_METADATA group.

class pntos.cobra.utils.UiSourceInterface(registry)

Bases: object

Interface between a source (e.g. pntos.api.TransportPlugin) and the UI via the registry.

__init__(registry)
Parameters:

registry (Registry) – Registry reference for UI communications.

_ensure_channel_view(channel)

Gets the SourceChannelView for a given channel, creating one if necessary.

new_message(source_identifier)

Update the UI with a new message from this source.

A source (e.g. TransportPlugin) should call this method on it’s UiSourceInterface as soon as it has a source_identifier. Note that the source is not required to provide a whole pntos.api.Message. This provides for some potential performance gains - when the user wishes to block this source ID, the source does not need to allocate and populate a whole pntos.api.Message.

Parameters:

source_identifier (str) – The source ID of the new message.

Returns:

False if UI user requests that the source block this source ID, else

True.

Return type:

bool

pntos.cobra.utils.delta_lat_to_north(delta_lat, approx_lat, altitude)

Convert delta-latitude in radians to north distance in meters.

Parameters:
  • delta_lat – A single delta-latitude or an N-length array of delta-latitudes

  • approx_lat – Approximate latitude at which to calculate conversion (radians)

  • altitude – Approximate altitude at which to calculate conversion (meters)

Returns:

N-length array of distances in the north direction.

pntos.cobra.utils.delta_lon_to_east(delta_lon, approx_lat, altitude)

Convert delta-longitude in radians to east distance in meters.

Parameters:
  • delta_lon – A single delta-longitude or an N-length array of delta-longitudes

  • approx_lat – Approximate latitude at which to calculate conversion (radians)

  • altitude – Approximate altitude at which to calculate conversion (meters)

Returns:

A single east distance or an N-length array of distances in the east direction.

pntos.cobra.utils.east_to_delta_lon(east_distance, approx_lat, altitude)

Convert east distance in meters to delta-longitude in radians.

Parameters:
  • east_distance – A single east distance or N-length array of east distances

  • approx_lat – Approximate latitude at which to calculate conversion (radians)

  • altitude – Approximate altitude at which to calculate conversion (meters)

Returns:

A single delta-longitude or an N-length array of delta-longitudes.

pntos.cobra.utils.ecef_to_llh(ecef)

Converts from ECEF to LLH.

Parameters:

ecef (NDArray[float64]) – A vector in the ECEF frame.

Returns:

The equivalent vector in the LLH frame.

Return type:

NDArray[float64]

pntos.cobra.utils.north_to_delta_lat(north_distance, approx_lat, altitude)

Convert north distance in meters to delta-latitude in radians.

Parameters:
  • north_distance – A single north distance or an N-length array of north distances

  • approx_lat – Approximate latitude at which to calculate conversion (radians)

  • altitude – Approximate altitude at which to calculate conversion (meters)

Returns:

A single delta-latitude or an N-length array of delta-latitudes.

pntos.cobra.utils.ValueType: TypeVar

A TypeVar bound to pntos.api.RegistryValueTypeUnion. This allows fields of type ValueType to be of type pntos.api.RegistryValueTypeUnion, or any subset of that union.