Skip to content

Commit

Permalink
Implement python catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Nov 4, 2024
1 parent 8611aa7 commit e6d0350
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 71 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ common-system-info = {path = "src/common/system-info", default-features = false}
common-tracing = {path = "src/common/tracing", default-features = false}
common-version = {path = "src/common/version", default-features = false}
daft-catalog = {path = "src/daft-catalog", default-features = false}
daft-catalog-pyiceberg = {path = "src/daft-catalog/pyiceberg", optional = true}
daft-catalog-python-catalog = {path = "src/daft-catalog/python-catalog", optional = true}
daft-compression = {path = "src/daft-compression", default-features = false}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
Expand Down Expand Up @@ -47,7 +47,7 @@ python = [
"common-resource-request/python",
"common-system-info/python",
"daft-catalog/python",
"daft-catalog-pyiceberg/python",
"daft-catalog-python-catalog/python",
"daft-core/python",
"daft-csv/python",
"daft-dsl/python",
Expand All @@ -73,7 +73,7 @@ python = [
"common-system-info/python",
"common-display/python",
"common-resource-request/python",
"dep:daft-catalog-pyiceberg",
"dep:daft-catalog-python-catalog",
"dep:pyo3",
"dep:pyo3-log"
]
Expand Down
43 changes: 42 additions & 1 deletion daft/catalog.py → daft/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,26 @@
from __future__ import annotations

from daft.daft import catalog as native_catalog
from daft.dataframe import DataFrame
from daft.logical.builder import LogicalPlanBuilder
from daft.catalog.python_catalog import PythonCatalog

from daft.dataframe import DataFrame

_PYICEBERG_AVAILABLE = False
try:
from pyiceberg.catalog import Catalog as PyIcebergCatalog

_PYICEBERG_AVAILABLE = True
except ImportError:
pass

_UNITY_AVAILABLE = False
try:
from daft.unity_catalog import UnityCatalog

_UNITY_AVAILABLE = True
except ImportError:
pass


def read_table(name: str) -> DataFrame:
Expand Down Expand Up @@ -82,3 +100,26 @@ def register_table(name: str, dataframe: DataFrame) -> str:
>>> daft.read_table("my_table")
"""
return native_catalog.register_table(name, dataframe._builder._builder)


def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog) -> str:
"""Registers a Python catalog with Daft
Currently supports:
* [PyIceberg Catalogs](https://py.iceberg.apache.org/api/)
* [Unity Catalog](https://www.getdaft.io/projects/docs/en/latest/user_guide/integrations/unity-catalog.html)
"""
python_catalog: PyIcebergCatalog
if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog):
from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor

python_catalog = PyIcebergCatalogAdaptor(catalog)
elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog):
from daft.catalog.unity import UnityCatalogAdaptor

python_catalog = UnityCatalogAdaptor(catalog)
else:
raise ValueError(f"Unsupported catalog type: {type(catalog)}")

return native_catalog.register_python_catalog(python_catalog)
32 changes: 32 additions & 0 deletions daft/catalog/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
from pyiceberg.table import Table as PyIcebergTable

from daft.dataframe import DataFrame

from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable


class PyIcebergCatalogAdaptor(PythonCatalog):
def __init__(self, pyiceberg_catalog: PyIcebergCatalog):
self._catalog = pyiceberg_catalog

def list_tables(self, prefix: str) -> list[str]:
return [".".join(tup) for tup in self._catalog.list_tables(prefix)]

def get_table(self, name: str) -> PyIcebergTableAdaptor:
return PyIcebergTableAdaptor(self._catalog.load_table(name))


class PyIcebergTableAdaptor(PythonCatalogTable):
def __init__(self, pyiceberg_table: PyIcebergTable):
self._table = pyiceberg_table

def to_dataframe(self) -> DataFrame:
import daft

return daft.read_iceberg(self._table)
24 changes: 24 additions & 0 deletions daft/catalog/python_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from daft.dataframe import DataFrame


class PythonCatalog:
"""Wrapper class for various Python implementations of Data Catalogs"""

@abstractmethod
def list_tables(self, prefix: str) -> list[str]: ...

@abstractmethod
def get_table(self, name: str) -> PythonCatalogTable: ...


class PythonCatalogTable:
"""Wrapper class for various Python implementations of Data Catalog Tables"""

@abstractmethod
def to_dataframe(self) -> DataFrame: ...
49 changes: 49 additions & 0 deletions daft/catalog/unity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from daft.dataframe import DataFrame
from daft.unity_catalog import UnityCatalog, UnityCatalogTable

from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable


class UnityCatalogAdaptor(PythonCatalog):
def __init__(self, unity_catalog: UnityCatalog):
self._catalog = unity_catalog

def list_tables(self, prefix: str) -> list[str]:
num_namespaces = prefix.count(".")
if prefix == "":
return [
tbl
for cat in self._catalog.list_catalogs()
for schema in self._catalog.list_schemas(cat)
for tbl in self._catalog.list_tables(schema)
]
elif num_namespaces == 0:
catalog_name = prefix
return [
tbl for schema in self._catalog.list_schemas(catalog_name) for tbl in self._catalog.list_tables(schema)
]
elif num_namespaces == 1:
schema_name = prefix
return [tbl for tbl in self._catalog.list_tables(schema_name)]
else:
raise ValueError(
f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {prefix}"
)

def get_table(self, name: str) -> UnityTableAdaptor:
return UnityTableAdaptor(self._catalog.load_table(name))


class UnityTableAdaptor(PythonCatalogTable):
def __init__(self, unity_table: UnityCatalogTable):
self._table = unity_table

def to_dataframe(self) -> DataFrame:
import daft

return daft.read_deltalake(self._table)
7 changes: 6 additions & 1 deletion daft/daft/catalog.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from typing import TYPE_CHECKING

from daft.daft import LogicalPlanBuilder as PyLogicalPlanBuilder

if TYPE_CHECKING:
from daft.catalog.python_catalog import PythonCatalog

def read_table(name: str) -> PyLogicalPlanBuilder: ...
def register_table(name: str, plan_builder: PyLogicalPlanBuilder) -> str: ...
def register_aws_glue() -> str: ...
def register_python_catalog(catalog: PythonCatalog) -> str: ...
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ snafu = {workspace = true}
python = ["daft-catalog/python", "daft-plan/python"]

[package]
description = "PyIceberg implementations of Daft DataCatalogTable (backed by a PyIceberg table)"
name = "daft-catalog-pyiceberg"
description = "Python implementations of Daft DataCatalogTable (backed by a PythonCatalog abstract class)"
name = "daft-catalog-python-catalog"
edition.workspace = true
version.workspace = true
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,43 @@ impl From<Error> for DaftCatalogError {
match value {
Error::ListTables { source } => DaftCatalogError::PythonError {
source,
context: "listing tables with PyIceberg library".to_string(),
context: "listing tables".to_string(),
},
Error::GetTable { source, table_name } => DaftCatalogError::PythonError {
source,
context: format!("getting PyIceberg table `{}`", table_name),
context: format!("getting table `{}`", table_name),
},
Error::ReadTable { source, table_name } => DaftCatalogError::PythonError {
source,
context: format!("reading PyIceberg table `{}`", table_name),
context: format!("reading table `{}`", table_name),
},
}
}
}

/// Wrapper around PyIceberg, or None if PyIceberg is not installed
pub struct PyIcebergTable {
pub struct PythonTable {
table_name: String,
pyiceberg_table_pyobj: PyObject,
table_pyobj: PyObject,
}

impl PyIcebergTable {
pub fn new(table_name: String, pyiceberg_table_pyobj: PyObject) -> Self {
impl PythonTable {
pub fn new(table_name: String, table_pyobj: PyObject) -> Self {
Self {
table_name,
pyiceberg_table_pyobj,
table_pyobj,
}
}
}

impl DataCatalogTable for PyIcebergTable {
impl DataCatalogTable for PythonTable {
fn to_logical_plan_builder(&self) -> daft_catalog::errors::Result<LogicalPlanBuilder> {
Python::with_gil(|py| {
let dataframe = py
.import_bound("daft")
.and_then(|daft_module| daft_module.getattr("read_iceberg"))
.and_then(|read_iceberg_fn| {
read_iceberg_fn.call1((self.pyiceberg_table_pyobj.clone(),))
})
let dataframe = self
.table_pyobj
.bind(py)
.getattr("to_dataframe")
.and_then(|to_dataframe_method| to_dataframe_method.call0())
.with_context(|_| ReadTableSnafu {
table_name: self.table_name.clone(),
})?;
Expand All @@ -87,24 +86,24 @@ impl DataCatalogTable for PyIcebergTable {
}

/// Wrapper around PyIceberg, or None if PyIceberg is not installed
pub struct PyIcebergCatalog {
pyiceberg_catalog_pyobj: PyObject,
pub struct PythonCatalog {
python_catalog_pyobj: PyObject,
}

impl PyIcebergCatalog {
pub fn new(pyiceberg_catalog_pyobj: PyObject) -> Self {
impl PythonCatalog {
pub fn new(python_catalog_pyobj: PyObject) -> Self {
Self {
pyiceberg_catalog_pyobj,
python_catalog_pyobj,
}
}
}

impl DataCatalog for PyIcebergCatalog {
impl DataCatalog for PythonCatalog {
fn list_tables(&self, prefix: &str) -> Result<Vec<String>> {
Python::with_gil(|py| {
let pyiceberg_catalog = self.pyiceberg_catalog_pyobj.bind(py);
let python_catalog = self.python_catalog_pyobj.bind(py);

Ok(pyiceberg_catalog
Ok(python_catalog
.getattr("list_tables")
.and_then(|list_tables_method| list_tables_method.call1((prefix,)))
.and_then(|tables| tables.extract::<Vec<String>>())
Expand All @@ -114,9 +113,9 @@ impl DataCatalog for PyIcebergCatalog {

fn get_table(&self, name: &str) -> Result<Option<Box<dyn DataCatalogTable>>> {
Python::with_gil(|py| {
let pyiceberg_catalog = self.pyiceberg_catalog_pyobj.bind(py);
let python_catalog = self.python_catalog_pyobj.bind(py);
let list_tables_method =
pyiceberg_catalog
python_catalog
.getattr("load_table")
.with_context(|_| GetTableSnafu {
table_name: name.to_string(),
Expand All @@ -126,50 +125,22 @@ impl DataCatalog for PyIcebergCatalog {
.with_context(|_| GetTableSnafu {
table_name: name.to_string(),
})?;
let pyiceberg_table = PyIcebergTable::new(name.to_string(), table.unbind());
Ok(Some(Box::new(pyiceberg_table) as Box<dyn DataCatalogTable>))
let python_table = PythonTable::new(name.to_string(), table.unbind());
Ok(Some(Box::new(python_table) as Box<dyn DataCatalogTable>))
})
}
}

impl PyIcebergCatalog {
pub fn new_glue() -> Self {
todo!("import the pyiceberg library and initialize a GlueCatalog");
}

pub fn new_rest() -> Self {
todo!("import the pyiceberg library and initialize a RESTCatalog");
}

pub fn new_sqlite() -> Self {
todo!("import the pyiceberg library and initialize a SQLite Catalog");
}

pub fn load_table(&self) -> PyIcebergTable {
todo!("Load a PyIcebergTable from the inner catalog object")
}
}

/// Registers an PyIceberg catalog instance
/// Registers an PythonCatalog instance
#[pyfunction]
#[pyo3(name = "register_pyiceberg_catalog")]
pub fn register_pyiceberg(py: Python, pyiceberg_catalog_obj: PyObject) -> PyResult<()> {
let pyiceberg_catalog_module = py.import_bound("pyiceberg.catalog")?;
let catalog_class = pyiceberg_catalog_module.getattr("Catalog")?;
let pyiceberg_catalog = pyiceberg_catalog_obj.bind(py);
if !pyiceberg_catalog.is_instance(&catalog_class)? {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(format!(
"Expected object to be a PyIceberg Catalog, but received: {}",
pyiceberg_catalog.get_type()
)));
}

let catalog = PyIcebergCatalog::new(pyiceberg_catalog_obj);
#[pyo3(name = "register_python_catalog")]
pub fn register_python_catalog(python_catalog_obj: PyObject) -> PyResult<String> {
let catalog = PythonCatalog::new(python_catalog_obj);
daft_catalog::global_catalog::register_catalog(Arc::new(catalog));
Ok(())
Ok("default".to_string())
}

pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
parent.add_wrapped(wrap_pyfunction!(register_pyiceberg))?;
parent.add_wrapped(wrap_pyfunction!(register_python_catalog))?;
Ok(())
}
Loading

0 comments on commit e6d0350

Please sign in to comment.