Skip to content

Commit

Permalink
Merge pull request #1089 from lsst/tickets/DM-46575
Browse files Browse the repository at this point in the history
DM-46575: Update parquet formatter to use fsspec.
  • Loading branch information
erykoff authored Oct 2, 2024
2 parents 907638f + 618d7ba commit e90bb0f
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
run: |
pip install uv
uv pip install -r requirements.txt
uv pip install fsspec s3fs
# We have two cores so we can speed up the testing with xdist
- name: Install pytest packages
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-46575.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update parquet formatter to use fsspec, which allows direct access to columns in S3, WebDAV, etc.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ ignore_missing_imports = True
[mypy-botocore.*]
ignore_missing_imports = True

[mypy-fsspec.*]
ignore_missing_imports = True

[mypy-urllib3.*]
ignore_missing_imports = True

Expand Down
36 changes: 35 additions & 1 deletion python/lsst/daf/butler/formatters/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import collections.abc
import itertools
import json
import logging
import re
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any, cast
Expand All @@ -62,11 +63,20 @@
from lsst.utils.introspection import get_full_type_name
from lsst.utils.iteration import ensure_iterable

log = logging.getLogger(__name__)

if TYPE_CHECKING:
import astropy.table as atable
import numpy as np
import pandas as pd

try:
import fsspec
from fsspec.spec import AbstractFileSystem
except ImportError:
fsspec = None
AbstractFileSystem = type

TARGET_ROW_GROUP_BYTES = 1_000_000_000


