Skip to content

Commit

Permalink
Merge pull request #5 from stanfordnmbl/cleam_com
Browse files Browse the repository at this point in the history
cleam_com
  • Loading branch information
antoinefalisse authored Sep 6, 2023
2 parents 0606525 + 9261f95 commit 0518df5
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 173 deletions.
29 changes: 12 additions & 17 deletions max_centerofmass_vpos/function/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import os
import numpy as np

from utilsKinematics import Kinematics
from utils import download_kinematics
from utilsKinematics import kinematics
from utils import get_trial_id, download_trial


def handler(event, context):
Expand Down Expand Up @@ -53,25 +53,20 @@ def handler(event, context):
specific_trial_names = kwargs['specific_trial_names']

# Specify where to download the data.
data_folder = os.path.join("/tmp/Data", session_id)
sessionDir = os.path.join("/tmp/Data", session_id)

# %% Download data.
trial_names, modelName = download_kinematics(session_id, folder=data_folder, trialNames=specific_trial_names)
trial_id = get_trial_id(session_id,specific_trial_names[0])
trial_name = download_trial(trial_id,sessionDir,session_id=session_id)

# %% Process data.
kinematics, center_of_mass = {}, {}
center_of_mass['values'] = {}
for trial_name in trial_names:
# Create object from class kinematics.
kinematics[trial_name] = Kinematics(
data_folder, trial_name, modelName=modelName, lowpass_cutoff_frequency_for_coordinate_values=10
)
# Get center of mass values, speeds, and accelerations.
center_of_mass['values'][trial_name] = kinematics[trial_name].get_center_of_mass_values(
lowpass_cutoff_frequency=10
)

max_center_of_mass = np.round(np.max(center_of_mass['values'][trial_names[0]]['y']), 2)
# Create object from class kinematics.
kinematics_obj = kinematics(
sessionDir, trial_name, lowpass_cutoff_frequency_for_coordinate_values=10)
# Get center of mass values.
center_of_mass = kinematics_obj.get_center_of_mass_values(lowpass_cutoff_frequency=10)
# Get maximal center of mass vertical position.
max_center_of_mass = np.round(np.max(center_of_mass['y']), 2)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
Expand Down
52 changes: 48 additions & 4 deletions max_centerofmass_vpos/function/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ def get_created_at(trial):
sessionJson['trials'].sort(key=get_created_at)

return sessionJson

# Returns a list of all sessions of the user.
def get_user_sessions():
sessions = requests.get(
API_URL + "sessions/valid/",
headers = {"Authorization": "Token {}".format(API_TOKEN)}).json()

return sessions

