From 2f0faa3321746704282aecc2faadc2379a45da63 Mon Sep 17 00:00:00 2001 From: CBroz1 Date: Fri, 6 Oct 2023 13:06:24 -0500 Subject: [PATCH] Refactor position helpers --- src/spyglass/common/common_position.py | 381 +++++++++++------- .../position/v1/position_trodes_position.py | 352 ++-------------- 2 files changed, 257 insertions(+), 476 deletions(-) diff --git a/src/spyglass/common/common_position.py b/src/spyglass/common/common_position.py index d7db69985..0d4433b84 100644 --- a/src/spyglass/common/common_position.py +++ b/src/spyglass/common/common_position.py @@ -26,7 +26,7 @@ from ..settings import raw_dir from ..utils.dj_helper_fn import fetch_nwb from .common_behav import RawPosition, VideoFile -from .common_interval import IntervalList +from .common_interval import IntervalList # noqa F401 from .common_nwbfile import AnalysisNwbfile schema = dj.schema("common_position") @@ -34,21 +34,23 @@ @schema class PositionInfoParameters(dj.Lookup): - """Parameters for extracting the smoothed head position, oriention and - velocity.""" + """ + Parameters for extracting the smoothed position, orientation and velocity. + """ definition = """ position_info_param_name : varchar(80) # name for this set of parameters --- max_separation = 9.0 : float # max distance (in cm) between head LEDs max_speed = 300.0 : float # max speed (in cm / s) of animal - position_smoothing_duration = 0.125 : float # size of moving window (in seconds) - speed_smoothing_std_dev = 0.100 : float # smoothing standard deviation (in seconds) - head_orient_smoothing_std_dev = 0.001 : float # smoothing std deviation (in seconds) - led1_is_front = 1 : int # first LED is front LED and second is back LED, else first LED is back + position_smoothing_duration = 0.125 : float # size of moving window (s) + speed_smoothing_std_dev = 0.100 : float # smoothing standard deviation (s) + head_orient_smoothing_std_dev = 0.001 : float # smoothing std deviation (s) + led1_is_front = 1 : int # 1 if 1st LED is front LED, else 1st LED is back is_upsampled = 0 : int # upsample the position to higher sampling rate upsampling_sampling_rate = NULL : float # The rate to be upsampled to - upsampling_interpolation_method = linear : varchar(80) # see pandas.DataFrame.interpolation for list of methods + upsampling_interpolation_method = linear : varchar(80) # see + # pandas.DataFrame.interpolation for list of methods """ @@ -81,122 +83,179 @@ class IntervalPositionInfo(dj.Computed): def make(self, key): print(f"Computing position for: {key}") - key["analysis_file_name"] = AnalysisNwbfile().create( - key["nwb_file_name"] - ) - raw_position = ( - RawPosition() - & { - "nwb_file_name": key["nwb_file_name"], - "interval_list_name": key["interval_list_name"], - } - ).fetch_nwb()[0] - position_info_parameters = ( - PositionInfoParameters() - & {"position_info_param_name": key["position_info_param_name"]} - ).fetch1() - head_position = pynwb.behavior.Position() - head_orientation = pynwb.behavior.CompassDirection() - head_velocity = pynwb.behavior.BehavioralTimeSeries() + analysis_file_name = AnalysisNwbfile().create(key["nwb_file_name"]) - METERS_PER_CM = 0.01 + raw_position = RawPosition.PosObject & key + spatial_series = raw_position.fetch_nwb()[0]["raw_position"] + spatial_df = raw_position.fetch1_dataframe() - try: - # calculate the processed position - spatial_series = raw_position["raw_position"] - position_info = self.calculate_position_info_from_spatial_series( - spatial_series, - position_info_parameters["max_separation"], - position_info_parameters["max_speed"], - position_info_parameters["speed_smoothing_std_dev"], - position_info_parameters["position_smoothing_duration"], - position_info_parameters["head_orient_smoothing_std_dev"], - position_info_parameters["led1_is_front"], - position_info_parameters["is_upsampled"], - position_info_parameters["upsampling_sampling_rate"], - position_info_parameters["upsampling_interpolation_method"], - ) + position_info_parameters = (PositionInfoParameters() & key).fetch1() - # create nwb objects for insertion into analysis nwb file - head_position.create_spatial_series( - name="head_position", - timestamps=position_info["time"], - conversion=METERS_PER_CM, - data=position_info["head_position"], - reference_frame=spatial_series.reference_frame, - comments=spatial_series.comments, - description="head_x_position, head_y_position", - ) - - head_orientation.create_spatial_series( - name="head_orientation", - timestamps=position_info["time"], - conversion=1.0, - data=position_info["head_orientation"], - reference_frame=spatial_series.reference_frame, - comments=spatial_series.comments, - description="head_orientation", - ) + position_info = self.calculate_position_info( + spatial_df=spatial_df, + meters_to_pixels=spatial_series.conversion, + **position_info_parameters, + ) - head_velocity.create_timeseries( - name="head_velocity", - timestamps=position_info["time"], - conversion=METERS_PER_CM, - unit="m/s", - data=np.concatenate( - ( - position_info["velocity"], - position_info["speed"][:, np.newaxis], - ), - axis=1, + key.update( + dict( + analysis_file_name=analysis_file_name, + **self.generate_pos_components( + spatial_series=spatial_series, + position_info=position_info, + analysis_fname=analysis_file_name, ), - comments=spatial_series.comments, - description="head_x_velocity, head_y_velocity, head_speed", ) - except ValueError as e: - print(e) + ) - # Insert into analysis nwb file - nwb_analysis_file = AnalysisNwbfile() + AnalysisNwbfile().add(key["nwb_file_name"], analysis_file_name) + + self.insert1(key) + + @staticmethod + def generate_pos_components( + spatial_series, + position_info, + analysis_fname, + prefix="head_", + add_frame_ind=False, + video_frame_ind=None, + ): + """Generate position, orientation and velocity components.""" + METERS_PER_CM = 0.01 - key["head_position_object_id"] = nwb_analysis_file.add_nwb_object( - key["analysis_file_name"], head_position + position = pynwb.behavior.Position() + orientation = pynwb.behavior.CompassDirection() + velocity = pynwb.behavior.BehavioralTimeSeries() + + # NOTE: CBroz1 removed a try/except ValueError that surrounded all + # .create_X_series methods. dpeg22 could not recall purpose + + time_comments = dict( + comments=spatial_series.comments, + timestamps=position_info["time"], ) - key["head_orientation_object_id"] = nwb_analysis_file.add_nwb_object( - key["analysis_file_name"], head_orientation + time_comments_ref = dict( + **time_comments, + reference_frame=spatial_series.reference_frame, ) - key["head_velocity_object_id"] = nwb_analysis_file.add_nwb_object( - key["analysis_file_name"], head_velocity + + # create nwb objects for insertion into analysis nwb file + position.create_spatial_series( + name=f"{prefix}position", + conversion=METERS_PER_CM, + data=position_info[f"{prefix}position"], + description=f"{prefix}x_position, {prefix}y_position", + **time_comments_ref, ) - AnalysisNwbfile().add(key["nwb_file_name"], key["analysis_file_name"]) + orientation.create_spatial_series( + name=f"{prefix}orientation", + conversion=1.0, + data=position_info[f"{prefix}orientation"], + description=f"{prefix}orientation", + **time_comments_ref, + ) - self.insert1(key) + velocity.create_timeseries( + name=f"{prefix}velocity", + conversion=METERS_PER_CM, + unit="m/s", + data=np.concatenate( + ( + position_info["velocity"], + position_info["speed"][:, np.newaxis], + ), + axis=1, + ), + description=f"{prefix}x_velocity, {prefix}y_velocity, " + + f"{prefix}speed", + **time_comments, + ) + + if add_frame_ind: + if video_frame_ind: + velocity.create_timeseries( + name="video_frame_ind", + unit="index", + data=video_frame_ind.to_numpy(), + description="video_frame_ind", + **time_comments, + ) + else: + print( + "No video frame index found. Assuming all camera frames " + + "are present." + ) + velocity.create_timeseries( + name="video_frame_ind", + unit="index", + data=np.arange(len(position_info["time"])), + description="video_frame_ind", + **time_comments, + ) + + # Insert into analysis nwb file + nwba = AnalysisNwbfile() + + return { + f"{prefix}position_object_id": nwba.add_nwb_object( + analysis_fname, position + ), + f"{prefix}orientation_object_id": nwba.add_nwb_object( + analysis_fname, orientation + ), + f"{prefix}velocity_object_id": nwba.add_nwb_object( + analysis_fname, velocity + ), + } @staticmethod - def calculate_position_info_from_spatial_series( - spatial_series, - max_LED_separation, - max_plausible_speed, - speed_smoothing_std_dev, + def calculate_position_info( + spatial_df: pd.DataFrame, + meters_to_pixels: float, position_smoothing_duration, - head_orient_smoothing_std_dev, + orient_smoothing_std_dev, led1_is_front, is_upsampled, upsampling_sampling_rate, upsampling_interpolation_method, + speed_smoothing_std_dev=None, + max_LED_separation=None, + max_plausible_speed=None, + **kwargs, ): CM_TO_METERS = 100 + if not speed_smoothing_std_dev: + speed_smoothing_std_dev = kwargs.get("head_speed_smoothing_std_dev") + if not max_LED_separation: + max_LED_separation = kwargs.get("max_separation") + if not max_plausible_speed: + max_plausible_speed = kwargs.get("max_speed") + if not all( + [speed_smoothing_std_dev, max_LED_separation, max_plausible_speed] + ): + raise ValueError("Missing required parameters") + + # Accepts x/y 'loc' or 'loc1' format for first pos. Renames to 'loc' + DEFAULT_COLS = ["xloc", "yloc", "xloc2", "yloc2", "xloc1", "yloc1"] + + cols = list(spatial_df.columns) + if len(cols) != 4 or not all([c in DEFAULT_COLS for c in cols]): + choice = dj.utils.user_choice( + "Unexpected columns in raw position. Assume " + + f"{DEFAULT_COLS[:4]}?\n{spatial_df}\n" + ) + if choice.lower() not in ["yes", "y"]: + raise ValueError(f"Unexpected columns in raw position: {cols}") + # rename first 4 columns, keep rest. Rest dropped below + spatial_df.columns = DEFAULT_COLS[:4] + cols[4:] + # Get spatial series properties - time = np.asarray(spatial_series.timestamps) # seconds - position = np.asarray( - pd.DataFrame( - spatial_series.data, - columns=spatial_series.description.split(", "), - ).loc[:, ["xloc", "yloc", "xloc2", "yloc2"]] - ) # meters + time = np.asarray(spatial_df.index) # seconds + position = np.asarray(spatial_df.iloc[:, :4]) # meters # remove NaN times is_nan_time = np.isnan(time) @@ -205,7 +264,6 @@ def calculate_position_info_from_spatial_series( dt = np.median(np.diff(time)) sampling_rate = 1 / dt - meters_to_pixels = spatial_series.conversion # Define LEDs if led1_is_front: @@ -306,28 +364,26 @@ def calculate_position_info_from_spatial_series( sampling_rate = upsampling_sampling_rate - # Calculate head position, head orientation, velocity, speed - head_position = get_centriod(back_LED, front_LED) # cm + # Calculate position, orientation, velocity, speed + position = get_centriod(back_LED, front_LED) # cm - head_orientation = get_angle(back_LED, front_LED) # radians - is_nan = np.isnan(head_orientation) + orientation = get_angle(back_LED, front_LED) # radians + is_nan = np.isnan(orientation) # Unwrap orientation before smoothing - head_orientation[~is_nan] = np.unwrap(head_orientation[~is_nan]) - head_orientation[~is_nan] = gaussian_smooth( - head_orientation[~is_nan], - head_orient_smoothing_std_dev, + orientation[~is_nan] = np.unwrap(orientation[~is_nan]) + orientation[~is_nan] = gaussian_smooth( + orientation[~is_nan], + orient_smoothing_std_dev, sampling_rate, axis=0, truncate=8, ) # convert back to between -pi and pi - head_orientation[~is_nan] = np.angle( - np.exp(1j * head_orientation[~is_nan]) - ) + orientation[~is_nan] = np.angle(np.exp(1j * orientation[~is_nan])) velocity = get_velocity( - head_position, + position, time=time, sigma=speed_smoothing_std_dev, sampling_frequency=sampling_rate, @@ -336,8 +392,8 @@ def calculate_position_info_from_spatial_series( return { "time": time, - "head_position": head_position, - "head_orientation": head_orientation, + "position": position, + "orientation": orientation, "velocity": velocity, "speed": speed, } @@ -348,55 +404,73 @@ def fetch_nwb(self, *attrs, **kwargs): ) def fetch1_dataframe(self): - nwb_data = self.fetch_nwb()[0] - index = pd.Index( - np.asarray( - nwb_data["head_position"].get_spatial_series().timestamps - ), - name="time", - ) + return self._data_to_df(self.fetch_nwb()[0]) + + @staticmethod + def _data_to_df(data, prefix="head_", add_frame_ind=False): + pos, ori, vel = [ + prefix + c for c in ["position", "orientation", "velocity"] + ] + COLUMNS = [ - "head_position_x", - "head_position_y", - "head_orientation", - "head_velocity_x", - "head_velocity_y", - "head_speed", + f"{pos}_x", + f"{pos}_y", + ori, + f"{vel}_x", + f"{vel}_y", + f"{prefix}speed", ] - return pd.DataFrame( + + df = pd.DataFrame( np.concatenate( ( - np.asarray( - nwb_data["head_position"].get_spatial_series().data - ), - np.asarray( - nwb_data["head_orientation"].get_spatial_series().data - )[:, np.newaxis], - np.asarray( - nwb_data["head_velocity"] - .time_series["head_velocity"] - .data - ), + np.asarray(data[pos].get_spatial_series().data), + np.asarray(data[ori].get_spatial_series().data)[ + :, np.newaxis + ], + np.asarray(data[vel].time_series[vel].data), ), axis=1, ), columns=COLUMNS, - index=index, + index=pd.Index( + np.asarray(data[pos].get_spatial_series().timestamps), + name="time", + ), ) + if add_frame_ind: + df.insert( + 0, + "video_frame_ind", + np.asarray( + data[vel].time_series["video_frame_ind"].data, + dtype=int, + ), + ) + + return df + @schema class LinearizationParameters(dj.Lookup): - """Choose whether to use an HMM to linearize position. This can help when - the eucledian distances between separate arms are too close and the previous - position has some information about which arm the animal is on.""" + """Choose whether to use an HMM to linearize position. + + This can help when the euclidean distances between separate arms are too + close and the previous position has some information about which arm the + animal is on. + + route_euclidean_distance_scaling: How much to prefer route distances between + successive time points that are closer to the euclidean distance. Smaller + numbers mean the route distance is more likely to be close to the euclidean + distance. + """ definition = """ linearization_param_name : varchar(80) # name for this set of parameters --- use_hmm = 0 : int # use HMM to determine linearization - # How much to prefer route distances between successive time points that are closer to the euclidean distance. Smaller numbers mean the route distance is more likely to be close to the euclidean distance. - route_euclidean_distance_scaling = 1.0 : float + route_euclidean_distance_scaling = 1.0 : float # Preference for euclidean. sensor_std_dev = 5.0 : float # Uncertainty of position sensor (in cm). # Biases the transition matrix to prefer the current track segment. diagonal_bias = 0.5 : float @@ -406,16 +480,18 @@ class LinearizationParameters(dj.Lookup): @schema class TrackGraph(dj.Manual): """Graph representation of track representing the spatial environment. - Used for linearizing position.""" + + Used for linearizing position. + """ definition = """ track_graph_name : varchar(80) ---- - environment : varchar(80) # Type of Environment - node_positions : blob # 2D position of track_graph nodes, shape (n_nodes, 2) - edges: blob # shape (n_edges, 2) - linear_edge_order : blob # order of track graph edges in the linear space, shape (n_edges, 2) - linear_edge_spacing : blob # amount of space between edges in the linear space, shape (n_edges,) + environment : varchar(80) # Type of Environment + node_positions : blob # 2D position of nodes, (n_nodes, 2) + edges: blob # shape (n_edges, 2) + linear_edge_order : blob # order of edges in linear space, (n_edges, 2) + linear_edge_spacing : blob # space btwn edges in linear space, (n_edges,) """ def get_networkx_track_graph(self, track_graph_parameters=None): @@ -581,7 +657,8 @@ def __init__( ax.imshow(frame, picker=True) ax.set_title( "Left click to place node.\nRight click to remove node." - "\nShift+Left click to clear nodes.\nCntrl+Left click two nodes to place an edge" + "\nShift+Left click to clear nodes." + "\nCntrl+Left click two nodes to place an edge" ) self.connect() diff --git a/src/spyglass/position/v1/position_trodes_position.py b/src/spyglass/position/v1/position_trodes_position.py index 02ae065c1..01f027fb9 100644 --- a/src/spyglass/position/v1/position_trodes_position.py +++ b/src/spyglass/position/v1/position_trodes_position.py @@ -2,27 +2,15 @@ import os from pathlib import Path -import bottleneck import cv2 import datajoint as dj import numpy as np -import pandas as pd -import pynwb -import pynwb.behavior from datajoint.utils import to_camel_case -from position_tools import ( - get_angle, - get_centriod, - get_distance, - get_speed, - get_velocity, - interpolate_nan, -) -from position_tools.core import gaussian_smooth from tqdm import tqdm as tqdm from ...common.common_behav import RawPosition from ...common.common_nwbfile import AnalysisNwbfile +from ...common.common_positon import IntervalPositionInfo from ...utils.dj_helper_fn import fetch_nwb from .dlc_utils import check_videofile, get_video_path @@ -170,124 +158,40 @@ class TrodesPosV1(dj.Computed): """ def make(self, key): - METERS_PER_CM = 0.01 - print(f"Computing position for: {key}") - orig_key = copy.deepcopy(key) - key["analysis_file_name"] = AnalysisNwbfile().create( - key["nwb_file_name"] - ) - - position_info_parameters = (TrodesPosParams() & key).fetch1("params") - - # Rename params to match specificity discussed in #628 - position_info_parameters[ - "max_LED_separation" - ] = position_info_parameters["max_separation"] - position_info_parameters[ - "max_plausible_speed" - ] = position_info_parameters["max_speed"] - spatial_series = (RawPosition.PosObject & key).fetch_nwb()[0][ - "raw_position" - ] - spatial_df = (RawPosition.PosObject & key).fetch1_dataframe() - video_frame_ind = getattr(spatial_df, "video_frame_ind", None) + analysis_file_name = AnalysisNwbfile().create(key["nwb_file_name"]) - position = pynwb.behavior.Position() - orientation = pynwb.behavior.CompassDirection() - velocity = pynwb.behavior.BehavioralTimeSeries() + raw_position = RawPosition.PosObject & key + spatial_series = raw_position.fetch_nwb()[0]["raw_position"] + spatial_df = raw_position.fetch1_dataframe() - # NOTE: CBroz1 removed a try/except ValueError that surrounded all - # .create_X_series methods. dpeg22 could not recall purpose - - position_info = self.calculate_position_info_from_spatial_series( + position_info_parameters = (TrodesPosParams() & key).fetch1("params") + position_info = self.calculate_position_info( spatial_df=spatial_df, meters_to_pixels=spatial_series.conversion, **position_info_parameters, ) - time_comments = dict( - comments=spatial_series.comments, - timestamps=position_info["time"], - ) - time_comments_ref = dict( - **time_comments, - reference_frame=spatial_series.reference_frame, - ) - - # create nwb objects for insertion into analysis nwb file - position.create_spatial_series( - name="position", - conversion=METERS_PER_CM, - data=position_info["position"], - description="x_position, y_position", - **time_comments_ref, - ) - - orientation.create_spatial_series( - name="orientation", - conversion=1.0, - data=position_info["orientation"], - description="orientation", - **time_comments_ref, - ) - - velocity.create_timeseries( - name="velocity", - conversion=METERS_PER_CM, - unit="m/s", - data=np.concatenate( - ( - position_info["velocity"], - position_info["speed"][:, np.newaxis], - ), - axis=1, - ), - description="x_velocity, y_velocity, speed", - **time_comments, - ) - - if video_frame_ind: - velocity.create_timeseries( - name="video_frame_ind", - unit="index", - data=spatial_df.video_frame_ind.to_numpy(), - description="video_frame_ind", - **time_comments, - ) - else: - print( - "No video frame index found. Assuming all camera frames " - + "are present." - ) - velocity.create_timeseries( - name="video_frame_ind", - unit="index", - data=np.arange(len(position_info["time"])), - description="video_frame_ind", - **time_comments, - ) - - # Insert into analysis nwb file - nwb_analysis_file = AnalysisNwbfile() - key.update( dict( - position_object_id=nwb_analysis_file.add_nwb_object( - key["analysis_file_name"], position - ), - orientation_object_id=nwb_analysis_file.add_nwb_object( - key["analysis_file_name"], orientation - ), - velocity_object_id=nwb_analysis_file.add_nwb_object( - key["analysis_file_name"], velocity + analysis_file_name=analysis_file_name, + **self.generate_pos_components( + spatial_series=spatial_series, + spatial_df=spatial_df, + position_info=position_info, + analysis_fname=analysis_file_name, + prefix="", + add_frame_ind=True, + video_frame_ind=getattr( + spatial_df, "video_frame_ind", None + ), ), ) ) - AnalysisNwbfile().add(key["nwb_file_name"], key["analysis_file_name"]) + AnalysisNwbfile().add(key["nwb_file_name"], analysis_file_name) self.insert1(key) @@ -301,180 +205,13 @@ def make(self, key): ) @staticmethod - def calculate_position_info_from_spatial_series( - spatial_df: pd.DataFrame, - meters_to_pixels: float, - max_LED_separation, - max_plausible_speed, - speed_smoothing_std_dev, - position_smoothing_duration, - orient_smoothing_std_dev, - led1_is_front, - is_upsampled, - upsampling_sampling_rate, - upsampling_interpolation_method, - **kwargs, - ): - """Calculate position info from 2D spatial series.""" - CM_TO_METERS = 100 - # Accepts x/y 'loc' or 'loc1' format for first pos. Renames to 'loc' - DEFAULT_COLS = ["xloc", "yloc", "xloc2", "yloc2", "xloc1", "yloc1"] - - cols = list(spatial_df.columns) - if len(cols) != 4 or not all([c in DEFAULT_COLS for c in cols]): - choice = dj.utils.user_choice( - f"Unexpected columns in raw position. Assume " - + f"{DEFAULT_COLS[:4]}?\n{spatial_df}\n" - ) - if choice.lower() not in ["yes", "y"]: - raise ValueError(f"Unexpected columns in raw position: {cols}") - # rename first 4 columns, keep rest. Rest dropped below - spatial_df.columns = DEFAULT_COLS[:4] + cols[4:] - - # Get spatial series properties - time = np.asarray(spatial_df.index) # seconds - position = np.asarray(spatial_df.iloc[:, :4]) # meters - - # remove NaN times - is_nan_time = np.isnan(time) - position = position[~is_nan_time] - time = time[~is_nan_time] - - dt = np.median(np.diff(time)) - sampling_rate = 1 / dt - - # Define LEDs - if led1_is_front: - front_LED = position[:, [0, 1]].astype(float) - back_LED = position[:, [2, 3]].astype(float) - else: - back_LED = position[:, [0, 1]].astype(float) - front_LED = position[:, [2, 3]].astype(float) - - # Convert to cm - back_LED *= meters_to_pixels * CM_TO_METERS - front_LED *= meters_to_pixels * CM_TO_METERS - - # Set points to NaN where the front and back LEDs are too separated - dist_between_LEDs = get_distance(back_LED, front_LED) - is_too_separated = dist_between_LEDs >= max_LED_separation - - back_LED[is_too_separated] = np.nan - front_LED[is_too_separated] = np.nan - - # Calculate speed - front_LED_speed = get_speed( - front_LED, - time, - sigma=speed_smoothing_std_dev, - sampling_frequency=sampling_rate, - ) - back_LED_speed = get_speed( - back_LED, - time, - sigma=speed_smoothing_std_dev, - sampling_frequency=sampling_rate, - ) + def generate_pos_components(*args, **kwargs): + return IntervalPositionInfo.generate_pos_components(*args, **kwargs) - # Set to points to NaN where the speed is too fast - is_too_fast = (front_LED_speed > max_plausible_speed) | ( - back_LED_speed > max_plausible_speed - ) - back_LED[is_too_fast] = np.nan - front_LED[is_too_fast] = np.nan - - # Interpolate the NaN points - back_LED = interpolate_nan(back_LED) - front_LED = interpolate_nan(front_LED) - - # Smooth - moving_average_window = int(position_smoothing_duration * sampling_rate) - back_LED = bottleneck.move_mean( - back_LED, window=moving_average_window, axis=0, min_count=1 - ) - front_LED = bottleneck.move_mean( - front_LED, window=moving_average_window, axis=0, min_count=1 - ) - - if is_upsampled: - position_df = pd.DataFrame( - { - "time": time, - "back_LED_x": back_LED[:, 0], - "back_LED_y": back_LED[:, 1], - "front_LED_x": front_LED[:, 0], - "front_LED_y": front_LED[:, 1], - } - ).set_index("time") - - upsampling_start_time = time[0] - upsampling_end_time = time[-1] - - n_samples = ( - int( - np.ceil( - (upsampling_end_time - upsampling_start_time) - * upsampling_sampling_rate - ) - ) - + 1 - ) - new_time = np.linspace( - upsampling_start_time, upsampling_end_time, n_samples - ) - new_index = pd.Index( - np.unique(np.concatenate((position_df.index, new_time))), - name="time", - ) - position_df = ( - position_df.reindex(index=new_index) - .interpolate(method=upsampling_interpolation_method) - .reindex(index=new_time) - ) - - time = np.asarray(position_df.index) - back_LED = np.asarray( - position_df.loc[:, ["back_LED_x", "back_LED_y"]] - ) - front_LED = np.asarray( - position_df.loc[:, ["front_LED_x", "front_LED_y"]] - ) - - sampling_rate = upsampling_sampling_rate - - # Calculate position, orientation, velocity, speed - position = get_centriod(back_LED, front_LED) # cm - - orientation = get_angle(back_LED, front_LED) # radians - is_nan = np.isnan(orientation) - - # Unwrap orientation before smoothing - orientation[~is_nan] = np.unwrap(orientation[~is_nan]) - orientation[~is_nan] = gaussian_smooth( - orientation[~is_nan], - orient_smoothing_std_dev, - sampling_rate, - axis=0, - truncate=8, - ) - # convert back to between -pi and pi - orientation[~is_nan] = np.angle(np.exp(1j * orientation[~is_nan])) - - velocity = get_velocity( - position, - time=time, - sigma=speed_smoothing_std_dev, - sampling_frequency=sampling_rate, - ) # cm/s - speed = np.sqrt(np.sum(velocity**2, axis=1)) # cm/s - - return { - "time": time, - "position": position, - "orientation": orientation, - "velocity": velocity, - "speed": speed, - } + @staticmethod + def calculate_position_info(*args, **kwargs): + """Calculate position info from 2D spatial series.""" + return IntervalPositionInfo.calculate_position_info(*args, **kwargs) def fetch_nwb(self, *attrs, **kwargs): return fetch_nwb( @@ -482,41 +219,8 @@ def fetch_nwb(self, *attrs, **kwargs): ) def fetch1_dataframe(self): - nwb_data = self.fetch_nwb()[0] - index = pd.Index( - np.asarray(nwb_data["position"].get_spatial_series().timestamps), - name="time", - ) - COLUMNS = [ - "video_frame_ind", - "position_x", - "position_y", - "orientation", - "velocity_x", - "velocity_y", - "speed", - ] - return pd.DataFrame( - np.concatenate( - ( - np.asarray( - nwb_data["velocity"] - .time_series["video_frame_ind"] - .data, - dtype=int, - )[:, np.newaxis], - np.asarray(nwb_data["position"].get_spatial_series().data), - np.asarray( - nwb_data["orientation"].get_spatial_series().data - )[:, np.newaxis], - np.asarray( - nwb_data["velocity"].time_series["velocity"].data - ), - ), - axis=1, - ), - columns=COLUMNS, - index=index, + return IntervalPositionInfo._data_to_df( + self.fetch_nwb()[0], prefix="", add_frame_ind=True ) @@ -529,7 +233,7 @@ class TrodesPosVideo(dj.Computed): definition = """ -> TrodesPosV1 - --- + --- has_video : bool """