Cobra Internal

These are all the objects that can be directly imported from pntos.cobra.internal.

Warning

Users do not normally need access to these objects. Make sure you know what you’re doing if you’re directly accessing these objects.

class pntos.cobra.internal.BuscatMediator(attached_plugin_identifier, attached_plugin_type)

Bases: Mediator

This is a simple Buscat mediator implementation. It was designed to be used in conjunction with the pntos.cobra.BuscatControllerPlugin which is why this controller directly access private members. It has one public member registry that other plugins are allowed to access.

__init__(attached_plugin_identifier, attached_plugin_type)

Buscat Mediator

Parameters:
  • attached_plugin_identifier (str) – The identifier field of the plugin this mediator is assigned to.

  • attached_plugin_type (PluginType | None) – The abstract plugin type of the plugin this mediator is assigned to.

property filter_description_list

Request a list of strings describing the solutions available.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • Strings should include a substring indicating the type of solution returned. This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

If the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the solutions available.

Return type:

list[str]

request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The times at which to return solutions.

  • filter_description (str | None, optional) – To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. Some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message array may be None if filter_description is invalid.

Return type:

list[Message | None] | None

process_pntos_message(message)

Send a new message to the system for arbitrary processing.

For example, this function is useful for plugins who have just received new sensor data that they wish to relay to the system to be used in a sensor fusion solution.

Parameters:

message (Message)

broadcast_aspn_message(message, transport=None, destination_identifier=None)

Request that pntOS broadcast the provided message out to the network.

Parameters:
  • message (Message)

  • transport (str | None, optional) – The identifier of a transport plugin that the message should be routed to. The transport parameter should match the pntos.api.CommonPlugin.identifier string of a pntos.api.TransportPlugin active in the system. If the transport parameter is None, this indicates that the message should be broadcast to all available transports.

  • destination_identifier (str | None, optional) – A transport-specific identifier that allows transports to determine how to route the message. If the destination transport has the concept of a channel or topic, destination_identifier should be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner defined by the destination transport. If destination_identifier is None, then the transport should output the message in the “default” output channel/topic and route being used by pntOS.

log_message(level, message)

Log a message.

Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).

Parameters:
class pntos.cobra.internal.BatchUpdate(*, sequence_id, group, keys)

Bases: BaseModel

Communicates the most recent values for any keys that have been updated in a group to the front-end.

model_config = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pntos.cobra.internal.ChunkUpdate(*, ordered_updates, unordered_updates)

Bases: BaseModel

Communicates a “chunk” of ordered_updates (SubscriptionMode == ALL) and unordered_updates (SubscriptionMode == LAST) to the front-end.

model_config = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pntos.cobra.internal.KeyUpdate(*, val, subscription_ids, sequence_id)

Bases: BaseModel

Used to notify the front-end of the new value at a certain group/key in the registry. This is part of a BatchUpdate.

model_config = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pntos.cobra.internal.Snapshot(*, data)

Bases: BaseModel

A snapshot of the values at all currently-subscribed group/keys in the registry.

model_config = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pntos.cobra.internal.Subscription(*, id, group, key, mode)

Bases: BaseModel

All data necessary to subscribe/unsubscribe to a certain group/key in the registry with a specific mode (stream behavior).

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pntos.cobra.internal.Write(*, data, sequence_id)

Bases: BaseModel

A batch write request from the front-end with updated values to write at various group/keys in the registry.

model_config = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pntos.cobra.internal.DummyMediator(plugins=None)

Bases: Mediator

A Mediator with minimal capability. Not for use in production code.

property filter_description_list

Returns the list of strings describing available solutions.

Returns:

The filter_description_list of the first OrchestrationPlugin found, or an empty list if this instance cannot find such a plugin.

request_solutions(solution_times, filter_description=None)

Returns requested solutions if possible. Ignores filter_description.

Parameters:
  • solution_times (list[TypeTimestamp]) – The times at which to return solutions.

  • filter_description (str | None, optional) – Unused

Returns:

The return of request_solutions() of the first OrchestrationPlugin found, or None if no OrchestrationPlugin is available.

process_pntos_message(message)

Passes message off to all available OrchestrationPlugin s.

Parameters:

message (Message) – Message to process.

broadcast_aspn_message(message, transport=None, destination_identifier=None)

Attempt to broadcast a message over all available transports.

Parameters:
  • message (Message) – The message to broadcast.

  • transport (str | None) – Usually an identifier used to select a specific transport to broadcast over. Unused in this implementation.

  • destination_identifier (str | None) – Additional description of the method of broadcast, usually a channel name or similar. If None will be changed to ‘somewhere’.

log_message(level, message)

Log a message.

Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).

Parameters:
class pntos.cobra.internal.DummyMessageStreamConfig

Bases: MessageStreamConfig

A MessageStreamConfig implementation that does nothing but satisfy the init_orchestration_plugin() parameter requirement.

sequenced_stream_add(message_type, source_identifier=None)

Does nothing.

Parameters:
  • message_type (type[AspnBase]) – Unused.

  • source_identifier (str | None, optional) – Unused.

sequenced_stream_remove(message_type, source_identifier=None)

Does nothing.

Parameters:
  • message_type (type[AspnBase]) – Unused.

  • source_identifier (str | None, optional) – Unused.

sequenced_stream_all(enable)

Does nothing.

Parameters:

enable (bool) – Unused.

immediate_stream_add(message_type, source_identifier=None)

Does nothing.

Parameters:
  • message_type (type[AspnBase]) – Unused.

  • source_identifier (str | None, optional) – Unused.

immediate_stream_remove(message_type, source_identifier=None)

Does nothing.

Parameters:
  • message_type (type[AspnBase]) – Unused.

  • source_identifier (str | None, optional) – Unused.

immediate_stream_all(enable)

Does nothing.

Parameters:

enable (bool) – Unused.

class pntos.cobra.internal.StandardMediator(attached_plugin_identifier, attached_plugin_type)

Bases: Mediator

This is a simple mediator implementation. It was designed to be used in conjunction with the pntos.cobra.StandardControllerPlugin which is why this controller directly access private members. It has one public member registry that other plugins are allowed to access.

__init__(attached_plugin_identifier, attached_plugin_type)

Standard Cobra Mediator

Parameters:
  • attached_plugin_identifier (str) – The identifier field of the plugin this mediator is assigned to.

  • attached_plugin_type (PluginType | None) – The abstract plugin type of the plugin this mediator is assigned to.

property filter_description_list

Request a list of strings describing the solutions available.

One of these description strings may be used when calling request_solutions(). For consistency, these strings should adhere to the following conventions:

  • Strings should be upper case and have words and acronyms separated by underscores (UPPER_SNAKE_CASE).

  • Strings should contain the substring BEST when they represent the primary solution.

  • Strings should contain the substring DEAD_RECKONING when they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more than BEST solutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide a DEAD_RECKONING solution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals between solution_times (but resets applied before all of the solution_times).

  • Strings should include a substring indicating the type of solution returned. This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string _ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.

Example

If the primary solution is an ASPN PVA then the string MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATE would fulfill the convention.

These conventions allow the user to identify their desired type of solution using substring matching.

Returns:

A list of strings describing the solutions available.

Return type:

list[str]

request_solutions(solution_times, filter_description=None)

Request filtering solutions at the times specified in the array solution_times.

Parameters:
  • solution_times (list[TypeTimestamp]) – The times at which to return solutions.

  • filter_description (str | None, optional) – To select which filter(s) to request solutions from, enter a valid filter description string in filter_description. Valid filter description strings can be obtained by calling filter_description_list. Passing in None will provide a result specific to a particular implementation. When filter_description is None, the implementation should endeavor to return its best solution.

Returns:

An array of messages containing the filter solutions for the requested solution_times. Some entries may be None if they are unavailable at the corresponding time in solution_times. The returned pntos.api.Message array may be None if filter_description is invalid.

Return type:

list[Message | None] | None

process_pntos_message(message)

Send a new message to the system for arbitrary processing.

For example, this function is useful for plugins who have just received new sensor data that they wish to relay to the system to be used in a sensor fusion solution.

Parameters:

message (Message)

broadcast_aspn_message(message, transport=None, destination_identifier=None)

Request that pntOS broadcast the provided message out to the network.

Parameters:
  • message (Message)

  • transport (str | None, optional) – The identifier of a transport plugin that the message should be routed to. The transport parameter should match the pntos.api.CommonPlugin.identifier string of a pntos.api.TransportPlugin active in the system. If the transport parameter is None, this indicates that the message should be broadcast to all available transports.

  • destination_identifier (str | None, optional) – A transport-specific identifier that allows transports to determine how to route the message. If the destination transport has the concept of a channel or topic, destination_identifier should be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner defined by the destination transport. If destination_identifier is None, then the transport should output the message in the “default” output channel/topic and route being used by pntOS.

log_message(level, message)

Log a message.

Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).

Parameters:
class pntos.cobra.internal.StandardMessageStreamConfig

Bases: MessageStreamConfig

This is a simple message stream config implementation. As with the StandardMediator it is considered as apart of the pntos.cobra.StandardControllerPlugin implementation which allows the controller to access private members and functions when necessary. All other plugins should adhere to API compliant functions.

__init__()

Standard Cobra Message Stream Config

By default, all messages are immediately streamed.

sequenced_stream_add(message_type, source_identifier=None)

Request messages are streamed in sorted timestamp ordering.

Request messages of the given message_type and optional source_identifier are streamed in sorted timestamp ordering.

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

sequenced_stream_remove(message_type, source_identifier=None)

Request messages are no longer streamed in sorted timestamp ordering.

Request messages of the given message_type and optional source_identifier are no longer streamed in sorted timestamp ordering. This will remove a type that was previously added in a call to sequenced_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call to sequenced_stream_all().

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

sequenced_stream_all(enable)

Request all messages are streamed in sorted timestamp ordering.

Note that the ability to do this reliably will depend on the length of the buffer used by the pntos.api.Mediator.

Parameters:

enable (bool)

immediate_stream_add(message_type, source_identifier=None)

Request messages are streamed immediately.

Request messages of the given message_type and optional source_identifier are streamed immediately without delay, buffering, or sorting.

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

immediate_stream_remove(message_type, source_identifier=None)

Request messages are no longer streamed immediately.

Request messages of the given message_type and optional source_identifier are no longer streamed immediately. This will remove a type that was previously added in a call to immediate_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call to immediate_stream_all().

Parameters:
  • message_type (type[AspnBase])

  • source_identifier (str | None, optional)

immediate_stream_all(enable)

Request all messages are streamed immediately without delay, buffering, or sorting.

Parameters:

enable (bool)

class pntos.cobra.internal.EkfFusionStrategy(mediator)

Bases: StandardFusionStrategy

This is an Extended Kalman Filter (EKF) sensor fusion strategy.

