Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Daft Catalog API #3036

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ common-scan-info = {path = "src/common/scan-info", default-features = false}
common-system-info = {path = "src/common/system-info", default-features = false}
common-tracing = {path = "src/common/tracing", default-features = false}
common-version = {path = "src/common/version", default-features = false}
daft-catalog = {path = "src/daft-catalog", default-features = false}
daft-catalog-python-catalog = {path = "src/daft-catalog/python-catalog", optional = true}
daft-compression = {path = "src/daft-compression", default-features = false}
daft-connect = {path = "src/daft-connect", optional = true}
daft-core = {path = "src/daft-core", default-features = false}
Expand Down Expand Up @@ -52,6 +54,8 @@ python = [
"common-display/python",
"common-resource-request/python",
"common-system-info/python",
"daft-catalog/python",
"daft-catalog-python-catalog/python",
"daft-connect/python",
"daft-core/python",
"daft-csv/python",
Expand All @@ -75,6 +79,11 @@ python = [
"daft-writers/python",
"daft-table/python",
"dep:daft-connect",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python",
"dep:daft-catalog-python-catalog",
"dep:pyo3",
"dep:pyo3-log"
]
Expand Down Expand Up @@ -140,6 +149,7 @@ members = [
"src/common/scan-info",
"src/common/system-info",
"src/common/treenode",
"src/daft-catalog",
"src/daft-core",
"src/daft-csv",
"src/daft-dsl",
Expand Down
3 changes: 3 additions & 0 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
# Daft top-level imports
###

from daft.catalog import read_table, register_table

Check warning on line 61 in daft/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/__init__.py#L61

Added line #L61 was not covered by tests
from daft.context import set_execution_config, set_planning_config, execution_config_ctx, planning_config_ctx
from daft.convert import (
from_arrow,
Expand Down Expand Up @@ -129,6 +130,8 @@
"set_execution_config",
"planning_config_ctx",
"execution_config_ctx",
"read_table",
"register_table",
"sql",
"sql_expr",
"to_struct",
Expand Down
151 changes: 151 additions & 0 deletions daft/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""The `daft.catalog` module contains functionality for Data Catalogs.

A Data Catalog can be understood as a system/service for users to discover, access and query their data.
Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog
also expose other types of data including files, ML models, registered functions and more.

Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.

Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This
is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment.

**Data Catalog**

Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided.

```python
# This will hit the default catalog
daft.read_table("my_db.my_namespace.my_table")
```

**Named Tables**

Daft allows also the registration of named tables, which have no catalog associated with them.

Note that named tables take precedence over the default catalog's table names when resolving names.

```python
df = daft.from_pydict({"foo": [1, 2, 3]})

daft.catalog.register_named_table(
"my_table",
df,
)

# Your table is now accessible from Daft-SQL, or Daft's `read_table`
df1 = daft.read_table("my_table")
df2 = daft.sql("SELECT * FROM my_table")
```
"""

from __future__ import annotations

from daft.daft import catalog as native_catalog
from daft.logical.builder import LogicalPlanBuilder

from daft.dataframe import DataFrame

_PYICEBERG_AVAILABLE = False
try:
from pyiceberg.catalog import Catalog as PyIcebergCatalog

_PYICEBERG_AVAILABLE = True
except ImportError:
pass

Check warning on line 54 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L53-L54

Added lines #L53 - L54 were not covered by tests

_UNITY_AVAILABLE = False
try:
from daft.unity_catalog import UnityCatalog

_UNITY_AVAILABLE = True

Check warning on line 60 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L60

Added line #L60 was not covered by tests
except ImportError:
pass

__all__ = [
"read_table",
"register_python_catalog",
"unregister_catalog",
"register_table",
]

# Forward imports from the native catalog which don't require Python wrappers
unregister_catalog = native_catalog.unregister_catalog


def read_table(name: str) -> DataFrame:
"""Finds a table with the specified name and reads it as a DataFrame

The provided name can be any of the following, and Daft will return them with the following order of priority:

1. Name of a registered dataframe/SQL view (manually registered using `daft.register_table`): `"my_registered_table"`
2. Name of a table within the default catalog (without inputting the catalog name) for example: `"my.table.name"`
3. Name of a fully-qualified table path with the catalog name for example: `"my_catalog.my.table.name"`

Args:
name: The identifier for the table to read

Returns:
A DataFrame containing the data from the specified table.
"""
native_logical_plan_builder = native_catalog.read_table(name)
return DataFrame(LogicalPlanBuilder(native_logical_plan_builder))

Check warning on line 91 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L90-L91

Added lines #L90 - L91 were not covered by tests


def register_table(name: str, dataframe: DataFrame) -> str:
"""Register a DataFrame as a named table.

This function registers a DataFrame as a named table, making it accessible
via Daft-SQL or Daft's `read_table` function.

Args:
name (str): The name to register the table under.
dataframe (daft.DataFrame): The DataFrame to register as a table.

Returns:
str: The name of the registered table.

Example:
>>> df = daft.from_pydict({"foo": [1, 2, 3]})
>>> daft.catalog.register_table("my_table", df)
>>> daft.read_table("my_table")
"""
return native_catalog.register_table(name, dataframe._builder._builder)

Check warning on line 112 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L112

Added line #L112 was not covered by tests


def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str:
"""Registers a Python catalog with Daft

Currently supports:

* [PyIceberg Catalogs](https://py.iceberg.apache.org/api/)
* [Unity Catalog](https://www.getdaft.io/projects/docs/en/latest/user_guide/integrations/unity-catalog.html)

Args:
catalog (PyIcebergCatalog | UnityCatalog): The Python catalog to register.
name (str | None, optional): The name to register the catalog under. If None, this catalog is registered as the default catalog.

Returns:
str: The name of the registered catalog.

Raises:
ValueError: If an unsupported catalog type is provided.

Example:
>>> from pyiceberg.catalog import load_catalog
>>> catalog = load_catalog("my_catalog")
>>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog")

"""
python_catalog: PyIcebergCatalog
if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog):
from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor

Check warning on line 141 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L140-L141

Added lines #L140 - L141 were not covered by tests

python_catalog = PyIcebergCatalogAdaptor(catalog)
elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog):
from daft.catalog.unity import UnityCatalogAdaptor

Check warning on line 145 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L143-L145

Added lines #L143 - L145 were not covered by tests

python_catalog = UnityCatalogAdaptor(catalog)

Check warning on line 147 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L147

Added line #L147 was not covered by tests
else:
raise ValueError(f"Unsupported catalog type: {type(catalog)}")

Check warning on line 149 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L149

Added line #L149 was not covered by tests

return native_catalog.register_python_catalog(python_catalog, name)

Check warning on line 151 in daft/catalog/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/__init__.py#L151

Added line #L151 was not covered by tests
32 changes: 32 additions & 0 deletions daft/catalog/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

Check warning on line 1 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L1

Added line #L1 was not covered by tests

from typing import TYPE_CHECKING

Check warning on line 3 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L3

Added line #L3 was not covered by tests

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
from pyiceberg.table import Table as PyIcebergTable

Check warning on line 7 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L5-L7

Added lines #L5 - L7 were not covered by tests

from daft.dataframe import DataFrame

Check warning on line 9 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L9

Added line #L9 was not covered by tests

from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable

Check warning on line 11 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L11

Added line #L11 was not covered by tests


class PyIcebergCatalogAdaptor(PythonCatalog):
def __init__(self, pyiceberg_catalog: PyIcebergCatalog):
self._catalog = pyiceberg_catalog

Check warning on line 16 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L14-L16

Added lines #L14 - L16 were not covered by tests

def list_tables(self, prefix: str) -> list[str]:
return [".".join(tup) for tup in self._catalog.list_tables(prefix)]

Check warning on line 19 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L18-L19

Added lines #L18 - L19 were not covered by tests

def load_table(self, name: str) -> PyIcebergTableAdaptor:
return PyIcebergTableAdaptor(self._catalog.load_table(name))

Check warning on line 22 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L21-L22

Added lines #L21 - L22 were not covered by tests


class PyIcebergTableAdaptor(PythonCatalogTable):
def __init__(self, pyiceberg_table: PyIcebergTable):
self._table = pyiceberg_table

Check warning on line 27 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L25-L27

Added lines #L25 - L27 were not covered by tests

def to_dataframe(self) -> DataFrame:
import daft

Check warning on line 30 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L29-L30

Added lines #L29 - L30 were not covered by tests

return daft.read_iceberg(self._table)

Check warning on line 32 in daft/catalog/pyiceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/pyiceberg.py#L32

Added line #L32 was not covered by tests
24 changes: 24 additions & 0 deletions daft/catalog/python_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

Check warning on line 1 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L1

Added line #L1 was not covered by tests

from abc import abstractmethod
from typing import TYPE_CHECKING

Check warning on line 4 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L3-L4

Added lines #L3 - L4 were not covered by tests

if TYPE_CHECKING:
from daft.dataframe import DataFrame

Check warning on line 7 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L6-L7

Added lines #L6 - L7 were not covered by tests


class PythonCatalog:

Check warning on line 10 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L10

Added line #L10 was not covered by tests
"""Wrapper class for various Python implementations of Data Catalogs"""

@abstractmethod
def list_tables(self, prefix: str) -> list[str]: ...

Check warning on line 14 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L13-L14

Added lines #L13 - L14 were not covered by tests

@abstractmethod
def load_table(self, name: str) -> PythonCatalogTable: ...

Check warning on line 17 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L16-L17

Added lines #L16 - L17 were not covered by tests


class PythonCatalogTable:

Check warning on line 20 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L20

Added line #L20 was not covered by tests
"""Wrapper class for various Python implementations of Data Catalog Tables"""

@abstractmethod
def to_dataframe(self) -> DataFrame: ...

Check warning on line 24 in daft/catalog/python_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/python_catalog.py#L23-L24

Added lines #L23 - L24 were not covered by tests
49 changes: 49 additions & 0 deletions daft/catalog/unity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

Check warning on line 1 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L1

Added line #L1 was not covered by tests

from typing import TYPE_CHECKING

Check warning on line 3 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L3

Added line #L3 was not covered by tests

if TYPE_CHECKING:
from daft.dataframe import DataFrame
from daft.unity_catalog import UnityCatalog, UnityCatalogTable

Check warning on line 7 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L5-L7

Added lines #L5 - L7 were not covered by tests

from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable

Check warning on line 9 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L9

Added line #L9 was not covered by tests


class UnityCatalogAdaptor(PythonCatalog):
def __init__(self, unity_catalog: UnityCatalog):
self._catalog = unity_catalog

Check warning on line 14 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L12-L14

Added lines #L12 - L14 were not covered by tests

def list_tables(self, prefix: str) -> list[str]:
num_namespaces = prefix.count(".")
if prefix == "":
return [

Check warning on line 19 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L16-L19

Added lines #L16 - L19 were not covered by tests
tbl
for cat in self._catalog.list_catalogs()
for schema in self._catalog.list_schemas(cat)
for tbl in self._catalog.list_tables(schema)
]
elif num_namespaces == 0:
catalog_name = prefix
return [

Check warning on line 27 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L25-L27

Added lines #L25 - L27 were not covered by tests
tbl for schema in self._catalog.list_schemas(catalog_name) for tbl in self._catalog.list_tables(schema)
]
elif num_namespaces == 1:
schema_name = prefix
return [tbl for tbl in self._catalog.list_tables(schema_name)]

Check warning on line 32 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L30-L32

Added lines #L30 - L32 were not covered by tests
else:
raise ValueError(

Check warning on line 34 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L34

Added line #L34 was not covered by tests
f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {prefix}"
)

def load_table(self, name: str) -> UnityTableAdaptor:
return UnityTableAdaptor(self._catalog.load_table(name))

Check warning on line 39 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L38-L39

Added lines #L38 - L39 were not covered by tests


class UnityTableAdaptor(PythonCatalogTable):
def __init__(self, unity_table: UnityCatalogTable):
self._table = unity_table

Check warning on line 44 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L42-L44

Added lines #L42 - L44 were not covered by tests

def to_dataframe(self) -> DataFrame:
import daft

Check warning on line 47 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L46-L47

Added lines #L46 - L47 were not covered by tests

return daft.read_deltalake(self._table)

Check warning on line 49 in daft/catalog/unity.py

View check run for this annotation

Codecov / codecov/patch

daft/catalog/unity.py#L49

Added line #L49 was not covered by tests
11 changes: 11 additions & 0 deletions daft/daft/catalog.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import TYPE_CHECKING

from daft.daft import LogicalPlanBuilder as PyLogicalPlanBuilder

if TYPE_CHECKING:
from daft.catalog.python_catalog import PythonCatalog

def read_table(name: str) -> PyLogicalPlanBuilder: ...
def register_table(name: str, plan_builder: PyLogicalPlanBuilder) -> str: ...
def register_python_catalog(catalog: PythonCatalog, catalog_name: str | None) -> str: ...
def unregister_catalog(catalog_name: str | None) -> bool: ...
9 changes: 9 additions & 0 deletions daft/io/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ class DataCatalogTable:
table_name: str
catalog_id: Optional[str] = None

def __post_init__(self):
import warnings

warnings.warn(
"This API will soon be deprecated. Users should use the new functionality in daft.catalog.",
DeprecationWarning,
stacklevel=2,
)

def table_uri(self, io_config: IOConfig) -> str:
"""
Get the URI of the table in the data catalog.
Expand Down
2 changes: 2 additions & 0 deletions src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum DaftError {
FromUtf8Error(#[from] std::string::FromUtf8Error),
#[error("Not Yet Implemented: {0}")]
NotImplemented(String),
#[error("DaftError::CatalogError {0}")]
CatalogError(String),
}

impl DaftError {
Expand Down
Loading
Loading