def get_trial_json(trial_id):
trialJson = requests.get(
Expand Down Expand Up @@ -128,6 +136,17 @@ def get_model_and_metadata(session_id, session_path):

return modelName

def get_model_name_from_metadata(sessionFolder,appendText='_scaled'):
metadataPath = os.path.join(sessionFolder,'sessionMetadata.yaml')

if os.path.exists(metadataPath):
metadata = import_metadata(os.path.join(sessionFolder,'sessionMetadata.yaml'))
modelName = metadata['openSimModel'] + appendText + '.osim'
else:
raise Exception('Session metadata not found, could not identify OpenSim model.')

return modelName


def get_motion_data(trial_id, session_path):
trial = get_trial_json(trial_id)
Expand All @@ -151,8 +170,7 @@ def get_motion_data(trial_id, session_path):
download_file(ikURL, ikPath)


def get_geometries(session_path,
modelName='LaiUhlrich2022_scaled'):
def get_geometries(session_path, modelName='LaiUhlrich2022_scaled'):

geometryFolder = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry')
try:
Expand Down Expand Up @@ -238,6 +256,32 @@ def download_kinematics(session_id, folder=None, trialNames=None):

return loadedTrialNames, modelName

# Download pertinent trial data.
def download_trial(trial_id, folder, session_id=None):

trial = get_trial_json(trial_id)
if session_id is None:
session_id = trial['session_id']

os.makedirs(folder,exist_ok=True)

# download model
get_model_and_metadata(session_id, folder)

# download trc and mot
get_motion_data(trial_id,folder)

return trial['name']


# Get trial ID from name.
def get_trial_id(session_id,trial_name):
session = get_session_json(session_id)

trial_id = [t['id'] for t in session['trials'] if t['name'] == trial_name]

return trial_id[0]

# %% Storage file to numpy array.
def storage_to_numpy(storage_file, excess_header_entries=0):
"""Returns the data from a storage file in a numpy format. Skips all lines
Expand Down Expand Up @@ -352,7 +396,6 @@ def numpy_to_storage(labels, data, storage_file, datatype=None):

f.close()


def download_videos_from_server(session_id,trial_id,
isCalibration=False, isStaticPose=False,
trial_name= None, session_path = None):
Expand Down Expand Up @@ -588,4 +631,5 @@ def zipdir(path, ziph):
# Write zip as a result to last trial for now
if writeToDB:
post_file_to_trial(session_zip,dynamic_ids[-1],tag='session_zip',
device_id='all')
device_id='all')

88 changes: 62 additions & 26 deletions max_centerofmass_vpos/function/utilsKinematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,46 @@

import os
import opensim
import utils
import numpy as np
import pandas as pd
import scipy.interpolate as interpolate


from utilsProcessing import lowPassFilter
from utilsTRC import trc_2_dict


class Kinematics:
class kinematics:

def __init__(self, dataDir, trialName,
modelName='LaiUhlrich2022_scaled',
def __init__(self, sessionDir, trialName,
modelName=None,
lowpass_cutoff_frequency_for_coordinate_values=-1):

self.lowpass_cutoff_frequency_for_coordinate_values = (
lowpass_cutoff_frequency_for_coordinate_values)

# Model.
opensim.Logger.setLevelString('error')
modelPath = os.path.join(dataDir, 'OpenSimData', 'Model',

modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model')
# Load model if specified, otherwise load the one that was on server
if modelName is None:
modelName = utils.get_model_name_from_metadata(sessionDir)
modelPath = os.path.join(modelBasePath,modelName)
else:
modelPath = os.path.join(modelBasePath,
'{}.osim'.format(modelName))

# make sure model exists
if not os.path.exists(modelPath):
raise Exception('Model path: ' + modelPath + ' does not exist.')

self.model = opensim.Model(modelPath)
self.model.initSystem()

# Motion file with coordinate values.
motionPath = os.path.join(dataDir, 'OpenSimData', 'Kinematics',
motionPath = os.path.join(sessionDir, 'OpenSimData', 'Kinematics',
'{}.mot'.format(trialName))

# Create time-series table with coordinate values.
Expand All @@ -53,6 +69,9 @@ def __init__(self, dataDir, trialName,
tableProcessor.append(opensim.TabOpUseAbsoluteStateNames())
self.time = np.asarray(self.table.getIndependentColumn())

# Initialize the state trajectory. We will set it in other functions
# if it is needed.
self._stateTrajectory = None

# Filter coordinate values.
if lowpass_cutoff_frequency_for_coordinate_values > 0:
Expand Down Expand Up @@ -101,11 +120,7 @@ def __init__(self, dataDir, trialName,
if not stateVariableNameStr in existingLabels:
vec_0 = opensim.Vector([0] * self.table.getNumRows())
self.table.appendColumn(stateVariableNameStr, vec_0)

# Set state trajectory
self.stateTrajectory = opensim.StatesTrajectory.createFromStatesTable(
self.model, self.table)


# Number of muscles.
self.nMuscles = 0
self.forceSet = self.model.getForceSet()
Expand All @@ -120,15 +135,13 @@ def __init__(self, dataDir, trialName,
self.coordinates = [self.coordinateSet.get(i).getName()
for i in range(self.nCoordinates)]

# TODO: hard coded
# Translational coordinates.
columnTrLabels = [
'pelvis_tx', 'pelvis_ty', 'pelvis_tz']
# Find rotational and translational coordinates.
self.idxColumnTrLabels = [
self.columnLabels.index(i) for i in columnTrLabels]
self.columnLabels.index(i) for i in self.coordinates if \
self.coordinateSet.get(i).getMotionType() == 2]
self.idxColumnRotLabels = [
self.columnLabels.index(i) for i in self.columnLabels
if not i in columnTrLabels]
self.columnLabels.index(i) for i in self.coordinates if \
self.coordinateSet.get(i).getMotionType() == 1]

# TODO: hard coded
self.rootCoordinates = [
Expand All @@ -142,7 +155,30 @@ def __init__(self, dataDir, trialName,
'elbow_flex_r', 'pro_sup_r',
'arm_flex_l', 'arm_add_l', 'arm_rot_l',
'elbow_flex_l', 'pro_sup_l']


# Only set the state trajectory when needed because it is slow.
def stateTrajectory(self):
if self._stateTrajectory is None:
self._stateTrajectory = (
opensim.StatesTrajectory.createFromStatesTable(
self.model, self.table))
return self._stateTrajectory

def get_marker_dict(self, session_dir, trial_name,
lowpass_cutoff_frequency=-1):

trcFilePath = os.path.join(session_dir,
'MarkerData',
'{}.trc'.format(trial_name))

markerDict = trc_2_dict(trcFilePath)
if lowpass_cutoff_frequency > 0:
markerDict['markers'] = {
marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency)
for marker_name, data in markerDict['markers'].items()}

return markerDict

def get_coordinate_values(self, in_degrees=True,
lowpass_cutoff_frequency=-1):

Expand Down Expand Up @@ -224,14 +260,14 @@ def get_muscle_tendon_lengths(self, lowpass_cutoff_frequency=-1):
# Compute muscle-tendon lengths.
lMT = np.zeros((self.table.getNumRows(), self.nMuscles))
for i in range(self.table.getNumRows()):
self.model.realizePosition(self.stateTrajectory[i])
self.model.realizePosition(self.stateTrajectory()[i])
if i == 0:
muscleNames = []
for m in range(self.forceSet.getSize()):
c_force_elt = self.forceSet.get(m)
if 'Muscle' in c_force_elt.getConcreteClassName():
cObj = opensim.Muscle.safeDownCast(c_force_elt)
lMT[i,m] = cObj.getLength(self.stateTrajectory[i])
lMT[i,m] = cObj.getLength(self.stateTrajectory()[i])
if i == 0:
muscleNames.append(c_force_elt.getName())

Expand All @@ -253,7 +289,7 @@ def get_moment_arms(self, lowpass_cutoff_frequency=-1):
dM = np.zeros((self.table.getNumRows(), self.nMuscles,
self.nCoordinates))
for i in range(self.table.getNumRows()):
self.model.realizePosition(self.stateTrajectory[i])
self.model.realizePosition(self.stateTrajectory()[i])
if i == 0:
muscleNames = []
for m in range(self.forceSet.getSize()):
Expand All @@ -280,7 +316,7 @@ def get_moment_arms(self, lowpass_cutoff_frequency=-1):
coordinate = self.coordinateSet.get(
self.coordinates.index(coord))
dM[i, m, c] = cObj.computeMomentArm(
self.stateTrajectory[i], coordinate)
self.stateTrajectory()[i], coordinate)

# Clean numerical artefacts (ie, moment arms smaller than 1e-5 m).
dM[np.abs(dM) < 1e-5] = 0
Expand All @@ -307,11 +343,11 @@ def compute_center_of_mass(self):
self.com_values = np.zeros((self.table.getNumRows(),3))
self.com_speeds = np.zeros((self.table.getNumRows(),3))
for i in range(self.table.getNumRows()):
self.model.realizeVelocity(self.stateTrajectory[i])
self.model.realizeVelocity(self.stateTrajectory()[i])
self.com_values[i,:] = self.model.calcMassCenterPosition(
self.stateTrajectory[i]).to_numpy()
self.stateTrajectory()[i]).to_numpy()
self.com_speeds[i,:] = self.model.calcMassCenterVelocity(
self.stateTrajectory[i]).to_numpy()
self.stateTrajectory()[i]).to_numpy()

def get_center_of_mass_values(self, lowpass_cutoff_frequency=-1):

Expand Down Expand Up @@ -370,4 +406,4 @@ def get_center_of_mass_accelerations(self, lowpass_cutoff_frequency=-1):
columns = ['time'] + ['x','y','z']
com_accelerations = pd.DataFrame(data=data, columns=columns)

return com_accelerations
return com_accelerations
Loading

0 comments on commit 0518df5

Please sign in to comment.