It is capable of Bayesian inference on a linearized discrete-time system with Gaussian noise inputs.

__init__(mediator)

An Extended Kalman Filter Fusion Strategy

Parameters:

mediator (Mediator) – A pntos.api.Mediator instance.

property num_states

Get the total number of states this filter is estimating.

The count will initially be zero, until add_states() is called.

Returns:

Number of states being estimated in this filter.

Return type:

int

add_states(initial_estimate, initial_covariance, cross_covariance=None)

Add new states to this filter.

Increases number of filter states and set the initial conditions of the new states. Returns index of the first added state. If cross_covariance is None, cross covariance between the existing states and the added states will be set to zeroes.

Parameters:
  • initial_estimate (NDArray[float64]) – The initial estimate to populate the new states with.

  • initial_covariance (NDArray[float64]) – The initial covariance matrix used to initialize the uncertainty of the new states.

  • cross_covariance (NDArray[float64] | None) – A covariance matrix that describes the cross terms between the previous states and the new states. If None, the cross-terms will be set to zero. Otherwise, if there are n existing states and m new states being added, this argument should have shape [n, m].

Returns:

Index of first state being added to this filter (zero indexed).

Return type:

int

remove_states(first_index, count)

Removes a set of states from the filter.

Parameters:
  • first_index (int) – Index of the first state to be removed (zero indexed).

  • count (int) – The number of states to be removed.

property estimate

Get the current internal estimate managed by this strategy.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. This method returns the current estimate, incorporating all changes made by previous method calls to this strategy.

Returns:

An estimate if available. Returns None if no states have been added yet.

Return type:

NDArray[float64] | None

set_estimate_slice(new_estimate, first_index)

Set a slice of the state estimates to a given set of values.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. This method allows for manually overriding the current estimate. Sets a block of states to new values, starting with first_index and overwriting a number of states equal to the length of new_estimate.

Parameters:
  • new_estimate (NDArray[float64]) – The new estimate values that will overwrite the previous values.

  • first_index (int) – The index of the first state to overwrite.

property covariance

Get the covariance of the current estimate.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed. This method returns this covariance, incorporating all changes made by previous method calls to this strategy.

Returns:

The covariance of the current estimate. Returns None if no states have been added yet.

Return type:

NDArray[float64] | None

set_covariance_slice(new_covariance, first_row, first_col=None)

Set a slice of the covariance matrix to a given set of values.

This class manages a current estimate that is initially populated by add_states() and then is modified iteratively by propagate(), update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed.

Allows for manually overriding the current covariance matrix. Sets a block of the covariance matrix to new values. The overwritten values are those in a rectangular area defined by the upper left corner at first_row, first_col and extending down and right to cover an area equal to the size of new_covariance. If first_col is not set or set to None, the value of first_row will be used as a column index as well.

Parameters:
  • new_covariance (NDArray[float64]) – The new covariance values that will overwrite a slice of the previous covariance matrix.

  • first_row (int) – The row of the first value to overwrite.

  • first_col (int | None, optional) – The column of the first value to overwrite.

propagate(dynamics_model)

Propagates the estimate of the state space forward in time.

This method assumes that a state space is already initialized with a set of states via the add_states() method. The dynamics_model parameter includes a description of how the current state estimate can be propagated from the current time to a new time. Note that the actual numerical values of the current/new times are not specified anywhere, as that information is not needed to perform the computation. This method then takes the current state space estimate and the dynamics_model and uses both to compute an estimate at the new time. The new estimate clobbers the old estimate managed by this class, and be acquired by calling estimate.

Parameters:

dynamics_model (StandardDynamicsModel)

update(measurement_model)

Updates the estimate of the state space, incorporating a new measurement.

This method assumes that a state space is already initialized with a set of states via the add_states() method. The measurement_model parameter includes both the measurement itself and a description of how the measurement relates to the state space. This method then takes the current state space estimate and updates it using information from the measurement_model. The updated estimate can be acquired by calling estimate.

Parameters:

measurement_model (StandardMeasurementModel) – The measurement with which to update the filter, as well as a model that describes how the measurement relates to the states this strategy is estimating.

class pntos.cobra.internal.StandardFusionEngine(mediator, save_x_and_p_after_prop, save_x_and_p_after_update)

Bases: StandardFusionEngine

A fusion engine designed to use data from multiple sensors and output a unified state estimate.

__init__(mediator, save_x_and_p_after_prop, save_x_and_p_after_update)
Parameters:
  • mediator (Mediator) – A pntos.api.Mediator instance.

  • save_x_and_p_after_prop – Whether to save state estimate and sigma to registry right after each propagate.

  • save_x_and_p_after_update – Whether to save state estimate and sigma to registry right after each update.

property time

The current time of the filter.

property strategy

The underlying algorithm used for Bayesian inference.

Returns:

The fusion strategy is the type of filter (EKF, UKF, etc.).

Return type:

StandardFusionStrategy | None

property num_states

Get the total number of states currently in the fusion engine.

Virtual state blocks do not affect this result.

Returns:

The total number of states currently in the fusion engine.

Return type:

int

property state_block_labels

Get a list of pntos.api.StandardStateBlock labels that have been added to this fusion engine.

Returns:

A list of the pntos.api.StandardStateBlock labels that have been added to this fusion engine. Returns None if no state blocks have been added. Guaranteed to not return None if num_states is a value other than 0.

Return type:

list[str] | None

add_state_block(block, initial_estimate_covariance, cross_covariances=None)

Add the given pntos.api.StandardStateBlock to the fusion engine.

This will expand the state vector being estimated by the value of num_states.

Parameters:
  • block (StandardStateBlock) – The pntos.api.StandardStateBlock to be added to the fusion engine.

  • initial_estimate_covariance (EstimateWithCovariance) – Contains the initial conditions of the states, with initial_estimate_covariance.estimate being an Nx1 matrix and initial_estimate_covariance.covariance being an NxN matrix, where N is block.num_states.

  • cross_covariances (CrossCovariances | None, optional) – An optional parameter which, if non-None, contains a description of the newly added StateBlock’s cross covariances with respect to a set of StateBlocks which already exist inside the filter (specified by cross_covariances.block_labels). If the cross_covariance parameter is None, cross covariance between the existing states and the added states will be set to zeroes.

get_state_block_estimate(block_label)

Get the estimate associated with a state block.

Find a pntos.api.StandardStateBlock or pntos.api.VirtualStateBlock within the fusion engine matching block_label, and return a copy of its current estimate vector.

Parameters:

block_label (str)

Returns:

A copy of its current estimate vector. If block_label references a virtual state block (VSB) this will return a converted estimate, converted into the VSBs coordinate frame. Returns None if block_label does not correspond to a block that has been added to the fusion engine. Guaranteed to not return None when block_label is in the list returned by state_block_labels and strategy is not None.

Return type:

NDArray[float64] | None

get_state_block_covariance(block_label)

Get the covariance associated with a state block.

Find a pntos.api.StandardStateBlock or pntos.api.VirtualStateBlock within the fusion engine matching block_label, and return a copy of its current covariance matrix.

Parameters:

block_label (str)

Returns:

A copy of its current covariance matrix. If block_label references a virtual state block (VSB) this will return a converted covariance, converted into the VSBs coordinate frame. Returns None if block_label does not correspond to a block that has been added to the fusion engine. Guaranteed to not return None when block_label is in the list returned by state_block_labels and strategy is not None.

Return type:

NDArray[float64] | None

get_state_block_cross_covariance(block_label1, block_label2)

Get the cross covariance between the states associated with two state blocks.

Find the pntos.api.StandardStateBlock s within the fusion engine matching block_label1 and block_label2, and return the cross-covariance matrix between them.

Parameters:
  • block_label1 (str)

  • block_label2 (str)

Returns:

The cross-covariance matrix between block_label1 and block_label2. Returns None if block_label1 or block_label2 do not correspond to blocks that have been added to the fusion engine. Guaranteed to not return None when both block_label1 and block_label2 are in the list returned by state_block_labels and strategy is not None.

Return type:

NDArray[float64] | None

set_state_block_estimate(block_label, estimate)

Update the estimate associated with a given state block.

Find a pntos.api.StandardStateBlock within the fusion engine matching block_label, and change its current estimate vector.

Note

This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.

Parameters:
  • block_label (str)

  • estimate (NDArray[float64])

set_state_block_covariance(block_label, covariance)

Update the covariance associated with a given state block.

Find a pntos.api.StandardStateBlock within the fusion engine matching block_label, and change its current covariance matrix.

Note

This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.

Parameters:
  • block_label (str)

  • covariance (NDArray[float64])

set_state_block_cross_covariance(block_label1, block_label2, covariance)

Update the covariance between two state blocks.

Find the pntos.api.StandardStateBlock s within the fusion engine matching block_label1 and block_label2, and change the current covariance matrix between them.

Note

This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.

Parameters:
  • block_label1 (str)

  • block_label2 (str)

  • covariance (NDArray[float64])

remove_state_block(block_label)

Remove the pntos.api.StandardStateBlock matching block_label.

This will reduce the state vector being estimated by the number of states that the block represents.

Parameters:

block_label (str)

property virtual_state_block_target_labels

Gets a list of the target labels of virtual state blocks that have been added.

A label being returned by this list is not a guarantee that the virtual state block has a valid source. For that, call has_virtual_state_block().

Returns:

A list of the target labels of virtual state blocks that have been added. Returns None if no virtual state blocks have been added to this fusion engine.

Return type:

list[str] | None

has_virtual_state_block(vsb_target_label)

Checks if the fusion engine has a pntos.api.VirtualStateBlock with a matching target label.

Parameters:

vsb_target_label (str)

Returns:

True if the fusion engine has a pntos.api.VirtualStateBlock with a matching target label, False if no virtual state block with matching target label exists or if one exists but is not capable of generating an estimate. That is, the VSB’s source must exist and be in a continuous chain to a concrete state block which also exists in the fusion engine in order to return True.

Return type:

bool

add_virtual_state_block(virtual_state_block)

Add the given pntos.api.VirtualStateBlock to the fusion engine.

A virtual state block (VSB) convert from an underlying block coordinate frame into the VSB coordinate frame.

Parameters:

virtual_state_block (VirtualStateBlock)

remove_virtual_state_block(vsb_target_label)

Remove the pntos.api.VirtualStateBlock matching vsb_target_label.

Parameters:

vsb_target_label (str)

property measurement_processor_labels

Get a list of the labels of measurement processors that have been added.

Returns:

list of labels of measurement processors that have been added. Returns None if no measurement processors have been added to this fusion engine.

Return type:

list[str] | None

add_measurement_processor(processor)

Add a pntos.api.StandardMeasurementProcessor.

This can be used to process future measurements that correspond to processor.label.

Parameters:

processor (StandardMeasurementProcessor)

