import sys
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import logging
from scipy.signal import butter, filtfilt
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from utils import computeCorrelation
[docs]
class RobotData:
"""
Initialize the RobotData object by loading and processing data from a CSV file.
Args:
- data_file_path (str): Path to the CSV data file.
- ndof (int) : number of freedom degree of the manipulator.
- time_step (float) : step time beteewn tow conscutive sample of data
- intIndex (int): Initial index for data slicing.
- stepIndex (int): Step index for periodic data selection.
- fnlIndex (int): Final index for data slicing.
"""
def __init__(self, data_file_path,ndof=7,time_step=1e-3, intIndex=None, stepIndex=1, fnlIndex=None):
try:
dataTable = pd.read_csv(data_file_path)
except Exception as e:
logger.error(f"Error loading data: {e}")
self.ndof = ndof
if intIndex is None:
intIndex = 0
if fnlIndex is None:
fnlIndex = dataTable.shape[0]
self.numRows = dataTable.shape[0]
self.numCols = dataTable.shape[1]
self.time = dataTable.iloc[intIndex:fnlIndex:stepIndex, 0].to_numpy()
self.velocity = dataTable.iloc[intIndex:fnlIndex:stepIndex, 8:15].to_numpy()
self.torque = dataTable.iloc[intIndex:fnlIndex:stepIndex, 22:29].to_numpy() # torque_sm : torque recorded from torque sensor
self.torque_cur = dataTable.iloc[intIndex:fnlIndex:stepIndex, 29:36].to_numpy() # torque : recorded using current
self.torque_rne = dataTable.iloc[intIndex:fnlIndex:stepIndex, 15:22].to_numpy() # torque : from blast
self.current = dataTable.iloc[intIndex:fnlIndex:stepIndex, 43:50].to_numpy()
self.position = dataTable.iloc[intIndex:fnlIndex:stepIndex, 1:8].to_numpy()
self.temperature = dataTable.iloc[intIndex:fnlIndex:stepIndex, 36:43].to_numpy()
self.desiredVelocity = dataTable.iloc[intIndex:fnlIndex:stepIndex, 57:64].to_numpy()
self.desiredPosition = dataTable.iloc[intIndex:fnlIndex:stepIndex, 50:57].to_numpy()
self.desiredAcceleration = dataTable.iloc[intIndex:fnlIndex:stepIndex, 64:71].to_numpy()
self.timeUnit = 'milliseconds'
self.timeStep = time_step
self.samplingRate = 1/self.timeStep
self.duration = (len(self.time) - 1) * self.timeStep
self.maxJointVelocity = np.max(self.velocity, axis=0)
self.maxJointTorque = np.max(self.torque, axis=0)
self.maxJointCurrent = np.max(self.current, axis=0)
self.maxJointTemperature = np.max(self.temperature, axis=0)
self.maxJointPosition = np.max(self.position, axis=0)
self.maxDesiredJointAcceleration = np.max(self.desiredAcceleration, axis=0)
self.minJointVelocity = np.min(self.velocity, axis=0)
self.minJointTorque = np.min(self.torque, axis=0)
self.minJointCurrent = np.min(self.current, axis=0)
self.minJointTemperature = np.min(self.temperature, axis=0)
self.minJointPosition = np.min(self.position, axis=0)
self.minDesiredJointAcceleration = np.min(self.desiredAcceleration, axis=0)
self.corrJointPosition = np.corrcoef(self.position, rowvar=False)
self.corrJointVelocity = np.corrcoef(self.velocity, rowvar=False)
self.corrJointTorque = np.corrcoef(self.torque, rowvar=False)
self.corrJointCurrent = np.corrcoef(self.current, rowvar=False)
self.corrJointAcceleration = np.corrcoef(self.desiredAcceleration, rowvar=False)
[docs]
def updateSamplingRate(self, new_sampling_rate):
self.samplingRate = new_sampling_rate
self.timeStep = 1 / self.samplingRate
self.duration = (len(self.time) - 1) * self.timeStep
[docs]
def lowPassfilter(self, cutoff_frequency):
"""Filtring robot data using a butter low pass filter"""
nyquist_freq = 0.5 * self.samplingRate
normal_cutoff = cutoff_frequency / nyquist_freq
b, a = butter(4, normal_cutoff, btype='low', analog=False)
smoothed_data = {}
smoothed_data['velocity'] = filtfilt(b, a, self.velocity, axis=0)
smoothed_data['torque'] = filtfilt(b, a, self.torque, axis=0)
smoothed_data['torque_rne'] = filtfilt(b,a,self.torque_rne,axis=0)
smoothed_data['torque_cur'] =filtfilt(b,a,self.torque_cur,axis=0)
smoothed_data['current'] = filtfilt(b, a, self.current, axis=0)
smoothed_data['position'] = filtfilt(b, a, self.position, axis=0)
smoothed_data['temperature'] = filtfilt(b, a, self.temperature, axis=0)
smoothed_data['desiredVelocity'] = filtfilt(b, a, self.desiredVelocity, axis=0)
smoothed_data['desiredPosition'] = filtfilt(b, a, self.desiredPosition, axis=0)
smoothed_data['desiredAcceleration'] = filtfilt(b, a, self.desiredAcceleration, axis=0)
return smoothed_data
[docs]
def computeTorqueNoise(self,variable='torque',method='std')->float:
"""
Estimates the Noise level in torque sensor mesurements.
The noise model is assumed to be a white gaussian noise.
"""
if method =='std':
if variable =='torque_cur':
noise_level = np.std(self.torque_cur)
elif variable == 'torque_rne':
noise_level = np.std(self.torque_rne)
else:
noise_level =np.std(self.torque)
elif method=='mad':
if variable =='torque_cur':
mad = np.median(np.abs(self.torque_cur-np.median(self.torque_cur,axis=0)),axis=0)
noise_level = np.mean(mad)*1.4826
elif variable == 'torque_rne':
mad = np.median(np.abs(self.torque_rne-np.median(self.torque_rne,axis=0)),axis=0)
noise_level = np.mean(mad)*1.4826
else:
mad = np.median(np.abs(self.torque-np.median(self.torque,axis=0)),axis=0)
noise_level = np.mean(mad)*1.4826
return noise_level
[docs]
def computePositionNoise(self,method='std')->tuple:
"""
Estimates the Noise level in torque sensor mesurements.
The noise model is assumed to be a white gaussian noise.
"""
if method =='std':
noise_level = np.std(self.position)
elif method=='mad':
mad = np.median(np.abs(self.position-np.median(self.position,axis=0)),axis=0)
noise_level = np.mean(mad)*1.4826
return noise_level
[docs]
def visualizeCorrelation(self,variable='torque'):
"""
Plot the correlation graph between joints variable data given by
variable parameter
"""
if variable =='torque_cur':
correlation_matrix = computeCorrelation(self.torque_cur)
title = 'Joints Recorded Torques-Current Correlations'
elif variable == 'torque_rne':
correlation_matrix = computeCorrelation(self.torque_rne)
title = 'Joints Blast RNEA Torques Correlations'
elif variable =='torque':
correlation_matrix = computeCorrelation(self.torque)
title = 'Joints Sensor Recorded Torques Correlations'
else:
logger.error('variable type not supported yet')
plt.figure(figsize=(12, 6))
sns.heatmap(correlation_matrix, annot=True, fmt=".3f", cmap='YlGnBu', linewidths=0.5, \
linecolor='white', cbar_kws={"shrink": .8})
plt.title(title,fontsize=9)
[docs]
def visualizeVelocity(self)->None:
"""Plot the joints velocity recorded by the sensors."""
sns.set(style="whitegrid")
fig, axes = plt.subplots(3, 3, figsize=(12, 6))
for i in range(self.ndof):
ax = axes[i // 3, i % 3]
sns.lineplot(ax=ax, x=np.arange(len(self.velocity[:, i])), y= self.velocity[:, i],linewidth=0.5)
ax.set_xlabel("Time (ms)")
ax.set_ylabel("Velocity")
ax.set_title(f'Joint {i+1}')
fig.suptitle('Joints Recorded Velocity', fontsize=11)
[docs]
def visualizePosition(self)->None:
"""Plot the joints position recorded by the sensors."""
sns.set(style="whitegrid")
fig, axes = plt.subplots(3, 3, figsize=(12, 6))
for i in range(self.ndof):
ax = axes[i // 3, i % 3]
sns.lineplot(ax = ax, x = np.arange(len(self.position[:, i])), y=self.position[:, i],linewidth=0.5)
ax.set_xlabel("Time (ms)")
ax.set_ylabel("Position")
ax.set_title(f'Joint {i+1}')
fig.suptitle('Joints Recorded Position', fontsize=10)
fig.tight_layout(rect = [0, 0, 1, 0.95])
[docs]
def visualizeTorque(self)->None:
"""Plot the joints torque"""
sns.set(style="whitegrid")
fig, axes = plt.subplots(3, 3, figsize=(12, 6))
for i in range(self.ndof):
ax = axes[i // 3, i % 3]
sns.lineplot(ax=ax, x=np.arange(len(self.torque[:, i])), y=self.torque[:, i],linewidth=0.5)
ax.set_xlabel("Time (ms)")
ax.set_ylabel("Torque (N.m)")
ax.set_title(f'Joint {i+1}')
fig.suptitle('Joints Recorded Torque', fontsize=11)
fig.tight_layout(rect=[0, 0, 1, 0.95])
[docs]
def visualizeCurrent(self)-> None:
"""Plot the joints current """
sns.set(style="whitegrid")
fig, axes = plt.subplots(3, 3, figsize=(12, 6))
for i in range(self.ndof):
ax = axes[i // 3, i % 3]
sns.lineplot(ax=ax, x=np.arange(len(self.current[:, i])), y=self.current[:, i],linewidth=0.5)
ax.set_xlabel("Time (ms)")
ax.set_ylabel("Current (mA)")
ax.set_title(f'Joint {i+1}')
fig.suptitle('Joints Recorded Current', fontsize=11)
fig.tight_layout(rect=[0, 0, 1, 0.95])
[docs]
def visualizeAcceleration(self)->None:
"""Plot the joints desired accleration"""
sns.set(style="whitegrid")
fig, axes = plt.subplots(3, 3, figsize=(12, 6))
for i in range(self.ndof):
ax = axes[i // 3, i % 3]
sns.lineplot(ax=ax, x=np.arange(len(self.desiredAcceleration[:, i])),\
y=self.desiredAcceleration[:, i],linewidth=0.5)
ax.set_xlabel("Time (ms)")
ax.set_ylabel("Acceleration")
ax.set_title(f'Joint {i+1}')
fig.suptitle('Joints desired acceleration', fontsize=11)
fig.tight_layout(rect=[0, 0, 1, 0.95])