Skip to content

Commit

Permalink
Add pyiceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 21, 2024
1 parent 9bd00db commit b0c2036
Show file tree
Hide file tree
Showing 13 changed files with 504 additions and 130 deletions.
431 changes: 348 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ common-system-info = {path = "src/common/system-info", default-features = false}
common-tracing = {path = "src/common/tracing", default-features = false}
common-version = {path = "src/common/version", default-features = false}
daft-catalog = {path = "src/daft-catalog", default-features = false}
daft-catalog-aws-glue = {path = "src/daft-catalog/aws-glue"}
daft-catalog-pyiceberg = {path = "src/daft-catalog/pyiceberg", optional = true}
daft-compression = {path = "src/daft-compression", default-features = false}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
Expand Down Expand Up @@ -43,6 +45,8 @@ python = [
"daft-csv/python",
"daft-dsl/python",
"daft-catalog/python",
"daft-catalog-aws-glue/python",
"daft-catalog-pyiceberg",
"daft-local-execution/python",
"daft-io/python",
"daft-image/python",
Expand Down Expand Up @@ -115,6 +119,9 @@ tikv-jemallocator = {version = "0.5.4", features = [
members = [
"src/arrow2",
"src/parquet2",
"src/daft-catalog",
"src/daft-catalog/aws-glue",
"src/daft-catalog/pyiceberg",
"src/common/display",
"src/common/error",
"src/common/io-config",
Expand Down
1 change: 1 addition & 0 deletions src/daft-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
common-error = {path = "../common/error"}
common-io-config = {path = "../common/io-config", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
lazy_static = {workspace = true}
once_cell = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde = {workspace = true}
Expand Down
14 changes: 14 additions & 0 deletions src/daft-catalog/aws-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[dependencies]
aws-config = {version = "0.55.3", features = ["native-tls", "rt-tokio", "client-hyper", "credentials-sso"], default-features = false}
aws-sdk-glue = {version = "1.66.0", features = ["rt-tokio"], default-features = false}
daft-catalog = {path = "..", default-features = false}
daft-catalog-pyiceberg = {path = "../pyiceberg", optional = true}
pyo3 = {workspace = true, optional = true}

[features]
python = ["dep:pyo3", "dep:daft-catalog-pyiceberg"]

[package]
name = "daft-catalog-aws-glue"
edition.workspace = true
version.workspace = true
27 changes: 27 additions & 0 deletions src/daft-catalog/aws-glue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#[cfg(feature = "python")]
pub mod python;

struct AWSGlueCatalog {}

impl daft_catalog::DataCatalog for AWSGlueCatalog {
fn list_tables(&self, _prefix: &str) -> Vec<String> {
todo!();
}

fn get_table(&self, _name: &str) -> Option<Box<dyn daft_catalog::DataCatalogTable>> {
// Make a request to AWS Glue to find the table
// If the table metadata indicates that this is an iceberg table, then delegate to PyIceberg to read
// NOTE: we have to throw an import error if the pyiceberg_catalog isn't instantiated, indicating that PyIceberg
// is not installed.
todo!("Detect from the table metadata that this is an iceberg table, then delegate to pyiceberg");
}
}

impl AWSGlueCatalog {
pub fn new() -> Self {
// Naively instantiate this, which under the hood may naively be a no-op if PyIceberg isn't installed
// let pyiceberg_catalog = PyIcebergCatalog::new_glue(); // TODO

AWSGlueCatalog {}
}
}
20 changes: 20 additions & 0 deletions src/daft-catalog/aws-glue/src/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::sync::Arc;

use pyo3::prelude::*;

use crate::AWSGlueCatalog;

/// Registers an AWS Glue catalog instance with Daft
#[pyfunction]
#[pyo3(name = "register_aws_glue_catalog")]
pub fn register_aws_glue_catalog(name: Option<&str>) -> PyResult<()> {
let catalog = AWSGlueCatalog::new();
daft_catalog::global_catalog::register_catalog(Arc::new(catalog), name);
Ok(())
}

pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
parent.add_wrapped(wrap_pyfunction!(register_aws_glue_catalog))?;

Ok(())
}
10 changes: 10 additions & 0 deletions src/daft-catalog/pyiceberg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[dependencies]
daft-catalog = {path = "..", default-features = false}
daft-plan = {path = "../../daft-plan", default-features = false}
pyo3 = {workspace = true}

[package]
description = "PyIceberg implementations of Daft DataCatalogTable (backed by a PyIceberg table)"
name = "daft-catalog-pyiceberg"
edition.workspace = true
version.workspace = true
24 changes: 24 additions & 0 deletions src/daft-catalog/pyiceberg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use daft_catalog::{errors::Result, DataCatalogTable};
use daft_plan::LogicalPlanBuilder;

/// Wrapper around PyIceberg, or None if PyIceberg is not installed
pub struct PyIcebergTable {} // TODO: Add a PyObject in here

/// Wrapper around PyIceberg, or None if PyIceberg is not installed
pub struct PyIcebergCatalog {} // TODO: Add a PyObject in here

impl DataCatalogTable for PyIcebergTable {
fn to_logical_plan_builder(&self) -> Result<LogicalPlanBuilder> {
todo!();
}
}

impl PyIcebergCatalog {
pub fn new_glue() -> Self {
todo!("import the pyiceberg library and initialize a GlueCatalog");
}

pub fn load_table(&self) -> PyIcebergTable {
todo!("Load a PyIcebergTable from the inner catalog object")
}
}
8 changes: 7 additions & 1 deletion src/daft-catalog/src/data_catalog_table.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use daft_plan::LogicalPlanBuilder;

use crate::errors;

/// A Table in a Data Catalog
///
/// This is a trait because there are many different implementations of this, for example
/// Iceberg, DeltaLake, Hive and more.
pub trait DataCatalogTable {}
pub trait DataCatalogTable {
fn to_logical_plan_builder(&self) -> errors::Result<LogicalPlanBuilder>;
}
6 changes: 6 additions & 0 deletions src/daft-catalog/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ pub enum Error {

#[snafu(display("Catalog not found: {}", name))]
CatalogNotFound { name: String },

#[cfg(feature = "python")]
#[snafu(display("Python error: {}", source))]
PythonError { source: pyo3::PyErr },
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -17,6 +21,8 @@ impl From<Error> for common_error::DaftError {
Error::TableNotFound { .. } | Error::CatalogNotFound { .. } => {
common_error::DaftError::CatalogError(err.to_string())
}
#[cfg(feature = "python")]
Error::PythonError { .. } => common_error::DaftError::CatalogError(err.to_string()),
}
}
}
Expand Down
38 changes: 28 additions & 10 deletions src/daft-catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
mod data_catalog;
mod data_catalog_table;
pub mod errors;

// Export public-facing traits
use std::{collections::HashMap, default, sync::Arc};

use daft_plan::LogicalPlanBuilder;
use data_catalog::DataCatalog;

mod data_catalog;
mod data_catalog_table;
mod errors;
pub use data_catalog::DataCatalog;
pub use data_catalog_table::DataCatalogTable;

#[cfg(feature = "python")]
pub mod python;

use errors::Error;
use once_cell::sync::Lazy;
pub use errors::{Error, Result};

pub mod global_catalog {
use std::sync::{Arc, Mutex};

/// Global singleton DaftMetaCatalog, lazily initialed from the environment
pub(crate) static GLOBAL_DAFT_META_CATALOG: Lazy<DaftMetaCatalog> =
Lazy::new(DaftMetaCatalog::new_from_env);
use lazy_static::lazy_static;

use crate::{DaftMetaCatalog, DataCatalog};

lazy_static! {
pub(crate) static ref GLOBAL_DAFT_META_CATALOG: Mutex<DaftMetaCatalog> =
Mutex::new(DaftMetaCatalog::new_from_env());
}

/// Register a DataCatalog with the global DaftMetaCatalog
pub fn register_catalog(catalog: Arc<dyn DataCatalog>, name: Option<&str>) {
GLOBAL_DAFT_META_CATALOG
.lock()
.unwrap()
.register_catalog(catalog, name);
}
}

/// The [`DaftMetaCatalog`] is a catalog of [`DataCatalog`] implementations
///
Expand Down
46 changes: 10 additions & 36 deletions src/daft-catalog/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,7 @@ use std::sync::Arc;
use daft_plan::PyLogicalPlanBuilder;
use pyo3::prelude::*;

use crate::{data_catalog::DataCatalog, GLOBAL_DAFT_META_CATALOG};

/// Registers an AWS Glue catalog instance with Daft
#[pyfunction]
#[pyo3(name = "register_aws_glue")]
pub fn register_aws_glue() -> PyResult<()> {
todo!("Register an AWS Glue catalog");
}

/// Registers an Iceberg REST service with Daft
#[pyfunction]
#[pyo3(name = "register_iceberg_rest")]
pub fn register_iceberg_rest() -> PyResult<()> {
todo!("Register an Iceberg REST catalog");
}

/// Registers a Hive Metastore (HMS) service with Daft
#[pyfunction]
#[pyo3(name = "register_hive_metastore")]
pub fn register_hive_metastore() -> PyResult<()> {
todo!("Register a Hive Metastore catalog");
}

/// Registers a Unity Catalog instance with Daft
#[pyfunction]
#[pyo3(name = "register_unity_catalog")]
pub fn register_unity_catalog() -> PyResult<()> {
todo!("Register a Unity Catalog");
}
use crate::{data_catalog::DataCatalog, global_catalog};

/// Retrieves a catalog instance by name.
///
Expand Down Expand Up @@ -94,15 +66,17 @@ fn py_read_table(
table_identifier: &str,
catalog_name: Option<&str>,
) -> PyResult<PyLogicalPlanBuilder> {
let logical_plan_builder =
GLOBAL_DAFT_META_CATALOG.read_table(table_identifier, catalog_name)?;
let logical_plan_builder = global_catalog::GLOBAL_DAFT_META_CATALOG
.lock()
.unwrap()
.read_table(table_identifier, catalog_name)?;
Ok(PyLogicalPlanBuilder::new(logical_plan_builder))
}

#[pymodule]
fn daft_catalog(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_class::<PyDataCatalog>()?;
m.add_wrapped(wrap_pyfunction!(py_read_table))?;
m.add_wrapped(wrap_pyfunction!(get_catalog))?;
pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
parent.add_class::<PyDataCatalog>()?;
parent.add_wrapped(wrap_pyfunction!(py_read_table))?;
parent.add_wrapped(wrap_pyfunction!(get_catalog))?;

Ok(())
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub mod pylib {
common_system_info::register_modules(m)?;
common_resource_request::register_modules(m)?;
common_file_formats::python::register_modules(m)?;
daft_catalog::python::register_modules(m)?;
daft_catalog_aws_glue::python::register_modules(m)?;
daft_core::register_modules(m)?;
daft_core::python::register_modules(m)?;
daft_local_execution::register_modules(m)?;
Expand Down

0 comments on commit b0c2036

Please sign in to comment.