remove_measurement_processor(processor_label)

Remove a pntos.api.StandardMeasurementProcessor previously added to the fusion engine.

Assumes a measurement processor was previously added via add_measurement_processor() with the label processor_label.

Parameters:

processor_label (str)

propagate(time)

Propagate the filter estimate forward in time.

May be evaluated lazily (when results are requested).

Parameters:

time (TypeTimestamp)

update(processor_label, message)

Update the filter with the given measurement.

Will propagate first if needed to reach the time encoded inside the measurement.

Parameters:
  • processor_label (str)

  • message (Message)

get_real_label(label)

Takes a block label in and returns the real label associated with the block. If label corresponds to a pntos.api.VirtualStateBlock the label of the starting block for the virtual transformation will be returned. If label corresponds to a pntos.api.StandardStateBlock then label will be returned as is.

peek_ahead(time, block_labels)

Calculates the estimate and covariance at a requested time.

Uses the state blocks listed in block_labels, without changing the state of the fusion engine or its underlying filter. Blocks are assembled in the order that the labels are passed in.

If all of the following are true:

  • time is equal to or after the filter time (which can be checked with time).

  • All labels in block_labels correspond to a block that has been added to the fusion engine (which can be checked with state_block_labels).

  • block_labels has at least one element.

Then the result returned is guaranteed to not be None. Otherwise, if any of the above are false then the result will be None.

Parameters:
  • time (TypeTimestamp)

  • block_labels (list[str]) – An array of strings.

Returns:

EstimateWithCovariance | None

generate_x_and_p(block_labels)

Generates the current estimate and covariance.

Estimate and covariance are built corresponding to a list of StateBlock labels. Blocks are assembled in the order that the labels are passed in.

If all of the following are true:

  • All labels in block_labels correspond to a block that has been added to the fusion engine (which can be checked with state_block_labels).

  • block_labels has at least one element.

Then the result returned is guaranteed to not be None. Otherwise, if any of the above are false then the result will be None.

Parameters:

block_labels (list[str]) – An array of strings.

Returns:

EstimateWithCovariance | None

give_state_block_aux_data(block_label, aux)

Route a list of messages of aux data to a pntos.api.StandardStateBlock.

Parameters:
  • block_label (str)

  • aux (list[Message | None])

give_measurement_processor_aux_data(processor_label, aux)

Route a list of messages of aux data to a pntos.api.StandardMeasurementProcessor.

Parameters:
  • processor_label (str)

  • aux (list[Message | None])

give_virtual_state_block_aux_data(target_label, aux)

Route a list of messages of aux data to a pntos.api.VirtualStateBlock.

Parameters:
  • target_label (str)

  • aux (list[Message | None])

class pntos.cobra.internal.VirtualStateBlockManager(mediator)

Bases: object

A utility class that manages pntos.api.VirtualStateBlock’s for a pntos.api.StandardFusionEngine instance. This class will create n-ary trees by instantiating Node`s to build and maintain the relationship between a given VSB and another. The roots of these trees represent ‘real’ state blocks. If a `Node has no parent, it is assumed to be a root node but there is no way the manager can know if said Node is a real block or if it’s just a virtual state block that has yet to be added.

__init__(mediator)
Parameters:

mediator (Mediator) – A pntos.api.Mediator instance.

add_virtual_state_block(trans)

Registers a pntos.api.VirtualStateBlock into the manager. If the VSB’s source matches its target or the target already exists in the manager, a warning will be logged and the VSB won’t be added. If the source doesn’t exist, a new Node will be created and it will be treated as a root node. As VSBs are added, this function will adjust the relationships between VSBs accordingly.

Parameters:

trans (VirtualStateBlock) – The pntos.api.VirtualStateBlock to be added.

convert(orig, start, target, time)

Converts an pntos.api.EstimateWithCovariance to a target representation if the conversion is possible.

Parameters:
  • orig (EstimateWithCovariance) – The estimate and covariance in the starting format.

  • start (str) – The label of the starting format.

  • target (str) – The label that refers to the ending format for the estimate and covariance.

  • time (TypeTimestamp) – The time of validity for the estimate and covariance.

Returns:

pntos.api.EstimateWithCovariance if conversion is possible; otherwise returns None.

convert_estimate(orig, start, target, time)

Converts an estimate to a target representation if the conversion is possible.

Parameters:
  • orig (NDArray[float64]) – The estimate in the starting format.

  • start (str) – The label of the starting format.

  • target (str) – The label that refers to the ending format for the estimate.

  • time (TypeTimestamp) – The time of validity for the estimate.

Returns:

NDArray[float64] if conversion is possible; otherwise returns None.

convert_H(engine, real_label, label, curr_H)

Given a measurement jacobian, H, calculate the full transformation from a set of real states to a set of measurements.

Parameters:
Returns:

NDArray[float64] if the labels provided exist in the engine, else None.

get_start_block_label(target)

Gets the state block label of the starting node which is assumed to be represent a ‘real’ state block.

Parameters:

target (str) – The pntos.api.VirtualStateBlock unique identifier to get the starting label for.

Returns:

str | None

get_virtual_state_block_labels()

Returns the list of virtual state block labels being tracked by the manager. Returns None if that list is empty.

give_virtual_state_block_aux_data(target, aux)

Provides the pntos.api.VirtualStateBlock with target label target with the data in aux.

Parameters:
jacobian(orig, start, target, time)

Obtains the jacobian of an input estimate in the target representation if possible.

Parameters:
  • orig (NDArray[float64]) – The estimate in the starting format.

  • start (str) – The label of the starting format.

  • target (str) – The label that refers to the ending format for the jacobian.

  • time (TypeTimestamp) – The time of validity for the input estimate.

Returns:

NDArray[float64] if conversion is possible; otherwise returns None.

remove_virtual_state_block(target)

Removes the pntos.api.VirtualStateBlock matching target from the manager. If this leaves an orphaned branch of nodes it will prune the branch removing all VSB’s in that branch as well.

Parameters:

target (str) – The unique identifier of the pntos.api.VirtualStateBlock to remove

class pntos.cobra.internal.ManualHeadingAlign(config_group, mediator)

Bases: InertialInitializationStrategy

Static alignment for an inertial, requiring a user-provided heading.

This initialization strategy can be used to produce an initial PVA to initialize an inertial mechanization. It requires both position and IMU measurements, as well as an initial heading provided via config. It is also capable of estimating inertial biases.

__init__(config_group, mediator)
Parameters:
request_motion_needed()

Check the type of motion (if any) needed.

Returns:

InitializationMotionNeeded

request_current_status()

Check the current initialization status.

Returns:

InitializationStatus

process_pntos_message(message)

Incorporate a new message into the initialization algorithm.

Parameters:

message (Message)

request_solution()

Get the current initial solution.

Returns:

InitialInertialSolution

class pntos.cobra.internal.BarometerToAltitudePreprocessor(channel, mediator, alt_sigma)

Bases: Preprocessor

A preprocessor that converts barometer measurements to altitude measurements.

__init__(channel, mediator, alt_sigma)

Cobra Barometer to Altitude Preprocessor

process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.cobra.internal.DownsamplerPreprocessor(config, mediator)

Bases: Preprocessor

A downsampling preprocessor that periodically discards certain messages.

It collects a list of channels and factors from the registry and allows 1 out of every N messages to pass through. This is done for every channel c and factor N. Where c = channel[i] and N = factor[i].

__init__(config, mediator)

Cobra Downsampler Preprocessor

Parameters:
process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.cobra.internal.ImuRotationPreprocessor(mediator, imu_channel, C_imu_to_platform)

Bases: Preprocessor

process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.cobra.internal.OutagePreprocessor(mediator, outage_config)

Bases: Preprocessor

Preprocessor used to induce an outage on a given channel.

process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.cobra.internal.TimeAdjusterPreprocessor(config, mediator)

Bases: Preprocessor

process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.cobra.internal.TimeBiasPreprocessor(config, mediator)

Bases: Preprocessor

Corrects timestamps for a constant bias.

This preprocessor is useful when a specific sensor produces timestamps with a constant bias. It is configured with a list of channels as well as a constant time bias. Any message whose source identifier matches one of the configured channels will have its timestamp subtracted by the bias amount.

__init__(config, mediator)
Parameters:
  • config_group (str) – The group in the registry which holds config information for this preprocessor.

  • mediator (Mediator) – Used to get config information and to perform logging.

process_pntos_message(message)

Process a message.

Parameters:

message (Message) – A message to be processed.

Returns:

A list of pntos.api.Message s. Usually this will be a single message, a modified version of message. It could be None if message is rejected or dropped. The preprocessor could also accumulate several messages, returning None for each one then returning an array with multiple processed messages.

Return type:

list[Message] | None

class pntos.cobra.internal.StandardInertial(config_group, mediator, solution)

Bases: StandardInertialMechanization

An inertial object which mechanizes IMU data to generate a series of inertial solutions.

__init__(config_group, mediator, solution)

An Inertial Mechanization Object

Parameters:
request_solution_message_type()

Get the solution type.

Returns:

The message type that will be returned by request_current_solution(), request_solution(), and request_solutions().

Return type:

type[AspnBase]

request_current_solution()

Get the current inertial solution.

Returns:

The current inertial solution.

Return type:

Message

request_solution(time)

Request solution at a specific time.

Parameters:

time (TypeTimestamp) – The time at which the returned solution should be valid.

Returns:

The solution computed by this inertial at time if time is in the valid range, None otherwise (is_time_in_range() can be used to check time before calling this method).

Return type:

Message | None

request_solutions(times, solution_type)

Request solutions at multiple specific times.

Parameters:
  • times (list[TypeTimestamp]) – An array of times at which solutions are requested.

  • solution_type (InertialSolutionRangeType) – The type of solution requested.

Returns:

An array of solutions. Returns None if solution_type is unsupported by this inertial or every instance of times is outside the valid range. Otherwise guaranteed to not be None.

Return type:

list[Message | None] | None

is_time_in_range(time)

Check if a solution exists at a given time.

Parameters:

time (TypeTimestamp) – The query time.

Returns:

True if a solution exists at time, False otherwise. This result is only valid until another method (for example, process_pntos_message()) is called.

Return type:

bool

request_earliest_time()

Get the earliest available time at which a solution or forces and rates can be requested.

This result is only valid until another method (for example, process_pntos_message()) is called.

Returns:

The earliest available time at which a solution or forces and rates can be requested.

Return type:

TypeTimestamp

request_latest_time()

Get the latest available time at which a solution or forces and rates can be requested.

This result is only valid until another method (for example, process_pntos_message()) is called.

Returns:

The latest available time at which a solution or forces and rates can be requested.

Return type:

TypeTimestamp

request_process_pntos_message_types()

Returns an array of message types that are supported by this plugin.

Returns:

An array of message types that are supported by this plugin as inputs to process_pntos_message().

