Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] Ingest/outgest round-trip improvements #2804

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 82 additions & 15 deletions apis/python/src/tiledbsoma/io/_registration/signatures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Dict, Optional
from typing import Dict, Optional, Union

import anndata as ad
import attrs
Expand Down Expand Up @@ -69,25 +69,92 @@ def _string_dict_from_pandas_dataframe(
"""

df = df.head(1).copy() # since reset_index can be expensive on full data
if df.index.name is None or df.index.name == "index":
df.reset_index(inplace=True)
if default_index_name in df:
if "index" in df:
# Avoid the warning:
# "A value is trying to be set on a copy of a slice from a DataFrame"
# which would occur if we did:
# df.drop(columns=["index"], inplace=True)
df = df.drop(columns=["index"])
else:
df.rename(columns={"index": default_index_name}, inplace=True)
else:
df.reset_index(inplace=True)

_prepare_df_for_ingest(df, default_index_name)
arrow_table = df_to_arrow(df)
arrow_schema = arrow_table.schema.remove_metadata()
return _string_dict_from_arrow_schema(arrow_schema)


# Metadata indicating a SOMA DataFrame's original index column name, serialized as a JSON string or `null`.
# SOMA DataFrames are always given a `soma_joinid` index, but we want to be able to outgest a `pd.DataFrame` that is
# identical to the one we ingested, so we store an "original index name" in the DataFrame's metadata.
OriginalIndexMetadata = Union[None, str]


def _prepare_df_for_ingest(
df: pd.DataFrame, id_column_name: Optional[str]
) -> OriginalIndexMetadata:
"""Prepare a `pd.DataFrame` for persisting as a SOMA DataFrame: demote its index to a column (to make way for a
required `soma_joinid` index), and compute and return metadata for restoring the index column and name later (on
outgest).

If `df.index` has a name (and it's not "index", which is taken to be a default/unset value):
- `df.index.name` takes precedence over the `id_column_name` arg: the index will be reset to an eponymous column.
- That original `df.index.name` will be logged as `OriginalIndexMetadata` (for promotion back to index on outgest).

In this case, the overall round trip is basically just:
- `reset_index` on ingest (demote index to eponymous column).
- `set_index` on outgest (restore column to index, with its original name).

Otherwise (index name is `None` or "index"):
- A fallback name (`id_column_name` if provided, "index" otherwise) is used for the column that the index becomes.
- The returned `OriginalIndexMetadata` will be `None`.

There are several edge cases (detailed below and in `test_dataframe_io_roundtrips.py` and
https://github.com/single-cell-data/TileDB-SOMA/issues/2829) where the index, its name, or a specific column are not
restored properly on outgest. For now, all such behavior is preserved, for backwards compatibility, but we should
look into ways of improving these "round-trip mutation" cases. See
https://github.com/single-cell-data/TileDB-SOMA/issues/2829 for more info.
"""
use_existing_index = df.index.name is not None and df.index.name != "index"

original_index_name = None
if use_existing_index:
original_index_name = df.index.name

df.reset_index(inplace=True)
if id_column_name is not None:
if id_column_name in df:
if "index" in df:
# The assumption here is that the column named "index" was previously an unnamed `df.index`, and
# `id_column_name` was already a column (per the grandparent `if` above). In this case, we drop the
# original unnamed `df.index`.
# TODO: This prevents outgesting the same DataFrame we ingested. We should fix it; see
# https://github.com/single-cell-data/TileDB-SOMA/issues/2829.
#
# Also note: if the DataFrame already had columns named "index" and `id_column_name`, the original
# `df.index` will have been "reset" to a column named `level_0`, and we end up just dropping the column
# named "index" here.
#
# Another version of this occurs when the original DataFrame has `df.index.name == id_column_name` and a
# column named "index". In this case, the index will have been "reset" to a column named
# `id_column_name` above, which then satisfies the grendparent `if`'s predicate, and causes us to drop
# the column named "index" here.
df.drop(columns=["index"], inplace=True)
else:
# If `id_column_name` was passed, and is not already a column in the DataFrame, we assume the original index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your careful analysis here -- I really appreciate it.

The enumeration of problems here should moved from code comments here, to a tracking issue I'm asking you to create. Then that issue's link should be placed here.

Copy link
Member Author

@ryan-williams ryan-williams Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a link to #2829, couldn't tell if you wanted me to remove the rest of the comment here.

I've left it for now, since the explanation here is specific to the nested if/else context, and the issues the rename below specifically introduces given that context.

Copy link
Member

@johnkerl johnkerl Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. What we have now is a bug-report listing as an extended code comment. That is more appropriately written as an issue we can track and schedule (#2829) -- and I asked you to move this bug-listing from here to there. But if you feel it's crucial for understanding of the code to have this list-out duplicated here, I won't reject this PR on that basis.

# was "reset" to a column named "index" (by `reset_index` above), and we rename that column to
# `id_column_name`, so that `id_column_name` matches the name of a column representing the original
# DataFrame's index.
#
# NOTE: the assumption above can break in a few ways:
# 1. The original DataFrame index has a name other than "index" or `id_column_name`…
# a. and there is a column named "index" ⇒ that column will be renamed to `id_column_name`
# b. and there is no column named "index" ⇒ the rename below is a no-op (outgest currently restores the
# original DataFrame in this case)
# 2. The original DataFrame has a column named "index":
# - That column will become `df.index` on outgest, and acquire the original `df.index.name` as its name.
# - The original index will end up as a column, on outgest:
# - If it had a name, the column will have that name.
# - Otherwise, it will end up as a column named e.g. `level_0` (or `level_1`, if a column named
# `level_0` already exists, etc.)
#
# See https://github.com/single-cell-data/TileDB-SOMA/issues/2829 for more info.
df.rename(columns={"index": id_column_name}, inplace=True)

return original_index_name


@attrs.define(kw_only=True)
class Signature:
"""
Expand Down
21 changes: 5 additions & 16 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
get_dataframe_values,
signatures,
)
from ._registration.signatures import OriginalIndexMetadata, _prepare_df_for_ingest
from ._util import read_h5ad

_NDArr = TypeVar("_NDArr", bound=NDArray)
Expand Down Expand Up @@ -1149,20 +1150,8 @@ def _write_dataframe(
The caller should have copied anything pointing to a user-provided
adata.obs, adata.var, etc.
"""
original_index_name = None
if df.index is not None and df.index.name is not None and df.index.name != "index":
original_index_name = df.index.name

df.reset_index(inplace=True)
if id_column_name is not None:
if id_column_name in df:
if "index" in df:
df.drop(columns=["index"], inplace=True)
else:
df.rename(columns={"index": id_column_name}, inplace=True)

original_index_metadata = _prepare_df_for_ingest(df, id_column_name)
df[SOMA_JOINID] = np.asarray(axis_mapping.data, dtype=np.int64)

df.set_index(SOMA_JOINID, inplace=True)

return _write_dataframe_impl(
Expand All @@ -1171,7 +1160,7 @@ def _write_dataframe(
id_column_name,
ingestion_params=ingestion_params,
additional_metadata=additional_metadata,
original_index_name=original_index_name,
original_index_metadata=original_index_metadata,
platform_config=platform_config,
context=context,
)
Expand All @@ -1184,7 +1173,7 @@ def _write_dataframe_impl(
*,
ingestion_params: IngestionParams,
additional_metadata: AdditionalMetadata = None,
original_index_name: Optional[str] = None,
original_index_metadata: OriginalIndexMetadata = None,
platform_config: Optional[PlatformConfig] = None,
context: Optional[SOMATileDBContext] = None,
) -> DataFrame:
Expand Down Expand Up @@ -1247,7 +1236,7 @@ def _write_dataframe_impl(
# Save the original index name for outgest. We use JSON for elegant indication of index name
# being None (in Python anyway).
soma_df.metadata[_DATAFRAME_ORIGINAL_INDEX_NAME_JSON] = json.dumps(
original_index_name
original_index_metadata
)
add_metadata(soma_df, additional_metadata)

Expand Down
109 changes: 60 additions & 49 deletions apis/python/src/tiledbsoma/io/outgest.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,64 @@
return data


def _read_dataframe(
df: DataFrame, default_index_name: Optional[str], fallback_index_name: str
) -> pd.DataFrame:
"""Outgest a SOMA DataFrame to Pandas, including restoring the original index{,.name}.

An `OriginalIndexMetadata` attached to the DataFrame, if present, contains a string (or `null`), indicating a
SOMADataFrame column which should be restored as the pd.DataFrame index.

`default_index_name`, if provided, overrides the stored `OriginalIndexMetadata`; a column with this name will be
verified to exist, and set as index of the returned `pd.DataFrame`.

If neither `default_index_name` nor `OriginalIndexMetadata` are provided, the `fallback_index_name` will be used.
`to_anndata` passes "obs_id" / "var_id" for obs/var, matching `from_anndata`'s default `{obs,var}_id_name` values.

NOTE: several edge cases result in the outgested DataFrame not matching the original DataFrame; see
johnkerl marked this conversation as resolved.
Show resolved Hide resolved
`test_dataframe_io_roundtrips.py` / https://github.com/single-cell-data/TileDB-SOMA/issues/2829.
"""
# Read and validate the "original index metadata" stored alongside this SOMA DataFrame.
original_index_metadata = json.loads(
df.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, "null")
)
if not (
original_index_metadata is None or isinstance(original_index_metadata, str)
):
raise ValueError(

Check warning on line 150 in apis/python/src/tiledbsoma/io/outgest.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/io/outgest.py#L150

Added line #L150 was not covered by tests
f"{df.uri}: invalid {_DATAFRAME_ORIGINAL_INDEX_NAME_JSON} metadata: {original_index_metadata}"
)

pdf: pd.DataFrame = df.read().concat().to_pandas()
# SOMA DataFrames always have a `soma_joinid` added, as part of the ingest process, which we remove on outgest.
pdf.drop(columns=SOMA_JOINID, inplace=True)

default_index_name = default_index_name or original_index_metadata
if default_index_name is not None:
# One or both of the following was true:
# - Original DataFrame had an index name (other than "index") ⇒ that name was written as `OriginalIndexMetadata`
# - `default_index_name` was provided (e.g. `{obs,var}_id_name` args to `to_anndata`)
#
# ⇒ Verify a column with that name exists, and set it as index (keeping its name).
if default_index_name not in pdf.keys():
raise ValueError(
f"Requested ID column name {default_index_name} not found in input: {pdf.keys()}"
)
pdf.set_index(default_index_name, inplace=True)
else:
# The assumption here is that the original index was unnamed, and was given a "fallback name" (e.g. "obs_id",
# "var_id") during ingest that matches the `fallback_index_name` arg here. In this case, we restore that column
# as index, and remove the name.
#
# NOTE: several edge cases result in the outgested DF not matching the original DF; see
johnkerl marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/single-cell-data/TileDB-SOMA/issues/2829.
if fallback_index_name in pdf:
pdf.set_index(fallback_index_name, inplace=True)
pdf.index.name = None

return pdf


# ----------------------------------------------------------------
def to_anndata(
experiment: Experiment,
Expand Down Expand Up @@ -196,55 +254,8 @@
# * Else if the names used at ingest time are available, use them.
# * Else use the default/fallback name.

# Restore the original index name for outgest. We use JSON for elegant indication of index
# name being None (in Python anyway). It may be 'null' which maps to Pyhton None.
obs_id_name = obs_id_name or json.loads(
experiment.obs.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, '"obs_id"')
)
var_id_name = var_id_name or json.loads(
measurement.var.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, '"var_id"')
)

obs_df = experiment.obs.read().concat().to_pandas()
obs_df.drop([SOMA_JOINID], axis=1, inplace=True)
if obs_id_name is not None:
if obs_id_name not in obs_df.keys():
raise ValueError(
f"requested obs IDs column name {obs_id_name} not found in input: {obs_df.keys()}"
)
obs_df.set_index(obs_id_name, inplace=True)
else:
# There are multiple cases to be handled here, all tested in CI.
# This else-block handle this one:
#
# orig.ident nCount_RNA ...
# ATGCCAGAACGACT 0 70.0 ...
# CATGGCCTGTGCAT 0 85.0 ...
# GAACCTGATGAACC 0 87.0 ...
#
# Namely:
# * The input AnnData dataframe had an index with no name
# * In the SOMA experiment we name that column "obs_id" and our index is "soma_joinid"
# * On outgest we drop "soma_joinid"
# * The thing we named "obs_id" needs to become the index again ...
# * ... and it needs to be nameless.
if "obs_id" in obs_df:
obs_df.set_index("obs_id", inplace=True)
obs_df.index.name = None

var_df = measurement.var.read().concat().to_pandas()

var_df.drop([SOMA_JOINID], axis=1, inplace=True)
if var_id_name is not None:
if var_id_name not in var_df.keys():
raise ValueError(
f"requested var IDs column name {var_id_name} not found in input: {var_df.keys()}"
)
var_df.set_index(var_id_name, inplace=True)
else:
if "var_id" in var_df:
var_df.set_index("var_id", inplace=True)
var_df.index.name = None
obs_df = _read_dataframe(experiment.obs, obs_id_name, "obs_id")
var_df = _read_dataframe(measurement.var, var_id_name, "var_id")

nobs = len(obs_df.index)
nvar = len(var_df.index)
Expand Down
Loading
Loading