Skip to content

Commit

Permalink
commit before lab meeting - problem in 'put skel on ground'
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmatthis committed Nov 2, 2023
1 parent 63f8556 commit 8086fb5
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 129 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from copy import deepcopy
from typing import Dict, Any
from typing import Dict, Any, Tuple

import numpy as np

Expand All @@ -13,7 +13,7 @@


def enforce_rigid_bones(handler: FreemocapDataHandler,
bones: Dict[str, Dict[str, Any]] = BONE_DEFINITIONS):
bones: Dict[str, Dict[str, Any]] = BONE_DEFINITIONS) -> FreemocapDataHandler:
print('Enforcing rigid bones - altering bone lengths to ensure they are the same length on each frame...')
original_trajectories = handler.trajectories
updated_trajectories = deepcopy(original_trajectories)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np

from .operations.estimate_good_frame import estimate_good_frame
from ..empties.creation.create_virtual_trajectories import calculate_virtual_trajectories
from ..freemocap_data_handler.helpers.saver import FreemocapDataSaver
from ..freemocap_data_handler.helpers.transformer import FreemocapDataTransformer
Expand Down Expand Up @@ -183,8 +184,8 @@ def number_of_face_trajectories(self):
@property
def number_of_hand_trajectories(self):
if not self.number_of_right_hand_trajectories == self.number_of_left_hand_trajectories:
logger.warning(f"Number of right hand trajectories ({self.number_of_right_hand_trajectories}) "
f"does not match number of left hand trajectories ({self.number_of_left_hand_trajectories}).")
print(f"Number of right hand trajectories ({self.number_of_right_hand_trajectories}) "
f"does not match number of left hand trajectories ({self.number_of_left_hand_trajectories}).")
return self.number_of_right_hand_trajectories + self.number_of_left_hand_trajectories

@property
Expand All @@ -199,6 +200,15 @@ def number_of_trajectories(self):
self.number_of_face_trajectories +
self.number_of_other_trajectories)

def estimate_good_clean_frame(self):
return estimate_good_frame(trajectories_with_error=self.get_trajectories(with_error=True))

def estimate_height(self):
if "bones" in self.metadata:
raise AssertionError(
"Cannot estimate height before the 'bones' metadata has been set ( in the `enforce_rigid_bones` step).")
left_leg_length = 9

