Skip to content

Commit

Permalink
Fix names and add exclusion to mapping (#59)
Browse files Browse the repository at this point in the history
* add more progress bars; adjust variable names; move temporal binning step earlier to save memory

* fix

* fix

* fix

* fix

* Update README.md

* add include

* fix

* ad note

* undo cache dumping per iteration

* Update README.md

* Update README.md

* restrict to dandisets

* remove object type

* add parallel note

* fix cache

* fix

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 26, 2024
1 parent 5244c4c commit 4895c32
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 55 deletions.
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ To map:
map_binned_s3_logs_to_dandisets \
--binned_s3_logs_folder_path < binned S3 logs folder path > \
--mapped_s3_logs_folder_path < mapped Dandiset logs folder > \
--object_type < blobs or zarr >
--excluded_dandisets < comma-separated list of six-digit IDs to exclude > \
--restrict_to_dandisets < comma-separated list of six-digit IDs to restrict mapping to >
```

For example, on Drogon:
Expand All @@ -148,12 +149,25 @@ For example, on Drogon:
map_binned_s3_logs_to_dandisets \
--binned_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-binned \
--mapped_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-mapped \
--object_type blobs
--excluded_dandisets 000108
```

In the summer of 2024, this `blobs` process took less than 6 hours to run (with caches; 8 hours without caches) with one worker. The process could easily be parallelized if requested.
In the summer of 2024, this blobs process took less than 8 hours to complete (with caches; 10 hours without caches) with one worker.

Mapping the `zarr` objects takes much longer (more than 12 hours, and is more memory intensive), but the general process is the same.
Some Dandisets may take disproportionately longer than others to process. For this reason, the command also accepts `--excluded_dandisets` and `--restrict_to_dandisets`.

This is strongly suggested for skipping `000108` in the main run and processing it separately (possibly on a different CRON cycle altogether).

```bash
map_binned_s3_logs_to_dandisets \
--binned_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-binned \
--mapped_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-mapped \
--restrict_to_dandisets 000108
```

In the summer of 2024, this took ?? hours to complete.

The mapping process can theoretically be designed to work in parallel (and thus much faster), but this would take some effort to design. If interested, please open an issue to request this feature.



Expand Down
28 changes: 20 additions & 8 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import collections
import pathlib
from typing import Literal

import click

Expand Down Expand Up @@ -135,28 +134,41 @@ def _bin_all_reduced_s3_logs_by_object_key_cli(
type=click.Path(writable=False),
)
@click.option(
"--object_type",
help="The type of objects to map the logs to, as determined by the parents of the object keys.",
required=True,
type=click.Choice(["blobs", "zarr"]),
"--excluded_dandisets",
help="A comma-separated list of Dandiset IDs to exclude from processing.",
required=False,
type=str,
default=None,
)
@click.option(
"--restrict_to_dandisets",
help="A comma-separated list of Dandiset IDs to exclusively process.",
required=False,
type=str,
default=None,
)
@click.option(
"--dandiset_limit",
help="The maximum number of Dandisets to process per call.",
help="The maximum number of Dandisets to process per call. Useful for quick testing.",
required=False,
type=int,
default=None,
)
def _map_binned_s3_logs_to_dandisets_cli(
binned_s3_logs_folder_path: pathlib.Path,
mapped_s3_logs_folder_path: pathlib.Path,
object_type: Literal["blobs", "zarr"],
excluded_dandisets: str | None,
restrict_to_dandisets: str | None,
dandiset_limit: int | None,
) -> None:
split_excluded_dandisets = excluded_dandisets.split(",") if excluded_dandisets is not None else None
split_restrict_to_dandisets = restrict_to_dandisets.split(",") if restrict_to_dandisets is not None else None

map_binned_s3_logs_to_dandisets(
binned_s3_logs_folder_path=binned_s3_logs_folder_path,
mapped_s3_logs_folder_path=mapped_s3_logs_folder_path,
object_type=object_type,
excluded_dandisets=split_excluded_dandisets,
restrict_to_dandisets=split_restrict_to_dandisets,
dandiset_limit=dandiset_limit,
)

Expand Down
117 changes: 80 additions & 37 deletions src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import pathlib
from typing import Literal

import dandi.dandiapi
import natsort
Expand All @@ -15,7 +14,8 @@
def map_binned_s3_logs_to_dandisets(
binned_s3_logs_folder_path: DirectoryPath,
mapped_s3_logs_folder_path: DirectoryPath,
object_type: Literal["blobs", "zarr"],
excluded_dandisets: list[str] | None = None,
restrict_to_dandisets: list[str] | None = None,
dandiset_limit: int | None = None,
) -> None:
"""
Expand All @@ -31,10 +31,13 @@ def map_binned_s3_logs_to_dandisets(
The path to the folder containing the reduced S3 log files.
mapped_s3_logs_folder_path : DirectoryPath
The path to the folder where the mapped logs will be saved.
object_type : one of "blobs" or "zarr"
The type of objects to map the logs to, as determined by the parents of the object keys.
excluded_dandisets : list of str, optional
A list of Dandiset IDs to exclude from processing.
restrict_to_dandisets : list of str, optional
A list of Dandiset IDs to exclusively process.
dandiset_limit : int, optional
The maximum number of Dandisets to process per call.
Useful for quick testing.
"""
if "IPINFO_CREDENTIALS" not in os.environ:
message = "The environment variable 'IPINFO_CREDENTIALS' must be set to import `dandi_s3_log_parser`!"
Expand All @@ -48,80 +51,114 @@ def map_binned_s3_logs_to_dandisets(
)
raise ValueError(message) # pragma: no cover

# TODO: cache all applicable DANDI API calls
if excluded_dandisets is not None and restrict_to_dandisets is not None:
message = "Only one of `exclude_dandisets` or `restrict_to_dandisets` can be passed, not both!"
raise ValueError(message)

excluded_dandisets = excluded_dandisets or []
restrict_to_dandisets = restrict_to_dandisets or []

# TODO: add mtime record for binned files to determine if update is needed

client = dandi.dandiapi.DandiAPIClient()

ip_hash_to_region = _load_ip_hash_cache(name="region")
ip_hash_not_in_services = _load_ip_hash_cache(name="services")
current_dandisets = list(client.get_dandisets())[:dandiset_limit]

if len(restrict_to_dandisets) != 0:
current_dandisets = [client.get_dandiset(dandiset_id=dandiset_id) for dandiset_id in restrict_to_dandisets]
else:
current_dandisets = [
dandiset for dandiset in client.get_dandisets() if dandiset.identifier not in excluded_dandisets
]
current_dandisets = current_dandisets[:dandiset_limit]

for dandiset in tqdm.tqdm(
iterable=current_dandisets,
total=len(current_dandisets),
desc="Mapping reduced logs to Dandisets...",
position=0,
leave=True,
mininterval=5.0,
smoothing=0,
):
_map_binneded_logs_to_dandiset(
_map_binned_logs_to_dandiset(
dandiset=dandiset,
binneded_s3_logs_folder_path=binned_s3_logs_folder_path,
binned_s3_logs_folder_path=binned_s3_logs_folder_path,
dandiset_logs_folder_path=mapped_s3_logs_folder_path,
object_type=object_type,
client=client,
ip_hash_to_region=ip_hash_to_region,
ip_hash_not_in_services=ip_hash_not_in_services,
)

_save_ip_hash_cache(name="region", ip_cache=ip_hash_to_region)
_save_ip_hash_cache(name="services", ip_cache=ip_hash_not_in_services)
_save_ip_hash_cache(name="region", ip_cache=ip_hash_to_region)
_save_ip_hash_cache(name="services", ip_cache=ip_hash_not_in_services)

return None


def _map_binneded_logs_to_dandiset(
def _map_binned_logs_to_dandiset(
dandiset: dandi.dandiapi.RemoteDandiset,
binneded_s3_logs_folder_path: pathlib.Path,
binned_s3_logs_folder_path: pathlib.Path,
dandiset_logs_folder_path: pathlib.Path,
object_type: Literal["blobs", "zarr"],
client: dandi.dandiapi.DandiAPIClient,
ip_hash_to_region: dict[str, str],
ip_hash_not_in_services: dict[str, bool],
) -> None:
dandiset_id = dandiset.identifier
dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id

for version in dandiset.get_versions():
dandiset_versions = list(dandiset.get_versions())
for version in tqdm.tqdm(
iterable=dandiset_versions,
total=len(dandiset_versions),
desc=f"Mapping Dandiset {dandiset_id} versions...",
position=1,
leave=False,
mininterval=5.0,
smoothing=0,
):
version_id = version.identifier
dandiset_version_log_folder_path = dandiset_log_folder_path / version_id

dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)

all_activity_for_version = []
for asset in dandiset_version.get_assets():
dandiset_version_assets = list(dandiset_version.get_assets())
for asset in tqdm.tqdm(
iterable=dandiset_version_assets,
total=len(dandiset_version_assets),
desc="Mapping assets...",
position=2,
leave=False,
mininterval=5.0,
smoothing=0,
):
asset_as_path = pathlib.Path(asset.path)
asset_suffixes = asset_as_path.suffixes
dandi_filename = asset_as_path.name.removesuffix("".join(asset_suffixes))

is_asset_zarr = ".zarr" in asset_suffixes
if is_asset_zarr and object_type == "blobs":
continue
if not is_asset_zarr and object_type == "zarr":
continue
# Removing suffixes works fine on NWB Dandisets
# But the BIDS standard allows files of the same stems to have different suffixes
# Thus we must keep the suffix information to disambiguate the mapped TSV files
dandi_filename = asset_as_path.name.replace(".", "_")

is_asset_zarr = ".zarr" in asset_suffixes
if is_asset_zarr:
blob_id = asset.zarr
reduced_s3_log_file_path = binneded_s3_logs_folder_path / "zarr" / f"{blob_id}.tsv"
binned_s3_log_file_path = binned_s3_logs_folder_path / "zarr" / f"{blob_id}.tsv"
else:
blob_id = asset.blob
reduced_s3_log_file_path = (
binneded_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
binned_s3_log_file_path = (
binned_s3_logs_folder_path / "blobs" / blob_id[:3] / blob_id[3:6] / f"{blob_id}.tsv"
)

if not reduced_s3_log_file_path.exists():
# TODO: Could add a step here to track which object IDs have been processed, and if encountered again
# Just copy the file over instead of reprocessing

if not binned_s3_log_file_path.exists():
continue # No reduced logs found (possible asset was never accessed); skip to next asset

reduced_s3_log_binned_by_blob_id = pandas.read_table(filepath_or_buffer=reduced_s3_log_file_path, header=0)
reduced_s3_log_binned_by_blob_id = pandas.read_table(filepath_or_buffer=binned_s3_log_file_path, header=0)

reduced_s3_log_binned_by_blob_id["region"] = [
get_region_from_ip_address(
Expand All @@ -144,21 +181,27 @@ def _map_binneded_logs_to_dandiset(
path_or_buf=version_asset_file_path, mode="w", sep="\t", header=True, index=True
)

all_activity_for_version.append(reordered_reduced_s3_log)
reordered_reduced_s3_log["date"] = [entry[:10] for entry in reordered_reduced_s3_log["timestamp"]]

if len(all_activity_for_version) == 0:
continue # No reduced logs found (possible dandiset version was never accessed); skip to next version
reordered_reduced_s3_log_aggregated = reordered_reduced_s3_log.groupby("date", as_index=False)[
"bytes_sent"
].agg([list, "sum"])
reordered_reduced_s3_log_aggregated.rename(columns={"sum": "bytes_sent"}, inplace=True)

mapped_log = pandas.concat(objs=all_activity_for_version, ignore_index=True)
mapped_log["date"] = [entry[:10] for entry in mapped_log["timestamp"]]
reordered_reduced_s3_log_binned_per_day = reordered_reduced_s3_log_aggregated.reindex(
columns=("date", "bytes_sent")
)
reordered_reduced_s3_log_binned_per_day.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)

all_activity_for_version.append(reordered_reduced_s3_log_binned_per_day)

mapped_log_aggregated = mapped_log.groupby("date", as_index=False)["bytes_sent"].agg([list, "sum"])
mapped_log_aggregated.rename(columns={"sum": "bytes_sent"}, inplace=True)
if len(all_activity_for_version) == 0:
continue # No reduced logs found (possible dandiset version was never accessed); skip to next version

mapped_log_binned_per_day = mapped_log_aggregated.reindex(columns=("date", "bytes_sent"))
mapped_log_binned_per_day.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)
summary_logs = pandas.concat(objs=all_activity_for_version, ignore_index=True)
summary_logs.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True)

summary_file_path = dandiset_version_log_folder_path / "summary.tsv"
mapped_log_binned_per_day.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True)
summary_logs.to_csv(path_or_buf=summary_file_path, mode="w", sep="\t", header=True)

return None
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ def test_map_all_reduced_s3_logs_to_dandisets(tmpdir: py.path.local):
dandi_s3_log_parser.map_binned_s3_logs_to_dandisets(
binned_s3_logs_folder_path=example_binned_s3_logs_folder_path,
mapped_s3_logs_folder_path=test_mapped_s3_logs_folder_path,
object_type="blobs",
)
dandi_s3_log_parser.map_binned_s3_logs_to_dandisets(
binned_s3_logs_folder_path=example_binned_s3_logs_folder_path,
mapped_s3_logs_folder_path=test_mapped_s3_logs_folder_path,
object_type="zarr",
)

test_file_paths = {
Expand Down

0 comments on commit 4895c32

Please sign in to comment.