Skip to content

Commit

Permalink
Merge pull request #908 from kbuma/enhancement/pandas_memory_store
Browse files Browse the repository at this point in the history
creating PandasMemoryStore for use by OpenDataStore
  • Loading branch information
Jason Munro authored Jan 19, 2024
2 parents 9a89f17 + 9b8ad5c commit 91132a4
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 83 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"montydb": ["montydb>=2.3.12"],
"notebook_runner": ["IPython>=8.11", "nbformat>=5.0", "regex>=2020.6"],
"azure": ["azure-storage-blob>=12.16.0", "azure-identity>=1.12.0"],
"open_data": ["pandas>=2.1.4"],
"testing": [
"pytest",
"pytest-cov",
Expand Down
326 changes: 315 additions & 11 deletions src/maggma/stores/open_data.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,325 @@
import gzip
from datetime import datetime
from io import BytesIO
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union

import orjson
import pandas as pd
from boto3 import Session
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import ClientError
from bson import json_util

from maggma.core.store import Sort, Store
from maggma.stores.aws import S3Store
from maggma.stores.mongolike import MemoryStore


class S3IndexStore(MemoryStore):
class PandasMemoryStore(Store):
"""
A store that is backed by Pandas DataFrame.
"""

def __init__(
self,
**kwargs,
):
self._data = None
super().__init__(**kwargs)

@property
def _collection(self):
"""
Returns a handle to the pymongo collection object
Raises:
NotImplementedError: always as this concept does not make sense for this type of store
"""
raise NotImplementedError("Index memory store cannot be used with this property")

@property
def name(self) -> str:
"""
Return a string representing this data source
"""
return "imem://"

def connect(self, force_reset: bool = False):
"""
Connect to the source data.
Not necessary for this type of store but here for compatibility.
Args:
force_reset: whether to reset the connection or not
"""
return

def close(self):
"""
Closes any connections
Not necessary for this type of store but here for compatibility.
"""
return

def count(self, criteria: Optional[Dict] = None) -> int:
"""
Counts the number of documents matching the query criteria
Returns:
int: the number of documents matching the query criteria
Args:
criteria: the value of the `query` key will be used as the string expression to filter;
NotImplmentedError will be thrown if it's not None and this key/value pair does not exist
"""
query_string = None
if criteria and "query" not in criteria:
raise NotImplementedError("Pandas memory store cannot handle PyMongo filters")
if criteria:
query_string = criteria["query"]

if self._data is None:
return 0
if query_string is None:
return len(self._data)
return len(self._data.query(query_string))

def _query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> pd.DataFrame:
query_string = ""
if criteria and "query" not in criteria:
raise AttributeError("Pandas memory store cannot handle PyMongo filters")
if criteria:
query_string = criteria["query"]

if isinstance(properties, dict):
properties = [key for key, value in properties.items() if value == 1]

if self._data is None:
return iter([])

ret = self._data

if query_string:
ret = ret.query(query_string)

if properties:
ret = ret[properties]

if sort:
sort_keys, sort_ascending = zip(*[(k, v == 1) for k, v in sort.items()])
ret = ret.sort_values(by=list(sort_keys), ascending=list(sort_ascending))

ret = ret[skip:]
if limit > 0:
ret = ret[:limit]
return ret

def query(
self,
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Args:
criteria: the value of the `query` key will be used as the Pandas string expression to query with;
all other data will be ignored
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip (from the start of the result set)
limit: limit on total number of documents returned
Returns:
Iterator[Dict]: an iterator over the documents that match the query parameters
Raises:
AttributeError: if criteria exists and does not include a query key
"""
ret = self._query(criteria=criteria, properties=properties, sort=sort, skip=skip, limit=limit)
return (row.to_dict() for _, row in ret.iterrows())

def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = None, clear_first: bool = False):
"""
Update documents into the Store
Args:
docs: the document or list of documents to update
key: field name(s) to determine uniqueness for a
document, can be a list of multiple fields,
a single field, or None if the Store's key
field is to be used
clear_first: if True clears the underlying data first, fully replacing the data with docs;
if False performs an upsert based on the parameters
"""
df = pd.DataFrame(docs)
if self._data is None or clear_first:
if not df.empty:
self._data = df
return

if key is None:
key = self.key

merged = self._data.merge(df, on=key, how="left", suffixes=("", "_B"))
for column in df.columns:
if column not in key:
merged[column] = merged[column + "_B"].combine_first(merged[column])
merged = merged[self._data.columns]
non_matching = df[~df.set_index(key).index.isin(self._data.set_index(key).index)]
self._data = pd.concat([merged, non_matching], ignore_index=True)

def ensure_index(self, key: str, unique: bool = False) -> bool:
"""
Tries to create an index and return true if it succeeded
Raises:
NotImplementedError: always as this concept does not make sense for this type of store
"""
raise NotImplementedError("Pandas memory store does not support this function")

def _field_exists(self, key: str) -> bool:
return key in self._data

def newer_in(self, target: "Store", criteria: Optional[Dict] = None, exhaustive: bool = False) -> List[str]:
"""
Returns the keys of documents that are newer in the target
Store than this Store.
Args:
target: target Store to compare with
criteria: the value of the `query` key will be used as the Pandas string expression to query with;
all other data will be ignored; only valid when exhaustive is True
exhaustive: triggers an item-by-item check vs. checking
the last_updated of the target Store and using
that to filter out new items in
Returns:
List[str]: if no criteria is provided a list of the keys of documents in the target store
whose last updated field value is greater than the 'newest' document in this store;
otherwise a list of the keys of documents in the target store that additionally meet the criteria
Raises:
AttributeError: if the key and last updated fields are not both present in this store or
if criteria is provided when exhaustive is not set to True
"""
if not (self._field_exists(self.key) and self._field_exists(self.last_updated_field)):
raise AttributeError("This index store does not contain data with both key and last updated fields")

if criteria is not None and not exhaustive:
raise AttributeError("Criteria is only considered when doing an item-by-item check")

if exhaustive:
# Get our current last_updated dates for each key value
props = {self.key: 1, self.last_updated_field: 1, "_id": 0}
dates = {
d[self.key]: self._lu_func[0](d.get(self.last_updated_field, datetime.max))
for d in self.query(properties=props)
}

# Get the last_updated for the store we're comparing with
props = {target.key: 1, target.last_updated_field: 1, "_id": 0}
target_dates = {
d[target.key]: target._lu_func[0](d.get(target.last_updated_field, datetime.min))
for d in target.query(criteria=criteria, properties=props)
}

new_keys = set(target_dates.keys()) - set(dates.keys())
updated_keys = {key for key, date in dates.items() if target_dates.get(key, datetime.min) > date}

return list(new_keys | updated_keys)

criteria = {"query": f"{self.last_updated_field} > '{self._lu_func[1](self.last_updated)}'"}
return target.distinct(field=self.key, criteria=criteria)

def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False) -> List:
"""
Get all distinct values for a field
Args:
field: the field(s) to get distinct values for
criteria: the value of the `query` key will be used as the Pandas string expression to query with;
all other data will be ignored
Returns:
List: a list of all the distinct values for the provided field (after filtering by the provided criteria)
"""
criteria = criteria or {}

return [key for key, _ in self.groupby(field, properties=[field], criteria=criteria)]

def groupby(
self,
keys: Union[List[str], str],
criteria: Optional[Dict] = None,
properties: Union[Dict, List, None] = None,
sort: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
) -> Iterator[Tuple[Dict, List[Dict]]]:
"""
Simple grouping function that will group documents
by keys.
Args:
keys: fields to group documents
criteria: the value of the `query` key will be used as the
Pandas string expression to query with;
all other data will be ignored
properties: properties to return in grouped documents
sort: Dictionary of sort order for fields. Keys are field names and
values are 1 for ascending or -1 for descending.
skip: number documents to skip
limit: limit on total number of documents returned
Returns:
Iterator[Tuple[Dict, List[Dict]]]: iterator returning tuples of (dict, list of docs)
"""
ret = self._query(criteria=criteria, properties=properties, sort=sort, skip=skip, limit=limit)
grouped_tuples = [(name, group) for name, group in ret.groupby(keys)]
return iter(grouped_tuples)

def remove_docs(self, criteria: Dict):
"""
Remove docs matching the query dictionary
Args:
criteria: query dictionary to match
Raises:
NotImplementedError: always as this concept is not used
"""
raise NotImplementedError("Not implemented for this store")

def __hash__(self):
"""Hash for the store"""
return hash((self.key, self.last_updated_field))

def __eq__(self, other: object) -> bool:
"""
Check equality for PandasMemoryStore
other: other PandasMemoryStore to compare with
"""
if not isinstance(other, PandasMemoryStore):
return False

fields = ["key", "last_updated_field"]
return all(getattr(self, f) == getattr(other, f) for f in fields)


class S3IndexStore(PandasMemoryStore):
"""
A store that loads the index of the collection from an S3 file.
Expand Down Expand Up @@ -50,24 +356,23 @@ def __init__(
self.session: Any = None
self.s3_session_kwargs = {}
self.manifest_key = manifest_key
self.kwargs = kwargs

kwargs["collection_name"] = collection_name
super().__init__(**kwargs)

def _get_full_key_path(self) -> str:
"""Produces the full path for the index."""
return f"{self.prefix}{self.manifest_key}"

def _retrieve_manifest(self) -> List[Dict]:
def _retrieve_manifest(self) -> pd.DataFrame:
"""Retrieves the contents of the index stored in S3.
Returns:
List[Dict]: The index contents with each item representing a document.
"""
try:
response = self.client.get_object(Bucket=self.bucket, Key=self._get_full_key_path())
json_data = orjson.loads(response["Body"].read().decode("utf-8"))
return json_data if isinstance(json_data, list) else [json_data]
return pd.read_json(response["Body"], orient="records")
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
return []
Expand All @@ -79,8 +384,7 @@ def _load_index(self, force_reset: bool = False) -> None:
Args:
force_reset: whether to force a reset of the memory store prior to load
"""
super().connect(force_reset=force_reset)
super().update(self._retrieve_manifest())
super().update(self._retrieve_manifest(), clear_first=True)

def store_manifest(self, data: List[Dict]) -> None:
"""Stores the provided data into the index stored in S3.
Expand All @@ -95,8 +399,7 @@ def store_manifest(self, data: List[Dict]) -> None:
Body=orjson.dumps(data, default=json_util.default),
Key=self._get_full_key_path(),
)
super().connect(force_reset=True)
super().update(data)
super().update(data, clear_first=True)

def connect(self, force_reset: bool = False):
"""
Expand Down Expand Up @@ -190,6 +493,7 @@ def __init__(
kwargs["key"] = key
kwargs["searchable_fields"] = searchable_fields
kwargs["unpack_data"] = True
self.kwargs = kwargs
super().__init__(**kwargs)

def _get_full_key_path(self, id: str) -> str:
Expand Down
Loading

0 comments on commit 91132a4

Please sign in to comment.