From b42432f0884ef3e990e111c53102a64529598ae5 Mon Sep 17 00:00:00 2001 From: Chris Brozdowski Date: Tue, 30 Jan 2024 18:31:49 -0600 Subject: [PATCH] More robust `delete_downstream_merge` (#806) * WIP: fix for #791 * WIP: #791, pt 2 * WIP: #791, needs testing * Faster tree search with networkx * Blackify * Blackify 2 * Update changelog/docs * Update notebooks * Overwrite . Mixin add cached_property decorator * Cleanup docstrings and type annotations --- CHANGELOG.md | 10 +- docs/src/misc/merge_tables.md | 13 +- notebooks/01_Insert_Data.ipynb | 63 ++-- notebooks/03_Merge_Tables.ipynb | 169 ++++++++--- notebooks/py_scripts/01_Insert_Data.py | 45 +-- notebooks/py_scripts/03_Merge_Tables.py | 35 ++- notebooks/py_scripts/11_Curation.py | 2 +- src/spyglass/common/common_usage.py | 23 ++ src/spyglass/settings.py | 2 +- src/spyglass/utils/database_settings.py | 20 +- src/spyglass/utils/dj_chains.py | 168 +++++++++++ src/spyglass/utils/dj_merge_tables.py | 157 +--------- src/spyglass/utils/dj_mixin.py | 386 +++++++++++++++--------- 13 files changed, 681 insertions(+), 412 deletions(-) create mode 100644 src/spyglass/common/common_usage.py create mode 100644 src/spyglass/utils/dj_chains.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bbf2ca515..9b638945d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,19 +8,23 @@ - Clean up following pre-commit checks. #688 - Add Mixin class to centralize `fetch_nwb` functionality. #692, #734 - Refactor restriction use in `delete_downstream_merge` #703 -- Add `cautious_delete` to Mixin class, initial implementation. #711, #762 +- Add `cautious_delete` to Mixin class + - Initial implementation. #711, #762 + - More robust caching of join to downstream tables. #806 + - Overwrite datajoint `delete` method to use `cautious_delete`. #806 - Add `deprecation_factory` to facilitate table migration. #717 - Add Spyglass logger. #730 - IntervalList: Add secondary key `pipeline` #742 - Increase pytest coverage for `common`, `lfp`, and `utils`. #743 - Update docs to reflect new notebooks. #776 - Add overview of Spyglass to docs. #779 -- LFPV1: Fix error for multiple lfp settings on same data #775 ### Pipelines - Spike sorting: Add SpikeSorting V1 pipeline. #651 -- LFP: Minor fixes to LFPBandV1 populator and `make`. #706, #795 +- LFP: + - Minor fixes to LFPBandV1 populator and `make`. #706, #795 + - LFPV1: Fix error for multiple lfp settings on same data #775 - Linearization: - Minor fixes to LinearizedPositionV1 pipeline #695 - Rename `position_linearization` -> `linearization`. #717 diff --git a/docs/src/misc/merge_tables.md b/docs/src/misc/merge_tables.md index 981ea40f7..1cd4b000b 100644 --- a/docs/src/misc/merge_tables.md +++ b/docs/src/misc/merge_tables.md @@ -15,8 +15,17 @@ deleting a part entry before the master. To circumvent this, you can add `force_parts=True` to the [`delete` function](https://datajoint.com/docs/core/datajoint-python/0.14/api/datajoint/__init__/#datajoint.table.Table.delete) call, but this will leave and orphaned primary key in the master. Instead, use -`spyglass.utils.dj_merge_tables.delete_downstream_merge` to delete master/part -pairs. +`(YourTable & restriction).delete_downstream_merge()` to delete master/part +pairs. If errors persist, identify and import the offending part table and rerun +`delete_downstream_merge` with `reload_cache=True`. This process will be faster +for subsequent calls if you reassign the your table after importing. + +```python +from spyglass.common import Nwbfile + +nwbfile = Nwbfile() +(nwbfile & "nwb_file_name LIKE 'Name%'").delete_downstream_merge() +``` ## What diff --git a/notebooks/01_Insert_Data.ipynb b/notebooks/01_Insert_Data.ipynb index f0d89cdfa..de31ea7c8 100644 --- a/notebooks/01_Insert_Data.ipynb +++ b/notebooks/01_Insert_Data.ipynb @@ -45,8 +45,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "[2023-10-05 11:48:12,292][INFO]: Connecting root@localhost:3306\n", - "[2023-10-05 11:48:12,302][INFO]: Connected root@localhost:3306\n" + "[2024-01-29 16:24:30,933][INFO]: Connecting root@localhost:3309\n", + "[2024-01-29 16:24:30,942][INFO]: Connected root@localhost:3309\n" ] } ], @@ -719,9 +719,9 @@ "\n", "- `minirec20230622.nwb`, .3 GB: minimal recording,\n", " [Link](https://ucsf.box.com/s/k3sgql6z475oia848q1rgms4zdh4rkjn)\n", - "- `mediumnwb20230802.nwb`, 32 GB: full-featured dataset, \n", - " [Link](https://ucsf.box.com/s/2qbhxghzpttfam4b7q7j8eg0qkut0opa) \n", - "- `montague20200802.nwb`, 8 GB: full experimental recording, \n", + "- `mediumnwb20230802.nwb`, 32 GB: full-featured dataset,\n", + " [Link](https://ucsf.box.com/s/2qbhxghzpttfam4b7q7j8eg0qkut0opa)\n", + "- `montague20200802.nwb`, 8 GB: full experimental recording,\n", " [Link](https://ucsf.box.com/s/26je2eytjpqepyznwpm92020ztjuaomb)\n", "- For those in the UCSF network, these and many others on `/stelmo/nwb/raw`\n", "\n", @@ -747,7 +747,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Spyglass will create a copy with this name." + "Spyglass will create a copy with this name.\n" ] }, { @@ -1072,7 +1072,6 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "\n", "`spyglass.data_import.insert_sessions` helps take the many fields of data\n", "present in an NWB file and insert them into various tables across Spyglass. If\n", "the NWB file is properly composed, this includes...\n", @@ -1082,8 +1081,8 @@ "- neural activity (extracellular recording of multiple brain areas)\n", "- etc.\n", "\n", - "_Note:_ this may take time as Spyglass creates the copy. You may see a prompt \n", - "about inserting device information." + "_Note:_ this may take time as Spyglass creates the copy. You may see a prompt\n", + "about inserting device information.\n" ] }, { @@ -2053,21 +2052,20 @@ "metadata": {}, "source": [ "`IntervalList` has an additional secondary key `pipeline` which can describe the origin of the data.\n", - "Because it is a _secondary_ key, it is not required to uniquely identify an entry. \n", + "Because it is a _secondary_ key, it is not required to uniquely identify an entry.\n", "Current values for this key from spyglass pipelines are:\n", "\n", - "| pipeline | Source|\n", - "| --- | --- |\n", - "| position | sg.common.PositionSource |\n", - "| lfp_v0 | sg.common.LFP |\n", - "| lfp_v1 | sg.lfp.v1.LFPV1 |\n", - "| lfp_band | sg.common.LFPBand,
sg.lfp.analysis.v1.LFPBandV1 |\n", - "| lfp_artifact | sg.lfp.v1.LFPArtifactDetection |\n", - "| spikesorting_artifact_v0 | sg.spikesorting.ArtifactDetection |\n", - "| spikesorting_artifact_v1 | sg.spikesorting.v1.ArtifactDetection |\n", - "| spikesorting_recording_v0 | sg.spikesorting.SpikeSortingRecording |\n", - "| spikesorting_recording_v1 | sg.spikesorting.v1.SpikeSortingRecording |\n", - "\n" + "| pipeline | Source |\n", + "| ------------------------- | --------------------------------------------------- |\n", + "| position | sg.common.PositionSource |\n", + "| lfp_v0 | sg.common.LFP |\n", + "| lfp_v1 | sg.lfp.v1.LFPV1 |\n", + "| lfp_band | sg.common.LFPBand,
sg.lfp.analysis.v1.LFPBandV1 |\n", + "| lfp_artifact | sg.lfp.v1.LFPArtifactDetection |\n", + "| spikesorting_artifact_v0 | sg.spikesorting.ArtifactDetection |\n", + "| spikesorting_artifact_v1 | sg.spikesorting.v1.ArtifactDetection |\n", + "| spikesorting_recording_v0 | sg.spikesorting.SpikeSortingRecording |\n", + "| spikesorting_recording_v1 | sg.spikesorting.v1.SpikeSortingRecording |\n" ] }, { @@ -2086,9 +2084,9 @@ "with _cascading deletes_. For example, if we delete our `Session` entry, all\n", "associated downstream entries are also deleted (e.g. `Raw`, `IntervalList`).\n", "\n", - "_Note_: The deletion process can be complicated by \n", + "_Note_: The deletion process can be complicated by\n", "[Merge Tables](https://lorenfranklab.github.io/spyglass/0.4/misc/merge_tables/)\n", - "when the entry is referenced by a part table. To demo deletion in these cases, \n", + "when the entry is referenced by a part table. To demo deletion in these cases,\n", "run the hidden code below.\n", "\n", "
\n", @@ -2113,20 +2111,23 @@ "lfp.v1.LFPSelection.insert1(lfp_key, skip_duplicates=True)\n", "lfp.v1.LFPV1().populate(lfp_key)\n", "```\n", + "\n", "
\n", "
\n", "Deleting Merge Entries\n", "\n", "```python\n", - "from spyglass.utils.dj_merge_tables import delete_downstream_merge\n", + "nwbfile = sgc.Nwbfile()\n", "\n", - "delete_downstream_merge(\n", - " sgc.Nwbfile(),\n", - " restriction={\"nwb_file_name\": nwb_copy_file_name},\n", + "(nwbfile & {\"nwb_file_name\": nwb_copy_file_name}).delete_downstream_merge(\n", " dry_run=False, # True will show Merge Table entries that would be deleted\n", - ") \n", + ")\n", "```\n", - "
" + "\n", + "Please see the [next notebook](./03_Merge_Tables.ipynb) for a more detailed\n", + "explanation.\n", + "\n", + "\n" ] }, { @@ -2659,7 +2660,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Up Next" + "## Up Next\n" ] }, { diff --git a/notebooks/03_Merge_Tables.ipynb b/notebooks/03_Merge_Tables.ipynb index 04cc6ba13..2d76867d8 100644 --- a/notebooks/03_Merge_Tables.ipynb +++ b/notebooks/03_Merge_Tables.ipynb @@ -66,8 +66,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "[2023-10-12 11:15:17,864][INFO]: Connecting root@localhost:3306\n", - "[2023-10-12 11:15:17,873][INFO]: Connected root@localhost:3306\n" + "[2024-01-29 16:15:00,903][INFO]: Connecting root@localhost:3309\n", + "[2024-01-29 16:15:00,912][INFO]: Connected root@localhost:3309\n" ] } ], @@ -328,7 +328,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "['merge_delete', 'merge_delete_parent', 'merge_fetch', 'merge_get_parent', 'merge_get_part', 'merge_html', 'merge_populate', 'merge_restrict', 'merge_view']\n" + "['merge_delete', 'merge_delete_parent', 'merge_fetch', 'merge_get_parent', 'merge_get_parent_class', 'merge_get_part', 'merge_html', 'merge_populate', 'merge_restrict', 'merge_restrict_class', 'merge_view']\n" ] } ], @@ -386,7 +386,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 6, "metadata": {}, "outputs": [ { @@ -415,7 +415,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -497,7 +497,7 @@ " (Total: 1)" ] }, - "execution_count": 8, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } @@ -510,7 +510,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 8, "metadata": {}, "outputs": [ { @@ -521,11 +521,11 @@ " 'target_interval_list_name': '01_s1',\n", " 'filter_name': 'LFP 0-400 Hz',\n", " 'filter_sampling_rate': 30000,\n", - " 'analysis_file_name': 'minirec20230622_JOV02AWW09.nwb',\n", + " 'analysis_file_name': 'minirec20230622_R5DWQ6S53S.nwb',\n", " 'interval_list_name': 'lfp_test_01_s1_valid times',\n", - " 'lfp_object_id': '340b9a0b-626b-40ca-8b48-e033be72570a',\n", + " 'lfp_object_id': 'ffb893d1-a31e-41d3-aec7-8dc8936c8898',\n", " 'lfp_sampling_rate': 1000.0,\n", - " 'lfp': filtered data pynwb.ecephys.ElectricalSeries at 0x139910624563552\n", + " 'lfp': filtered data pynwb.ecephys.ElectricalSeries at 0x129602752674544\n", " Fields:\n", " comments: no comments\n", " conversion: 1.0\n", @@ -540,7 +540,7 @@ " unit: volts}]" ] }, - "execution_count": 9, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } @@ -552,7 +552,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 9, "metadata": {}, "outputs": [ { @@ -567,7 +567,7 @@ " 'filter_sampling_rate': 30000}" ] }, - "execution_count": 10, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -579,7 +579,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 10, "metadata": {}, "outputs": [ { @@ -588,7 +588,7 @@ "True" ] }, - "execution_count": 12, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } @@ -616,7 +616,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 11, "metadata": {}, "outputs": [ { @@ -718,7 +718,7 @@ " (Total: 1)" ] }, - "execution_count": 14, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -730,7 +730,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 12, "metadata": {}, "outputs": [ { @@ -824,9 +824,9 @@ "01_s1\n", "LFP 0-400 Hz\n", "30000\n", - "minirec20230622_JOV02AWW09.nwb\n", + "minirec20230622_R5DWQ6S53S.nwb\n", "lfp_test_01_s1_valid times\n", - "340b9a0b-626b-40ca-8b48-e033be72570a\n", + "ffb893d1-a31e-41d3-aec7-8dc8936c8898\n", "1000.0 \n", " \n", " \n", @@ -837,11 +837,11 @@ "FreeTable(`lfp_v1`.`__l_f_p_v1`)\n", "*nwb_file_name *lfp_electrode *target_interv *filter_name *filter_sampli analysis_file_ interval_list_ lfp_object_id lfp_sampling_r\n", "+------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+\n", - "minirec2023062 test 01_s1 LFP 0-400 Hz 30000 minirec2023062 lfp_test_01_s1 340b9a0b-626b- 1000.0 \n", + "minirec2023062 test 01_s1 LFP 0-400 Hz 30000 minirec2023062 lfp_test_01_s1 ffb893d1-a31e- 1000.0 \n", " (Total: 1)" ] }, - "execution_count": 15, + "execution_count": 12, "metadata": {}, "output_type": "execute_result" } @@ -861,7 +861,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 13, "metadata": {}, "outputs": [ { @@ -870,7 +870,7 @@ "array([1000.])" ] }, - "execution_count": 16, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } @@ -890,7 +890,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 14, "metadata": {}, "outputs": [ { @@ -900,7 +900,7 @@ " array(['minirec20230622_.nwb'], dtype=object)]" ] }, - "execution_count": 19, + "execution_count": 14, "metadata": {}, "output_type": "execute_result" } @@ -912,7 +912,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 15, "metadata": {}, "outputs": [ { @@ -926,7 +926,7 @@ " 'filter_sampling_rate': 30000}" ] }, - "execution_count": 20, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -955,8 +955,8 @@ "2. use `merge_delete_parent` to delete from the parent sources, getting rid of\n", " the entries in the source table they came from.\n", "\n", - "3. use `delete_downstream_merge` to find Merge Tables downstream and get rid\n", - " full entries, avoiding orphaned master table entries.\n", + "3. use `delete_downstream_merge` to find Merge Tables downstream of any other\n", + " table and get rid full entries, avoiding orphaned master table entries.\n", "\n", "The two latter cases can be destructive, so we include an extra layer of\n", "protection with `dry_run`. When true (by default), these functions return\n", @@ -965,16 +965,100 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 16, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[2024-01-29 16:15:23,054][INFO]: Deleting 1 rows from `lfp_merge`.`l_f_p_output__l_f_p_v1`\n", + "[2024-01-29 16:15:23,058][INFO]: Deleting 1 rows from `lfp_merge`.`l_f_p_output`\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[2024-01-29 16:15:24,953][WARNING]: Deletes cancelled\n" + ] + } + ], + "source": [ + "LFPOutput.merge_delete(nwb_file_dict) # Delete from merge table" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[FreeTable(`lfp_v1`.`__l_f_p_v1`)\n", + " *nwb_file_name *lfp_electrode *target_interv *filter_name *filter_sampli analysis_file_ interval_list_ lfp_object_id lfp_sampling_r\n", + " +------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+\n", + " minirec2023062 test 01_s1 LFP 0-400 Hz 30000 minirec2023062 lfp_test_01_s1 ffb893d1-a31e- 1000.0 \n", + " (Total: 1)]" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "LFPOutput.merge_delete_parent(restriction=nwb_file_dict, dry_run=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`delete_downstream_merge` is available from any other table in the pipeline,\n", + "but it does take some time to find the links downstream. If you're using this,\n", + "you can save time by reassigning your table to a variable, which will preserve\n", + "a copy of the previous search.\n", + "\n", + "Because the copy is stored, this function may not see additional merge tables\n", + "you've imported. To refresh this copy, set `reload_cache=True`\n" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[16:15:37][INFO] Spyglass: Building merge cache for nwbfile.\n", + "\tFound 3 downstream merge tables\n" + ] + }, + { + "data": { + "text/plain": [ + "dict_values([[*nwb_file_name *analysis_file *lfp_electrode *target_interv *filter_name *filter_sampli *merge_id nwb_file_a analysis_f analysis_file_ analysis_p interval_list_ lfp_object_id lfp_sampling_r\n", + "+------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +------------+ +--------+ +--------+ +------------+ +--------+ +------------+ +------------+ +------------+\n", + "minirec2023062 minirec2023062 test 01_s1 LFP 0-400 Hz 30000 c34f98c5-7de7- =BLOB= =BLOB= =BLOB= lfp_test_01_s1 ffb893d1-a31e- 1000.0 \n", + " (Total: 1)\n", + "]])" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "LFPOutput.merge_delete(nwb_file_dict) # Delete from merge table\n", - "LFPOutput.merge_delete_parent(restriction=nwb_file_dict, dry_run=True)\n", - "delete_downstream_merge(\n", - " table=LFPV1,\n", - " restriction=nwb_file_dict,\n", + "nwbfile = sgc.Nwbfile()\n", + "\n", + "(nwbfile & nwb_file_dict).delete_downstream_merge(\n", " dry_run=True,\n", + " reload_cache=False, # if still encountering errors, try setting this to True\n", ")" ] }, @@ -982,8 +1066,8 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "To delete all merge table entries associated with an NWB file, use\n", - "`delete_downstream_merge` with the `Nwbfile` table.\n" + "This function is run automatically whin you use `cautious_delete`, which\n", + "checks team permissions before deleting.\n" ] }, { @@ -992,12 +1076,7 @@ "metadata": {}, "outputs": [], "source": [ - "delete_downstream_merge(\n", - " table=sgc.Nwbfile,\n", - " restriction={\"nwb_file_name\": nwb_copy_file_name},\n", - " dry_run=True,\n", - " recurse_level=3, # for long pipelines with many tables\n", - ")" + "(nwbfile & nwb_file_dict).cautious_delete()" ] }, { diff --git a/notebooks/py_scripts/01_Insert_Data.py b/notebooks/py_scripts/01_Insert_Data.py index 908c93491..c1fec99a9 100644 --- a/notebooks/py_scripts/01_Insert_Data.py +++ b/notebooks/py_scripts/01_Insert_Data.py @@ -128,6 +128,7 @@ # - # Spyglass will create a copy with this name. +# nwb_copy_file_name @@ -155,9 +156,9 @@ # sgc.LabMember.LabMemberInfo.insert( - [ # Full name, Google email address, DataJoint username - ["Firstname Lastname", "example1@gmail.com", "example1"], - ["Firstname2 Lastname2", "example2@gmail.com", "example2"], + [ # Full name, Google email address, DataJoint username, admin + ["Firstname Lastname", "example1@gmail.com", "example1", 0], + ["Firstname2 Lastname2", "example2@gmail.com", "example2", 0], ], skip_duplicates=True, ) @@ -187,7 +188,6 @@ # ## Inserting from NWB # -# # `spyglass.data_import.insert_sessions` helps take the many fields of data # present in an NWB file and insert them into various tables across Spyglass. If # the NWB file is properly composed, this includes... @@ -199,6 +199,7 @@ # # _Note:_ this may take time as Spyglass creates the copy. You may see a prompt # about inserting device information. +# sgi.insert_sessions(nwb_file_name) @@ -306,18 +307,17 @@ # Because it is a _secondary_ key, it is not required to uniquely identify an entry. # Current values for this key from spyglass pipelines are: # -# | pipeline | Source| -# | --- | --- | -# | position | sg.common.PositionSource | -# | lfp_v0 | sg.common.LFP | -# | lfp_v1 | sg.lfp.v1.LFPV1 | -# | lfp_band | sg.common.LFPBand,
sg.lfp.analysis.v1.LFPBandV1 | -# | lfp_artifact | sg.lfp.v1.LFPArtifactDetection | -# | spikesorting_artifact_v0 | sg.spikesorting.ArtifactDetection | -# | spikesorting_artifact_v1 | sg.spikesorting.v1.ArtifactDetection | -# | spikesorting_recording_v0 | sg.spikesorting.SpikeSortingRecording | -# | spikesorting_recording_v1 | sg.spikesorting.v1.SpikeSortingRecording | -# +# | pipeline | Source | +# | ------------------------- | --------------------------------------------------- | +# | position | sg.common.PositionSource | +# | lfp_v0 | sg.common.LFP | +# | lfp_v1 | sg.lfp.v1.LFPV1 | +# | lfp_band | sg.common.LFPBand,
sg.lfp.analysis.v1.LFPBandV1 | +# | lfp_artifact | sg.lfp.v1.LFPArtifactDetection | +# | spikesorting_artifact_v0 | sg.spikesorting.ArtifactDetection | +# | spikesorting_artifact_v1 | sg.spikesorting.v1.ArtifactDetection | +# | spikesorting_recording_v0 | sg.spikesorting.SpikeSortingRecording | +# | spikesorting_recording_v1 | sg.spikesorting.v1.SpikeSortingRecording | # # ## Deleting data @@ -355,20 +355,24 @@ # lfp.v1.LFPSelection.insert1(lfp_key, skip_duplicates=True) # lfp.v1.LFPV1().populate(lfp_key) # ``` +# # #
# Deleting Merge Entries # # ```python -# from spyglass.utils.dj_merge_tables import delete_downstream_merge +# nwbfile = sgc.Nwbfile() # -# delete_downstream_merge( -# sgc.Nwbfile(), -# restriction={"nwb_file_name": nwb_copy_file_name}, +# (nwbfile & {"nwb_file_name": nwb_copy_file_name}).delete_downstream_merge( # dry_run=False, # True will show Merge Table entries that would be deleted # ) # ``` +# +# Please see the [next notebook](./03_Merge_Tables.ipynb) for a more detailed +# explanation. +# #
+# session_entry = sgc.Session & {"nwb_file_name": nwb_copy_file_name} session_entry @@ -418,6 +422,7 @@ # !ls $SPYGLASS_BASE_DIR/raw # ## Up Next +# # In the [next notebook](./02_Data_Sync.ipynb), we'll explore tools for syncing. # diff --git a/notebooks/py_scripts/03_Merge_Tables.py b/notebooks/py_scripts/03_Merge_Tables.py index c4c0abb48..33b8e9a0e 100644 --- a/notebooks/py_scripts/03_Merge_Tables.py +++ b/notebooks/py_scripts/03_Merge_Tables.py @@ -192,8 +192,8 @@ # 2. use `merge_delete_parent` to delete from the parent sources, getting rid of # the entries in the source table they came from. # -# 3. use `delete_downstream_merge` to find Merge Tables downstream and get rid -# full entries, avoiding orphaned master table entries. +# 3. use `delete_downstream_merge` to find Merge Tables downstream of any other +# table and get rid full entries, avoiding orphaned master table entries. # # The two latter cases can be destructive, so we include an extra layer of # protection with `dry_run`. When true (by default), these functions return @@ -201,23 +201,32 @@ # LFPOutput.merge_delete(nwb_file_dict) # Delete from merge table + LFPOutput.merge_delete_parent(restriction=nwb_file_dict, dry_run=True) -delete_downstream_merge( - table=LFPV1, - restriction=nwb_file_dict, - dry_run=True, -) -# To delete all merge table entries associated with an NWB file, use -# `delete_downstream_merge` with the `Nwbfile` table. +# `delete_downstream_merge` is available from any other table in the pipeline, +# but it does take some time to find the links downstream. If you're using this, +# you can save time by reassigning your table to a variable, which will preserve +# a copy of the previous search. # +# Because the copy is stored, this function may not see additional merge tables +# you've imported. To refresh this copy, set `reload_cache=True` +# + +# + +nwbfile = sgc.Nwbfile() -delete_downstream_merge( - table=sgc.Nwbfile, - restriction={"nwb_file_name": nwb_copy_file_name}, +(nwbfile & nwb_file_dict).delete_downstream_merge( dry_run=True, - recurse_level=3, # for long pipelines with many tables + reload_cache=False, # if still encountering errors, try setting this to True ) +# - + +# This function is run automatically whin you use `cautious_delete`, which +# checks team permissions before deleting. +# + +(nwbfile & nwb_file_dict).cautious_delete() # ## Up Next # diff --git a/notebooks/py_scripts/11_Curation.py b/notebooks/py_scripts/11_Curation.py index 8b75a9c76..25eb698ad 100644 --- a/notebooks/py_scripts/11_Curation.py +++ b/notebooks/py_scripts/11_Curation.py @@ -5,7 +5,7 @@ # extension: .py # format_name: light # format_version: '1.5' -# jupytext_version: 1.15.2 +# jupytext_version: 1.16.0 # kernelspec: # display_name: base # language: python diff --git a/src/spyglass/common/common_usage.py b/src/spyglass/common/common_usage.py new file mode 100644 index 000000000..8b110cbc2 --- /dev/null +++ b/src/spyglass/common/common_usage.py @@ -0,0 +1,23 @@ +"""A schema to store the usage of advanced Spyglass features. + +Records show usage of features such as table chains, which will be used to +determine which features are used, how often, and by whom. This will help +plan future development of Spyglass. +""" + +import datajoint as dj + +schema = dj.schema("common_usage") + + +@schema +class CautiousDelete(dj.Manual): + definition = """ + id: int auto_increment + --- + dj_user: varchar(64) + duration: float + origin: varchar(64) + restriction: varchar(64) + merge_deletes = null: blob + """ diff --git a/src/spyglass/settings.py b/src/spyglass/settings.py index 68fe1e528..007ec9160 100644 --- a/src/spyglass/settings.py +++ b/src/spyglass/settings.py @@ -7,7 +7,7 @@ import yaml from pymysql.err import OperationalError -from spyglass.utils import logger +from spyglass.utils.logging import logger class SpyglassConfig: diff --git a/src/spyglass/utils/database_settings.py b/src/spyglass/utils/database_settings.py index 5a634c69c..da65914fa 100755 --- a/src/spyglass/utils/database_settings.py +++ b/src/spyglass/utils/database_settings.py @@ -14,6 +14,16 @@ CREATE_USR = "CREATE USER IF NOT EXISTS " TEMP_PASS = " IDENTIFIED BY 'temppass';" ESC = r"\_%" +SHARED_MODULES = [ + "common", + "spikesorting", + "decoding", + "position", + "position_linearization", + "ripple", + "lfp", + "waveform", +] class DatabaseSettings: @@ -40,15 +50,7 @@ def __init__( target_database : str, optional Default is mysql. Can also be docker container id """ - self.shared_modules = [ - f"common{ESC}", - f"spikesorting{ESC}", - f"decoding{ESC}", - f"position{ESC}", - f"position_linearization{ESC}", - f"ripple{ESC}", - f"lfp{ESC}", - ] + self.shared_modules = [f"{m}{ESC}" for m in SHARED_MODULES] self.user = user_name or dj.config["database.user"] self.host = ( host_name or dj.config["database.host"] or "lmf-db.cin.ucsf.edu" diff --git a/src/spyglass/utils/dj_chains.py b/src/spyglass/utils/dj_chains.py new file mode 100644 index 000000000..b76132551 --- /dev/null +++ b/src/spyglass/utils/dj_chains.py @@ -0,0 +1,168 @@ +from functools import cached_property +from typing import List + +import datajoint as dj +import networkx as nx +from datajoint.expression import QueryExpression +from datajoint.table import Table +from datajoint.utils import get_master + +from spyglass.utils.dj_merge_tables import RESERVED_PRIMARY_KEY as MERGE_PK +from spyglass.utils.logging import logger + + +class TableChains: + """Class for representing chains from parent to Merge table via parts. + + Functions as a plural version of TableChain, allowing a single `join` + call across all chains from parent -> Merge table. + """ + + def __init__(self, parent, child, connection=None): + self.parent = parent + self.child = child + self.connection = connection or parent.connection + parts = child.parts(as_objects=True) + self.part_names = [part.full_table_name for part in parts] + self.chains = [TableChain(parent, part) for part in parts] + self.has_link = any([chain.has_link for chain in self.chains]) + + def __repr__(self): + return "\n".join([str(chain) for chain in self.chains]) + + def __len__(self): + return len([c for c in self.chains if c.has_link]) + + def join(self, restriction=None) -> List[QueryExpression]: + """Return list of joins for each chain in self.chains.""" + restriction = restriction or self.parent.restriction or True + joins = [] + for chain in self.chains: + if joined := chain.join(restriction): + joins.append(joined) + return joins + + +class TableChain: + """Class for representing a chain of tables. + + A chain is a sequence of tables from parent to child identified by + networkx.shortest_path. Parent -> Merge should use TableChains instead to + handle multiple paths to the respective parts of the Merge table. + + Attributes + ---------- + parent : Table + Parent or origin of chain. + child : Table + Child or destination of chain. + _connection : datajoint.Connection, optional + Connection to database used to create FreeTable objects. Defaults to + parent.connection. + _link_symbol : str + Symbol used to represent the link between parent and child. Hardcoded + to " -> ". + _has_link : bool + Cached attribute to store whether parent is linked to child. False if + child is not in parent.descendants or nx.NetworkXNoPath is raised by + nx.shortest_path. + names : List[str] + List of full table names in chain. Generated by networkx.shortest_path. + objects : List[dj.FreeTable] + List of FreeTable objects for each table in chain. + + Methods + ------- + __str__() + Return string representation of chain: parent -> child. + __repr__() + Return full representation of chain: parent -> {links} -> child. + __len__() + Return number of tables in chain. + join(restriction: str = None) + Return join of tables in chain with restriction applied to parent. + """ + + def __init__(self, parent: Table, child: Table, connection=None): + self._connection = connection or parent.connection + if not self._connection.dependencies._loaded: + self._connection.dependencies.load() + + if ( # if child is a merge table + get_master(child.full_table_name) == "" + and MERGE_PK in child.heading.names + ): + logger.error("Child is a merge table. Use TableChains instead.") + + self._link_symbol = " -> " + self.parent = parent + self.child = child + self._has_link = child.full_table_name in parent.descendants() + + def __str__(self): + """Return string representation of chain: parent -> child.""" + if not self._has_link: + return "No link" + return ( + "Chain: " + + self.parent.table_name + + self._link_symbol + + self.child.table_name + ) + + def __repr__(self): + """Return full representation of chain: parent -> {links} -> child.""" + return ( + "Chain: " + + self._link_symbol.join([t.table_name for t in self.objects]) + if self.names + else "No link" + ) + + def __len__(self): + """Return number of tables in chain.""" + return len(self.names) + + @property + def has_link(self) -> bool: + """Return True if parent is linked to child. + + Cached as hidden attribute _has_link to set False if nx.NetworkXNoPath + is raised by nx.shortest_path. + """ + return self._has_link + + @cached_property + def names(self) -> List[str]: + """Return list of full table names in chain. + + Uses networkx.shortest_path. + """ + if not self._has_link: + return None + try: + return nx.shortest_path( + self.parent.connection.dependencies, + self.parent.full_table_name, + self.child.full_table_name, + ) + except nx.NetworkXNoPath: + self._has_link = False + return None + + @cached_property + def objects(self) -> List[dj.FreeTable]: + """Return list of FreeTable objects for each table in chain.""" + return ( + [dj.FreeTable(self._connection, name) for name in self.names] + if self.names + else None + ) + + def join(self, restricton: str = None) -> dj.expression.QueryExpression: + """Return join of tables in chain with restriction applied to parent.""" + restriction = restricton or self.parent.restriction or True + join = self.objects[0] & restriction + for table in self.objects[1:]: + join = join * table + return join if join else None diff --git a/src/spyglass/utils/dj_merge_tables.py b/src/spyglass/utils/dj_merge_tables.py index 0e0681782..b748267ad 100644 --- a/src/spyglass/utils/dj_merge_tables.py +++ b/src/spyglass/utils/dj_merge_tables.py @@ -501,7 +501,7 @@ def merge_delete_parent( def fetch_nwb( self, - restriction: str = True, + restriction: str = None, multi_source=False, disable_warning=False, *attrs, @@ -521,10 +521,7 @@ def fetch_nwb( """ if isinstance(self, dict): raise ValueError("Try replacing Merge.method with Merge().method") - if restriction is True and self.restriction: - if not disable_warning: - _warn_on_restriction(self, restriction) - restriction = self.restriction + restriction = restriction or self.restriction or True return self.merge_restrict_class(restriction).fetch_nwb() @@ -805,8 +802,6 @@ def delete_downstream_merge( dry_run: bool Default True. If true, return list of tuples, merge/part tables downstream of table input. Otherwise, delete merge/part table entries. - recurse_level: int - Default 2. Depth to recurse into table descendants. disable_warning: bool Default False. If True, don't warn about restrictions on table object. kwargs: dict @@ -817,148 +812,12 @@ def delete_downstream_merge( List[Tuple[dj.Table, dj.Table]] Entries in merge/part tables downstream of table input. """ - if not disable_warning: - _warn_on_restriction(table, restriction) + from spyglass.utils.dj_mixin import SpyglassMixin - if not restriction: - restriction = True + if not isinstance(table, SpyglassMixin): + raise ValueError("Input must be a Spyglass Table.") + table = table if isinstance(table, dj.Table) else table() - descendants = _unique_descendants(table, recurse_level) - merge_table_pairs = _master_table_pairs( - table_list=descendants, - restricted_parent=(table & restriction), + return table.delete_downstream_merge( + restriction=restriction, dry_run=dry_run, **kwargs ) - - # restrict the merge table based on uuids in part - # don't need part for del, but show on dry_run - merge_pairs = [ - (merge & part.fetch(RESERVED_PRIMARY_KEY, as_dict=True), part) - for merge, part in merge_table_pairs - ] - - if dry_run: - return merge_pairs - - for merge_table, _ in merge_pairs: - merge_table.delete(**kwargs) - - -def _warn_on_restriction(table: dj.Table, restriction: str = None): - """Warn if restriction on table object differs from input restriction""" - if restriction is None and table.restriction: - logger.warn( - f"Warning: ignoring table restriction: {table().restriction}.\n\t" - + "Please pass restrictions as an arg" - ) - - -def _unique_descendants( - table: dj.Table, - recurse_level: int = 2, - return_names: bool = False, - attribute=None, -) -> list: - """Recurisively find unique descendants of a given table - - Parameters - ---------- - table: dj.Table - The node in the tree from which to find descendants. - recurse_level: int - The maximum level of descendants to find. - return_names: bool - If True, return names of descendants found. Else return Table objects. - attribute: str, optional - If provided, only return descendants that have this attribute. - - Returns - ------- - List[dj.Table, str] - List descendants found when recurisively called to recurse_level - """ - - if recurse_level == 0: - return [] - - if attribute is None: - skip_attr_check = True - else: - skip_attr_check = False - - descendants = {} - - def recurse_descendants(sub_table, level): - for descendant in sub_table.descendants(as_objects=True): - if descendant.full_table_name not in descendants and ( - skip_attr_check or attribute in descendant.heading.attributes - ): - descendants[descendant.full_table_name] = descendant - if level > 1: - recurse_descendants(descendant, level - 1) - - recurse_descendants(table, recurse_level) - - return ( - list(descendants.keys()) if return_names else list(descendants.values()) - ) - - -def _master_table_pairs( - table_list: list, - restricted_parent: dj.expression.QueryExpression = True, - connection: dj.connection.Connection = None, -) -> list: - """ - Given list of tables, return a list of master table pairs. - - Returns a list of tuples, with master and part. Part will have restriction - applied. If restriction yield empty list, skip. - - Parameters - ---------- - table_list : List[dj.Table] - A list of datajoint tables. - restricted_parent : dj.expression.QueryExpression - Parent table restricted, to be joined with master and part. Default - True, no restriction. - connection : datajoint.connection.Connection - A database connection. Default None, use connection from first table. - - Returns - ------- - List[Tuple[dj.Table, dj.Table]] - A list of master table pairs. - """ - conn = connection or table_list[0].connection - - master_table_pairs = [] - unique_parts = [] - - # Adapted from Spyglass PR 535 - for table in table_list: - table_name = table.full_table_name - if table_name in unique_parts: # then repeat in list - continue - - master_name = get_master(table_name) - if not master_name: # then it's not a part table - continue - - master = dj.FreeTable(conn, master_name) - if RESERVED_PRIMARY_KEY not in master.heading.attributes.keys(): - continue # then it's not a merge table - - restricted_join = restricted_parent * table - if not restricted_join: # No entries relevant to restriction in part - continue - - unique_parts.append(table_name) - master_table_pairs.append( - ( - master, - table - & restricted_join.fetch(RESERVED_PRIMARY_KEY, as_dict=True), - ) - ) - - return master_table_pairs diff --git a/src/spyglass/utils/dj_mixin.py b/src/spyglass/utils/dj_mixin.py index 00163b605..03f0ec08b 100644 --- a/src/spyglass/utils/dj_mixin.py +++ b/src/spyglass/utils/dj_mixin.py @@ -1,8 +1,16 @@ +from functools import cached_property +from time import time +from typing import Dict, List, Union + import datajoint as dj -from datajoint.table import logger as dj_logger -from datajoint.utils import user_choice +from datajoint.expression import QueryExpression +from datajoint.logging import logger as dj_logger +from datajoint.table import Table +from datajoint.utils import get_master, user_choice +from spyglass.utils.dj_chains import TableChain, TableChains from spyglass.utils.dj_helper_fn import fetch_nwb +from spyglass.utils.dj_merge_tables import RESERVED_PRIMARY_KEY as MERGE_PK from spyglass.utils.logging import logger @@ -19,6 +27,14 @@ class SpyglassMixin: Fetch NWBFile object from relevant table. Uses either a foreign key to a NWBFile table (including AnalysisNwbfile) or a _nwb_table attribute to determine which table to use. + delte_downstream_merge(restriction=None, dry_run=True, reload_cache=False) + Delete downstream merge table entries associated with restricton. + Requires caching of merge tables and links, which is slow on first call. + `restriction` can be set to a string to restrict the delete. `dry_run` + can be set to False to commit the delete. `reload_cache` can be set to + True to reload the merge cache. + ddm(*args, **kwargs) + Alias for delete_downstream_merge. cautious_delete(force_permission=False, *args, **kwargs) Check user permissions before deleting table rows. Permission is granted to users listed as admin in LabMember table or to users on a team with @@ -31,158 +47,213 @@ class SpyglassMixin: Alias for cautious_delete. """ - _nwb_table_dict = {} # Dict mapping NWBFile table to path attribute name. # _nwb_table = None # NWBFile table class, defined at the table level - _nwb_table_resolved = None # NWBFiletable class, resolved here from above - _delete_dependencies = [] # Session, LabMember, LabTeam, delay import - _merge_delete_func = None # delete_downstream_merge, delay import - # pks for delete permission check, assumed to be on field + + # pks for delete permission check, assumed to be one field for each _session_pk = None # Session primary key. Mixin is ambivalent to Session pk _member_pk = None # LabMember primary key. Mixin ambivalent table structure # ------------------------------- fetch_nwb ------------------------------- - @property - def _table_dict(self): - """Dict mapping NWBFile table to path attribute name. - - Used to delay import of NWBFile tables until needed, avoiding circular - imports. - """ - if not self._nwb_table_dict: - from spyglass.common.common_nwbfile import ( # noqa F401 - AnalysisNwbfile, - Nwbfile, - ) - - self._nwb_table_dict = { - AnalysisNwbfile: "analysis_file_abs_path", - Nwbfile: "nwb_file_abs_path", - } - return self._nwb_table_dict - - @property - def _nwb_table_tuple(self): + @cached_property + def _nwb_table_tuple(self) -> tuple: """NWBFile table class. Used to determine fetch_nwb behavior. Also used in Merge.fetch_nwb. - Multiple copies for different purposes. + Implemented as a cached_property to avoid circular imports.""" + from spyglass.common.common_nwbfile import ( + AnalysisNwbfile, + Nwbfile, + ) # noqa F401 + + table_dict = { + AnalysisNwbfile: "analysis_file_abs_path", + Nwbfile: "nwb_file_abs_path", + } + + resolved = getattr(self, "_nwb_table", None) or ( + AnalysisNwbfile + if "-> AnalysisNwbfile" in self.definition + else Nwbfile if "-> Nwbfile" in self.definition else None + ) - - _nwb_table may be user-set. Don't overwrite. - - _nwb_table_resolved is set here from either _nwb_table or definition. - - _nwb_table_tuple is used to cache result of _nwb_table_resolved and - return the appropriate path_attr from _table_dict above. - """ - if not self._nwb_table_resolved: - from spyglass.common.common_nwbfile import ( # noqa F401 - AnalysisNwbfile, - Nwbfile, + if not resolved: + raise NotImplementedError( + f"{self.__class__.__name__} does not have a " + "(Analysis)Nwbfile foreign key or _nwb_table attribute." ) - if hasattr(self, "_nwb_table"): - self._nwb_table_resolved = self._nwb_table - - if not hasattr(self, "_nwb_table"): - self._nwb_table_resolved = ( - AnalysisNwbfile - if "-> AnalysisNwbfile" in self.definition - else Nwbfile if "-> Nwbfile" in self.definition else None - ) - - if getattr(self, "_nwb_table_resolved", None) is None: - raise NotImplementedError( - f"{self.__class__.__name__} does not have a " - "(Analysis)Nwbfile foreign key or _nwb_table attribute." - ) - return ( - self._nwb_table_resolved, - self._table_dict[self._nwb_table_resolved], + resolved, + table_dict[resolved], ) def fetch_nwb(self, *attrs, **kwargs): """Fetch NWBFile object from relevant table. - Implementing class must have a foreign key to Nwbfile or - AnalysisNwbfile or a _nwb_table attribute. + Implementing class must have a foreign key reference to Nwbfile or + AnalysisNwbfile (i.e., "-> (Analysis)Nwbfile" in definition) + or a _nwb_table attribute. If both are present, the attribute takes + precedence. + """ + return fetch_nwb(self, self._nwb_table_tuple, *attrs, **kwargs) + + # ------------------------ delete_downstream_merge ------------------------ - A class that does not have with either '-> Nwbfile' or - '-> AnalysisNwbfile' in its definition can use a _nwb_table attribute to - specify which table to use. + @cached_property + def _merge_tables(self) -> Dict[str, dj.FreeTable]: + """Dict of merge tables downstream of self: {full_table_name: FreeTable}. + + Cache of items in parents of self.descendants(as_objects=True). Both + descendant and parent must have the reserved primary key 'merge_id'. """ - nwb_table, path_attr = self._nwb_table_tuple - return fetch_nwb(self, (nwb_table, path_attr), *attrs, **kwargs) + self.connection.dependencies.load() + merge_tables = {} + for desc in self.descendants(as_objects=True): + if MERGE_PK not in desc.heading.names or not ( + master_name := get_master(desc.full_table_name) + ): + continue + master = dj.FreeTable(self.connection, master_name) + if MERGE_PK in master.heading.names: + merge_tables[master_name] = master + + logger.info( + f"Building merge cache for {self.table_name}.\n\t" + + f"Found {len(merge_tables)} downstream merge tables" + ) - # -------------------------------- delete --------------------------------- + return merge_tables - @property - def _delete_deps(self) -> list: - """List of tables required for delete permission check. + @cached_property + def _merge_chains(self) -> Dict[str, List[dj.FreeTable]]: + """Dict of chains to merges downstream of self - Used to delay import of tables until needed, avoiding circular imports. + Format: {full_table_name: TableChains}. + + For each merge table found in _merge_tables, find the path from self to + merge via merge parts. If the path is valid, add it to the dict. Cache + prevents need to recompute whenever delete_downstream_merge is called + with a new restriction. To recompute, add `reload_cache=True` to call. """ - if not self._delete_dependencies: - from spyglass.common import LabMember, LabTeam, Session # noqa F401 + merge_chains = {} + for name, merge_table in self._merge_tables.items(): + chains = TableChains(self, merge_table, connection=self.connection) + if len(chains): + merge_chains[name] = chains + return merge_chains + + def _commit_merge_deletes( + self, merge_join_dict: Dict[str, List[QueryExpression]], **kwargs + ) -> None: + """Commit merge deletes. - self._delete_dependencies = [LabMember, LabTeam, Session] - self._session_pk = Session.primary_key[0] - self._member_pk = LabMember.primary_key[0] - return self._delete_dependencies + Parameters + ---------- + merge_join_dict : Dict[str, List[QueryExpression]] + Dictionary of merge tables and their joins. Uses 'merge_id' primary + key to restrict delete. - @property - def _merge_del_func(self) -> callable: - """Callable: delete_downstream_merge function. + Extracted for use in cautious_delete and delete_downstream_merge.""" + for table_name, part_restr in merge_join_dict.items(): + table = self._merge_tables[table_name] + keys = [part.fetch(MERGE_PK, as_dict=True) for part in part_restr] + (table & keys).delete(**kwargs) - Used to delay import of func until needed, avoiding circular imports. + def delete_downstream_merge( + self, + restriction: str = None, + dry_run: bool = True, + reload_cache: bool = False, + disable_warning: bool = False, + return_parts: bool = True, + **kwargs, + ) -> Union[List[QueryExpression], Dict[str, List[QueryExpression]]]: + """Delete downstream merge table entries associated with restricton. + + Requires caching of merge tables and links, which is slow on first call. + + Parameters + ---------- + restriction : str, optional + Restriction to apply to merge tables. Default None. Will attempt to + use table restriction if None. + dry_run : bool, optional + If True, return list of merge part entries to be deleted. Default + True. + reload_cache : bool, optional + If True, reload merge cache. Default False. + disable_warning : bool, optional + If True, do not warn if no merge tables found. Default False. + return_parts : bool, optional + If True, return list of merge part entries to be deleted. Default + True. If False, return dictionary of merge tables and their joins. + **kwargs : Any + Passed to datajoint.table.Table.delete. """ - if not self._merge_delete_func: - from spyglass.utils.dj_merge_tables import ( # noqa F401 - delete_downstream_merge, + if reload_cache: + del self._merge_tables + del self._merge_chains + + restriction = restriction or self.restriction or True + + merge_join_dict = {} + for name, chain in self._merge_chains.items(): + join = chain.join(restriction) + if join: + merge_join_dict[name] = join + + if not merge_join_dict and not disable_warning: + logger.warning( + f"No merge tables found downstream of {self.full_table_name}." + + "\n\tIf this is unexpected, try running with `reload_cache`." ) - self._merge_delete_func = delete_downstream_merge - return self._merge_delete_func + if dry_run: + return merge_join_dict.values() if return_parts else merge_join_dict - def _find_session_link( + self._commit_merge_deletes(merge_join_dict, **kwargs) + + def ddm( self, - table: dj.user_tables.UserTable, - search_limit: int = 2, - ) -> dj.expression.QueryExpression: - """Find Session table associated with table. + restriction: str = None, + dry_run: bool = True, + reload_cache: bool = False, + disable_warning: bool = False, + return_parts: bool = True, + *args, + **kwargs, + ) -> Union[List[QueryExpression], Dict[str, List[QueryExpression]]]: + """Alias for delete_downstream_merge.""" + return self.delete_downstream_merge( + restriction=restriction, + dry_run=dry_run, + reload_cache=reload_cache, + disable_warning=disable_warning, + return_parts=return_parts, + *args, + **kwargs, + ) - Parameters - ---------- - table : datajoint.user_tables.UserTable - Table to search for Session ancestor. - Session : datajoint.user_tables.UserTable - Session table to search for. Passed as arg to prevent re-import. - search_limit : int, optional - Number of levels of children of target table to search. Default 2. + # ---------------------------- cautious_delete ---------------------------- - Returns - ------- - datajoint.expression.QueryExpression or None - Join of table link with Session table if found, else None. - """ - Session = self._delete_deps[-1] - # TODO: check search_limit default is enough for any table in spyglass - if self._session_pk in table.primary_key: - # joinable with Session - return table * Session + @cached_property + def _delete_deps(self) -> List[Table]: + """List of tables required for delete permission check. - elif search_limit > 0: - for child in table.children(as_objects=True): - table = self._find_session_link(child, search_limit - 1) - if table: # table is link, will valid join to Session - return table + LabMember, LabTeam, and Session are required for delete permission. - elif not table or search_limit < 1: # if none found and limit reached - return # Err kept in parent func to centralize permission logic + Used to delay import of tables until needed, avoiding circular imports. + Each of these tables inheits SpyglassMixin. + """ + from spyglass.common import LabMember, LabTeam, Session # noqa F401 - return table * Session + self._session_pk = Session.primary_key[0] + self._member_pk = LabMember.primary_key[0] + return [LabMember, LabTeam, Session] - def _get_exp_summary(self, sess_link: dj.expression.QueryExpression): + def _get_exp_summary(self): """Get summary of experimenters for session(s), including NULL. Parameters @@ -196,18 +267,34 @@ def _get_exp_summary(self, sess_link: dj.expression.QueryExpression): Summary of experimenters for session(s). """ Session = self._delete_deps[-1] + SesExp = Session.Experimenter + empty_pk = {self._member_pk: "NULL"} format = dj.U(self._session_pk, self._member_pk) - exp_missing = format & (sess_link - Session.Experimenter).proj( - **{self._member_pk: "NULL"} - ) - exp_present = ( - format & (sess_link * Session.Experimenter - exp_missing).proj() - ) + sess_link = self._session_connection.join(self.restriction) + + exp_missing = format & (sess_link - SesExp).proj(**empty_pk) + exp_present = format & (sess_link * SesExp - exp_missing).proj() + return exp_missing + exp_present + @cached_property + def _session_connection(self) -> Union[TableChain, bool]: + """Path from Session table to self. False if no connection found.""" + connection = TableChain(parent=self._delete_deps[-1], child=self) + return connection if connection.has_link else False + + @cached_property + def _test_mode(self) -> bool: + """Return True if in test mode. + + Avoids circular import. Prevents prompt on delete.""" + from spyglass.settings import test_mode + + return test_mode + def _check_delete_permission(self) -> None: - """Check user name against lab team assoc. w/ self * Session. + """Check user name against lab team assoc. w/ self -> Session. Returns ------- @@ -226,20 +313,18 @@ def _check_delete_permission(self) -> None: if dj_user in LabMember().admin: # bypass permission check for admin return - sess_link = self._find_session_link(table=self) - if not sess_link: # Permit delete if not linked to a session - logger.warn( + if not self._session_connection: + logger.warn( # Permit delete if no session connection "Could not find lab team associated with " + f"{self.__class__.__name__}." + "\nBe careful not to delete others' data." ) return - sess_summary = self._get_exp_summary( - sess_link.restrict(self.restriction) - ) + sess_summary = self._get_exp_summary() experimenters = sess_summary.fetch(self._member_pk) if None in experimenters: + # TODO: Check if allow delete of remainder? raise PermissionError( "Please ensure all Sessions have an experimenter in " + f"SessionExperimenter:\n{sess_summary}" @@ -262,7 +347,25 @@ def _check_delete_permission(self) -> None: ) logger.info(f"Queueing delete for session(s):\n{sess_summary}") - # Rename to `delete` when we're ready to use it + @cached_property + def _usage_table(self): + """Temporary inclusion for usage tracking.""" + from spyglass.common.common_usage import CautiousDelete + + return CautiousDelete + + def _log_use(self, start, merge_deletes=None): + """Log use of cautious_delete.""" + self._usage_table.insert1( + dict( + duration=time() - start, + dj_user=dj.config["database.user"], + origin=self.full_table_name, + restriction=self.restriction, + merge_deletes=merge_deletes, + ) + ) + # TODO: Intercept datajoint delete confirmation prompt for merge deletes def cautious_delete(self, force_permission: bool = False, *args, **kwargs): """Delete table rows after checking user permission. @@ -280,15 +383,15 @@ def cautious_delete(self, force_permission: bool = False, *args, **kwargs): *args, **kwargs : Any Passed to datajoint.table.Table.delete. """ + start = time() if not force_permission: self._check_delete_permission() - merge_deletes = self._merge_del_func( - self, - restriction=self.restriction if self.restriction else None, + merge_deletes = self.delete_downstream_merge( dry_run=True, disable_warning=True, + return_parts=False, ) safemode = ( @@ -298,21 +401,28 @@ def cautious_delete(self, force_permission: bool = False, *args, **kwargs): ) if merge_deletes: - for table, _ in merge_deletes: - count, name = len(table), table.full_table_name - dj_logger.info(f"Merge: Deleting {count} rows from {name}") + for table, content in merge_deletes.items(): + count = sum([len(part) for part in content]) + dj_logger.info(f"Merge: Deleting {count} rows from {table}") if ( - not safemode + not self._test_mode + or not safemode or user_choice("Commit deletes?", default="no") == "yes" ): - for merge_table, _ in merge_deletes: - merge_table.delete({**kwargs, "safemode": False}) + self._commit_merge_deletes(merge_deletes, **kwargs) else: logger.info("Delete aborted.") + self._log_use(start) return super().delete(*args, **kwargs) # Additional confirm here + self._log_use(start=start, merge_deletes=merge_deletes) + def cdel(self, *args, **kwargs): """Alias for cautious_delete.""" self.cautious_delete(*args, **kwargs) + + def delete(self, *args, **kwargs): + """Alias for cautious_delete, overwrites datajoint.table.Table.delete""" + self.cautious_delete(*args, **kwargs)