import logging
from typing import overload
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
[docs]
def conditionNumber(M, threshold=1e-5):
"""
Computes the condition number of a matrix with a check for SVD convergence.
Args:
- M : The input matrix.
- threshold : The condition number threshold to check against.
"""
try:
cond_number = np.linalg.cond(M)
return cond_number < threshold
except np.linalg.LinAlgError as e:
logger.error(f"Linear Algebra error: {e}")
return False
[docs]
def discreteTimeIntegral(vector, time_step):
"""
Compute the discrete-time integral of a sampled vector with a given time step.
Args:
vector (array-like): 1-D input vector.
time_step (float) : Sampling frequency with which this vector was recorded.
Returns:
int_vector (numpy.ndarray): 1-D integrated vector.
"""
if not isinstance(time_step, (int, float)) or time_step <= 0:
logger.error("Input validation failed: 'time step' must be numeric, positive scalar.")
if time_step > 1:
logger.warning("Warning: time step is too high: results may be inaccurate!")
if time_step < 1e-6:
logger.warning("Warning: time step is too small: results may be inaccurate!")
vector = np.asarray(vector)
int_vector = np.zeros_like(vector)
for i in range(len(vector)):
if i == 0:
int_vector[i] = vector[i]
else:
int_vector[i] = int_vector[i - 1] + 2 * vector[i]
int_vector = (time_step / 2) * int_vector
return int_vector
@overload
def discreteTimeDerivative(ut, ut_1, dt):
"""
Compute the discrete-time derivative in a single time step.
Args:
ut (float or array-like): Current value at time t.
ut_1 (float or array-like): Previous value at time t-1.
dt (float): Time step.
Returns:
upt (float or array-like): Discrete-time derivative.
"""
if dt == 0:
logger.error("dt must be a non-zero value")
upt = (ut - ut_1) / dt
return upt
@overload
def discreteTimeDerivative(vector:np.ndarray, time_step:float, init_value=0):
"""
Compute the discrete-time derivative.
"""
if time_step ==0:
logger.error("time_step must be a non-zero value")
der_vector = np.zeros_like(vector)
der_vector[0] = init_value
for i in range(1,len(vector)):
der_vector[i] = discreteTimeDerivative(vector[i],vector[i-1],time_step)
return der_vector
[docs]
def RMSE(array1: np.ndarray, array2: np.ndarray=None,axis=0) -> np.ndarray:
"""
Compute the RMSE between 2 arrays across all samples.
Args:
array1 (Nsamples, ndof).
array2 (Nsamples, ndof).
Returns:
np.ndarray: (Nsamples, ndof)
"""
if array1.shape != array2.shape:
logger.error("Input arrays must have the same shape.")
if array2 is None:
differences = array1
else:
differences = array1 - array2
squared_diff = differences ** 2
mean_squared_diff = np.mean(squared_diff, axis)
rmse = np.sqrt(mean_squared_diff)
return rmse
[docs]
def cumulativeRMSE(array1: np.ndarray, array2: np.ndarray) -> np.ndarray:
"""
Compute the RMSE between columns of two arrays, considering
the error committed for each joint at the previous time step.
Args:
array1 (Nsamples, ndof).
array2 (Nsamples, ndof).
Returns:
np.ndarray: (Nsamples, ndof)
"""
differences = array1 - array2
squared_diff = differences ** 2
cumulative_error = np.cumsum(squared_diff, axis=0)
mean_squared_diff = cumulative_error / (np.arange(1, array1.shape[0] + 1)[:, np.newaxis])
rmse = np.sqrt(mean_squared_diff)
return rmse
[docs]
def computeCorrelation(array: np.ndarray) -> np.ndarray:
"""
Compute the correlation factor between the columns of a numpy array.
Args:
array (Nsamples, ndof).
Returns:
np.ndarray: (ndof, ndof).
"""
return np.corrcoef(array, rowvar=False)
[docs]
def computeCumulativeCorrelation(array:np.ndarray)->np.ndarray:
"""
Compute the correlation matrix.
Args:
- array : (Nsamples, ndof).
Returns:
- numpy-ndarry: (Nsamples,ndof, ndof)
"""
rows, cols = array.shape
correlation_matrix = np.zeros(rows, cols, cols)
for i in range(1, rows):
correlation_matrix[i,:,:]= np.corrcoef(array[0:i,:], rowvar=False)
return correlation_matrix
[docs]
def MAE(array: np.ndarray, window_size: int) -> np.ndarray:
"""
Implement a simple moving average estimator.
Args:
array (np.ndarray): Array of values at each timestep.
window_size (int): Number of previous values to consider.
Returns:
np.ndarray: expected values.
"""
expected_values = np.zeros_like(array)
for i in range(len(array)):
if i < window_size:
expected_values[i] = np.mean(array[:i+1])
else:
expected_values[i] = np.mean(array[i-window_size+1:i+1])
return expected_values