Return type:

list[type[AspnBase]]

process_pntos_message(message)

A new message to be incorporated into the computed inertial solution.

Parameters:

message (Message)

request_forces_and_rates(time)

Request forces and rates for a given time.

Parameters:

time (TypeTimestamp) – The time at which the forces and rates should be valid.

Returns:

The instantaneous forces and rates at time if time is in the valid range, None otherwise (is_time_in_range() can be used to check time before calling this method).

Return type:

InertialForcesRates | None

request_average_forces_and_rates(time1, time2)

Request average forces and rates over a time period.

Parameters:
  • time1 (TypeTimestamp) – The start of the time range over which the forces and rates should be valid.

  • time2 (TypeTimestamp) – The end of the time range over which the forces and rates should be valid.

Returns:

The average forces and rates over the period of time defined by time1 and time2 if at least one of them is in the valid range, None otherwise (is_time_in_range() can be used to check both times before calling this method).

Return type:

InertialForcesRates | None

request_reset_message_types()

Get valid types of reset messages.

Returns:

An array of message types that are supported by this plugin for resetting the inertial solution, or None if resetting the inertial solution is an unsupported operation by the inertial plugin.

Return type:

list[type[AspnBase]] | None

reset_solution(message)

Set the solution to the values in message.

For example, if message is PVA then the inertial solution will be set to that PVA. If message is just position, then only the position portion of the inertial solution will be set using message.

Parameters:

message (Message) – A message containing the information necessary to reset the solution. To see the types supported by the implementation, call request_reset_message_types().

correct_sensor_errors(time, errors)

Reset the current inertial internal bias values.

Reset the current inertial internal bias values with corrections from an external source, such as a filter or error estimator. The errors passed in here will be adjusted for internally by the inertial when processing incoming data. Thus, if errors are passed into the inertial here they should not be corrected for in an external filter processing the inertial output (which would lead to a double correction).

Parameters:
  • time (TypeTimestamp) – The time at which errors should be valid.

  • errors (StandardInertialErrors) – An estimate of the inertial sensor’s errors.

request_sensor_errors(time)

Request inertial errors for a given time.

Parameters:

time (TypeTimestamp) – Time at which inertial errors should be valid.

Returns:

Inertial errors at time if time is in the valid range (pntos.api.CommonInertial.is_time_in_range() can be used to check time before calling this method), None otherwise.

Return type:

StandardInertialErrors | None

class pntos.cobra.internal.StandardKeyValueStore(group, log_func, plugin_resources_location=None)

Bases: KeyValueStore

Implementation note: This implementation deviates from the python dictionary in that it heavily favors logging out errors and continuing on over raising exceptions. Exceptions are reserved only for truly fatal and unrecoverable errors. For example, instead of raising a KeyValue error for a key that does not exist in the store, the StandardKeyValueStore just logs out an error message and returns None.

data_format = 1

Defines the underlying format which the data in the key-value store will be stored as.

__init__(group, log_func, plugin_resources_location=None)

Cobra Standard Key-Value Store

Parameters:
  • group (str) – The name of the key-value store so that it can be easily accessed by plugins.

  • log_func (Callable[[LoggingLevel, str], None]) – The function to use for logging.

  • plugin_resources_location (str | None) – The path to the directory to save or load permanent keys. If no path is provided, DEFAULT_PERMANENCY_DIR will be used.

type_conversion

A store of type conversion functions.

Usage: type_conversion[type_in_store][requested_return_type] -> requested_return_type | None

Will be None is conversion is not supported or conversion failed.

keys()

Get the array of keys which currently exist in this store.

Returns:

Returns the keys in the store or None if no keys are present.

Return type:

list[str] | None

get_value(key, value_type)

Get the value stored at key with return type value_type.

Example

For example, to access altitude in a pntos.api.KeyValueStore named kv_store as an integer:

altitude = kv_store.get_value("altitude", int)
Parameters:
Returns:

Returns None if the key is not available. The return is guaranteed to not be None if called with a valid key (which can be checked with pntos.api.KeyValueStore.__contains__()) and if the store can convert the value to the requested type.

Return type:

pntos.api.RegistryValueType | None

get_raw(key=None)

Get the value for the given key as an array of bytes.

Parameters:

key (str | None, optional)

Returns:

The return format will conform to the definition in pntos.api.KeyValueStore.data_format. Returns None if the given key is not available. The return is guaranteed to not be None if called with a valid key, which can be checked with pntos.api.KeyValueStore.__contains__():

if key in store:
...

If key is None, then this function will return all of the keys and values in the group passed to pntos.api.Registry.batch_start() and will be formatted to conform to keys and values as defined in pntos.api.KeyValueStore.data_format.

Return type:

bytes | None

set_value(key, value)

Set the given key to the provided value.

Parameters:
set_raw(key, bytes)

Set the given key to the provided value.

Parameters:
remove_key(key)

Remove the given key from the registry.

Parameters:

key (str)

Returns:

True if key is successfully removed, and False otherwise. Keys may fail to be removed if the key does not currently exist, or the backend is unable to remove the key.

Return type:

bool

batch_end()

Ends a batch operation.

Ends a batch operation started with a pntos.api.Registry.batch_start() call. After calling this, the user should not use the pntos.api.KeyValueStore they received from pntos.api.Registry.batch_start() again without calling pntos.api.KeyValueStore.batch_restart() on the pntos.api.KeyValueStore.

If keys in the batch were acted upon with set_permanent() turned on, and the plugin supports permanent storage, this call will save changes to permanent storage if set_permanent() is True during the call to batch_end(). Enacts equivalent of set_permanent(self,false) before return. If any request_notify() observers have been added, they will be processed prior to this call returning.

Example

Example 1: Flushing to permanent storage on batch_end():

store = registry.batch_start("group")
...work...
store.set_permanent(true) # if not disabled, flush on batch_end
...work...
store.batch_end()    # will flush values

Example 2: Not flushing to permanent storage on batch_end():

store = registry.batch_start("group")
...work...
store.set_permanent(true)  # tag some values
...work...
store.set_permanent(false) # do not flush on batch_end
store.batch_end()          # will not flush values

In the second example above, values set with “set” methods after the initial set_permanent() call are still stored for potential saving to permanent storage.

batch_restart()

Restarts a batch.

Restarts a batch that was previously started with pntos.api.Registry.batch_start() and subsequently ended with pntos.api.KeyValueStore.batch_end(). This method is likely much more efficient than pntos.api.Registry.batch_start() (depending on the registry implementation) as the pntos.api.Registry.batch_start() method must find the store again given the group name.

Note

While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call batch_end() as soon as possible after they are done getting/setting values in the returned pntos.api.KeyValueStore.

request_notify(key, callback)

Register a callback which gets called each time a key in the store is updated.

Allows plugins to respond asynchronously to parameter updates. Returns True if the notifier was successfully registered, and False if the store is unable to notify the requester. If key is None, then the callback will be invoked when any key in the batch’s group is modified. Otherwise, the callback will only be invoked when the given key is modified.

Note

The callback must not attempt to set any values inside the pntos.api.KeyValueStore, as the callback is likely being invoked during the processing of another operation. The callback should endeavour to store off the updated keys/values as quickly as possible and return, leaving the processing of the updates to another context or thread when possible. Calling pntos.api.Mediator within the callback may be disallowed by the controller implementation and lead to undefined behavior.

Parameters:
  • key (str | None)

  • callback (Callable[[str, list[str], KeyValueStore], None]) –

    A function with a function definition compatible with:

    def my_callback(group: str, modified_keys: list[str], kv: KeyValueStore) -> None:
        ...
    

Returns:

True if the notifier was successfully registered, and False if the store is unable to notify the requester.

Return type:

bool

remove_notify(key, callback)

Removes a notification as requested by request_notify().

The group and callback must match the parameters passed to request_notify() in order to successfully remove a callback.

Note

This will remove all matching callbacks that have a matching group and callback. If a user registers the same callback twice this will remove both.

Parameters:
  • key (str | None)

  • callback (Callable[[str, list[str], KeyValueStore], None])

Returns:

True if removal was successful and False if it was not. False will be returned if a callback did not exist for the group.

Return type:

bool

clear()

Remove all items from key-value store.

values()

Returns a list of the key-value store’s values.

This method is useful for unloading all values from a key-value store.

Example

For example:

def get_mean_of_kv_store(kv: KeyValueStore) -> float | None:
    '''Returns the mean of a key-value store if all values are int or float'''
    values = kv.values()
    if all(isinstance(x, (int, float)) for x in values):
        return sum(values) / len(values)
    return None
Returns:

A list of all values in the store.

Return type:

list[RegistryValueTypeUnion]

items()

Returns a list of the items (key-value pairs) in the store.

This method is useful for when you need to iterate over both keys and values in a dictionary.

Example

For example:

for key, value in my_dict.items()
    printf('Key: {key}, Value: {value}')
Returns:

A list of the key-value pairs (as tuples) in the store.

Return type:

list[tuple[str, RegistryValueTypeUnion]]

set_permanent(permanent)

Tag values modified with “set” methods as permanently stored.

Configure the pntos.api.KeyValueStore to tag values modified with “set” methods as permanently stored (as opposed to ephemerally stored in memory). Only values acted upon with “set” methods while set_permanent() is True will be tagged. Values will be flushed according to registry configuration settings or per batch_end() API. Returns the value of the permanent storage configuration. Callers should check this to verify if the set was successful.

Example

Tagging specific keys to be permanently stored:

store: PntosKeyValueStore = registry.batch_start("group")
store.set_value("key1",1234.56) # does not tag this value as permanently stored
store.set_permanent(True)       # start tagging set calls as permanently stored
store.set_value("key1",987.65)
store.set_value("key2",123)     # both key1 and key2 values tagged
store.set_permanent(False)      # disable permanent storage
store.set_value("key1",456.78)  # key1 = 456.78 is value of key1 in store
                                # key1 = 987.65 tagged to be permanently stored
                                # key2 = 123    tagged to be permanently stored
Parameters:

permanent (bool)

Returns:

The value of the permanent storage configuration. Callers should check this to verify if the set was successful.

Return type:

bool

get_type(key)

Returns the type of a value in the KeyValueStore.

This is helpful for situations when it is advantageous to know the type of a value in the store without need for the value itself.

Example

For example, it might be useful for these situations where different callbacks are needed depending on what type is in the store:

if key in kv_store:
    val_type = kv_store.get_type(key)
    if val_type is int:
        kv_store.request_notify(key, int_callback)
    elif val_type is Message:
        kv_store.request_notify(key, message_callback)
    else:
        kv_store.request_notify(key, generic_callback)
Parameters:

key (str)

Returns:

If key exists in registry, this is guaranteed to return the return type of pntos.api.KeyValueStore.__getitem__() for the given key. Else, it returns None if type information is not available or key does not exist in registry.