def add_trajectory(self,
trajectory: np.ndarray,
trajectory_name: str,
Expand Down Expand Up @@ -262,9 +272,14 @@ def add_trajectories(self,
source=source,
group_name=group_name)

def get_trajectories(self, trajectory_names: List[str], components=None, with_error: bool = False) -> Union[
def get_trajectories(self,
trajectory_names: List[str] = None,
components=None,
with_error: bool = False) -> Union[
Dict[str, np.ndarray], Dict[str, Dict[str, np.ndarray]]]:

if trajectory_names is None:
trajectory_names = [self.body_names, self.right_hand_names, self.left_hand_names]
if not isinstance(trajectory_names, list):
trajectory_names = [trajectory_names]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
import logging
import math
from typing import Dict

import numpy as np

import sys


def estimate_good_frame(trajectories_with_error: Dict[str, Dict[str, np.ndarray]], ignore_first_n_frames: int = 30):
names = list(trajectories_with_error.keys())
def estimate_good_frame(trajectories_with_error: Dict[str, np.ndarray],
velocity_threshold: float = 0.1,
ignore_first_n_frames: int = 30):
trajectory_names = list(trajectories_with_error.keys())
all_good_frames = None
all_velocities = {}
all_errors = {}

for name in names:
velocities = np.diff(trajectories_with_error[name]['trajectory'], axis=0)
velocities = np.insert(velocities, 0, np.nan, axis=0)
velocities_mag = np.sqrt(np.sum(velocities ** 2, axis=1))
velocities = np.nan_to_num(velocities, nan=np.inf)
for trajectory_name in trajectory_names:
trajectory_velocity_frame_xyz = np.diff(trajectories_with_error[trajectory_name]['trajectory'], axis=0)
trajectory_velocity_frame_xyz = np.insert(trajectory_velocity_frame_xyz, 0, np.nan, axis=0)
trajectory_velocity_frame_magnitude = np.sqrt(np.sum(trajectory_velocity_frame_xyz ** 2, axis=1))
all_velocities[trajectory_name] = trajectory_velocity_frame_magnitude

errors = trajectories_with_error[name]['error']
trajectory_reprojection_error = trajectories_with_error[trajectory_name]['error']
all_errors[trajectory_name] = trajectory_reprojection_error

# define threshold for 'standing still'
velocity_threshold = np.nanpercentile(velocities_mag, 10)
velocity_threshold = np.nanpercentile(trajectory_velocity_frame_magnitude,
q=velocity_threshold * 100)

# Get the indices of the good frames
good_frames_indices = np.where((velocities_mag <= velocity_threshold) & (~np.isnan(errors)))[0]
velocity_above_threshold = [index for index, velocity in enumerate(trajectory_velocity_frame_magnitude) if velocity > velocity_threshold]
reprojection_error_not_nan = [index for index, error in enumerate(trajectory_reprojection_error) if not math.isnan(error)]
floating_point_error = math.ulp(1.0)*10 # 10 times the floating point error, any velocity below this is considered zero
velocity_not_zero = [index for index, velocity in enumerate(trajectory_velocity_frame_magnitude) if velocity > floating_point_error]

# To get the good_frames_indices, we need the intersection of the other three lists:
good_frames_indices = list(set(velocity_above_threshold) & set(reprojection_error_not_nan) & set(velocity_not_zero))

# If no good frames found, skip this trajectory
if good_frames_indices.size == 0:
if len(good_frames_indices)== 0:
continue

# intersect good_frames_indices with all_good_frames
Expand All @@ -38,13 +48,18 @@ def estimate_good_frame(trajectories_with_error: Dict[str, Dict[str, np.ndarray]
if not all_good_frames:
raise Exception("No good frames found! Please check your data.")

# Convert the set back to a numpy array
all_good_frames = np.array(list(all_good_frames))
# Convert the set to a list
all_good_frames = list(all_good_frames)

all_velocities_on_good_frames = np.array([all_velocities[name][all_good_frames] for name in trajectory_names])
reprojection_errors_on_good_frames = np.array([all_errors[name][all_good_frames] for name in trajectory_names])

# Get the velocities of the good frames
good_frames_velocities = velocities_mag[all_good_frames]
# normalize the values by their minimum (best) value
good_frames_velocities_normalized = all_velocities_on_good_frames / np.min(all_velocities_on_good_frames)
good_frames_errors_normalized = reprojection_errors_on_good_frames / np.min(reprojection_errors_on_good_frames)

# Find the index of the good frame with the lowest velocity
best_frame = all_good_frames[np.argmin(good_frames_velocities)]
# Find the index of the good frame with the lowest velocity and lowest error
combined_error = good_frames_velocities_normalized + good_frames_errors_normalized
best_frame = all_good_frames[np.argmin(combined_error)]

return best_frame
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@


def put_skeleton_on_ground(handler: 'FreemocapDataHandler'):
print(
f"Putting freemocap data in inertial reference frame...")
print(f"Putting freemocap data in inertial reference frame...")

ground_reference_trajectories_with_error = handler.get_trajectories(
trajectory_names=["right_heel", "left_heel", "right_foot_index", "left_foot_index"],
Expand Down Expand Up @@ -105,7 +104,7 @@ def get_body_trajectories_closest_to_the_ground(handler: 'FreemocapDataHandler')

for trajectory_name, trajectory in part_trajectories.items():
if np.isnan(trajectory).all():
logger.warning(
print(
f"Trajectory {trajectory_name} is all nan. Removing from lowest body trajectories.")
del part_trajectories[trajectory_name]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def load_videos(recording_path: str,
elif Path(recording_path / "synchronized_videos").is_dir():
videos_path = Path(recording_path / "synchronized_videos")
else:
logger.warning("Did not find an `annotated_videos` or `synchronized_videos` folder in the recording path")
print("Did not find an `annotated_videos` or `synchronized_videos` folder in the recording path")
videos_path = None

if videos_path is not None:
Expand Down
Loading

0 comments on commit 8086fb5

Please sign in to comment.