Skip to content

Commit

Permalink
feat: add load table
Browse files Browse the repository at this point in the history
Signed-off-by: callum-ryan <[email protected]>
  • Loading branch information
callum-ryan committed Sep 10, 2024
1 parent 0e2bf7b commit 32b54fe
Showing 1 changed file with 47 additions and 3 deletions.
50 changes: 47 additions & 3 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;

use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::TableMetadataBuilder;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
Expand All @@ -41,6 +41,7 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";

static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
static NAMESPACE_FIELD_NAME: &str = "namespace";
Expand Down Expand Up @@ -569,8 +570,51 @@ impl Catalog for SqlCatalog {
todo!()
}

async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
todo!()
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
if !self.table_exists(identifier).await? {
return no_such_table_err(identifier);
}

let rows = self
.fetch_rows(
&format!(
"SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
),
vec![
Some(&self.name),
Some(identifier.name()),
Some(&identifier.namespace().join(".")),
],
)
.await?;

if rows.is_empty() {
return no_such_table_err(identifier);
}

let row = &rows[0];
let tbl_metadata_location = row
.try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
.map_err(from_sqlx_error)?;

let file = self.fileio.new_input(&tbl_metadata_location)?;
let metadata_content = file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

Ok(Table::builder()
.file_io(self.fileio.clone())
.identifier(identifier.clone())
.metadata_location(tbl_metadata_location)
.metadata(metadata)
.build()?)
}

async fn create_table(
Expand Down

0 comments on commit 32b54fe

Please sign in to comment.