Return type:

type[RegistryValueTypeUnion] | None

class pntos.cobra.internal.StandardRegistry(log_func, plugin_resources_location=None)

Bases: Registry

A registry that maps group names to objects storing all the key/values in that group.

__init__(log_func, plugin_resources_location=None)

Cobra Standard Registry

Parameters:
  • log_func (Callable[[LoggingLevel, str], None]) – The function to use for logging.

  • plugin_resources_location (str | None) – The path to the directory to save or load permanent keys. If no path is provided, DEFAULT_PERMANENCY_DIR will be used.

batch_start(group)

Begin a batch get/set operation.

Begin a batch get/set operation wherein the user may make any number of modifications to the keys/values in the group. The registry implementation may wait to batch these requests until pntos.api.KeyValueStore.batch_end() is called for better performance. For example, a lock may be obtained at the beginning of a batch_start() and not released until a pntos.api.KeyValueStore.batch_end() call is encountered. Thus, a plugin that calls batch_start() should endeavour to make its calls to the set_, get_, and register methods as quickly as possible and call pntos.api.KeyValueStore.batch_end() immediately, as doing otherwise may be locking other plugins out of access to the registry (depending on the registry plugin implementation). If a plugin supports pntos.api.KeyValueStore.request_notify(), then notifications of updates may be suspended until the batch ends. After a batch is ended, the returned pntos.api.KeyValueStore can still be used to access the store via pntos.api.KeyValueStore.batch_restart().

Note

While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call pntos.api.KeyValueStore.batch_end() as soon as possible after they are done getting/setting values in the returned pntos.api.KeyValueStore.

Parameters:

group (str)

Returns:

KeyValueStore

property group_array

Get the array of groups which currently exist.

Returns:

The array of groups which currently exists. Returns None if no groups exist.

Return type:

list[str] | None

has_group(group)

Checks whether or not a given group has had any values added to it (for any key).

Parameters:

group (str)

Returns:

bool

request_notify_new_group(callback)

Register a callback which gets called each time a new group is made in the registry.

Parameters:

callback (Callable[[str], None])

Returns:

True if the notifier was successfully registered, and False if the registry is unable to notify the requester.

Return type:

bool

class pntos.cobra.internal.AltitudeMeasurementProcessor(label, state_block_labels, mediator)

Bases: StandardMeasurementProcessor

Generates a model that maps an altitude measurement to a Pinson15 inertial error state block and a 1-state altitude FOGM bias block.

This measurement processor handles all of the following ASPN message types:
  • MeasurementAltitude

  • MeasurementPosition (Geodetic)

  • MeasurementPositionVelocityAttitude (Geodetic)

__init__(label, state_block_labels, mediator)

An Altitude Measurement Processor.

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 2-element list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 3, with a Down position error in meters as the third state. The second entry should refer to a 1-state FOGM state block estimating time-correlated altitude bias.

  • mediator (Mediator) – A pntos.api.Mediator instance.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generate a pntos.api.StandardMeasurementModel.

Parameters:
  • message (Message) – The measurement/observation to process.

  • gen_x_and_p_func (GenXandP) – A callback function that can be used to retrieve the current estimate and covariance for the state blocks this measurement processor targets.

Returns:

A generated model containing the parameters required for a filter update. Will be None when a measurement cannot be produced from message (for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).

Return type:

StandardMeasurementModel | None

class pntos.cobra.internal.ClockBiasStateBlock(label, mediator, h_0, h_neg2, q3)

Bases: StandardStateBlock

Models a clock bias (seconds), drift (seconds/second), and optional drift rate (seconds/second^2).

Uses allan variance parameters to create a clock model for the system dynamics model covariance matrix.

receive_aux_data(_)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_state_block_aux_data() is called with a label corresponding to this state block’s label.

Parameters:

aux (list[Message | None])

generate_dynamics(gen_x_and_p_func, time_from, time_to)

Generate a pntos.api.StandardDynamicsModel.

The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.

Parameters:
  • gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.

  • time_from (TypeTimestamp) – The time to propagate from.

  • time_to (TypeTimestamp) – The time to propagate to.

Returns:

The description of how to propagate this state block over the given time interval, or None if time_from is later than time_to. Otherwise guaranteed to not return None.

Return type:

StandardDynamicsModel | None

calc_Hwang_Brown_Q(h_0, h_n2, dt)

Calculate the discrete system noise matrix.

Using the 10.4.1 - 10.4.3 from Introduction to Random Signals And Applied Kalman Filtering, second edition.

Same as Q3 model in navtk block.

class pntos.cobra.internal.ConstantStateBlock(label, mediator, num_states, Q=None)

Bases: StandardStateBlock

Estimates 1 or more “constant” states, i.e. states with no dynamics.

receive_aux_data(_)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_state_block_aux_data() is called with a label corresponding to this state block’s label.

Parameters:

aux (list[Message | None])

generate_dynamics(gen_x_and_p_func, time_from, time_to)

Return a constant dynamics model, where g(x) = x.

class pntos.cobra.internal.Direction3DToPointsMeasurementProcessor(label, state_block_labels, mediator, l_ps_p, orientation)

Bases: StandardMeasurementProcessor

Generates a model that maps a Direction3DToPoints measurement to an inertial error state block.

See generate_model() for a detailed description of the assumptions and capabilities of this processor.

__init__(label, state_block_labels, mediator, l_ps_p, orientation)

A Direction3DToPoints Measurement Processor

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.

  • mediator (Mediator) – a Mediator instance.

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generates the model mapping state estimates to the provided measurement.

Parameters:
  • message (Message) – Measurement to process. message.wrapped_message must be a MeasurementDirection3DToPoints using the SINE_SPACE reference frame.

  • gen_x_and_p_func (GenXandP) – Callback to get the state estimate and covariance for the pinson-style block this processor is updating. NED position errors in meters are expected in at indices [0:3] and NED tilt errors in radians at indices [6:9].

Returns:

StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.

Model Description and Derivation

TODO: Add model description and derivation

class pntos.cobra.internal.FogmBlock(label, mediator, sigmas, taus)

Bases: StandardStateBlock

A StateBlock that represents ‘n’ first-order Gauss-Markov processes.

__init__(label, mediator, sigmas, taus)

Constructor

Parameters:
  • label (str) – Label for this block.

  • mediator (Mediator) – A pntos.api.Mediator instance.

  • sigmas (NDArray[float64]) – Nx1 array of FOGM noise sigmas; units will vary.

  • taus (NDArray[float64]) – Nx1 array of FOGM time constants, in seconds. Must be positive.

receive_aux_data(aux)

Receive aux data. Unused for this class.

Parameters:

aux (list[Message | None]) – List of messages.

generate_dynamics(gen_x_and_p_func, time_from, time_to)

Generate a pntos.api.StandardDynamicsModel.

The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.

Parameters:
  • gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.

  • time_from (TypeTimestamp) – The time to propagate from.

  • time_to (TypeTimestamp) – The time to propagate to.

Returns:

The description of how to propagate this state block over the given time interval, or None if time_from is later than time_to. Otherwise guaranteed to not return None.

Return type:

StandardDynamicsModel | None

class pntos.cobra.internal.Pinson15NedBlock(label, mediator, imu_model)

Bases: StandardStateBlock

A 15-state representation of the error model of an inertial navigation system in NED frame.

This block is based upon the model provided in the Titterton and Weston 2nd edition textbook (pg. 345). The 15-state model is created by combining the original 9x9 state block with the 6x6 G*u block that relates the gyro biases and accelerometer biases to the tilt and velocity error states, respectively. Additional changes include the conversion to North, East and Down position errors in meters as opposed to the latitude (radians), longitude (radians) and altitude (meters) error states in the book model. Note that the error states are additive, meaning in general you add the error state to the uncorrected value to get the corrected value. Tilt states require special handling; see below.

Tilt errors: The North, East and Down tilt errors are 3 small angle corrections that when represented in skew-symmetric form and subtracted from an identity matrix may be interpreted as a DCM that rotates a vector from an estimated navigation frame to the ‘true’ navigation frame, to the extent that the error states are correct. A positive tilt error results in a negative right-handed rotation about the axis to which it is attached. For example, if the sensor frame is aligned with the local vertical with 90 degree heading (sensor x axis is aligned with East axis), and the down tilt value is 1 degree, the corrected heading will be approximately 89 degrees.

The AspnBaseVector provided to this class should come from the inertial for which it is providing error estimates. Accel and gyro bias states are generally in the inertial sensor frame, but more precisely, they are in the frame that is related to the navigation frame by the NavSolution::rot_mat provided in AspnBaseVector. This means that if the inertial is mechanizing in the inertial sensor frame, then NavSolution::rot_mat should contain C_nav_to_sensor, or the nav-to-sensor DCM, and the biases will be in the sensor frame. If the inertial is mechanizing in the platform frame (which is very common), then NavSolution::rot_mat should be C_nav_to_platform, the nav-to-platform DCM. The biases will be with respect to that frame.

Note that these definitions only hold when using the ‘additive error state’ formulation (true = estimated + error), the current assumption in all off-the-shelf measurement processors. The opposite formulation will flip the sign on estimated values.

Order and description of states:

0 - North position error (m). 1 - East position error (m). 2 - Down position error (m). 3 - North velocity error (m/s). 4 - East velocity error (m/s). 5 - Down velocity error (m/s). 6 - North tilt error (rad). 7 - East tilt error (rad). 8 - Down tilt error (rad). 9 - Accel x-axis bias error (m/s^2) (See note on frame above). 10 - Accel y-axis bias error (m/s^2). 11 - Accel z-axis bias error (m/s^2). 12 - Gyro x-axis bias error (rad/s). 13 - Gyro y-axis bias error (rad/s). 14 - Gyro z-axis bias error (rad/s).

__init__(label, mediator, imu_model)

A Pinson15 NED Standard State Block

Parameters:
receive_aux_data(aux)

Receive inertial PVA and forces as aux data.

Parameters:

aux (list[Message | None]) – List of messages. Assumed to contain inertial solution in a PVA message and forces in an IMU message.

generate_dynamics(gen_x_and_p_func, time_from, time_to)

Generate a pntos.api.StandardDynamicsModel.

The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.

Parameters:
  • gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.

  • time_from (TypeTimestamp) – The time to propagate from.

  • time_to (TypeTimestamp) – The time to propagate to.

Returns:

The description of how to propagate this state block over the given time interval, or None if time_from is later than time_to. Otherwise guaranteed to not return None.

Return type:

StandardDynamicsModel | None

scale_phi(Phi)

Scale first 15 elements of first 2 columns of phi such to account for change in rad to meter scale factors over propagation time.

Parameters:

Phi (NDArray[float64]) – Matrix obtained by discretizing the propagation Jacobian. This matrix will be modified to account for the rad to meter scaling.

