Skip to content

Commit

Permalink
feat: add the rest of table ops
Browse files Browse the repository at this point in the history
Signed-off-by: callum-ryan <[email protected]>
  • Loading branch information
callum-ryan committed Sep 11, 2024
1 parent 32b54fe commit 3de1771
Showing 1 changed file with 249 additions and 9 deletions.
258 changes: 249 additions & 9 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ impl Catalog for SqlCatalog {
{CATALOG_FIELD_TABLE_NAMESPACE}
FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_CATALOG_NAME} = ?"
AND {CATALOG_FIELD_CATALOG_NAME} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)",
),
vec![Some(&namespace.join(".")), Some(&self.name)],
)
Expand Down Expand Up @@ -553,7 +557,11 @@ impl Catalog for SqlCatalog {
FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?"
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
),
vec![Some(&namespace), Some(&self.name), Some(table_name)],
)
Expand All @@ -566,8 +574,32 @@ impl Catalog for SqlCatalog {
}
}

async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
todo!()
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
if !self.table_exists(identifier).await? {
return no_such_table_err(identifier);
}

self.execute(
&format!(
"DELETE FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
),
vec![
Some(&self.name),
Some(identifier.name()),
Some(&identifier.namespace().join(".")),
],
None,
)
.await?;

Ok(())
}

async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
Expand Down Expand Up @@ -674,8 +706,10 @@ impl Catalog for SqlCatalog {
.await?;

self.execute(&format!(
"INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP})
VALUES (?, ?, ?, ?)"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?;
"INSERT INTO {CATALOG_TABLE_NAME}
({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
VALUES (?, ?, ?, ?, ?)
"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;

Ok(Table::builder()
.file_io(self.fileio.clone())
Expand All @@ -685,8 +719,47 @@ impl Catalog for SqlCatalog {
.build()?)
}

async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
todo!()
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
if src == dest {
return Ok(());
}

if !self.table_exists(src).await? {
return no_such_table_err(src);
}

if !self.namespace_exists(dest.namespace()).await? {
return no_such_namespace_err(dest.namespace());
}

if self.table_exists(dest).await? {
return table_already_exists_err(dest);
}

self.execute(
&format!(
"UPDATE {CATALOG_TABLE_NAME}
SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
),
vec![
Some(dest.name()),
Some(&dest.namespace().join(".")),
Some(&self.name),
Some(src.name()),
Some(&src.namespace().join(".")),
],
None,
)
.await?;

Ok(())
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Expand All @@ -711,6 +784,8 @@ mod tests {
use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};

const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";

fn temp_path() -> String {
let temp_dir = TempDir::new().unwrap();
temp_dir.path().to_str().unwrap().to_string()
Expand Down Expand Up @@ -778,7 +853,11 @@ mod tests {
.unwrap();
}

const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
for table_ident in table_idents {
create_table(catalog, table_ident).await;
}
}

fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
assert_eq!(table.identifier(), expected_table_ident);
Expand Down Expand Up @@ -1470,4 +1549,165 @@ mod tests {
)
);
}

#[tokio::test]
async fn test_rename_table_in_same_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident = NamespaceIdent::new("n1".into());
create_namespace(&catalog, &namespace_ident).await;
let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
create_table(&catalog, &src_table_ident).await;

catalog
.rename_table(&src_table_ident, &dst_table_ident)
.await
.unwrap();

assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
dst_table_ident
],);
}

#[tokio::test]
async fn test_rename_table_across_namespaces() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let src_namespace_ident = NamespaceIdent::new("a".into());
let dst_namespace_ident = NamespaceIdent::new("b".into());
create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
create_table(&catalog, &src_table_ident).await;

catalog
.rename_table(&src_table_ident, &dst_table_ident)
.await
.unwrap();

assert_eq!(
catalog.list_tables(&src_namespace_ident).await.unwrap(),
vec![],
);

assert_eq!(
catalog.list_tables(&dst_namespace_ident).await.unwrap(),
vec![dst_table_ident],
);
}

#[tokio::test]
async fn test_rename_table_src_table_is_same_as_dst_table() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident = NamespaceIdent::new("n1".into());
create_namespace(&catalog, &namespace_ident).await;
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
create_table(&catalog, &table_ident).await;

catalog
.rename_table(&table_ident, &table_ident)
.await
.unwrap();

assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
table_ident
],);
}

#[tokio::test]
async fn test_rename_table_across_nested_namespaces() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident_a = NamespaceIdent::new("a".into());
let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
create_namespaces(&catalog, &vec![
&namespace_ident_a,
&namespace_ident_a_b,
&namespace_ident_a_b_c,
])
.await;

let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
create_tables(&catalog, vec![&src_table_ident]).await;

let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
catalog
.rename_table(&src_table_ident, &dst_table_ident)
.await
.unwrap();

assert!(!catalog.table_exists(&src_table_ident).await.unwrap());

assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
}

#[tokio::test]
async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let src_namespace_ident = NamespaceIdent::new("n1".into());
let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
create_namespace(&catalog, &src_namespace_ident).await;
create_table(&catalog, &src_table_ident).await;

let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
let dst_table_ident =
TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
assert_eq!(
catalog
.rename_table(&src_table_ident, &dst_table_ident)
.await
.unwrap_err()
.to_string(),
format!(
"Unexpected => No such namespace: {:?}",
non_existent_dst_namespace_ident
),
);
}

#[tokio::test]
async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident = NamespaceIdent::new("n1".into());
create_namespace(&catalog, &namespace_ident).await;
let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());

assert_eq!(
catalog
.rename_table(&src_table_ident, &dst_table_ident)
.await
.unwrap_err()
.to_string(),
format!("Unexpected => No such table: {:?}", src_table_ident),
);
}

#[tokio::test]
async fn test_rename_table_throws_error_if_dst_table_already_exists() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident = NamespaceIdent::new("n1".into());
create_namespace(&catalog, &namespace_ident).await;
let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;

assert_eq!(
catalog
.rename_table(&src_table_ident, &dst_table_ident)
.await
.unwrap_err()
.to_string(),
format!(
"Unexpected => Cannot create table {:? }. Table already exists.",
&dst_table_ident
),
);
}
}

0 comments on commit 3de1771

Please sign in to comment.