Cobra Internal
These are all the objects that can be directly imported from pntos.cobra.internal.
Warning
Users do not normally need access to these objects. Make sure you know what you’re doing if you’re directly accessing these objects.
- class pntos.cobra.internal.BuscatMediator(attached_plugin_identifier, attached_plugin_type)
Bases:
MediatorThis is a simple Buscat mediator implementation. It was designed to be used in conjunction with the
pntos.cobra.BuscatControllerPluginwhich is why this controller directly access private members. It has one public memberregistrythat other plugins are allowed to access.- __init__(attached_plugin_identifier, attached_plugin_type)
Buscat Mediator
- Parameters:
attached_plugin_identifier (str) – The identifier field of the plugin this mediator is assigned to.
attached_plugin_type (PluginType | None) – The abstract plugin type of the plugin this mediator is assigned to.
- property filter_description_list
Request a list of strings describing the solutions available.
One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).Strings should include a substring indicating the type of solution returned. This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
If the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the solutions available.
- Return type:
list[str]
- request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The times at which to return solutions.
filter_description (str | None, optional) – To select which filter(s) to request solutions from, enter a valid filter description string in
filter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particular implementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. Some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagearray may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- process_pntos_message(message)
Send a new message to the system for arbitrary processing.
For example, this function is useful for plugins who have just received new sensor data that they wish to relay to the system to be used in a sensor fusion solution.
- Parameters:
message (Message)
- broadcast_aspn_message(message, transport=None, destination_identifier=None)
Request that pntOS broadcast the provided message out to the network.
- Parameters:
message (Message)
transport (str | None, optional) – The identifier of a transport plugin that the message should be routed to. The transport parameter should match the
pntos.api.CommonPlugin.identifierstring of apntos.api.TransportPluginactive in the system. If the transport parameter isNone, this indicates that the message should be broadcast to all available transports.destination_identifier (str | None, optional) – A transport-specific identifier that allows transports to determine how to route the message. If the destination transport has the concept of a channel or topic,
destination_identifiershould be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner defined by the destination transport. Ifdestination_identifierisNone, then the transport should output the message in the “default” output channel/topic and route being used by pntOS.
- log_message(level, message)
Log a message.
Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).
- Parameters:
level (LoggingLevel)
message (str)
- class pntos.cobra.internal.BatchUpdate(*, sequence_id, group, keys)
Bases:
BaseModelCommunicates the most recent values for any keys that have been updated in a group to the front-end.
- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pntos.cobra.internal.ChunkUpdate(*, ordered_updates, unordered_updates)
Bases:
BaseModelCommunicates a “chunk” of
ordered_updates(SubscriptionMode == ALL) andunordered_updates(SubscriptionMode == LAST) to the front-end.- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pntos.cobra.internal.KeyUpdate(*, val, subscription_ids, sequence_id)
Bases:
BaseModelUsed to notify the front-end of the new value at a certain group/key in the registry. This is part of a
BatchUpdate.- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pntos.cobra.internal.Snapshot(*, data)
Bases:
BaseModelA snapshot of the values at all currently-subscribed group/keys in the registry.
- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pntos.cobra.internal.Subscription(*, id, group, key, mode)
Bases:
BaseModelAll data necessary to subscribe/unsubscribe to a certain group/key in the registry with a specific mode (stream behavior).
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pntos.cobra.internal.Write(*, data, sequence_id)
Bases:
BaseModelA batch write request from the front-end with updated values to write at various group/keys in the registry.
- model_config = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pntos.cobra.internal.DummyMediator(plugins=None)
Bases:
MediatorA
Mediatorwith minimal capability. Not for use in production code.- property filter_description_list
Returns the list of strings describing available solutions.
- Returns:
The
filter_description_listof the firstOrchestrationPluginfound, or an empty list if this instance cannot find such a plugin.
- request_solutions(solution_times, filter_description=None)
Returns requested solutions if possible. Ignores filter_description.
- Parameters:
solution_times (list[TypeTimestamp]) – The times at which to return solutions.
filter_description (str | None, optional) – Unused
- Returns:
The return of
request_solutions()of the firstOrchestrationPluginfound, or None if noOrchestrationPluginis available.
- process_pntos_message(message)
Passes message off to all available
OrchestrationPlugins.- Parameters:
message (Message) – Message to process.
- broadcast_aspn_message(message, transport=None, destination_identifier=None)
Attempt to broadcast a message over all available transports.
- Parameters:
message (Message) – The message to broadcast.
transport (str | None) – Usually an identifier used to select a specific transport to broadcast over. Unused in this implementation.
destination_identifier (str | None) – Additional description of the method of broadcast, usually a channel name or similar. If None will be changed to ‘somewhere’.
- log_message(level, message)
Log a message.
Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).
- Parameters:
level (LoggingLevel)
message (str)
- class pntos.cobra.internal.DummyMessageStreamConfig
Bases:
MessageStreamConfigA
MessageStreamConfigimplementation that does nothing but satisfy theinit_orchestration_plugin()parameter requirement.- sequenced_stream_add(message_type, source_identifier=None)
Does nothing.
- Parameters:
message_type (type[AspnBase]) – Unused.
source_identifier (str | None, optional) – Unused.
- sequenced_stream_remove(message_type, source_identifier=None)
Does nothing.
- Parameters:
message_type (type[AspnBase]) – Unused.
source_identifier (str | None, optional) – Unused.
- sequenced_stream_all(enable)
Does nothing.
- Parameters:
enable (bool) – Unused.
- immediate_stream_add(message_type, source_identifier=None)
Does nothing.
- Parameters:
message_type (type[AspnBase]) – Unused.
source_identifier (str | None, optional) – Unused.
- immediate_stream_remove(message_type, source_identifier=None)
Does nothing.
- Parameters:
message_type (type[AspnBase]) – Unused.
source_identifier (str | None, optional) – Unused.
- immediate_stream_all(enable)
Does nothing.
- Parameters:
enable (bool) – Unused.
- class pntos.cobra.internal.StandardMediator(attached_plugin_identifier, attached_plugin_type)
Bases:
MediatorThis is a simple mediator implementation. It was designed to be used in conjunction with the
pntos.cobra.StandardControllerPluginwhich is why this controller directly access private members. It has one public memberregistrythat other plugins are allowed to access.- __init__(attached_plugin_identifier, attached_plugin_type)
Standard Cobra Mediator
- Parameters:
attached_plugin_identifier (str) – The identifier field of the plugin this mediator is assigned to.
attached_plugin_type (PluginType | None) – The abstract plugin type of the plugin this mediator is assigned to.
- property filter_description_list
Request a list of strings describing the solutions available.
One of these description strings may be used when calling
request_solutions(). For consistency, these strings should adhere to the following conventions:Strings should be upper case and have words and acronyms separated by underscores (
UPPER_SNAKE_CASE).Strings should contain the substring
BESTwhen they represent the primary solution.Strings should contain the substring
DEAD_RECKONINGwhen they represent a solution suitable for estimating relative motion or rotation over a period of time. This solution may drift more thanBESTsolutions, as the goal is to allow a user to get an estimate of the relative motion between different times. In the calculation of this solution, some sensor measurement might be excluded. For example, a system with an IMU might provide aDEAD_RECKONINGsolution which is the solution from its free-running inertial mechanization, with resets disabled during the time intervals betweensolution_times(but resets applied before all of thesolution_times).Strings should include a substring indicating the type of solution returned. This substring should contain the string-equivalent to the corresponding ASPN message class name, converted to UPPER_SNAKE_CASE, followed by the string
_ESTIMATE. This allows the user to perform substring matching without a risk of getting a false positive match from a type whose string would be a subset of another type.
Example
If the primary solution is an ASPN PVA then the string
MY_BEST_ASPN_MEASUREMENT_POSITION_VELOCITY_ATTITUDE_ESTIMATEwould fulfill the convention.These conventions allow the user to identify their desired type of solution using substring matching.
- Returns:
A list of strings describing the solutions available.
- Return type:
list[str]
- request_solutions(solution_times, filter_description=None)
Request filtering solutions at the times specified in the array
solution_times.- Parameters:
solution_times (list[TypeTimestamp]) – The times at which to return solutions.
filter_description (str | None, optional) – To select which filter(s) to request solutions from, enter a valid filter description string in
filter_description. Valid filter description strings can be obtained by callingfilter_description_list. Passing inNonewill provide a result specific to a particular implementation. Whenfilter_descriptionisNone, the implementation should endeavor to return its best solution.
- Returns:
An array of messages containing the filter solutions for the requested
solution_times. Some entries may beNoneif they are unavailable at the corresponding time insolution_times. The returnedpntos.api.Messagearray may beNoneiffilter_descriptionis invalid.- Return type:
list[Message | None] | None
- process_pntos_message(message)
Send a new message to the system for arbitrary processing.
For example, this function is useful for plugins who have just received new sensor data that they wish to relay to the system to be used in a sensor fusion solution.
- Parameters:
message (Message)
- broadcast_aspn_message(message, transport=None, destination_identifier=None)
Request that pntOS broadcast the provided message out to the network.
- Parameters:
message (Message)
transport (str | None, optional) – The identifier of a transport plugin that the message should be routed to. The transport parameter should match the
pntos.api.CommonPlugin.identifierstring of apntos.api.TransportPluginactive in the system. If the transport parameter isNone, this indicates that the message should be broadcast to all available transports.destination_identifier (str | None, optional) – A transport-specific identifier that allows transports to determine how to route the message. If the destination transport has the concept of a channel or topic,
destination_identifiershould be populated by the channel or topic. Otherwise, the identifier is populated in a plugin-specific manner defined by the destination transport. Ifdestination_identifierisNone, then the transport should output the message in the “default” output channel/topic and route being used by pntOS.
- log_message(level, message)
Log a message.
Send a loggable message to the system, to be logged through the current logging infrastructure enabled (e.g. the console, a logfile, etc.).
- Parameters:
level (LoggingLevel)
message (str)
- class pntos.cobra.internal.StandardMessageStreamConfig
Bases:
MessageStreamConfigThis is a simple message stream config implementation. As with the
StandardMediatorit is considered as apart of thepntos.cobra.StandardControllerPluginimplementation which allows the controller to access private members and functions when necessary. All other plugins should adhere to API compliant functions.- __init__()
Standard Cobra Message Stream Config
By default, all messages are immediately streamed.
- sequenced_stream_add(message_type, source_identifier=None)
Request messages are streamed in sorted timestamp ordering.
Request messages of the given
message_typeand optionalsource_identifierare streamed in sorted timestamp ordering.- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- sequenced_stream_remove(message_type, source_identifier=None)
Request messages are no longer streamed in sorted timestamp ordering.
Request messages of the given
message_typeand optionalsource_identifierare no longer streamed in sorted timestamp ordering. This will remove a type that was previously added in a call tosequenced_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call tosequenced_stream_all().- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- sequenced_stream_all(enable)
Request all messages are streamed in sorted timestamp ordering.
Note that the ability to do this reliably will depend on the length of the buffer used by the
pntos.api.Mediator.- Parameters:
enable (bool)
- immediate_stream_add(message_type, source_identifier=None)
Request messages are streamed immediately.
Request messages of the given
message_typeand optionalsource_identifierare streamed immediately without delay, buffering, or sorting.- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- immediate_stream_remove(message_type, source_identifier=None)
Request messages are no longer streamed immediately.
Request messages of the given
message_typeand optionalsource_identifierare no longer streamed immediately. This will remove a type that was previously added in a call toimmediate_stream_add(), or remove individual messages from the entire list of messages that was added with a previous call toimmediate_stream_all().- Parameters:
message_type (type[AspnBase])
source_identifier (str | None, optional)
- immediate_stream_all(enable)
Request all messages are streamed immediately without delay, buffering, or sorting.
- Parameters:
enable (bool)
- class pntos.cobra.internal.EkfFusionStrategy(mediator)
Bases:
StandardFusionStrategyThis is an Extended Kalman Filter (EKF) sensor fusion strategy.
It is capable of Bayesian inference on a linearized discrete-time system with Gaussian noise inputs.
- __init__(mediator)
An Extended Kalman Filter Fusion Strategy
- Parameters:
mediator (Mediator) – A
pntos.api.Mediatorinstance.
- property num_states
Get the total number of states this filter is estimating.
The count will initially be zero, until
add_states()is called.- Returns:
Number of states being estimated in this filter.
- Return type:
int
- add_states(initial_estimate, initial_covariance, cross_covariance=None)
Add new states to this filter.
Increases number of filter states and set the initial conditions of the new states. Returns index of the first added state. If
cross_covarianceisNone, cross covariance between the existing states and the added states will be set to zeroes.- Parameters:
initial_estimate (NDArray[float64]) – The initial estimate to populate the new states with.
initial_covariance (NDArray[float64]) – The initial covariance matrix used to initialize the uncertainty of the new states.
cross_covariance (NDArray[float64] | None) – A covariance matrix that describes the cross terms between the previous states and the new states. If
None, the cross-terms will be set to zero. Otherwise, if there arenexisting states andmnew states being added, this argument should have shape[n, m].
- Returns:
Index of first state being added to this filter (zero indexed).
- Return type:
int
- remove_states(first_index, count)
Removes a set of states from the filter.
- Parameters:
first_index (int) – Index of the first state to be removed (zero indexed).
count (int) – The number of states to be removed.
- property estimate
Get the current internal estimate managed by this strategy.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. This method returns the current estimate, incorporating all changes made by previous method calls to this strategy.- Returns:
An estimate if available. Returns
Noneif no states have been added yet.- Return type:
NDArray[float64] | None
- set_estimate_slice(new_estimate, first_index)
Set a slice of the state estimates to a given set of values.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. This method allows for manually overriding the current estimate. Sets a block of states to new values, starting withfirst_indexand overwriting a number of states equal to the length ofnew_estimate.- Parameters:
new_estimate (NDArray[float64]) – The new estimate values that will overwrite the previous values.
first_index (int) – The index of the first state to overwrite.
- property covariance
Get the covariance of the current estimate.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed. This method returns this covariance, incorporating all changes made by previous method calls to this strategy.- Returns:
The covariance of the current estimate. Returns
Noneif no states have been added yet.- Return type:
NDArray[float64] | None
- set_covariance_slice(new_covariance, first_row, first_col=None)
Set a slice of the covariance matrix to a given set of values.
This class manages a current estimate that is initially populated by
add_states()and then is modified iteratively bypropagate(),update(), and other method calls. In addition to the estimate itself, a covariance of the current estimate is computed.Allows for manually overriding the current covariance matrix. Sets a block of the covariance matrix to new values. The overwritten values are those in a rectangular area defined by the upper left corner at
first_row,first_coland extending down and right to cover an area equal to the size ofnew_covariance. Iffirst_colis not set or set toNone, the value offirst_rowwill be used as a column index as well.- Parameters:
new_covariance (NDArray[float64]) – The new covariance values that will overwrite a slice of the previous covariance matrix.
first_row (int) – The row of the first value to overwrite.
first_col (int | None, optional) – The column of the first value to overwrite.
- propagate(dynamics_model)
Propagates the estimate of the state space forward in time.
This method assumes that a state space is already initialized with a set of states via the
add_states()method. Thedynamics_modelparameter includes a description of how the current state estimate can be propagated from the current time to a new time. Note that the actual numerical values of the current/new times are not specified anywhere, as that information is not needed to perform the computation. This method then takes the current state space estimate and thedynamics_modeland uses both to compute an estimate at the new time. The new estimate clobbers the old estimate managed by this class, and be acquired by callingestimate.- Parameters:
dynamics_model (StandardDynamicsModel)
- update(measurement_model)
Updates the estimate of the state space, incorporating a new measurement.
This method assumes that a state space is already initialized with a set of states via the
add_states()method. Themeasurement_modelparameter includes both the measurement itself and a description of how the measurement relates to the state space. This method then takes the current state space estimate and updates it using information from themeasurement_model. The updated estimate can be acquired by callingestimate.- Parameters:
measurement_model (StandardMeasurementModel) – The measurement with which to update the filter, as well as a model that describes how the measurement relates to the states this strategy is estimating.
- class pntos.cobra.internal.StandardFusionEngine(mediator, save_x_and_p_after_prop, save_x_and_p_after_update)
Bases:
StandardFusionEngineA fusion engine designed to use data from multiple sensors and output a unified state estimate.
- __init__(mediator, save_x_and_p_after_prop, save_x_and_p_after_update)
- Parameters:
mediator (Mediator) – A
pntos.api.Mediatorinstance.save_x_and_p_after_prop – Whether to save state estimate and sigma to registry right after each propagate.
save_x_and_p_after_update – Whether to save state estimate and sigma to registry right after each update.
- property time
The current time of the filter.
- property strategy
The underlying algorithm used for Bayesian inference.
- Returns:
The fusion strategy is the type of filter (EKF, UKF, etc.).
- Return type:
StandardFusionStrategy | None
- property num_states
Get the total number of states currently in the fusion engine.
Virtual state blocks do not affect this result.
- Returns:
The total number of states currently in the fusion engine.
- Return type:
int
- property state_block_labels
Get a list of
pntos.api.StandardStateBlocklabels that have been added to this fusion engine.- Returns:
A list of the
pntos.api.StandardStateBlocklabels that have been added to this fusion engine. ReturnsNoneif no state blocks have been added. Guaranteed to not returnNoneifnum_statesis a value other than 0.- Return type:
list[str] | None
- add_state_block(block, initial_estimate_covariance, cross_covariances=None)
Add the given
pntos.api.StandardStateBlockto the fusion engine.This will expand the state vector being estimated by the value of
num_states.- Parameters:
block (StandardStateBlock) – The
pntos.api.StandardStateBlockto be added to the fusion engine.initial_estimate_covariance (EstimateWithCovariance) – Contains the initial conditions of the states, with
initial_estimate_covariance.estimatebeing an Nx1 matrix andinitial_estimate_covariance.covariancebeing an NxN matrix, where N isblock.num_states.cross_covariances (CrossCovariances | None, optional) – An optional parameter which, if non-
None, contains a description of the newly added StateBlock’s cross covariances with respect to a set of StateBlocks which already exist inside the filter (specified bycross_covariances.block_labels). If thecross_covarianceparameter isNone, cross covariance between the existing states and the added states will be set to zeroes.
- get_state_block_estimate(block_label)
Get the estimate associated with a state block.
Find a
pntos.api.StandardStateBlockorpntos.api.VirtualStateBlockwithin the fusion engine matchingblock_label, and return a copy of its current estimate vector.- Parameters:
block_label (str)
- Returns:
A copy of its current estimate vector. If
block_labelreferences a virtual state block (VSB) this will return a converted estimate, converted into the VSBs coordinate frame. ReturnsNoneifblock_labeldoes not correspond to a block that has been added to the fusion engine. Guaranteed to not returnNonewhenblock_labelis in the list returned bystate_block_labelsandstrategyis notNone.- Return type:
NDArray[float64] | None
- get_state_block_covariance(block_label)
Get the covariance associated with a state block.
Find a
pntos.api.StandardStateBlockorpntos.api.VirtualStateBlockwithin the fusion engine matchingblock_label, and return a copy of its current covariance matrix.- Parameters:
block_label (str)
- Returns:
A copy of its current covariance matrix. If
block_labelreferences a virtual state block (VSB) this will return a converted covariance, converted into the VSBs coordinate frame. ReturnsNoneifblock_labeldoes not correspond to a block that has been added to the fusion engine. Guaranteed to not returnNonewhenblock_labelis in the list returned bystate_block_labelsandstrategyis notNone.- Return type:
NDArray[float64] | None
- get_state_block_cross_covariance(block_label1, block_label2)
Get the cross covariance between the states associated with two state blocks.
Find the
pntos.api.StandardStateBlocks within the fusion engine matchingblock_label1andblock_label2, and return the cross-covariance matrix between them.- Parameters:
block_label1 (str)
block_label2 (str)
- Returns:
The cross-covariance matrix between
block_label1andblock_label2. ReturnsNoneifblock_label1orblock_label2do not correspond to blocks that have been added to the fusion engine. Guaranteed to not returnNonewhen bothblock_label1andblock_label2are in the list returned bystate_block_labelsandstrategyis notNone.- Return type:
NDArray[float64] | None
- set_state_block_estimate(block_label, estimate)
Update the estimate associated with a given state block.
Find a
pntos.api.StandardStateBlockwithin the fusion engine matchingblock_label, and change its current estimate vector.Note
This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.
- Parameters:
block_label (str)
estimate (NDArray[float64])
- set_state_block_covariance(block_label, covariance)
Update the covariance associated with a given state block.
Find a
pntos.api.StandardStateBlockwithin the fusion engine matchingblock_label, and change its current covariance matrix.Note
This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.
- Parameters:
block_label (str)
covariance (NDArray[float64])
- set_state_block_cross_covariance(block_label1, block_label2, covariance)
Update the covariance between two state blocks.
Find the
pntos.api.StandardStateBlocks within the fusion engine matchingblock_label1andblock_label2, and change the current covariance matrix between them.Note
This function may lead to performance degradation with some implementations and thus its use is discouraged if other options are available.
- Parameters:
block_label1 (str)
block_label2 (str)
covariance (NDArray[float64])
- remove_state_block(block_label)
Remove the
pntos.api.StandardStateBlockmatchingblock_label.This will reduce the state vector being estimated by the number of states that the block represents.
- Parameters:
block_label (str)
- property virtual_state_block_target_labels
Gets a list of the target labels of virtual state blocks that have been added.
A label being returned by this list is not a guarantee that the virtual state block has a valid source. For that, call
has_virtual_state_block().- Returns:
A list of the target labels of virtual state blocks that have been added. Returns
Noneif no virtual state blocks have been added to this fusion engine.- Return type:
list[str] | None
- has_virtual_state_block(vsb_target_label)
Checks if the fusion engine has a
pntos.api.VirtualStateBlockwith a matching target label.- Parameters:
vsb_target_label (str)
- Returns:
Trueif the fusion engine has apntos.api.VirtualStateBlockwith a matching target label,Falseif no virtual state block with matching target label exists or if one exists but is not capable of generating an estimate. That is, the VSB’s source must exist and be in a continuous chain to a concrete state block which also exists in the fusion engine in order to returnTrue.- Return type:
bool
- add_virtual_state_block(virtual_state_block)
Add the given
pntos.api.VirtualStateBlockto the fusion engine.A virtual state block (VSB) convert from an underlying block coordinate frame into the VSB coordinate frame.
- Parameters:
virtual_state_block (VirtualStateBlock)
- remove_virtual_state_block(vsb_target_label)
Remove the
pntos.api.VirtualStateBlockmatchingvsb_target_label.- Parameters:
vsb_target_label (str)
- property measurement_processor_labels
Get a list of the labels of measurement processors that have been added.
- Returns:
list of labels of measurement processors that have been added. Returns
Noneif no measurement processors have been added to this fusion engine.- Return type:
list[str] | None
- add_measurement_processor(processor)
Add a
pntos.api.StandardMeasurementProcessor.This can be used to process future measurements that correspond to
processor.label.- Parameters:
processor (StandardMeasurementProcessor)
- remove_measurement_processor(processor_label)
Remove a
pntos.api.StandardMeasurementProcessorpreviously added to the fusion engine.Assumes a measurement processor was previously added via
add_measurement_processor()with the labelprocessor_label.- Parameters:
processor_label (str)
- propagate(time)
Propagate the filter estimate forward in time.
May be evaluated lazily (when results are requested).
- Parameters:
time (TypeTimestamp)
- update(processor_label, message)
Update the filter with the given measurement.
Will propagate first if needed to reach the time encoded inside the measurement.
- Parameters:
processor_label (str)
message (Message)
- get_real_label(label)
Takes a block label in and returns the real label associated with the block. If label corresponds to a
pntos.api.VirtualStateBlockthe label of the starting block for the virtual transformation will be returned. If label corresponds to apntos.api.StandardStateBlockthen label will be returned as is.
- peek_ahead(time, block_labels)
Calculates the estimate and covariance at a requested time.
Uses the state blocks listed in
block_labels, without changing the state of the fusion engine or its underlying filter. Blocks are assembled in the order that the labels are passed in.If all of the following are true:
timeis equal to or after the filter time (which can be checked withtime).All labels in
block_labelscorrespond to a block that has been added to the fusion engine (which can be checked withstate_block_labels).block_labelshas at least one element.
Then the result returned is guaranteed to not be
None. Otherwise, if any of the above are false then the result will beNone.- Parameters:
time (TypeTimestamp)
block_labels (list[str]) – An array of strings.
- Returns:
EstimateWithCovariance | None
- generate_x_and_p(block_labels)
Generates the current estimate and covariance.
Estimate and covariance are built corresponding to a list of StateBlock labels. Blocks are assembled in the order that the labels are passed in.
If all of the following are true:
All labels in
block_labelscorrespond to a block that has been added to the fusion engine (which can be checked withstate_block_labels).block_labelshas at least one element.
Then the result returned is guaranteed to not be
None. Otherwise, if any of the above are false then the result will beNone.- Parameters:
block_labels (list[str]) – An array of strings.
- Returns:
EstimateWithCovariance | None
- give_state_block_aux_data(block_label, aux)
Route a list of messages of aux data to a
pntos.api.StandardStateBlock.- Parameters:
block_label (str)
aux (list[Message | None])
- give_measurement_processor_aux_data(processor_label, aux)
Route a list of messages of aux data to a
pntos.api.StandardMeasurementProcessor.- Parameters:
processor_label (str)
aux (list[Message | None])
- give_virtual_state_block_aux_data(target_label, aux)
Route a list of messages of aux data to a
pntos.api.VirtualStateBlock.- Parameters:
target_label (str)
aux (list[Message | None])
- class pntos.cobra.internal.VirtualStateBlockManager(mediator)
Bases:
objectA utility class that manages
pntos.api.VirtualStateBlock’s for apntos.api.StandardFusionEngineinstance. This class will create n-ary trees by instantiating Node`s to build and maintain the relationship between a given VSB and another. The roots of these trees represent ‘real’ state blocks. If a `Node has no parent, it is assumed to be a root node but there is no way the manager can know if said Node is a real block or if it’s just a virtual state block that has yet to be added.- __init__(mediator)
- Parameters:
mediator (Mediator) – A
pntos.api.Mediatorinstance.
- add_virtual_state_block(trans)
Registers a
pntos.api.VirtualStateBlockinto the manager. If the VSB’ssourcematches itstargetor thetargetalready exists in the manager, a warning will be logged and the VSB won’t be added. If thesourcedoesn’t exist, a new Node will be created and it will be treated as a root node. As VSBs are added, this function will adjust the relationships between VSBs accordingly.- Parameters:
trans (VirtualStateBlock) – The
pntos.api.VirtualStateBlockto be added.
- convert(orig, start, target, time)
Converts an
pntos.api.EstimateWithCovarianceto a target representation if the conversion is possible.- Parameters:
orig (EstimateWithCovariance) – The estimate and covariance in the starting format.
start (str) – The label of the starting format.
target (str) – The label that refers to the ending format for the estimate and covariance.
time (TypeTimestamp) – The time of validity for the estimate and covariance.
- Returns:
pntos.api.EstimateWithCovarianceif conversion is possible; otherwise returns None.
- convert_estimate(orig, start, target, time)
Converts an estimate to a target representation if the conversion is possible.
- Parameters:
orig (NDArray[float64]) – The estimate in the starting format.
start (str) – The label of the starting format.
target (str) – The label that refers to the ending format for the estimate.
time (TypeTimestamp) – The time of validity for the estimate.
- Returns:
NDArray[float64] if conversion is possible; otherwise returns None.
- convert_H(engine, real_label, label, curr_H)
Given a measurement jacobian, H, calculate the full transformation from a set of real states to a set of measurements.
- Parameters:
engine (StandardFusionEngine) – An instance of the
pntos.api.StandardFusionEnginecalling this method.real_label (str) – The label of the
pntos.api.StandardStateBlockthe virtual transformation starts with.label (str) – The label of the target state, corresponding to a
pntos.api.VirtualStateBlock.curr_H (NDArray[float64]) – The measurement jacobian that maps a set of virtual states to a set of measurements.
- Returns:
NDArray[float64] if the labels provided exist in the engine, else None.
- get_start_block_label(target)
Gets the state block label of the starting node which is assumed to be represent a ‘real’ state block.
- Parameters:
target (str) – The
pntos.api.VirtualStateBlockunique identifier to get the starting label for.- Returns:
str | None
- get_virtual_state_block_labels()
Returns the list of virtual state block labels being tracked by the manager. Returns None if that list is empty.
- give_virtual_state_block_aux_data(target, aux)
Provides the
pntos.api.VirtualStateBlockwith target labeltargetwith the data inaux.- Parameters:
target (str) – The unique label that identifies what
pntos.api.VirtualStateBlockaux (list[Message | None]) – The auxiliary data to give to the
pntos.api.VirtualStateBlock
- jacobian(orig, start, target, time)
Obtains the jacobian of an input estimate in the target representation if possible.
- Parameters:
orig (NDArray[float64]) – The estimate in the starting format.
start (str) – The label of the starting format.
target (str) – The label that refers to the ending format for the jacobian.
time (TypeTimestamp) – The time of validity for the input estimate.
- Returns:
NDArray[float64] if conversion is possible; otherwise returns None.
- remove_virtual_state_block(target)
Removes the
pntos.api.VirtualStateBlockmatchingtargetfrom the manager. If this leaves an orphaned branch of nodes it will prune the branch removing all VSB’s in that branch as well.- Parameters:
target (str) – The unique identifier of the
pntos.api.VirtualStateBlockto remove
- class pntos.cobra.internal.ManualHeadingAlign(config_group, mediator)
Bases:
InertialInitializationStrategyStatic alignment for an inertial, requiring a user-provided heading.
This initialization strategy can be used to produce an initial PVA to initialize an inertial mechanization. It requires both position and IMU measurements, as well as an initial heading provided via config. It is also capable of estimating inertial biases.
- __init__(config_group, mediator)
- Parameters:
config_group (str) – A
pntos.cobra.config.ManualHeadingAlignmentConfigconfig group.mediator (Mediator) – A
pntos.api.Mediatorinstance.
- request_motion_needed()
Check the type of motion (if any) needed.
- Returns:
InitializationMotionNeeded
- request_current_status()
Check the current initialization status.
- Returns:
InitializationStatus
- process_pntos_message(message)
Incorporate a new message into the initialization algorithm.
- Parameters:
message (Message)
- request_solution()
Get the current initial solution.
- Returns:
InitialInertialSolution
- class pntos.cobra.internal.BarometerToAltitudePreprocessor(channel, mediator, alt_sigma)
Bases:
PreprocessorA preprocessor that converts barometer measurements to altitude measurements.
- __init__(channel, mediator, alt_sigma)
Cobra Barometer to Altitude Preprocessor
- process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.cobra.internal.DownsamplerPreprocessor(config, mediator)
Bases:
PreprocessorA downsampling preprocessor that periodically discards certain messages.
It collects a list of channels and factors from the registry and allows 1 out of every
Nmessages to pass through. This is done for every channelcand factorN. Wherec = channel[i]andN = factor[i].- __init__(config, mediator)
Cobra Downsampler Preprocessor
- Parameters:
config_group (str) – The
pntos.cobra.config.DownsamplerConfigconfig group.mediator (Mediator) – A
pntos.api.Mediatorinstance.
- process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.cobra.internal.ImuRotationPreprocessor(mediator, imu_channel, C_imu_to_platform)
Bases:
Preprocessor- process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.cobra.internal.OutagePreprocessor(mediator, outage_config)
Bases:
PreprocessorPreprocessor used to induce an outage on a given channel.
- process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.cobra.internal.TimeAdjusterPreprocessor(config, mediator)
Bases:
Preprocessor- process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.cobra.internal.TimeBiasPreprocessor(config, mediator)
Bases:
PreprocessorCorrects timestamps for a constant bias.
This preprocessor is useful when a specific sensor produces timestamps with a constant bias. It is configured with a list of channels as well as a constant time bias. Any message whose source identifier matches one of the configured channels will have its timestamp subtracted by the bias amount.
- __init__(config, mediator)
- Parameters:
config_group (str) – The group in the registry which holds config information for this preprocessor.
mediator (Mediator) – Used to get config information and to perform logging.
- process_pntos_message(message)
Process a message.
- Parameters:
message (Message) – A message to be processed.
- Returns:
A list of
pntos.api.Messages. Usually this will be a single message, a modified version ofmessage. It could beNoneifmessageis rejected or dropped. The preprocessor could also accumulate several messages, returningNonefor each one then returning an array with multiple processed messages.- Return type:
list[Message] | None
- class pntos.cobra.internal.StandardInertial(config_group, mediator, solution)
Bases:
StandardInertialMechanizationAn inertial object which mechanizes IMU data to generate a series of inertial solutions.
- __init__(config_group, mediator, solution)
An Inertial Mechanization Object
- Parameters:
config_group (str) – An
pntos.cobra.config.InertialConfigconfig group.mediator (Mediator) – A
pntos.api.Mediatorinstance.solution (Message) – An initial PVA message required for inertial mechanization.
- request_solution_message_type()
Get the solution type.
- Returns:
The message type that will be returned by
request_current_solution(),request_solution(), andrequest_solutions().- Return type:
type[AspnBase]
- request_current_solution()
Get the current inertial solution.
- Returns:
The current inertial solution.
- Return type:
- request_solution(time)
Request solution at a specific time.
- Parameters:
time (TypeTimestamp) – The time at which the returned solution should be valid.
- Returns:
The solution computed by this inertial at
timeiftimeis in the valid range,Noneotherwise (is_time_in_range()can be used to checktimebefore calling this method).- Return type:
Message | None
- request_solutions(times, solution_type)
Request solutions at multiple specific times.
- Parameters:
times (list[TypeTimestamp]) – An array of times at which solutions are requested.
solution_type (InertialSolutionRangeType) – The type of solution requested.
- Returns:
An array of solutions. Returns
Noneifsolution_typeis unsupported by this inertial or every instance oftimesis outside the valid range. Otherwise guaranteed to not beNone.- Return type:
list[Message | None] | None
- is_time_in_range(time)
Check if a solution exists at a given time.
- Parameters:
time (TypeTimestamp) – The query time.
- Returns:
Trueif a solution exists attime,Falseotherwise. This result is only valid until another method (for example,process_pntos_message()) is called.- Return type:
bool
- request_earliest_time()
Get the earliest available time at which a solution or forces and rates can be requested.
This result is only valid until another method (for example,
process_pntos_message()) is called.- Returns:
The earliest available time at which a solution or forces and rates can be requested.
- Return type:
TypeTimestamp
- request_latest_time()
Get the latest available time at which a solution or forces and rates can be requested.
This result is only valid until another method (for example,
process_pntos_message()) is called.- Returns:
The latest available time at which a solution or forces and rates can be requested.
- Return type:
TypeTimestamp
- request_process_pntos_message_types()
Returns an array of message types that are supported by this plugin.
- Returns:
An array of message types that are supported by this plugin as inputs to
process_pntos_message().- Return type:
list[type[AspnBase]]
- process_pntos_message(message)
A new message to be incorporated into the computed inertial solution.
- Parameters:
message (Message)
- request_forces_and_rates(time)
Request forces and rates for a given time.
- Parameters:
time (TypeTimestamp) – The time at which the forces and rates should be valid.
- Returns:
The instantaneous forces and rates at
timeiftimeis in the valid range,Noneotherwise (is_time_in_range()can be used to checktimebefore calling this method).- Return type:
InertialForcesRates | None
- request_average_forces_and_rates(time1, time2)
Request average forces and rates over a time period.
- Parameters:
time1 (TypeTimestamp) – The start of the time range over which the forces and rates should be valid.
time2 (TypeTimestamp) – The end of the time range over which the forces and rates should be valid.
- Returns:
The average forces and rates over the period of time defined by
time1andtime2if at least one of them is in the valid range,Noneotherwise (is_time_in_range()can be used to check both times before calling this method).- Return type:
InertialForcesRates | None
- request_reset_message_types()
Get valid types of reset messages.
- Returns:
An array of message types that are supported by this plugin for resetting the inertial solution, or
Noneif resetting the inertial solution is an unsupported operation by the inertial plugin.- Return type:
list[type[AspnBase]] | None
- reset_solution(message)
Set the solution to the values in
message.For example, if
messageis PVA then the inertial solution will be set to that PVA. Ifmessageis just position, then only the position portion of the inertial solution will be set usingmessage.- Parameters:
message (Message) – A message containing the information necessary to reset the solution. To see the types supported by the implementation, call
request_reset_message_types().
- correct_sensor_errors(time, errors)
Reset the current inertial internal bias values.
Reset the current inertial internal bias values with corrections from an external source, such as a filter or error estimator. The errors passed in here will be adjusted for internally by the inertial when processing incoming data. Thus, if errors are passed into the inertial here they should not be corrected for in an external filter processing the inertial output (which would lead to a double correction).
- Parameters:
time (TypeTimestamp) – The time at which
errorsshould be valid.errors (StandardInertialErrors) – An estimate of the inertial sensor’s errors.
- request_sensor_errors(time)
Request inertial errors for a given time.
- Parameters:
time (TypeTimestamp) – Time at which inertial errors should be valid.
- Returns:
Inertial errors at
timeiftimeis in the valid range (pntos.api.CommonInertial.is_time_in_range()can be used to checktimebefore calling this method),Noneotherwise.- Return type:
StandardInertialErrors | None
- class pntos.cobra.internal.StandardKeyValueStore(group, log_func, plugin_resources_location=None)
Bases:
KeyValueStoreImplementation note: This implementation deviates from the python dictionary in that it heavily favors logging out errors and continuing on over raising exceptions. Exceptions are reserved only for truly fatal and unrecoverable errors. For example, instead of raising a
KeyValueerror for a key that does not exist in the store, theStandardKeyValueStorejust logs out an error message and returns None.- data_format = 1
Defines the underlying format which the data in the key-value store will be stored as.
- __init__(group, log_func, plugin_resources_location=None)
Cobra Standard Key-Value Store
- Parameters:
group (str) – The name of the key-value store so that it can be easily accessed by plugins.
log_func (Callable[[LoggingLevel, str], None]) – The function to use for logging.
plugin_resources_location (str | None) – The path to the directory to save or load permanent keys. If no path is provided,
DEFAULT_PERMANENCY_DIRwill be used.
- type_conversion
A store of type conversion functions.
Usage:
type_conversion[type_in_store][requested_return_type] -> requested_return_type | NoneWill be None is conversion is not supported or conversion failed.
- keys()
Get the array of keys which currently exist in this store.
- Returns:
Returns the keys in the store or
Noneif no keys are present.- Return type:
list[str] | None
- get_value(key, value_type)
Get the value stored at
keywith return typevalue_type.Example
For example, to access altitude in a
pntos.api.KeyValueStorenamedkv_storeas an integer:altitude = kv_store.get_value("altitude", int)
- Parameters:
key (str)
value_type (type[RegistryValueType])
- Returns:
Returns
Noneif the key is not available. The return is guaranteed to not beNoneif called with a valid key (which can be checked withpntos.api.KeyValueStore.__contains__()) and if the store can convert the value to the requested type.- Return type:
pntos.api.RegistryValueType| None
- get_raw(key=None)
Get the value for the given
keyas an array of bytes.- Parameters:
key (str | None, optional)
- Returns:
The return format will conform to the definition in
pntos.api.KeyValueStore.data_format. ReturnsNoneif the given key is not available. The return is guaranteed to not beNoneif called with a valid key, which can be checked withpntos.api.KeyValueStore.__contains__():if key in store: ...
If
keyisNone, then this function will return all of the keys and values in the group passed topntos.api.Registry.batch_start()and will be formatted to conform to keys and values as defined inpntos.api.KeyValueStore.data_format.- Return type:
bytes | None
- set_value(key, value)
Set the given key to the provided value.
- Parameters:
key (str)
value (RegistryValueTypeUnion) – Can be of any type specified by
pntos.api.RegistryValueTypeUnion.
- set_raw(key, bytes)
Set the given key to the provided value.
- Parameters:
key (str | None) – If
keyisNone, then the contents ofbytesmust include both keys and values and must be formatted to conform topntos.api.KeyValueStore.data_format.byteswill then be used to set the corresponding keys and values in the group passed topntos.api.Registry.batch_start().bytes (bytes) – Must be formatted to conform to the definition of a value in
pntos.api.KeyValueStore.data_format.
- remove_key(key)
Remove the given key from the registry.
- Parameters:
key (str)
- Returns:
Trueifkeyis successfully removed, andFalseotherwise. Keys may fail to be removed if the key does not currently exist, or the backend is unable to remove the key.- Return type:
bool
- batch_end()
Ends a batch operation.
Ends a batch operation started with a
pntos.api.Registry.batch_start()call. After calling this, the user should not use thepntos.api.KeyValueStorethey received frompntos.api.Registry.batch_start()again without callingpntos.api.KeyValueStore.batch_restart()on thepntos.api.KeyValueStore.If keys in the batch were acted upon with
set_permanent()turned on, and the plugin supports permanent storage, this call will save changes to permanent storage ifset_permanent()isTrueduring the call tobatch_end(). Enacts equivalent ofset_permanent(self,false)before return. If anyrequest_notify()observers have been added, they will be processed prior to this call returning.Example
Example 1: Flushing to permanent storage on
batch_end():store = registry.batch_start("group") ...work... store.set_permanent(true) # if not disabled, flush on batch_end ...work... store.batch_end() # will flush values
Example 2: Not flushing to permanent storage on
batch_end():store = registry.batch_start("group") ...work... store.set_permanent(true) # tag some values ...work... store.set_permanent(false) # do not flush on batch_end store.batch_end() # will not flush values
In the second example above, values set with “set” methods after the initial
set_permanent()call are still stored for potential saving to permanent storage.
- batch_restart()
Restarts a batch.
Restarts a batch that was previously started with
pntos.api.Registry.batch_start()and subsequently ended withpntos.api.KeyValueStore.batch_end(). This method is likely much more efficient thanpntos.api.Registry.batch_start()(depending on the registry implementation) as thepntos.api.Registry.batch_start()method must find the store again given the group name.Note
While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call
batch_end()as soon as possible after they are done getting/setting values in the returnedpntos.api.KeyValueStore.
- request_notify(key, callback)
Register a callback which gets called each time a key in the store is updated.
Allows plugins to respond asynchronously to parameter updates. Returns
Trueif the notifier was successfully registered, andFalseif the store is unable to notify the requester. IfkeyisNone, then the callback will be invoked when any key in the batch’s group is modified. Otherwise, the callback will only be invoked when the given key is modified.Note
The callback must not attempt to set any values inside the
pntos.api.KeyValueStore, as the callback is likely being invoked during the processing of another operation. The callback should endeavour to store off the updated keys/values as quickly as possible and return, leaving the processing of the updates to another context or thread when possible. Callingpntos.api.Mediatorwithin the callback may be disallowed by the controller implementation and lead to undefined behavior.- Parameters:
key (str | None)
callback (Callable[[str, list[str], KeyValueStore], None]) –
A function with a function definition compatible with:
def my_callback(group: str, modified_keys: list[str], kv: KeyValueStore) -> None: ...
- Returns:
Trueif the notifier was successfully registered, andFalseif the store is unable to notify the requester.- Return type:
bool
- remove_notify(key, callback)
Removes a notification as requested by
request_notify().The group and callback must match the parameters passed to
request_notify()in order to successfully remove a callback.Note
This will remove all matching callbacks that have a matching group and callback. If a user registers the same callback twice this will remove both.
- Parameters:
key (str | None)
callback (Callable[[str, list[str], KeyValueStore], None])
- Returns:
Trueif removal was successful andFalseif it was not.Falsewill be returned if a callback did not exist for the group.- Return type:
bool
- clear()
Remove all items from key-value store.
- values()
Returns a list of the key-value store’s values.
This method is useful for unloading all values from a key-value store.
Example
For example:
def get_mean_of_kv_store(kv: KeyValueStore) -> float | None: '''Returns the mean of a key-value store if all values are int or float''' values = kv.values() if all(isinstance(x, (int, float)) for x in values): return sum(values) / len(values) return None
- Returns:
A list of all values in the store.
- Return type:
list[RegistryValueTypeUnion]
- items()
Returns a list of the items (key-value pairs) in the store.
This method is useful for when you need to iterate over both keys and values in a dictionary.
Example
For example:
for key, value in my_dict.items() printf('Key: {key}, Value: {value}')
- Returns:
A list of the key-value pairs (as tuples) in the store.
- Return type:
list[tuple[str, RegistryValueTypeUnion]]
- set_permanent(permanent)
Tag values modified with “set” methods as permanently stored.
Configure the
pntos.api.KeyValueStoreto tag values modified with “set” methods as permanently stored (as opposed to ephemerally stored in memory). Only values acted upon with “set” methods whileset_permanent()isTruewill be tagged. Values will be flushed according to registry configuration settings or perbatch_end()API. Returns the value of the permanent storage configuration. Callers should check this to verify if the set was successful.Example
Tagging specific keys to be permanently stored:
store: PntosKeyValueStore = registry.batch_start("group") store.set_value("key1",1234.56) # does not tag this value as permanently stored store.set_permanent(True) # start tagging set calls as permanently stored store.set_value("key1",987.65) store.set_value("key2",123) # both key1 and key2 values tagged store.set_permanent(False) # disable permanent storage store.set_value("key1",456.78) # key1 = 456.78 is value of key1 in store # key1 = 987.65 tagged to be permanently stored # key2 = 123 tagged to be permanently stored
- Parameters:
permanent (bool)
- Returns:
The value of the permanent storage configuration. Callers should check this to verify if the set was successful.
- Return type:
bool
- get_type(key)
Returns the type of a value in the KeyValueStore.
This is helpful for situations when it is advantageous to know the type of a value in the store without need for the value itself.
Example
For example, it might be useful for these situations where different callbacks are needed depending on what type is in the store:
if key in kv_store: val_type = kv_store.get_type(key) if val_type is int: kv_store.request_notify(key, int_callback) elif val_type is Message: kv_store.request_notify(key, message_callback) else: kv_store.request_notify(key, generic_callback)
- Parameters:
key (str)
- Returns:
If key exists in registry, this is guaranteed to return the return type of
pntos.api.KeyValueStore.__getitem__()for the given key. Else, it returnsNoneif type information is not available or key does not exist in registry.- Return type:
type[RegistryValueTypeUnion] | None
- class pntos.cobra.internal.StandardRegistry(log_func, plugin_resources_location=None)
Bases:
RegistryA registry that maps group names to objects storing all the key/values in that group.
- __init__(log_func, plugin_resources_location=None)
Cobra Standard Registry
- Parameters:
log_func (Callable[[LoggingLevel, str], None]) – The function to use for logging.
plugin_resources_location (str | None) – The path to the directory to save or load permanent keys. If no path is provided,
DEFAULT_PERMANENCY_DIRwill be used.
- batch_start(group)
Begin a batch get/set operation.
Begin a batch get/set operation wherein the user may make any number of modifications to the keys/values in the
group. The registry implementation may wait to batch these requests untilpntos.api.KeyValueStore.batch_end()is called for better performance. For example, a lock may be obtained at the beginning of abatch_start()and not released until apntos.api.KeyValueStore.batch_end()call is encountered. Thus, a plugin that callsbatch_start()should endeavour to make its calls to theset_,get_, andregistermethods as quickly as possible and callpntos.api.KeyValueStore.batch_end()immediately, as doing otherwise may be locking other plugins out of access to the registry (depending on the registry plugin implementation). If a plugin supportspntos.api.KeyValueStore.request_notify(), then notifications of updates may be suspended until the batch ends. After a batch is ended, the returnedpntos.api.KeyValueStorecan still be used to access the store viapntos.api.KeyValueStore.batch_restart().Note
While a batch is active, access to the store may be denied to other users. Thus a user should endeavour to call
pntos.api.KeyValueStore.batch_end()as soon as possible after they are done getting/setting values in the returnedpntos.api.KeyValueStore.- Parameters:
group (str)
- Returns:
KeyValueStore
- property group_array
Get the array of groups which currently exist.
- Returns:
The array of groups which currently exists. Returns
Noneif no groups exist.- Return type:
list[str] | None
- has_group(group)
Checks whether or not a given group has had any values added to it (for any key).
- Parameters:
group (str)
- Returns:
bool
- request_notify_new_group(callback)
Register a callback which gets called each time a new group is made in the registry.
- Parameters:
callback (Callable[[str], None])
- Returns:
Trueif the notifier was successfully registered, andFalseif the registry is unable to notify the requester.- Return type:
bool
- class pntos.cobra.internal.AltitudeMeasurementProcessor(label, state_block_labels, mediator)
Bases:
StandardMeasurementProcessorGenerates a model that maps an altitude measurement to a Pinson15 inertial error state block and a 1-state altitude FOGM bias block.
- This measurement processor handles all of the following ASPN message types:
MeasurementAltitude
MeasurementPosition (Geodetic)
MeasurementPositionVelocityAttitude (Geodetic)
- __init__(label, state_block_labels, mediator)
An Altitude Measurement Processor.
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 2-element list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 3, with a Down position error in meters as the third state. The second entry should refer to a 1-state FOGM state block estimating time-correlated altitude bias.
mediator (Mediator) – A
pntos.api.Mediatorinstance.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generate a
pntos.api.StandardMeasurementModel.- Parameters:
- Returns:
A generated model containing the parameters required for a filter update. Will be
Nonewhen a measurement cannot be produced frommessage(for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).- Return type:
StandardMeasurementModel | None
- class pntos.cobra.internal.ClockBiasStateBlock(label, mediator, h_0, h_neg2, q3)
Bases:
StandardStateBlockModels a clock bias (seconds), drift (seconds/second), and optional drift rate (seconds/second^2).
Uses allan variance parameters to create a clock model for the system dynamics model covariance matrix.
- receive_aux_data(_)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_state_block_aux_data()is called with a label corresponding to this state block’slabel.- Parameters:
aux (list[Message | None])
- generate_dynamics(gen_x_and_p_func, time_from, time_to)
Generate a
pntos.api.StandardDynamicsModel.The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.
- Parameters:
gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.
time_from (TypeTimestamp) – The time to propagate from.
time_to (TypeTimestamp) – The time to propagate to.
- Returns:
The description of how to propagate this state block over the given time interval, or
Noneiftime_fromis later thantime_to. Otherwise guaranteed to not returnNone.- Return type:
StandardDynamicsModel | None
- calc_Hwang_Brown_Q(h_0, h_n2, dt)
Calculate the discrete system noise matrix.
Using the 10.4.1 - 10.4.3 from Introduction to Random Signals And Applied Kalman Filtering, second edition.
Same as Q3 model in navtk block.
- class pntos.cobra.internal.ConstantStateBlock(label, mediator, num_states, Q=None)
Bases:
StandardStateBlockEstimates 1 or more “constant” states, i.e. states with no dynamics.
- receive_aux_data(_)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_state_block_aux_data()is called with a label corresponding to this state block’slabel.- Parameters:
aux (list[Message | None])
- generate_dynamics(gen_x_and_p_func, time_from, time_to)
Return a constant dynamics model, where g(x) = x.
- class pntos.cobra.internal.Direction3DToPointsMeasurementProcessor(label, state_block_labels, mediator, l_ps_p, orientation)
Bases:
StandardMeasurementProcessorGenerates a model that maps a Direction3DToPoints measurement to an inertial error state block.
See
generate_model()for a detailed description of the assumptions and capabilities of this processor.- __init__(label, state_block_labels, mediator, l_ps_p, orientation)
A Direction3DToPoints Measurement Processor
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.
mediator (Mediator) – a Mediator instance.
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generates the model mapping state estimates to the provided measurement.
- Parameters:
message (Message) – Measurement to process. message.wrapped_message must be a MeasurementDirection3DToPoints using the SINE_SPACE reference frame.
gen_x_and_p_func (GenXandP) – Callback to get the state estimate and covariance for the pinson-style block this processor is updating. NED position errors in meters are expected in at indices [0:3] and NED tilt errors in radians at indices [6:9].
- Returns:
StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.
Model Description and Derivation
TODO: Add model description and derivation
- class pntos.cobra.internal.FogmBlock(label, mediator, sigmas, taus)
Bases:
StandardStateBlockA StateBlock that represents ‘n’ first-order Gauss-Markov processes.
- __init__(label, mediator, sigmas, taus)
Constructor
- Parameters:
label (str) – Label for this block.
mediator (Mediator) – A
pntos.api.Mediatorinstance.sigmas (NDArray[float64]) – Nx1 array of FOGM noise sigmas; units will vary.
taus (NDArray[float64]) – Nx1 array of FOGM time constants, in seconds. Must be positive.
- receive_aux_data(aux)
Receive aux data. Unused for this class.
- Parameters:
aux (list[Message | None]) – List of messages.
- generate_dynamics(gen_x_and_p_func, time_from, time_to)
Generate a
pntos.api.StandardDynamicsModel.The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.
- Parameters:
gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.
time_from (TypeTimestamp) – The time to propagate from.
time_to (TypeTimestamp) – The time to propagate to.
- Returns:
The description of how to propagate this state block over the given time interval, or
Noneiftime_fromis later thantime_to. Otherwise guaranteed to not returnNone.- Return type:
StandardDynamicsModel | None
- class pntos.cobra.internal.Pinson15NedBlock(label, mediator, imu_model)
Bases:
StandardStateBlockA 15-state representation of the error model of an inertial navigation system in NED frame.
This block is based upon the model provided in the Titterton and Weston 2nd edition textbook (pg. 345). The 15-state model is created by combining the original 9x9 state block with the 6x6 G*u block that relates the gyro biases and accelerometer biases to the tilt and velocity error states, respectively. Additional changes include the conversion to North, East and Down position errors in meters as opposed to the latitude (radians), longitude (radians) and altitude (meters) error states in the book model. Note that the error states are additive, meaning in general you add the error state to the uncorrected value to get the corrected value. Tilt states require special handling; see below.
Tilt errors: The North, East and Down tilt errors are 3 small angle corrections that when represented in skew-symmetric form and subtracted from an identity matrix may be interpreted as a DCM that rotates a vector from an estimated navigation frame to the ‘true’ navigation frame, to the extent that the error states are correct. A positive tilt error results in a negative right-handed rotation about the axis to which it is attached. For example, if the sensor frame is aligned with the local vertical with 90 degree heading (sensor x axis is aligned with East axis), and the down tilt value is 1 degree, the corrected heading will be approximately 89 degrees.
The AspnBaseVector provided to this class should come from the inertial for which it is providing error estimates. Accel and gyro bias states are generally in the inertial sensor frame, but more precisely, they are in the frame that is related to the navigation frame by the NavSolution::rot_mat provided in AspnBaseVector. This means that if the inertial is mechanizing in the inertial sensor frame, then NavSolution::rot_mat should contain C_nav_to_sensor, or the nav-to-sensor DCM, and the biases will be in the sensor frame. If the inertial is mechanizing in the platform frame (which is very common), then NavSolution::rot_mat should be C_nav_to_platform, the nav-to-platform DCM. The biases will be with respect to that frame.
Note that these definitions only hold when using the ‘additive error state’ formulation (true = estimated + error), the current assumption in all off-the-shelf measurement processors. The opposite formulation will flip the sign on estimated values.
- Order and description of states:
0 - North position error (m). 1 - East position error (m). 2 - Down position error (m). 3 - North velocity error (m/s). 4 - East velocity error (m/s). 5 - Down velocity error (m/s). 6 - North tilt error (rad). 7 - East tilt error (rad). 8 - Down tilt error (rad). 9 - Accel x-axis bias error (m/s^2) (See note on frame above). 10 - Accel y-axis bias error (m/s^2). 11 - Accel z-axis bias error (m/s^2). 12 - Gyro x-axis bias error (rad/s). 13 - Gyro y-axis bias error (rad/s). 14 - Gyro z-axis bias error (rad/s).
- __init__(label, mediator, imu_model)
A Pinson15 NED Standard State Block
- Parameters:
label (str) – An identifier for this state block object.
mediator (Mediator) – A
pntos.api.Mediatorinstance.imu_model (ImuConfig) – The
pntos.cobra.config.ImuConfigto use in FOGM bias estimation.
- receive_aux_data(aux)
Receive inertial PVA and forces as aux data.
- Parameters:
aux (list[Message | None]) – List of messages. Assumed to contain inertial solution in a PVA message and forces in an IMU message.
- generate_dynamics(gen_x_and_p_func, time_from, time_to)
Generate a
pntos.api.StandardDynamicsModel.The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.
- Parameters:
gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.
time_from (TypeTimestamp) – The time to propagate from.
time_to (TypeTimestamp) – The time to propagate to.
- Returns:
The description of how to propagate this state block over the given time interval, or
Noneiftime_fromis later thantime_to. Otherwise guaranteed to not returnNone.- Return type:
StandardDynamicsModel | None
- scale_phi(Phi)
Scale first 15 elements of first 2 columns of phi such to account for change in rad to meter scale factors over propagation time.
- Parameters:
Phi (NDArray[float64]) – Matrix obtained by discretizing the propagation Jacobian. This matrix will be modified to account for the rad to meter scaling.
- generate_f_pinson15()
Generates the continuous time propagation matrix F.
F is the Jacobian of the differential equations governing inertial error growth. This is based upon the model given in Titterton and Weston, 2nd edition.
- Returns:
The F Matrix.
- Return type:
NDArray[float64]
- generate_q_pinson15()
Generates the continuous time process noise covariance matrix Q.
- Returns:
The Q Matrix.
- Return type:
NDArray[float64]
- class pntos.cobra.internal.PinsonBodyVelocityMeasurementProcessor(label, state_block_labels, mediator, l_ps_p, orientation_ps_p)
Bases:
StandardMeasurementProcessorGenerates a model that maps a velocity measurement in the body/sensor frame to an inertial error states block.
- __init__(label, state_block_labels, mediator, l_ps_p, orientation_ps_p)
A Pinson Body/Sensor Velocity Measurement Processor
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block where the first (or only) 15 states must be the same as Pinson15NedBlock.
mediator (Mediator) – a Mediator instance
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the velocity sensor origin, in the platform frame, in units of meters.
orientation_ps_p (NDArray[float64]) – A 4-element quaternion representing the rotational difference from the platform frame to the sensor frame. The corresponding DCM would be C_platform_to_sensor.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generate a
pntos.api.StandardMeasurementModel.- Parameters:
- Returns:
A generated model containing the parameters required for a filter update. Will be
Nonewhen a measurement cannot be produced frommessage(for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).- Return type:
StandardMeasurementModel | None
- class pntos.cobra.internal.PinsonPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)
Bases:
StandardMeasurementProcessorGenerates a model that maps a position measurement to an inertial error state block.
See
generate_model()for a detailed description of the assumptions and capabilities of this processor.- __init__(label, state_block_labels, mediator, l_ps_p)
A Pinson Position Measurement Processor
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.
mediator (Mediator) – a Mediator instance.
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generates the model mapping state estimates to the provided measurement.
- Parameters:
message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPosition using the GEODETIC reference frame.
gen_x_and_p_func (GenXandP) – Callback to get the state estimate and covariance for the pinson-style block this processor is updating. NED position errors in meters are expected in at indices [0:3] and NED tilt errors in radians at indices [6:9].
- Returns:
StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.
Model Description and Derivation
Measurements accepted by this this processor are modeled as
\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p + \eta(0, \sigma_z)\)
where
\(P_s^g\) is the measured position vector of the \(s\) sensor frame origin in some global Cartesian frame \(g\).
\(P_p^g\) is the true position of the \(p\) platform frame origin in the \(g\) frame.
\(C^g_p\) is the true rotation from the \(p\) frame to the \(g\) frame.
\(l_{p \Rightarrow s}^p\) is the lever arm, a vector from the \(p\) frame origin to the \(s\) frame origin, expressed in the \(p\) frame.
\(\eta(0, \sigma_z)\) is 0-mean Gaussian white noise.
In other words, the measurement is modeled as perfect aside from white noise, offset from the platform frame by a known lever arm.
The measurement is used to update the estimates of the error in the nominal trajectory of the platform frame, provided through a MeasurementPositionVelocityAttitude (PVA) via the
receive_aux_data()function. This PVA provides the following values of interest:\(\hat{P_p^g}\): The estimated position of the platform frame origin in the \(g\) frame.
\(\hat{C^{ned}_p}\): The estimated orientation of the platform frame with respect to the NED frame.
We wish to use this measurement to update estimates of the following states contained in the ‘Pinson’ state block:
\(\delta P_p^{ned}\): the estimate of the error in the current nominal position of the platform frame, expressed in the North-East-Down (NED) frame. The relationship between the true position, the nominal position, and the error state estimates is \(P = \hat{P} + \delta P\)
\(\delta \psi^{ned}\): The NED frame tilt error estimates. The relationship between the true and estimated rotations is \(C^{ned}_p = C^{ned}_{\hat{ned}}C^{\hat{ned}}_p \approx [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p\) (where \(\times\) is the skew/cross operator).
The measurement vector provided to the filter \(z\) is the difference between the measurement and the nominal platform frame position (white noise terms dropped, and \(ned\) frame used as g frame):
\(z = P_s^{ned} - \hat{P^{ned}_p}\)
The measurement model \(h(x)\) is found by expanding \(z\) in terms of the states \(x\) (i.e. \(\delta P_p^{ned}\) and \(\delta \psi^{ned}\)):
\(P_s^{ned} - \hat{P^{ned}_p} = P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \hat{P}_p^{ned}\)
\(= \hat{P_p^{ned}} + \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \hat{P}_p^{ned}\)
\(= \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p\)
\(h(x) \approx \delta P_p^{ned} + [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p l_{p \Rightarrow s}^p\)
Finally, to generate the measurement Jacobian \(H\) we take derivatives of \(h(x)\):
\(\frac{\delta h(x)}{d \delta P_p^{ned}} = I\)
\(\frac{\delta h(x)}{d \delta \psi^{ned}} = C^{\hat{ned}}_p l_{p \Rightarrow s}^p \times\)
- class pntos.cobra.internal.PinsonPosVelMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)
Bases:
StandardMeasurementProcessorGenerates a model that maps PVA measurements to an inertial error state block.
NOTE: Only the position and velocity fields of the PVA measurement are used, attitude is ignored.
See
generate_model()for a detailed description of the assumptions and capabilities of this processor.- __init__(label, state_block_labels, mediator, l_ps_p)
A Pinson Position and Velocity Measurement Processor
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states, NED velocity errors as states 3:6, and NED tilt errors, in radians, as states 6:9.
mediator (Mediator) – a Mediator instance.
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generates the model mapping state estimates to the provided measurement.
- Parameters:
message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPositionVelocityAttitude using the GEODETIC reference frame.
gen_x_and_p_func (GenXandP) – Callback to get the current state estimate and covariance for the pinson-style block this processor is updating. NED position errors in meters are expected in at indices [0:3], NED velocity errors at indices [3:6], and NED tilt errors in radians at indices [6:9].
- Returns:
StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.
Model Description and Derivation
This measurement processes combined position and velocity measurements as a PVA measurement. The model for the position measurement is identical to that of PinsonPositionMeasurementProcessor, and the model for the velocity measurement is identical to that of PinsonVelocityMeasurementProcessor.
- class pntos.cobra.internal.PinsonVelocityMeasurementProcessor(label, state_block_labels, mediator)
Bases:
StandardMeasurementProcessorGenerates a model that maps a velocity measurement to an inertial error state block.
- __init__(label, state_block_labels, mediator)
A Pinson Velocity Measurement Processor
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.
mediator (Mediator) – A
pntos.api.Mediatorinstance.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generate a
pntos.api.StandardMeasurementModel.- Parameters:
- Returns:
A generated model containing the parameters required for a filter update. Will be
Nonewhen a measurement cannot be produced frommessage(for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).- Return type:
StandardMeasurementModel | None
- class pntos.cobra.internal.PinsonWithLeverArmPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)
Bases:
StandardMeasurementProcessorMaps a position measurement to an inertial error state block, a lever arm offset block, and a sensor error block.
See
generate_model()for a detailed description of the assumptions and capabilities of this processor.- __init__(label, state_block_labels, mediator, l_ps_p)
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 3-length list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states
6:9. The second state block entry should refer to a 3-element state block that models the position sensor errors in the NED frame. The third state block entry should refer to a 3-element state block that models the additional platform to position sensor lever arm in the body frame, in meters. The complete lever arm estimate is composed of the sum of these last 3 states andl_ps_p.mediator (Mediator) – a Mediator instance
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.
- receive_aux_data(aux)
Receive aux data.
This function is used to provide additional data required by
generate_model()beyond what is provided by the measurement and state estimate and covariance. This processor requires aux data that represents the nominal position, velocity and attitude of the platform at the time of measuremet update. The inertial error block referenced ingenerate_model()represents the estimated errors in this nominal PVA.- Parameters:
aux (list[Message | None]) – Aux data to process. Expected to be length 1 and contain a
MeasurementPositionVelocityAttitudewith a valid quaternion.
- generate_model(message, gen_x_and_p_func)
Generates a StandardMeasurementModel relating a position measurement to the states described in
__init__().- Parameters:
message (Message) – Measurement used to update. The
wrapped_messagemember must be of typeaspn23.MeasurementPosition.gen_x_and_p_func (GenXandP) – Callback to get the estimate and covariance of the states referenced by this model. The layout of the states should follow the description in
__init__().
- Returns:
Model relating measurement to states. Will return
Noneif a)state_block_labelsis incorrectly configured b)messageis an unsupported type c) aMeasurementPositionVelocityAttitudewith the estimated PVA at the current measurement time has not been received viareceive_aux_data().
Model Description and Derivation
This processor uses a model very similar to that used described in
pntos.cobra.internal.PinsonWithNedFogmPositionMeasurementProcessor.generate_model(). The primary difference is that instead of assuming the lever arm between the platform frame and the sensor frame is perfectly known, the model incorporates 3 additional states to account for the possible deviation between the assumed lever arm provided to__init__()and the actual lever arm.The model for the measurement is the same as that of
pntos.cobra.internal.PinsonWithNedFogmPositionMeasurementProcessor:\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p - \delta z^{g} + \eta(0, \sigma_z)\)
but now we can rewrite the true lever arm \(l_{p \Rightarrow s}^p\) in terms of the estimated lever arm \(\hat{l}_{p \Rightarrow s}^p\) plus deviation \(\delta l_{p \Rightarrow s}^p\):
\(P_s^g = P_p^g + C^g_p(\hat{l}_{p \Rightarrow s}^p + \delta l_{p \Rightarrow s}^p) - \delta z^{g} + \eta(0, \sigma_z)\).
Following the procedure outlined in the documentation for the other position processors we form the filter update vector by differencing the measurement and the estimated platform position:
\(z = P_s^{ned} - \hat{P^{ned}_p}\)
\(= P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \delta z_{ned} - \hat{P}_p^{ned}\)
\(= \hat{P_p^{ned}} + \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \delta z^{ned} - \hat{P}_p^{ned}\)
\(= \delta P_p^{ned} + C^{ned}_p l_{p \Rightarrow s}^p - \delta z^{ned}\)
The measurement prediction function is found by expanding this in terms of state estimates and nominal values:
\(h(x) \approx \delta P_p^{ned} + [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p (\hat{l}_{p \Rightarrow s}^p + \delta l_{p \Rightarrow s}^p) - \delta z^{ned}\)
Taking the derivative with respect to the states we get the Jacobian elements:
\(\frac{\delta h(x)}{d \delta P_p^{ned}} = I\)
\(\frac{\delta h(x)}{d \delta \psi^{ned}} = C^{\hat{ned}}_p(\hat{l}_{p \Rightarrow s}^p + \delta l_{p \Rightarrow s}^p)\times\)
\(\frac{\delta h(x)}{d \delta l_{p \Rightarrow s}^p} = [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p\)
\(\frac{\delta h(x)}{d \delta z^{ned}} = -I\)
- class pntos.cobra.internal.PinsonWithNedFogmPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)
Bases:
StandardMeasurementProcessorGenerates a model that maps a position measurement to an inertial error state block and a position measurement error block.
See
generate_model()for a detailed description of the assumptions and capabilities of this processor.- __init__(label, state_block_labels, mediator, l_ps_p)
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 2-length list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9. The second state block entry should refer to a 3-element FOGM state block that models the position sensor errors in the NED frame.
mediator (Mediator) – a Mediator instance
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generates the model mapping state estimates to the provided measurement.
- Parameters:
message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPosition using the GEODETIC reference frame.
gen_x_and_p_func (GenXandP) – Callback to get joint state estimate and covariance for both the pinson-style block and sensor measurement error blocks this processor is updating. NED platform position errors in meters are expected in at indices [0:3] and NED tilt errors in radians at indices [6:9]. Sensor measurement error states in the NED frame are expected to be the last 3 states.
- Returns:
StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.
Model Description and Derivation
This processor uses a model identical to that used described in
pntos.cobra.internal.PinsonPositionMeasurementProcessor.generate_model(), aside from the addition of error states to track position error present in the sensor measurements. See the linked function for definitions of terms not repeated here.The addition of the new states changes the original sensor measurement model from:
\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p + \eta(0, \sigma_z)\)
to
\(P_s^g = P_p^g + C^g_p l_{p \Rightarrow s}^p - \delta z^{g} + \eta(0, \sigma_z)\)
where \(\delta z^{g}\) is measurement error in the \(g\) frame.
This results in additional term being added to the prior model \(h(x)\), giving:
\(h(x) \approx \delta P_p^{ned} + [I - \delta \psi^{ned} \times ]C^{\hat{ned}}_p l_{p \Rightarrow s}^p - \delta z^{ned}\)
and an additional entry in the Jacobian matrix \(H\):
\(\frac{\delta h(x)}{d \delta z^{ned}} = -I\)
- class pntos.cobra.internal.PositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)
Bases:
StandardMeasurementProcessorGenerates a model that maps a position measurement to an inertial direct state block.
See
generate_model()for a detailed description of the assumptions and capabilities of this processor.- __init__(label, state_block_labels, mediator, l_ps_p)
A Geodetic 3D Position Measurement Processor.
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 2-element list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.
mediator (Mediator) – a Mediator instance.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generates the model mapping state estimates to the provided measurement.
- Parameters:
message (Message) – Measurement to process. message.wrapped_message must be a MeasurementPosition using the GEODETIC reference frame.
gen_x_and_p_func – Callback to get the current joint state estimate and covariance for both the PinsonErrorToStandard virtual state block and sensor measurement error blocks this processor is updating. LLA position in rad, rad, meters respectively are expected at indices [0:3] and RPY tilt in radians are expected at indices [6:9]. Sensor measurement error states in units matching indices [0:3] are expected to be the last 3 states.
- Returns:
StandardMeasurementModel if all restrictions on message and gen_x_and_p_func are met and proper aux data is available, None otherwise.
- class pntos.cobra.internal.StandardStateModelProvider(mediator)
Bases:
StandardStateModelProviderStandardStateModelProvider that offers a 15-state pinson state block, variable-size Fogm Block and various position measurement processors.
- __init__(mediator)
Standard Position and INS State Model Provider
- Parameters:
mediator (Mediator) – A :class:(Mediator) instance.
- new_processor(processor_index, engine, label, state_block_labels, config_group)
Generate a new StandardMeasurementProcessor that describes the relationship between a measurement and a set of states.
- Parameters:
processor_index (int) –
Index into self.processor_identifiers used to select the desired type of measurement processor.
Index 0 corresponds to a
PinsonPositionMeasurementProcessor.Index 1 corresponds to a
PinsonVelocityMeasurementProcessor.Index 2 corresponds to a
PinsonWithNedFogmPositionMeasurementProcessor.Index 3 corresponds to a
AltitudeMeasurementProcessor.Index 4 corresponds to a
PinsonWithLeverArmPositionMeasurementProcessor.Index 5 corresponds to a
PinsonBodyVelocityMeasurementProcessor.Index 6 corresponds to a
PinsonPosVelMeasurementProcessor.Index 7 corresponds to a
PositionMeasurementProcessor.Index 8 corresponds to a
Direction3DToPointsMeasurementProcessor.All other indices will result in a return value of None.
engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new processor, such that the processor may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to
Nonewhen no engine is available for the processor to use.label (str) – A string which will be used to populate the
labelfield of the newly created processor. Thislabelwill be the unique name for the returned instance of a processor, and used to track the processor throughout its lifecycle. Note that it differs frompntos.api.StandardStateModelProvider.processor_identifierswhich is the model’s mechanism for selecting the type of processor to create.state_block_labels (list[str]) – A list of strings which will be used to populate the
state_block_labelsfield of the newly created processor.config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new processor. If the processor requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created StandardMeasurementProcessor or
Nonewhen no processor can be produced with the givenprocessor_index,engine, andconfig_group.- Return type:
StandardMeasurementProcessor | None
- new_block(block_index, engine, label, config_group)
Generate a new StandardStateBlock that describes a set of states and how they propagate over time.
- Parameters:
block_index (int) –
Index into self.block_identifiers used to select the desired type of state block.
Index 0 corresponds to a
Pinson15NedBlock.Index 1 corresponds to a
FogmBlock.Index 2 corresponds to a
ClockBiasStateBlock.Index 3 corresponds to a
ConstantStateBlock.All other indices will result in a return value of None.
engine (StandardFusionEngine | None) – An optional parameter that may be provided to the new block, such that the block may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to
Nonewhen no engine is available for the block to use.label (str) – A string which will be used to populate the
labelfield of the newly created state block. Thislabelwill be the unique name for the returned instance of a state block, and used to track the state block throughout its lifecycle. Note that it differs frompntos.api.StandardStateModelProvider.block_identifierswhich is the model’s mechanism for selecting the kind of state block to create.config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new state block. If the state block requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created StandardStateBlock or
Nonewhen no state block can be produced with the givenblock_index,engine, andconfig_group.- Return type:
StandardStateBlock | None
- new_virtual_block(virtual_block_index, source_label, target_label, config_group)
Generate a new VirtualStateBlock that converts a set of states to another representation.
- Parameters:
virtual_block_index (int) –
Index into self.virtual_block_identifiers used to select the desired type of state block.
Index 0 corresponds to a
PinsonErrorToStandardVSB.Index 1 corresponds to a
StateExtractorVSB.All other indices will result in a return value of None.
source_label (str) – A string which will be used to populate the
sourcefield of the newly created virtual state block. Thissource_labelshould correspond to either a different ‘real’ or virtual state block.target_label (str) – A string which will be used to populate the
targetfield of the newly create virtual state block. Thistargetshould be unique, differing from all other targets on the other instances ofpntos.api.VirtualStateBlock.config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new virtual state block. If the state block requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created VirtualStateBlock or
Nonewhen no virtual state block can be produced with the givenblock_index,engine, andconfig_group.- Return type:
VirtualStateBlock | None
- class pntos.cobra.internal.PinsonErrorToStandard(mediator, source, target)
Bases:
VirtualStateBlockA VirtualStateBlock that maps Pinson-style error states into a ‘Standard’ (or direct) whole-valued representation by combining the error states with the uncorrected reference PVA.
- Output Transformation States in Order:
0 - Latitude (rad)
1 - Longitude (rad)
2 - Altitude HAE (m)
3 - North Velocity (m/s)
4 - East Velocity (m/s)
5 - Down Velocity (m/s)
6 - Roll (rad)
7 - Pitch (rad)
8 - Yaw (rad)
Additional trailing states are retained and unmodified. See Pinson15NedBlock.py for a description of input states.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_virtual_state_block_aux_data()is called with a label corresponding to thispntos.api.VirtualStateBlock‘starget.- Parameters:
aux (list[Message | None])
- convert(estimate_with_covariance, time)
Convert a full estimate/covariance pair.
- Parameters:
estimate_with_covariance (EstimateWithCovariance) – Estimate and covariance to convert.
time (TypeTimestamp) – Time that
estimate_with_covarianceis valid at.
- Returns:
The converted value.
- Return type:
- convert_estimate(estimate, time)
Convert just an estimate vector.
- Parameters:
estimate (NDArray[float64]) – Estimate vector to convert, Nx1.
time (TypeTimestamp) – Time that
estimateis valid at.
- Returns:
The converted vector, Mx1.
- Return type:
NDArray[float64]
- jacobian(estimate, time)
Obtain the Jacobian of the transform performed by this instance.
The Jacobian is calculated at an instance in time, given an estimate to differentiate with respect to.
- Parameters:
estimate (NDArray[float64]) – Estimate vector associated with the return value of
source, Nx1.time (TypeTimestamp) – Time that
estimateis valid at.
- Returns:
An MxN matrix that may be used to pre-multiply
estimateto obtain an M length vector intargetrepresentation (to first order).- Return type:
NDArray[float64]
- class pntos.cobra.internal.StateExtractor(mediator, source, target, incoming_state_size, indices)
Bases:
VirtualStateBlockA VirtualStateBlock that extracts and returns a series of states from an input estimate and covariance.
- __init__(mediator, source, target, incoming_state_size, indices)
- Parameters:
mediator (Mediator) – A class:pntos.api.Mediator instance
source (str) – The label associated with the representation this instance can transform from
target (str) – The label associated with the representation this instance can transform to
incoming_state_size (int) – The number of states in the StateBlock source refers to
indices (list[int]) – The series of indices that correspond to states to extract from source and comprise target
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_virtual_state_block_aux_data()is called with a label corresponding to thispntos.api.VirtualStateBlock‘starget.- Parameters:
aux (list[Message | None])
- convert(estimate_with_covariance, time)
Convert a full estimate/covariance pair.
- Parameters:
estimate_with_covariance (EstimateWithCovariance) – Estimate and covariance to convert.
time (TypeTimestamp) – Time that
estimate_with_covarianceis valid at.
- Returns:
The converted value.
- Return type:
- convert_estimate(estimate, time)
Convert just an estimate vector.
- Parameters:
estimate (NDArray[float64]) – Estimate vector to convert, Nx1.
time (TypeTimestamp) – Time that
estimateis valid at.
- Returns:
The converted vector, Mx1.
- Return type:
NDArray[float64]
- jacobian(estimate, time)
Obtain the Jacobian of the transform performed by this instance.
The Jacobian is calculated at an instance in time, given an estimate to differentiate with respect to.
- Parameters:
estimate (NDArray[float64]) – Estimate vector associated with the return value of
source, Nx1.time (TypeTimestamp) – Time that
estimateis valid at.
- Returns:
An MxN matrix that may be used to pre-multiply
estimateto obtain an M length vector intargetrepresentation (to first order).- Return type:
NDArray[float64]
- class pntos.cobra.internal.StaticAlign(config_group, mediator)
Bases:
InertialInitializationStrategyStatic alignment for an inertial.
This initialization strategy can be used to produce an initial PVA to initialize an inertial mechanization. It requires position and IMU data, performing gyrocompassing to estimate an initial attitude from the IMU data. It does not estimate initial inertial errors.
- __init__(config_group, mediator)
- Parameters:
config_group (str) – A
pntos.cobra.config.StaticAlignmentConfigconfig group.mediator (Mediator) – A
pntos.api.Mediatorinstance.
- request_motion_needed()
Check the type of motion (if any) needed.
- Returns:
InitializationMotionNeeded
- request_current_status()
Check the current initialization status.
- Returns:
InitializationStatus
- process_pntos_message(message)
Incorporate a new message into the initialization algorithm.
- Parameters:
message (Message)
- request_solution()
Get the current initial solution.
- Returns:
InitialInertialSolution
- class pntos.cobra.internal.TutorialFogmBlock(label, mediator, sigmas, taus)
Bases:
StandardStateBlockA StateBlock that represents ‘n’ first-order Gauss-Markov processes.
- __init__(label, mediator, sigmas, taus)
Constructor
- Parameters:
label (str) – Label for this block.
mediator (Mediator) – A
pntos.api.Mediatorinstance.sigmas (NDArray[float64]) – Nx1 array of FOGM noise sigmas; units will vary.
taus (NDArray[float64]) – Nx1 array of FOGM time constants, in seconds. Must be positive.
- receive_aux_data(aux)
Receive aux data. Unused for this class.
- Parameters:
aux (list[Message | None]) – List of messages.
- generate_dynamics(gen_x_and_p_func, time_from, time_to)
Generate a
pntos.api.StandardDynamicsModel.The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.
- Parameters:
gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.
time_from (TypeTimestamp) – The time to propagate from.
time_to (TypeTimestamp) – The time to propagate to.
- Returns:
The description of how to propagate this state block over the given time interval, or
Noneiftime_fromis later thantime_to. Otherwise guaranteed to not returnNone.- Return type:
StandardDynamicsModel | None
- class pntos.cobra.internal.TutorialPinson15NedBlock(label, mediator, imu_model)
Bases:
StandardStateBlockA 15-state representation of the error model of an inertial navigation system in NED frame.
This block is based upon the model provided in the Titterton and Weston 2nd edition textbook (pg. 345). The 15-state model is created by combining the original 9x9 state block with the 6x6 G*u block that relates the gyro biases and accelerometer biases to the tilt and velocity error states, respectively. Additional changes include the conversion to North, East and Down position errors in meters as opposed to the latitude (radians), longitude (radians) and altitude (meters) error states in the book model. Note that the error states are additive, meaning in general you add the error state to the uncorrected value to get the corrected value. Tilt states require special handling; see below.
Tilt errors: The North, East and Down tilt errors are 3 small angle corrections that when represented in skew-symmetric form and subtracted from an identity matrix may be interpreted as a DCM that rotates a vector from an estimated navigation frame to the ‘true’ navigation frame, to the extent that the error states are correct. A positive tilt error results in a negative right-handed rotation about the axis to which it is attached. For example, if the sensor frame is aligned with the local vertical with 90 degree heading (sensor x axis is aligned with East axis), and the down tilt value is 1 degree, the corrected heading will be approximately 89 degrees.
The AspnBaseVector provided to this class should come from the inertial for which it is providing error estimates. Accel and gyro bias states are generally in the inertial sensor frame, but more precisely, they are in the frame that is related to the navigation frame by the NavSolution::rot_mat provided in AspnBaseVector. This means that if the inertial is mechanizing in the inertial sensor frame, then NavSolution::rot_mat should contain C_nav_to_sensor, or the nav-to-sensor DCM, and the biases will be in the sensor frame. If the inertial is mechanizing in the platform frame (which is very common), then NavSolution::rot_mat should be C_nav_to_platform, the nav-to-platform DCM. The biases will be with respect to that frame.
Note that these definitions only hold when using the ‘additive error state’ formulation (true = estimated + error), the current assumption in all off-the-shelf measurement processors. The opposite formulation will flip the sign on estimated values.
- Order and description of states:
0 - North position error (m). 1 - East position error (m). 2 - Down position error (m). 3 - North velocity error (m/s). 4 - East velocity error (m/s). 5 - Down velocity error (m/s). 6 - North tilt error (rad). 7 - East tilt error (rad). 8 - Down tilt error (rad). 9 - Accel x-axis bias error (m/s^2) (See note on frame above). 10 - Accel y-axis bias error (m/s^2). 11 - Accel z-axis bias error (m/s^2). 12 - Gyro x-axis bias error (rad/s). 13 - Gyro y-axis bias error (rad/s). 14 - Gyro z-axis bias error (rad/s).
- __init__(label, mediator, imu_model)
A Pinson15 NED Standard State Block
- Parameters:
label (str) – An identifier for this state block object.
mediator (Mediator) – A
pntos.api.Mediatorinstance.imu_model (ImuConfig) – The
pntos.cobra.config.ImuConfigto use in FOGM bias estimation.
- receive_aux_data(aux)
Receive inertial PVA and forces as aux data.
- Parameters:
aux (list[Message | None]) – List of messages. Assumed to contain inertial solution in a PVA message and forces in an IMU message.
- generate_dynamics(gen_x_and_p_func, time_from, time_to)
Generate a
pntos.api.StandardDynamicsModel.The generated model contains a complete description of how to propagate this state block forward in time. For simple models, this can simply return a set of static matrices that are pre-defined.
- Parameters:
gen_x_and_p_func (GenXandP) – A callback function that the state block can use to get the current estimate and covariance.
time_from (TypeTimestamp) – The time to propagate from.
time_to (TypeTimestamp) – The time to propagate to.
- Returns:
The description of how to propagate this state block over the given time interval, or
Noneiftime_fromis later thantime_to. Otherwise guaranteed to not returnNone.- Return type:
StandardDynamicsModel | None
- generate_f_pinson15()
Generates the continuous time propagation matrix F.
F is the Jacobian of the differential equations governing inertial error growth. This is based upon the model given in Titterton and Weston, 2nd edition.
- Returns:
The F Matrix.
- Return type:
NDArray[float64]
- generate_q_pinson15()
Generates the continuous time process noise covariance matrix Q.
- Returns:
The Q Matrix.
- Return type:
NDArray[float64]
- class pntos.cobra.internal.TutorialPinsonVelocityMeasurementProcessor(label, state_block_labels, mediator)
Bases:
StandardMeasurementProcessorGenerates a model that maps a velocity measurement to an inertial error state block.
- __init__(label, state_block_labels, mediator)
A Pinson Velocity Measurement Processor
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 1-element list of labels of state blocks this processor can update. The single entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9.
mediator (Mediator) – A
pntos.api.Mediatorinstance.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generate a
pntos.api.StandardMeasurementModel.- Parameters:
- Returns:
A generated model containing the parameters required for a filter update. Will be
Nonewhen a measurement cannot be produced frommessage(for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).- Return type:
StandardMeasurementModel | None
- class pntos.cobra.internal.TutorialPinsonWithNedFogmPositionMeasurementProcessor(label, state_block_labels, mediator, l_ps_p)
Bases:
StandardMeasurementProcessorGenerates a model that maps a position measurement to an inertial error state block and a position measurement error block.
- __init__(label, state_block_labels, mediator, l_ps_p)
- Parameters:
label (str) – Name of processor.
state_block_labels (list[str]) – A 2-length list of labels of state blocks this processor can update. The first entry should refer to a Pinson-style state block of at least size 9, with NED position errors in meters as the first three states and NED tilt errors, in radians, as states 6:9. The second state block entry should refer to a 3-element FOGM state block that models the position sensor errors in the NED frame.
mediator (Mediator) – a Mediator instance
l_ps_p (NDArray[float64]) – A 3-element array representing the lever arm from the platform frame origin to the position sensor origin, in the platform frame, in units of meters.
- receive_aux_data(aux)
Receive and use an arbitrary collection of aux data.
This method will be called by the fusion engine when its
pntos.api.StandardFusionEngine.give_measurement_processor_aux_data()is called with a label corresponding to this measurement processor’slabel.- Parameters:
aux (list[Message | None])
- generate_model(message, gen_x_and_p_func)
Generate a
pntos.api.StandardMeasurementModel.- Parameters:
- Returns:
A generated model containing the parameters required for a filter update. Will be
Nonewhen a measurement cannot be produced frommessage(for example, this could happen if the measurement type is unsupported by the measurement processor or if it is rejected due to residual monitoring).- Return type:
StandardMeasurementModel | None
- class pntos.cobra.internal.TutorialPosInsStateModelProvider(mediator)
Bases:
StandardStateModelProviderA tutorial implementation of StandardStateModelProvider that offers a 15-state pinson state block, variable-size FOGM block, a position measurement processor and a velocity measurement processor.
- __init__(mediator)
Tutorial Position and INS State Model Provider
- Parameters:
mediator (Mediator) – A :class:(Mediator) instance.
- virtual_block_identifiers
List of identifiers for virtual state blocks.
- new_processor(processor_index, engine, label, state_block_labels, config_group)
Generate a new StandardMeasurementProcessor that describes the relationship between a measurement and a set of states.
- Parameters:
processor_index (int) – Index into self.processor_identifiers used to select the desired type of measurement processor. - Index 0 corresponds to a
pntos.cobra.internal.TutorialPinsonVelocityMeasurementProcessor. - Index 1 corresponds to apntos.cobra.internal.TutorialPinsonWithNedFogmPositionMeasurementProcessor. - All other indices will result in a return value of None.engine (pntos.api.plugins.fusion.StandardFusionEngine | None) – An optional parameter that may be provided to the new processor, such that the processor may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to
Nonewhen no engine is available for the processor to use.label (str) – A string which will be used to populate the
labelfield of the newly created processor. Thislabelwill be the unique name for the returned instance of a processor, and used to track the processor throughout its lifecycle. Note that it differs frompntos.api.StandardStateModelProvider.processor_identifierswhich is the model’s mechanism for selecting the type of processor to create.state_block_labels (list[str]) – A list of strings which will be used to populate the
state_block_labelsfield of the newly created processor.config_group (str) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new processor. If the processor requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created StandardMeasurementProcessor or
Nonewhen no processor can be produced with the givenprocessor_index,engine, andconfig_group.- Return type:
StandardMeasurementProcessor | None
- new_block(block_index, engine, label, config_group)
Generate a new StandardStateBlock that describes a set of states and how they propagate over time.
- Parameters:
block_index (int) –
Index into self.block_identifiers used to select the desired type of state block.
Index 0 corresponds to a
pntos.cobra.internal.TutorialPinson15NedBlock.Index 1 corresponds to a
pntos.cobra.internal.TutorialFogmBlock.All other indices will result in a return value of None.
engine (pntos.api.plugins.fusion.StandardFusionEngine | None) – An optional parameter that may be provided to the new block, such that the block may interact with the fusion engine it is being used in (for example, to add/remove states). Set it to
Nonewhen no engine is available for the block to use.label (str) – A string which will be used to populate the
labelfield of the newly created state block. Thislabelwill be the unique name for the returned instance of a state block, and used to track the state block throughout its lifecycle. Note that it differs frompntos.api.StandardStateModelProvider.block_identifierswhich is the model’s mechanism for selecting the kind of state block to create.config_group (str) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new state block. If the state block requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created StandardStateBlock or
Nonewhen no state block can be produced with the givenblock_index,engine, andconfig_group.- Return type:
StandardStateBlock | None
- new_virtual_block(virtual_block_index, source_label, target_label, config_group)
Generate a newly created
pntos.api.VirtualStateBlock.This virtual state block is used to convert a set of states from one representation to another.
- Parameters:
virtual_block_index (int) – Since the
pntos.api.StandardStateModelProvidercan create different kinds of virtual state blocks, thevirtual_block_index parameteris used to select which kind of virtual state block to create a new instance of. Thepntos.api.StandardStateModelProvider.virtual_block_identifiersfield contains identifying strings for the kinds of virtual state blocks. For example, if the model can create 45 different virtual state blocks, the identifier of the last virtual state block that can be created is found invirtual_block_identifiers[44]. An instance of this virtual state block can be created by callingnew_virtual_block(self, 44, ...). Note that0 <= virtual_block_index < len(virtual_block_identifiers).source_label (str) – The label of the state block or virtual state block whose states this virtual state block transforms.
target_label (str) – A unique identifier for this virtual state block.
config_group (str | None) – Indicates which (if any) parameter group in the registry may be used to obtain additional configuration values to generate the new virtual state block. If the virtual state block requires no outside configuration,
config_groupmay beNone.
- Returns:
The newly created
pntos.api.VirtualStateBlockorNonewhen no virtual state block can be produced with the givenvirtual_block_indexandconfig_group.- Return type:
VirtualStateBlock | None
- class pntos.cobra.internal.ManualInitialization(config_group, mediator)
Bases:
InertialInitializationStrategyA simple initialization strategy that generates an initial solution from an alignment config in the registry. This implementation is designed to work with a manual alignment.
- __init__(config_group, mediator)
A Simple Initialization Strategy
- Parameters:
config_group (str) – A
pntos.cobra.config.ManualAlignmentConfigconfig group.mediator (Mediator) – A
pntos.api.Mediatorinstance.
- request_motion_needed()
Check the type of motion (if any) needed.
- Returns:
InitializationMotionNeeded
- request_current_status()
Check the current initialization status.
- Returns:
InitializationStatus
- process_pntos_message(message)
Incorporate a new message into the initialization algorithm.
- Parameters:
message (Message)
- request_solution()
Get the current initial solution.
- Returns:
InitialInertialSolution