generate_f_pinson15()

Generates the continuous time propagation matrix F.

F is the Jacobian of the differential equations governing inertial error growth. This is based upon the model given in Titterton and Weston, 2nd edition.

Returns:

The F Matrix.

Return type:

NDArray[float64]

generate_q_pinson15()

Generates the continuous time process noise covariance matrix Q.

Returns:

The Q Matrix.

Return type:

NDArray[float64]

class pntos.cobra.internal.PinsonBodyVelocityMeasurementProcessor(label, state_block_labels, mediator, l_ps_p, orientation_ps_p)

Bases: StandardMeasurementProcessor

Generates a model that maps a velocity measurement in the body/sensor frame to an inertial error states block.

__init__(label, state_block_labels, mediator, l_ps_p, orientation_ps_p)

A Pinson Body/Sensor Velocity Measurement Processor

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block where the first (or only) 15 states must be the same as Pinson15NedBlock.

  • mediator (Mediator) – a Mediator instance

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the velocity sensor origin, in the platform frame, in units of meters.

  • orientation_ps_p (NDArray[float64]) – A 4-element quaternion representing the rotational difference from the platform frame to the sensor frame. The corresponding DCM would be C_platform_to_sensor.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generate a pntos.api.StandardMeasurementModel.

Parameters:
  • message (Message) – The measurement/observation to process.

  • gen_x_and_p_func (GenXandP) – A callback function that can be used to retrieve the current estimate and covariance for the state blocks this measurement processor targets.

Returns:

A generated model containing the parameters required for a filter update. Will be None when a measurement cannot be produced from message (for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).

Return type:

StandardMeasurementModel | None

class pntos.cobra.internal.PinsonPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)

Bases: StandardMeasurementProcessor

Generates a model that maps a position measurement to an inertial error state block.

See generate_model() for a detailed description of the assumptions and capabilities of this processor.

__init__(label, state_block_labels, mediator, l_ps_p)

A Pinson Position Measurement Processor

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.

  • mediator (Mediator) – a Mediator instance.

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generates the model mapping state estimates to the provided measurement.

Parameters:
  • message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPosition using the GEODETIC reference frame.

  • gen_x_and_p_func (GenXandP) – Callback to get the state estimate and covariance for the pinson-style block this processor is updating. NED position errors in meters are expected in at indices [0:3] and NED tilt errors in radians at indices [6:9].

Returns:

StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.

Model Description and Derivation

Measurements accepted by this this processor are modeled as

\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p + \eta(0, \sigma_z)\)

where

\(P_s^g\) is the measured position vector of the \(s\) sensor frame origin in some global Cartesian frame \(g\).

\(P_p^g\) is the true position of the \(p\) platform frame origin in the \(g\) frame.

\(C^g_p\) is the true rotation from the \(p\) frame to the \(g\) frame.

\(l_{p \Rightarrow s}^p\) is the lever arm, a vector from the \(p\) frame origin to the \(s\) frame origin, expressed in the \(p\) frame.

\(\eta(0, \sigma_z)\) is 0-mean Gaussian white noise.

In other words, the measurement is modeled as perfect aside from white noise, offset from the platform frame by a known lever arm.

The measurement is used to update the estimates of the error in the nominal trajectory of the platform frame, provided through a MeasurementPositionVelocityAttitude (PVA) via the receive_aux_data() function. This PVA provides the following values of interest:

\(\hat{P_p^g}\): The estimated position of the platform frame origin in the \(g\) frame.

\(\hat{C^{ned}_p}\): The estimated orientation of the platform frame with respect to the NED frame.

We wish to use this measurement to update estimates of the following states contained in the ‘Pinson’ state block:

\(\delta P_p^{ned}\): the estimate of the error in the current nominal position of the platform frame, expressed in the North-East-Down (NED) frame. The relationship between the true position, the nominal position, and the error state estimates is \(P = \hat{P} + \delta P\)

\(\delta \psi^{ned}\): The NED frame tilt error estimates. The relationship between the true and estimated rotations is \(C^{ned}_p = C^{ned}_{\hat{ned}}C^{\hat{ned}}_p \approx [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p\) (where \(\times\) is the skew/cross operator).

The measurement vector provided to the filter \(z\) is the difference between the measurement and the nominal platform frame position (white noise terms dropped, and \(ned\) frame used as g frame):

\(z = P_s^{ned} - \hat{P^{ned}_p}\)

The measurement model \(h(x)\) is found by expanding \(z\) in terms of the states \(x\) (i.e. \(\delta P_p^{ned}\) and \(\delta \psi^{ned}\)):

\(P_s^{ned} - \hat{P^{ned}_p} = P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \hat{P}_p^{ned}\)

\(= \hat{P_p^{ned}} + \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \hat{P}_p^{ned}\)

\(= \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p\)

\(h(x) \approx \delta P_p^{ned} + [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p l_{p \Rightarrow s}^p\)

Finally, to generate the measurement Jacobian \(H\) we take derivatives of \(h(x)\):

\(\frac{\delta h(x)}{d \delta P_p^{ned}} = I\)

\(\frac{\delta h(x)}{d \delta \psi^{ned}} = C^{\hat{ned}}_p l_{p \Rightarrow s}^p \times\)

class pntos.cobra.internal.PinsonPosVelMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)

Bases: StandardMeasurementProcessor

Generates a model that maps PVA measurements to an inertial error state block.

NOTE: Only the position and velocity fields of the PVA measurement are used, attitude is ignored.

See generate_model() for a detailed description of the assumptions and capabilities of this processor.

__init__(label, state_block_labels, mediator, l_ps_p)

A Pinson Position and Velocity Measurement Processor

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states, NED velocity errors as states 3:6, and NED tilt errors, in radians, as states 6:9.

  • mediator (Mediator) – a Mediator instance.

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generates the model mapping state estimates to the provided measurement.

Parameters:
  • message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPositionVelocityAttitude using the GEODETIC reference frame.

  • gen_x_and_p_func (GenXandP) – Callback to get the current state estimate and covariance for the pinson-style block this processor is updating. NED position errors in meters are expected in at indices [0:3], NED velocity errors at indices [3:6], and NED tilt errors in radians at indices [6:9].

Returns:

StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.

Model Description and Derivation

This measurement processes combined position and velocity measurements as a PVA measurement. The model for the position measurement is identical to that of PinsonPositionMeasurementProcessor, and the model for the velocity measurement is identical to that of PinsonVelocityMeasurementProcessor.

class pntos.cobra.internal.PinsonVelocityMeasurementProcessor(label, state_block_labels, mediator)

Bases: StandardMeasurementProcessor

Generates a model that maps a velocity measurement to an inertial error state block.

__init__(label, state_block_labels, mediator)

A Pinson Velocity Measurement Processor

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.

  • mediator (Mediator) – A pntos.api.Mediator instance.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generate a pntos.api.StandardMeasurementModel.

Parameters:
  • message (Message) – The measurement/observation to process.

  • gen_x_and_p_func (GenXandP) – A callback function that can be used to retrieve the current estimate and covariance for the state blocks this measurement processor targets.

Returns:

A generated model containing the parameters required for a filter update. Will be None when a measurement cannot be produced from message (for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).

Return type:

StandardMeasurementModel | None

class pntos.cobra.internal.PinsonWithLeverArmPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)

Bases: StandardMeasurementProcessor

Maps a position measurement to an inertial error state block, a lever arm offset block, and a sensor error block.

See generate_model() for a detailed description of the assumptions and capabilities of this processor.

__init__(label, state_block_labels, mediator, l_ps_p)
Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 3-length list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9. The second state block entry should refer to a 3-element state block that models the position sensor errors in the NED frame. The third state block entry should refer to a 3-element state block that models the additional platform to position sensor lever arm in the body frame, in meters. The complete lever arm estimate is composed of the sum of these last 3 states and l_ps_p.

  • mediator (Mediator) – a Mediator instance

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.

receive_aux_data(aux)

Receive aux data.

This function is used to provide additional data required by generate_model() beyond what is provided by the measurement and state estimate and covariance. This processor requires aux data that represents the nominal position, velocity and attitude of the platform at the time of measuremet update. The inertial error block referenced in generate_model() represents the estimated errors in this nominal PVA.

Parameters:

aux (list[Message | None]) – Aux data to process. Expected to be length 1 and contain a MeasurementPositionVelocityAttitude with a valid quaternion.

generate_model(message, gen_x_and_p_func)

Generates a StandardMeasurementModel relating a position measurement to the states described in __init__().

Parameters:
  • message (Message) – Measurement used to update. The wrapped_message member must be of type aspn23.MeasurementPosition.

  • gen_x_and_p_func (GenXandP) – Callback to get the estimate and covariance of the states referenced by this model. The layout of the states should follow the description in __init__().

Returns:

Model relating measurement to states. Will return None if a) state_block_labels is incorrectly configured b) message is an unsupported type c) a MeasurementPositionVelocityAttitude with the estimated PVA at the current measurement time has not been received via receive_aux_data().

Model Description and Derivation

This processor uses a model very similar to that used described in pntos.cobra.internal.PinsonWithNedFogmPositionMeasurementProcessor.generate_model(). The primary difference is that instead of assuming the lever arm between the platform frame and the sensor frame is perfectly known, the model incorporates 3 additional states to account for the possible deviation between the assumed lever arm provided to __init__() and the actual lever arm.

The model for the measurement is the same as that of pntos.cobra.internal.PinsonWithNedFogmPositionMeasurementProcessor:

\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p - \delta z^{g} + \eta(0, \sigma_z)\)

but now we can rewrite the true lever arm \(l_{p \Rightarrow s}^p\) in terms of the estimated lever arm \(\hat{l}_{p \Rightarrow s}^p\) plus deviation \(\delta l_{p \Rightarrow s}^p\):

\(P_s^g = P_p^g + C^g_p(\hat{l}_{p \Rightarrow s}^p + \delta l_{p \Rightarrow s}^p) - \delta z^{g} + \eta(0, \sigma_z)\).

Following the procedure outlined in the documentation for the other position processors we form the filter update vector by differencing the measurement and the estimated platform position:

\(z = P_s^{ned} - \hat{P^{ned}_p}\)

\(= P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \delta z_{ned} - \hat{P}_p^{ned}\)

\(= \hat{P_p^{ned}} + \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \delta z^{ned} - \hat{P}_p^{ned}\)

\(= \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \delta z^{ned}\)

The measurement prediction function is found by expanding this in terms of state estimates and nominal values:

