From 0e2bf7bf1f0bd2325ab5112580008ce0ec159eac Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:01:38 +0100 Subject: [PATCH] feat: create table Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/Cargo.toml | 2 + crates/catalog/sql/src/catalog.rs | 402 +++++++++++++++++++++++++++++- crates/catalog/sql/src/error.rs | 10 + 3 files changed, 403 insertions(+), 11 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 4a88e75b4..16465610b 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -33,6 +33,8 @@ async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } typed-builder = { workspace = true } +uuid = { workspace = true, features = ["v4"] } +serde_json = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 483fe3a6d..15a48ac6a 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,6 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; +use iceberg::spec::TableMetadataBuilder; use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, @@ -27,8 +28,11 @@ use iceberg::{ use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; use sqlx::{Any, AnyPool, Row, Transaction}; use typed_builder::TypedBuilder; +use uuid::Uuid; -use crate::error::{from_sqlx_error, no_such_namespace_err, no_such_table_err}; +use crate::error::{ + from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, +}; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -43,6 +47,8 @@ static NAMESPACE_FIELD_NAME: &str = "namespace"; static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key"; static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value"; +static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location"; + static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning @@ -71,8 +77,8 @@ pub struct SqlCatalogConfig { pub struct SqlCatalog { name: String, connection: AnyPool, - _warehouse_location: String, - _fileio: FileIO, + warehouse_location: String, + fileio: FileIO, sql_bind_style: SqlBindStyle, } @@ -142,8 +148,8 @@ impl SqlCatalog { Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, - _warehouse_location: config.warehouse_location, - _fileio: config.file_io, + warehouse_location: config.warehouse_location, + fileio: config.file_io, sql_bind_style: config.sql_bind_style, }) } @@ -548,7 +554,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ?" ), - vec![Some(&namespace), Some(&self.name), Some(&table_name)], + vec![Some(&namespace), Some(&self.name), Some(table_name)], ) .await?; @@ -569,10 +575,70 @@ impl Catalog for SqlCatalog { async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result { - todo!() + if !self.namespace_exists(namespace).await? { + return no_such_namespace_err(namespace); + } + + let tbl_name = creation.name.clone(); + let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone()); + + if self.table_exists(&tbl_ident).await? { + return table_already_exists_err(&tbl_ident); + } + + let (tbl_creation, location) = match creation.location.clone() { + Some(location) => (creation, location), + None => { + // fall back to namespace-specific location + // and then to warehouse location + let nsp_properties = self.get_namespace(namespace).await?.properties().clone(); + let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) { + Some(location) => location.clone(), + None => { + format!( + "{}/{}", + self.warehouse_location.clone(), + namespace.join("/") + ) + } + }; + + let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name()); + + ( + TableCreation { + location: Some(tbl_location.clone()), + ..creation + }, + tbl_location, + ) + } + }; + + let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?; + let tbl_metadata_location = format!( + "{}/metadata/0-{}.metadata.json", + location.clone(), + Uuid::new_v4() + ); + + let file = self.fileio.new_output(&tbl_metadata_location)?; + file.write(serde_json::to_vec(&tbl_metadata)?.into()) + .await?; + + self.execute(&format!( + "INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}) + VALUES (?, ?, ?, ?)"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .metadata_location(tbl_metadata_location) + .identifier(tbl_ident) + .metadata(tbl_metadata) + .build()?) } async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { @@ -590,11 +656,15 @@ mod tests { use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; - use iceberg::{Catalog, Namespace, NamespaceIdent, TableIdent}; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::table::Table; + use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; + use itertools::Itertools; + use regex::Regex; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; + use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; fn temp_path() -> String { @@ -650,6 +720,67 @@ mod tests { .unwrap() } + async fn create_table(catalog: &C, table_ident: &TableIdent) { + let _ = catalog + .create_table( + &table_ident.namespace, + TableCreation::builder() + .name(table_ident.name().into()) + .schema(simple_table_schema()) + .location(temp_path()) + .build(), + ) + .await + .unwrap(); + } + + const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { + assert_eq!(table.identifier(), expected_table_ident); + + let metadata = table.metadata(); + + assert_eq!(metadata.current_schema().as_ref(), expected_schema); + + let expected_partition_spec = PartitionSpec::builder(expected_schema) + .with_spec_id(0) + .build() + .unwrap(); + + assert_eq!( + metadata + .partition_specs_iter() + .map(|p| p.as_ref()) + .collect_vec(), + vec![&expected_partition_spec] + ); + + let expected_sorted_order = SortOrder::builder() + .with_order_id(0) + .with_fields(vec![]) + .build(expected_schema.clone()) + .unwrap(); + + assert_eq!( + metadata + .sort_orders_iter() + .map(|s| s.as_ref()) + .collect_vec(), + vec![&expected_sorted_order] + ); + + assert_eq!(metadata.properties(), &HashMap::new()); + + assert!(!table.readonly()); + } + + fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) { + let actual = table.metadata_location().unwrap().to_string(); + let regex = Regex::new(regex_str).unwrap(); + assert!(regex.is_match(&actual)) + } + #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); @@ -1046,4 +1177,253 @@ mod tests { ), ); } + + #[tokio::test] + async fn test_create_table_with_location() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + assert!(table + .metadata_location() + .unwrap() + .to_string() + .starts_with(&location)) + } + + #[tokio::test] + async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + let mut namespace_properties = HashMap::new(); + let namespace_location = temp_path(); + namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + namespace_location.to_string(), + ); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/tbl1/metadata/0-{}.metadata.json$", + namespace_location, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + let mut namespace_properties = HashMap::new(); + let namespace_location = temp_path(); + namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + namespace_location.to_string(), + ); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let mut nested_namespace_properties = HashMap::new(); + let nested_namespace_location = temp_path(); + nested_namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + nested_namespace_location.to_string(), + ); + catalog + .create_namespace(&nested_namespace_ident, nested_namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = + TableIdent::new(nested_namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/tbl1/metadata/0-{}.metadata.json$", + nested_namespace_location, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &nested_namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + // note: no location specified in namespace_properties + let namespace_properties = HashMap::new(); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/a/tbl1/metadata/0-{}.metadata.json$", + warehouse_loc, UUID_REGEX_STR + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespace(&catalog, &nested_namespace_ident).await; + + let table_name = "tbl1"; + let expected_table_ident = + TableIdent::new(nested_namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/a/b/tbl1/metadata/0-{}.metadata.json$", + warehouse_loc, UUID_REGEX_STR + ); + + let table = catalog + .create_table( + &nested_namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_table(&catalog, &table_ident).await; + + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + + assert_eq!( + catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + .location(location) + .build() + ) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:?}. Table already exists.", + &table_ident + ) + ); + } } diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index 1487cc812..0bb8ba0c8 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -39,3 +39,13 @@ pub fn no_such_table_err(table_ident: &TableIdent) -> Result { format!("No such table: {:?}", table_ident), )) } + +pub fn table_already_exists_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists.", + table_ident + ), + )) +}