Skip to content

Commit

Permalink
Use namespace location or warehouse location if table location is mis…
Browse files Browse the repository at this point in the history
…sing (#511)
  • Loading branch information
fqaiser94 authored Aug 3, 2024
1 parent bd9eea1 commit 0f7fc20
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ parquet = "52"
pilota = "0.11.2"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
regex = "1.10.5"
reqwest = { version = "^0.12", default-features = false, features = ["json"] }
rust_decimal = "1.31.0"
serde = { version = "^1.0", features = ["rc"] }
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ serde_json = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
regex = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
265 changes: 249 additions & 16 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,24 @@ use uuid::Uuid;

use crate::namespace_state::NamespaceState;

/// namespace `location` property
const LOCATION: &str = "location";

/// Memory catalog implementation.
#[derive(Debug)]
pub struct MemoryCatalog {
root_namespace_state: Mutex<NamespaceState>,
file_io: FileIO,
warehouse_location: Option<String>,
}

impl MemoryCatalog {
/// Creates an memory catalog.
pub fn new(file_io: FileIO) -> Self {
pub fn new(file_io: FileIO, warehouse_location: Option<String>) -> Self {
Self {
root_namespace_state: Mutex::new(NamespaceState::default()),
file_io,
warehouse_location,
}
}
}
Expand Down Expand Up @@ -165,11 +170,20 @@ impl Catalog for MemoryCatalog {
let (table_creation, location) = match table_creation.location.clone() {
Some(location) => (table_creation, location),
None => {
let location = format!(
"{}/{}",
table_ident.namespace().join("/"),
table_ident.name()
);
let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
let location_prefix = match namespace_properties.get(LOCATION) {
Some(namespace_location) => Ok(namespace_location.clone()),
None => match self.warehouse_location.clone() {
Some(warehouse_location) => Ok(format!("{}/{}", warehouse_location, namespace_ident.join("/"))),
None => Err(Error::new(ErrorKind::Unexpected,
format!(
"Cannot create table {:?}. No default path is set, please specify a location when creating a table.",
&table_ident
)))
},
}?;

let location = format!("{}/{}", location_prefix, table_ident.name());

let new_table_creation = TableCreation {
location: Some(location.clone()),
Expand Down Expand Up @@ -273,13 +287,20 @@ mod tests {

use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use regex::Regex;
use tempfile::TempDir;

use super::*;

fn temp_path() -> String {
let temp_dir = TempDir::new().unwrap();
temp_dir.path().to_str().unwrap().to_string()
}

fn new_memory_catalog() -> impl Catalog {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
MemoryCatalog::new(file_io)
let warehouse_location = temp_path();
MemoryCatalog::new(file_io, Some(warehouse_location))
}

async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
Expand Down Expand Up @@ -312,16 +333,12 @@ mod tests {
}

async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
let tmp_dir = TempDir::new().unwrap();
let location = tmp_dir.path().to_str().unwrap().to_string();

let _ = catalog
.create_table(
&table_ident.namespace,
TableCreation::builder()
.name(table_ident.name().into())
.schema(simple_table_schema())
.location(location)
.build(),
)
.await
Expand Down Expand Up @@ -374,6 +391,14 @@ mod tests {
assert!(!table.readonly());
}

const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";

fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
let actual = table.metadata_location().unwrap().to_string();
let regex = Regex::new(regex_str).unwrap();
assert!(regex.is_match(&actual))
}

#[tokio::test]
async fn test_list_namespaces_returns_empty_vector() {
let catalog = new_memory_catalog();
Expand Down Expand Up @@ -990,12 +1015,220 @@ mod tests {
.metadata_location()
.unwrap()
.to_string()
.starts_with(&location));
.starts_with(&location))
}

assert_table_eq(
&catalog.load_table(&expected_table_ident).await.unwrap(),
&expected_table_ident,
&simple_table_schema(),
#[tokio::test]
async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let warehouse_location = temp_path();
let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone()));

let namespace_ident = NamespaceIdent::new("a".into());
let mut namespace_properties = HashMap::new();
let namespace_location = temp_path();
namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
catalog
.create_namespace(&namespace_ident, namespace_properties)
.await
.unwrap();

let table_name = "tbl1";
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/tbl1/metadata/0-{}.metadata.json$",
namespace_location, UUID_REGEX_STR,
);

let table = catalog
.create_table(
&namespace_ident,
TableCreation::builder()
.name(table_name.into())
.schema(simple_table_schema())
// no location specified for table
.build(),
)
.await
.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);

let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
}

#[tokio::test]
async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing(
) {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let warehouse_location = temp_path();
let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone()));

let namespace_ident = NamespaceIdent::new("a".into());
let mut namespace_properties = HashMap::new();
let namespace_location = temp_path();
namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
catalog
.create_namespace(&namespace_ident, namespace_properties)
.await
.unwrap();

let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
let mut nested_namespace_properties = HashMap::new();
let nested_namespace_location = temp_path();
nested_namespace_properties
.insert(LOCATION.to_string(), nested_namespace_location.to_string());
catalog
.create_namespace(&nested_namespace_ident, nested_namespace_properties)
.await
.unwrap();

let table_name = "tbl1";
let expected_table_ident =
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/tbl1/metadata/0-{}.metadata.json$",
nested_namespace_location, UUID_REGEX_STR,
);

let table = catalog
.create_table(
&nested_namespace_ident,
TableCreation::builder()
.name(table_name.into())
.schema(simple_table_schema())
// no location specified for table
.build(),
)
.await
.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);

let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
}

#[tokio::test]
async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
) {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let warehouse_location = temp_path();
let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone()));

let namespace_ident = NamespaceIdent::new("a".into());
// note: no location specified in namespace_properties
let namespace_properties = HashMap::new();
catalog
.create_namespace(&namespace_ident, namespace_properties)
.await
.unwrap();

let table_name = "tbl1";
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/a/tbl1/metadata/0-{}.metadata.json$",
warehouse_location, UUID_REGEX_STR
);

let table = catalog
.create_table(
&namespace_ident,
TableCreation::builder()
.name(table_name.into())
.schema(simple_table_schema())
// no location specified for table
.build(),
)
.await
.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);

let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
}

#[tokio::test]
async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
) {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let warehouse_location = temp_path();
let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone()));

let namespace_ident = NamespaceIdent::new("a".into());
catalog
// note: no location specified in namespace_properties
.create_namespace(&namespace_ident, HashMap::new())
.await
.unwrap();

let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
catalog
// note: no location specified in namespace_properties
.create_namespace(&nested_namespace_ident, HashMap::new())
.await
.unwrap();

let table_name = "tbl1";
let expected_table_ident =
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
warehouse_location, UUID_REGEX_STR
);

let table = catalog
.create_table(
&nested_namespace_ident,
TableCreation::builder()
.name(table_name.into())
.schema(simple_table_schema())
// no location specified for table
.build(),
)
.await
.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);

let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
}

#[tokio::test]
async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing(
) {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let catalog = MemoryCatalog::new(file_io, None);

let namespace_ident = NamespaceIdent::new("a".into());
create_namespace(&catalog, &namespace_ident).await;

let table_name = "tbl1";
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());

assert_eq!(
catalog
.create_table(
&namespace_ident,
TableCreation::builder()
.name(table_name.into())
.schema(simple_table_schema())
.build(),
)
.await
.unwrap_err()
.to_string(),
format!(
"Unexpected => Cannot create table {:?}. No default path is set, please specify a location when creating a table.",
&expected_table_ident
)
)
}

Expand Down

0 comments on commit 0f7fc20

Please sign in to comment.