\(h(x) \approx \delta P_p^{ned} + [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p (\hat{l}_{p \Rightarrow s}^p + \delta l_{p \Rightarrow s}^p) - \delta z^{ned}\)

Taking the derivative with respect to the states we get the Jacobian elements:

\(\frac{\delta h(x)}{d \delta P_p^{ned}} = I\)

\(\frac{\delta h(x)}{d \delta \psi^{ned}} = C^{\hat{ned}}_p(\hat{l}_{p \Rightarrow s}^p + \delta l_{p \Rightarrow s}^p)\times\)

\(\frac{\delta h(x)}{d \delta l_{p \Rightarrow s}^p} = [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p\)

\(\frac{\delta h(x)}{d \delta z^{ned}} = -I\)

class pntos.cobra.internal.PinsonWithNedFogmPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)

Bases: StandardMeasurementProcessor

Generates a model that maps a position measurement to an inertial error state block and a position measurement error block.

See generate_model() for a detailed description of the assumptions and capabilities of this processor.

__init__(label, state_block_labels, mediator, l_ps_p)
Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 2-length list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9. The second state block entry should refer to a 3-element FOGM state block that models the position sensor errors in the NED frame.

  • mediator (Mediator) – a Mediator instance

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generates the model mapping state estimates to the provided measurement.

Parameters:
  • message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPosition using the GEODETIC reference frame.

  • gen_x_and_p_func (GenXandP) – Callback to get joint state estimate and covariance for both the pinson-style block and sensor measurement error blocks this processor is updating. NED platform position errors in meters are expected in at indices [0:3] and NED tilt errors in radians at indices [6:9]. Sensor measurement error states in the NED frame are expected to be the last 3 states.

Returns:

StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.

Model Description and Derivation

This processor uses a model identical to that used described in pntos.cobra.internal.PinsonPositionMeasurementProcessor.generate_model(), aside from the addition of error states to track position error present in the sensor measurements. See the linked function for definitions of terms not repeated here.

The addition of the new states changes the original sensor measurement model from:

\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p + \eta(0, \sigma_z)\)

to

\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p - \delta z^{g} + \eta(0, \sigma_z)\)

where \(\delta z^{g}\) is measurement error in the \(g\) frame.

This results in additional term being added to the prior model \(h(x)\), giving:

\(h(x) \approx \delta P_p^{ned} + [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p l_{p \Rightarrow s}^p - \delta z^{ned}\)

and an additional entry in the Jacobian matrix \(H\):

\(\frac{\delta h(x)}{d \delta z^{ned}} = -I\)

class pntos.cobra.internal.PositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)

Bases: StandardMeasurementProcessor

Generates a model that maps a position measurement to an inertial direct state block.

See generate_model() for a detailed description of the assumptions and capabilities of this processor.

__init__(label, state_block_labels, mediator, l_ps_p)

A Geodetic 3D Position Measurement Processor.

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 2-element list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.

  • mediator (Mediator) – a Mediator instance.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generates the model mapping state estimates to the provided measurement.

Parameters:
  • message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPosition using the GEODETIC reference frame.

  • gen_x_and_p_func – Callback to get the current joint state estimate and covariance for both the PinsonErrorToStandard virtual state block and sensor measurement error blocks this processor is updating. LLA position in rad, rad, meters respectively are expected at indices [0:3] and RPY tilt in radians are expected at indices [6:9]. Sensor measurement error states in units matching indices [0:3] are expected to be the last 3 states.

Returns:

StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.

class pntos.cobra.internal.StandardStateModelProvider(mediator)

Bases: StandardStateModelProvider

StandardStateModelProvider that offers a 15-state pinson state block, variable-size Fogm Block and various position measurement processors.

__init__(mediator)

Standard Position and INS State Model Provider

Parameters:

mediator (Mediator) – A :class:(Mediator) instance.

new_processor(processor_index, engine, label, state_block_labels, config_group)

Generate a new StandardMeasurementProcessor that describes the relationship between a measurement and a set of states.

Parameters:
Returns:

The newly created StandardMeasurementProcessor or None when no processor can be produced with the given processor_index, engine, and config_group.

Return type:

StandardMeasurementProcessor | None

new_block(block_index, engine, label, config_group)

Generate a new StandardStateBlock that describes a set of states and how they propagate over time.

Parameters:
  • block_index (int) –

    Index into self.block_identifiers used to select the desired type of state block.

  • engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new block, such that the block may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to None when no engine is available for the block to use.

  • label (str) – A string which will be used to populate the label field of the newly created state block. This label will be the unique name for the returned instance of a state block, and used to track the state block throughout its lifecycle. Note that it differs from pntos.api.StandardStateModelProvider.block_identifiers which is the model’s mechanism for selecting the kind of state block to create.

  • config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new state block. If the state block requires no outside configuration, config_group may be None.

Returns:

The newly created StandardStateBlock or None when no state block can be produced with the given block_index, engine, and config_group.

Return type:

StandardStateBlock | None

new_virtual_block(virtual_block_index, source_label, target_label, config_group)

Generate a new VirtualStateBlock that converts a set of states to another representation.

Parameters:
  • virtual_block_index (int) –

    Index into self.virtual_block_identifiers used to select the desired type of state block.

  • source_label (str) – A string which will be used to populate the source field of the newly created virtual state block. This source_label should correspond to either a different ‘real’ or virtual state block.

  • target_label (str) – A string which will be used to populate the target field of the newly create virtual state block. This target should be unique, differing from all other targets on the other instances of pntos.api.VirtualStateBlock.

  • config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new virtual state block. If the state block requires no outside configuration, config_group may be None.

Returns:

The newly created VirtualStateBlock or None when no virtual state block can be produced with the given block_index, engine, and config_group.

Return type:

VirtualStateBlock | None

class pntos.cobra.internal.PinsonErrorToStandard(mediator, source, target)

Bases: VirtualStateBlock

A VirtualStateBlock that maps Pinson-style error states into a ‘Standard’ (or direct) whole-valued representation by combining the error states with the uncorrected reference PVA.

Output Transformation States in Order:

0 - Latitude (rad)

1 - Longitude (rad)

2 - Altitude HAE (m)

3 - North Velocity (m/s)

4 - East Velocity (m/s)

5 - Down Velocity (m/s)

6 - Roll (rad)

7 - Pitch (rad)

8 - Yaw (rad)

Additional trailing states are retained and unmodified. See Pinson15NedBlock.py for a description of input states.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_virtual_state_block_aux_data() is called with a label corresponding to this pntos.api.VirtualStateBlock ‘s target.

Parameters:

aux (list[Message | None])

convert(estimate_with_covariance, time)

Convert a full estimate/covariance pair.

Parameters:
  • estimate_with_covariance (EstimateWithCovariance) – Estimate and covariance to convert.

  • time (TypeTimestamp) – Time that estimate_with_covariance is valid at.

Returns:

The converted value.

Return type:

EstimateWithCovariance

convert_estimate(estimate, time)

Convert just an estimate vector.

Parameters:
  • estimate (NDArray[float64]) – Estimate vector to convert, Nx1.

  • time (TypeTimestamp) – Time that estimate is valid at.

Returns:

The converted vector, Mx1.

Return type:

NDArray[float64]

jacobian(estimate, time)

Obtain the Jacobian of the transform performed by this instance.

The Jacobian is calculated at an instance in time, given an estimate to differentiate with respect to.

Parameters:
  • estimate (NDArray[float64]) – Estimate vector associated with the return value of source, Nx1.

  • time (TypeTimestamp) – Time that estimate is valid at.

Returns:

An MxN matrix that may be used to pre-multiply estimate to obtain an M length vector in target representation (to first order).

Return type:

NDArray[float64]

class pntos.cobra.internal.StateExtractor(mediator, source, target, incoming_state_size, indices)

Bases: VirtualStateBlock

A VirtualStateBlock that extracts and returns a series of states from an input estimate and covariance.

__init__(mediator, source, target, incoming_state_size, indices)
Parameters:
  • mediator (Mediator) – A class:pntos.api.Mediator instance

  • source (str) – The label associated with the representation this instance can transform from

  • target (str) – The label associated with the representation this instance can transform to

  • incoming_state_size (int) – The number of states in the StateBlock source refers to

  • indices (list[int]) – The series of indices that correspond to states to extract from source and comprise target

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_virtual_state_block_aux_data() is called with a label corresponding to this pntos.api.VirtualStateBlock ‘s target.

Parameters:

aux (list[Message | None])

convert(estimate_with_covariance, time)

Convert a full estimate/covariance pair.

Parameters:
  • estimate_with_covariance (EstimateWithCovariance) – Estimate and covariance to convert.

  • time (TypeTimestamp) – Time that estimate_with_covariance is valid at.

Returns:

The converted value.

Return type:

EstimateWithCovariance

convert_estimate(estimate, time)

Convert just an estimate vector.

Parameters:
  • estimate (NDArray[float64]) – Estimate vector to convert, Nx1.

  • time (TypeTimestamp) – Time that estimate is valid at.

Returns:

The converted vector, Mx1.

Return type:

NDArray[float64]

jacobian(estimate, time)

Obtain the Jacobian of the transform performed by this instance.

The Jacobian is calculated at an instance in time, given an estimate to differentiate with respect to.

Parameters:
  • estimate (NDArray[float64]) – Estimate vector associated with the return value of source, Nx1.

  • time (TypeTimestamp) – Time that estimate is valid at.

Returns:

An MxN matrix that may be used to pre-multiply estimate to obtain an M length vector in target representation (to first order).

Return type:

NDArray[float64]

class pntos.cobra.internal.StaticAlign(config_group, mediator)

Bases: InertialInitializationStrategy

Static alignment for an inertial.

This initialization strategy can be used to produce an initial PVA to initialize an inertial mechanization. It requires position and IMU data, performing gyrocompassing to estimate an initial attitude from the IMU data. It does not estimate initial inertial errors.

__init__(config_group, mediator)
Parameters:
request_motion_needed()

Check the type of motion (if any) needed.

Returns:

InitializationMotionNeeded

request_current_status()

Check the current initialization status.

Returns:

InitializationStatus

process_pntos_message(message)

Incorporate a new message into the initialization algorithm.

Parameters:

message (Message)

request_solution()

Get the current initial solution.

Returns:

InitialInertialSolution

class pntos.cobra.internal.TutorialFogmBlock(label, mediator, sigmas, taus)

Bases: StandardStateBlock

A StateBlock that represents ‘n’ first-order Gauss-Markov processes.

__init__(label, mediator, sigmas, taus)

Constructor

Parameters:
  • label (str) – Label for this block.

  • mediator (Mediator) – A pntos.api.Mediator instance.

  • sigmas (NDArray[float64]) – Nx1 array of FOGM noise sigmas; units will vary.

  • taus (NDArray[float64]) – Nx1 array of FOGM time constants, in seconds. Must be positive.

receive_aux_data(aux)

Receive aux data. Unused for this class.

Parameters:

aux (list[Message | None]) – List of messages.

generate_dynamics(gen_x_and_p_func, time_from, time_to)

Generate a pntos.api.StandardDynamicsModel.

The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.

