From 32b54fea62a00752eafe54cfab078b5df75e03fd Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:33:13 +0100 Subject: [PATCH] feat: add load table Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 50 +++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 15a48ac6a..78b27d700 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,7 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; -use iceberg::spec::TableMetadataBuilder; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, @@ -41,6 +41,7 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace"; static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location"; static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type"; +static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE"; static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties"; static NAMESPACE_FIELD_NAME: &str = "namespace"; @@ -569,8 +570,51 @@ impl Catalog for SqlCatalog { todo!() } - async fn load_table(&self, _identifier: &TableIdent) -> Result { - todo!() + async fn load_table(&self, identifier: &TableIdent) -> Result
{ + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + let rows = self + .fetch_rows( + &format!( + "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(&self.name), + Some(identifier.name()), + Some(&identifier.namespace().join(".")), + ], + ) + .await?; + + if rows.is_empty() { + return no_such_table_err(identifier); + } + + let row = &rows[0]; + let tbl_metadata_location = row + .try_get::(CATALOG_FIELD_METADATA_LOCATION_PROP) + .map_err(from_sqlx_error)?; + + let file = self.fileio.new_input(&tbl_metadata_location)?; + let metadata_content = file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier.clone()) + .metadata_location(tbl_metadata_location) + .metadata(metadata) + .build()?) } async fn create_table(