Expand All @@ -76,15 +86,37 @@ class ParquetFormatter(FormatterV2):
"""

default_extension = ".parq"
can_read_from_uri = True
can_read_from_local_file = True

def can_accept(self, in_memory_dataset: Any) -> bool:
# Docstring inherited.
return _checkArrowCompatibleType(in_memory_dataset) is not None

def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any:
# Docstring inherited from Formatter.read.
try:
fs, path = uri.to_fsspec()
except ImportError:
log.debug("fsspec not available; falling back to local file access.")
# This signals to the formatter to use the read_from_local_file
# code path.
return NotImplemented

return self._read_parquet(path=path, fs=fs, component=component, expected_size=expected_size)

def read_from_local_file(self, path: str, component: str | None = None, expected_size: int = -1) -> Any:
# Docstring inherited from Formatter.read.
schema = pq.read_schema(path)
return self._read_parquet(path=path, component=component, expected_size=expected_size)

def _read_parquet(
self,
path: str,
fs: AbstractFileSystem | None = None,
component: str | None = None,
expected_size: int = -1,
) -> Any:
schema = pq.read_schema(path, filesystem=fs)

schema_names = ["ArrowSchema", "DataFrameSchema", "ArrowAstropySchema", "ArrowNumpySchema"]

Expand All @@ -99,6 +131,7 @@ def read_from_local_file(self, path: str, component: str | None = None, expected

temp_table = pq.read_table(
path,
filesystem=fs,
columns=[schema.names[0]],
use_threads=False,
use_pandas_metadata=False,
Expand Down Expand Up @@ -140,6 +173,7 @@ def read_from_local_file(self, path: str, component: str | None = None, expected
metadata = schema.metadata if schema.metadata is not None else {}
arrow_table = pq.read_table(
path,
filesystem=fs,
columns=par_columns,
use_threads=False,
use_pandas_metadata=(b"pandas" in metadata),
Expand Down
152 changes: 152 additions & 0 deletions tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import datetime
import os
import posixpath
import shutil
import unittest

try:
Expand All @@ -52,6 +54,29 @@
except ImportError:
pd = None

try:
import boto3
import botocore
from lsst.resources.s3utils import clean_test_environment_for_s3

try:
from moto import mock_aws # v5
except ImportError:
from moto import mock_s3 as mock_aws
except ImportError:
boto3 = None

try:
import fsspec
except ImportError:
fsspec = None

try:
import s3fs
except ImportError:
s3fs = None


from lsst.daf.butler import (
Butler,
Config,
Expand All @@ -61,6 +86,7 @@
StorageClassConfig,
StorageClassFactory,
)
from lsst.resources import ResourcePath

try:
from lsst.daf.butler.delegates.arrowtable import ArrowTableDelegate
Expand Down Expand Up @@ -2123,6 +2149,132 @@ class InMemoryArrowSchemaDelegateTestCase(ParquetFormatterArrowSchemaTestCase):
configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")


@unittest.skipUnless(pa is not None, "Cannot test S3 without pyarrow.")
@unittest.skipUnless(boto3 is not None, "Cannot test S3 without boto3.")
@unittest.skipUnless(fsspec is not None, "Cannot test S3 without fsspec.")
@unittest.skipUnless(s3fs is not None, "Cannot test S3 without s3fs.")
class ParquetFormatterArrowTableS3TestCase(unittest.TestCase):
"""Tests for arrow table/parquet with S3."""

# Code is adapted from test_butler.py
configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml")
fullConfigKey = None
validationCanFail = True

bucketName = "anybucketname"

root = "butlerRoot/"

datastoreStr = [f"datastore={root}"]

datastoreName = ["FileDatastore@s3://{bucketName}/{root}"]

registryStr = "/gen3.sqlite3"

mock_aws = mock_aws()

def setUp(self):
self.root = makeTestTempDir(TESTDIR)

config = Config(self.configFile)
uri = ResourcePath(config[".datastore.datastore.root"])
self.bucketName = uri.netloc

# Enable S3 mocking of tests.
self.enterContext(clean_test_environment_for_s3())
self.mock_aws.start()

rooturi = f"s3://{self.bucketName}/{self.root}"
config.update({"datastore": {"datastore": {"root": rooturi}}})

# need local folder to store registry database
self.reg_dir = makeTestTempDir(TESTDIR)
config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3"

# MOTO needs to know that we expect Bucket bucketname to exist
# (this used to be the class attribute bucketName)
s3 = boto3.resource("s3")
s3.create_bucket(Bucket=self.bucketName)

self.datastoreStr = [f"datastore='{rooturi}'"]
self.datastoreName = [f"FileDatastore@{rooturi}"]
Butler.makeRepo(rooturi, config=config, forceConfigRoot=False)
self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml")

self.butler = Butler(self.tmpConfigFile, writeable=True, run="test_run")

# No dimensions in dataset type so we don't have to worry about
# inserting dimension data or defining data IDs.
self.datasetType = DatasetType(
"data", dimensions=(), storageClass="ArrowTable", universe=self.butler.dimensions
)
self.butler.registry.registerDatasetType(self.datasetType)

def tearDown(self):
s3 = boto3.resource("s3")
bucket = s3.Bucket(self.bucketName)
try:
bucket.objects.all().delete()
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
# the key was not reachable - pass
pass
else:
raise

bucket = s3.Bucket(self.bucketName)
bucket.delete()

# Stop the S3 mock.
self.mock_aws.stop()

if self.reg_dir is not None and os.path.exists(self.reg_dir):
shutil.rmtree(self.reg_dir, ignore_errors=True)

if os.path.exists(self.root):
shutil.rmtree(self.root, ignore_errors=True)

def testArrowTableS3(self):
tab1 = _makeSimpleArrowTable(include_multidim=True, include_masked=True)

self.butler.put(tab1, self.datasetType, dataId={})

# Read the whole Table.
tab2 = self.butler.get(self.datasetType, dataId={})
# We convert to use the numpy testing framework to handle nan
# comparisons.
self.assertEqual(tab1.schema, tab2.schema)
tab1_np = arrow_to_numpy(tab1)
tab2_np = arrow_to_numpy(tab2)
for col in tab1.column_names:
np.testing.assert_array_equal(tab2_np[col], tab1_np[col])
# Read the columns.
columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
self.assertEqual(len(columns2), len(tab1.schema.names))
for i, name in enumerate(tab1.schema.names):
self.assertEqual(columns2[i], name)
# Read the rowcount.
rowcount = self.butler.get(self.datasetType.componentTypeName("rowcount"), dataId={})
self.assertEqual(rowcount, len(tab1))
# Read the schema.
schema = self.butler.get(self.datasetType.componentTypeName("schema"), dataId={})
self.assertEqual(schema, tab1.schema)
# Read just some columns a few different ways.
tab3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
self.assertEqual(tab3, tab1.select(("a", "c")))
tab4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
self.assertEqual(tab4, tab1.select(("a",)))
tab5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
self.assertEqual(tab5, tab1.select(("index", "a")))
tab6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
self.assertEqual(tab6, tab1.select(("ddd",)))
tab7 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "a"]})
self.assertEqual(tab7, tab1.select(("a",)))
# Passing an unrecognized column should be a ValueError.
with self.assertRaises(ValueError):
self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})


@unittest.skipUnless(np is not None, "Cannot test compute_row_group_size without numpy.")
@unittest.skipUnless(pa is not None, "Cannot test compute_row_group_size without pyarrow.")
class ComputeRowGroupSizeTestCase(unittest.TestCase):
Expand Down

0 comments on commit e90bb0f

Please sign in to comment.