Parameters:
  • gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.

  • time_from (TypeTimestamp) – The time to propagate from.

  • time_to (TypeTimestamp) – The time to propagate to.

Returns:

The description of how to propagate this state block over the given time interval, or None if time_from is later than time_to. Otherwise guaranteed to not return None.

Return type:

StandardDynamicsModel | None

class pntos.cobra.internal.TutorialPinson15NedBlock(label, mediator, imu_model)

Bases: StandardStateBlock

A 15-state representation of the error model of an inertial navigation system in NED frame.

This block is based upon the model provided in the Titterton and Weston 2nd edition textbook (pg. 345). The 15-state model is created by combining the original 9x9 state block with the 6x6 G*u block that relates the gyro biases and accelerometer biases to the tilt and velocity error states, respectively. Additional changes include the conversion to North, East and Down position errors in meters as opposed to the latitude (radians), longitude (radians) and altitude (meters) error states in the book model. Note that the error states are additive, meaning in general you add the error state to the uncorrected value to get the corrected value. Tilt states require special handling; see below.

Tilt errors: The North, East and Down tilt errors are 3 small angle corrections that when represented in skew-symmetric form and subtracted from an identity matrix may be interpreted as a DCM that rotates a vector from an estimated navigation frame to the ‘true’ navigation frame, to the extent that the error states are correct. A positive tilt error results in a negative right-handed rotation about the axis to which it is attached. For example, if the sensor frame is aligned with the local vertical with 90 degree heading (sensor x axis is aligned with East axis), and the down tilt value is 1 degree, the corrected heading will be approximately 89 degrees.

The AspnBaseVector provided to this class should come from the inertial for which it is providing error estimates. Accel and gyro bias states are generally in the inertial sensor frame, but more precisely, they are in the frame that is related to the navigation frame by the NavSolution::rot_mat provided in AspnBaseVector. This means that if the inertial is mechanizing in the inertial sensor frame, then NavSolution::rot_mat should contain C_nav_to_sensor, or the nav-to-sensor DCM, and the biases will be in the sensor frame. If the inertial is mechanizing in the platform frame (which is very common), then NavSolution::rot_mat should be C_nav_to_platform, the nav-to-platform DCM. The biases will be with respect to that frame.

Note that these definitions only hold when using the ‘additive error state’ formulation (true = estimated + error), the current assumption in all off-the-shelf measurement processors. The opposite formulation will flip the sign on estimated values.

Order and description of states:

0 - North position error (m). 1 - East position error (m). 2 - Down position error (m). 3 - North velocity error (m/s). 4 - East velocity error (m/s). 5 - Down velocity error (m/s). 6 - North tilt error (rad). 7 - East tilt error (rad). 8 - Down tilt error (rad). 9 - Accel x-axis bias error (m/s^2) (See note on frame above). 10 - Accel y-axis bias error (m/s^2). 11 - Accel z-axis bias error (m/s^2). 12 - Gyro x-axis bias error (rad/s). 13 - Gyro y-axis bias error (rad/s). 14 - Gyro z-axis bias error (rad/s).

__init__(label, mediator, imu_model)

A Pinson15 NED Standard State Block

Parameters:
receive_aux_data(aux)

Receive inertial PVA and forces as aux data.

Parameters:

aux (list[Message | None]) – List of messages. Assumed to contain inertial solution in a PVA message and forces in an IMU message.

generate_dynamics(gen_x_and_p_func, time_from, time_to)

Generate a pntos.api.StandardDynamicsModel.

The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.

Parameters:
  • gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.

  • time_from (TypeTimestamp) – The time to propagate from.

  • time_to (TypeTimestamp) – The time to propagate to.

Returns:

The description of how to propagate this state block over the given time interval, or None if time_from is later than time_to. Otherwise guaranteed to not return None.

Return type:

StandardDynamicsModel | None

generate_f_pinson15()

Generates the continuous time propagation matrix F.

F is the Jacobian of the differential equations governing inertial error growth. This is based upon the model given in Titterton and Weston, 2nd edition.

Returns:

The F Matrix.

Return type:

NDArray[float64]

generate_q_pinson15()

Generates the continuous time process noise covariance matrix Q.

Returns:

The Q Matrix.

Return type:

NDArray[float64]

class pntos.cobra.internal.TutorialPinsonVelocityMeasurementProcessor(label, state_block_labels, mediator)

Bases: StandardMeasurementProcessor

Generates a model that maps a velocity measurement to an inertial error state block.

__init__(label, state_block_labels, mediator)

A Pinson Velocity Measurement Processor

Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.

  • mediator (Mediator) – A pntos.api.Mediator instance.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generate a pntos.api.StandardMeasurementModel.

Parameters:
  • message (Message) – The measurement/observation to process.

  • gen_x_and_p_func (GenXandP) – A callback function that can be used to retrieve the current estimate and covariance for the state blocks this measurement processor targets.

Returns:

A generated model containing the parameters required for a filter update. Will be None when a measurement cannot be produced from message (for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).

Return type:

StandardMeasurementModel | None

class pntos.cobra.internal.TutorialPinsonWithNedFogmPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)

Bases: StandardMeasurementProcessor

Generates a model that maps a position measurement to an inertial error state block and a position measurement error block.

__init__(label, state_block_labels, mediator, l_ps_p)
Parameters:
  • label (str) – Name of processor.

  • state_block_labels (list[str]) – A 2-length list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9. The second state block entry should refer to a 3-element FOGM state block that models the position sensor errors in the NED frame.

  • mediator (Mediator) – a Mediator instance

  • l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.

receive_aux_data(aux)

Receive and use an arbitrary collection of aux data.

This method will be called by the fusion engine when its pntos.api.StandardFusionEngine.give_measurement_processor_aux_data() is called with a label corresponding to this measurement processor’s label.

Parameters:

aux (list[Message | None])

generate_model(message, gen_x_and_p_func)

Generate a pntos.api.StandardMeasurementModel.

Parameters:
  • message (Message) – The measurement/observation to process.

  • gen_x_and_p_func (GenXandP) – A callback function that can be used to retrieve the current estimate and covariance for the state blocks this measurement processor targets.

Returns:

A generated model containing the parameters required for a filter update. Will be None when a measurement cannot be produced from message (for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).

Return type:

StandardMeasurementModel | None

class pntos.cobra.internal.TutorialPosInsStateModelProvider(mediator)

Bases: StandardStateModelProvider

A tutorial implementation of StandardStateModelProvider that offers a 15-state pinson state block, variable-size FOGM block, a position measurement processor and a velocity measurement processor.

__init__(mediator)

Tutorial Position and INS State Model Provider

Parameters:

mediator (Mediator) – A :class:(Mediator) instance.

virtual_block_identifiers

List of identifiers for virtual state blocks.

new_processor(processor_index, engine, label, state_block_labels, config_group)

Generate a new StandardMeasurementProcessor that describes the relationship between a measurement and a set of states.

Parameters:
  • processor_index (int) – Index into self.processor_identifiers used to select the desired type of measurement processor. - Index 0 corresponds to a pntos.cobra.internal.TutorialPinsonVelocityMeasurementProcessor. - Index 1 corresponds to a pntos.cobra.internal.TutorialPinsonWithNedFogmPositionMeasurementProcessor. - All other indices will result in a return value of None.

  • engine (pntos.api.plugins.fusion.StandardFusionEngine | None) – An optional parameter that may be provided to the new processor, such that the processor may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to None when no engine is available for the processor to use.

  • label (str) – A string which will be used to populate the label field of the newly created processor. This label will be the unique name for the returned instance of a processor, and used to track the processor throughout its lifecycle. Note that it differs from pntos.api.StandardStateModelProvider.processor_identifiers which is the model’s mechanism for selecting the type of processor to create.

  • state_block_labels (list[str]) – A list of strings which will be used to populate the state_block_labels field of the newly created processor.

  • config_group (str) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new processor. If the processor requires no outside configuration, config_group may be None.

Returns:

The newly created StandardMeasurementProcessor or None when no processor can be produced with the given processor_index, engine, and config_group.

Return type:

StandardMeasurementProcessor | None

new_block(block_index, engine, label, config_group)

Generate a new StandardStateBlock that describes a set of states and how they propagate over time.

Parameters:
  • block_index (int) –

    Index into self.block_identifiers used to select the desired type of state block.

  • engine (pntos.api.plugins.fusion.StandardFusionEngine | None) – An optional parameter that may be provided to the new block, such that the block may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to None when no engine is available for the block to use.

  • label (str) – A string which will be used to populate the label field of the newly created state block. This label will be the unique name for the returned instance of a state block, and used to track the state block throughout its lifecycle. Note that it differs from pntos.api.StandardStateModelProvider.block_identifiers which is the model’s mechanism for selecting the kind of state block to create.

  • config_group (str) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new state block. If the state block requires no outside configuration, config_group may be None.

Returns:

The newly created StandardStateBlock or None when no state block can be produced with the given block_index, engine, and config_group.

Return type:

StandardStateBlock | None

new_virtual_block(virtual_block_index, source_label, target_label, config_group)

Generate a newly created pntos.api.VirtualStateBlock.

This virtual state block is used to convert a set of states from one representation to another.

Parameters:
  • virtual_block_index (int) – Since the pntos.api.StandardStateModelProvider can create different kinds of virtual state blocks, the virtual_block_index parameter is used to select which kind of virtual state block to create a new instance of. The pntos.api.StandardStateModelProvider.virtual_block_identifiers field contains identifying strings for the kinds of virtual state blocks. For example, if the model can create 45 different virtual state blocks, the identifier of the last virtual state block that can be created is found in virtual_block_identifiers[44]. An instance of this virtual state block can be created by calling new_virtual_block(self, 44, ...). Note that 0 <= virtual_block_index < len(virtual_block_identifiers).

  • source_label (str) – The label of the state block or virtual state block whose states this virtual state block transforms.

  • target_label (str) – A unique identifier for this virtual state block.

  • config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new virtual state block. If the virtual state block requires no outside configuration, config_group may be None.

Returns:

The newly created pntos.api.VirtualStateBlock or None when no virtual state block can be produced with the given virtual_block_index and config_group.

Return type:

VirtualStateBlock | None

class pntos.cobra.internal.ManualInitialization(config_group, mediator)

Bases: InertialInitializationStrategy

A simple initialization strategy that generates an initial solution from an alignment config in the registry. This implementation is designed to work with a manual alignment.

__init__(config_group, mediator)

A Simple Initialization Strategy

Parameters:
request_motion_needed()

Check the type of motion (if any) needed.

Returns:

InitializationMotionNeeded

request_current_status()

Check the current initialization status.

Returns:

InitializationStatus

process_pntos_message(message)

Incorporate a new message into the initialization algorithm.

Parameters:

message (Message)

request_solution()

Get the current initial solution.

Returns:

InitialInertialSolution