From 307fe1da8d85cc6777af55bb52c52c72badf69bb Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Wed, 13 Mar 2024 11:25:08 +0100 Subject: [PATCH] run test --- crates/catalog/sql/Cargo.toml | 1 + crates/catalog/sql/src/catalog.rs | 44 +++++++++++++++++++++---------- crates/catalog/sql/src/error.rs | 2 +- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 5440935b1..f727ad837 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -48,5 +48,6 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +sqlx = { version = "0.7.2", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql"], default-features = false } tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index fc4296be9..c47bc5514 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -22,7 +22,7 @@ use dashmap::DashMap; use futures::{AsyncReadExt, AsyncWriteExt}; use sqlx::{ any::{install_default_drivers, AnyRow}, - AnyPool, Row, + AnyPool, Connection, Row, }; use std::collections::HashMap; @@ -50,32 +50,48 @@ impl SqlCatalog { let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?; - sqlx::query( - "create table if not exists iceberg_tables ( + let mut connection = pool.acquire().await.map_err(from_sqlx_error)?; + + connection + .transaction(|txn| { + Box::pin(async move { + sqlx::query( + "create table if not exists iceberg_tables ( catalog_name text not null, table_namespace text not null, table_name text not null, - metadata_location text not null, + metadata_location text, previous_metadata_location text, primary key (catalog_name, table_namespace, table_name) );", - ) - .execute(&pool) - .await - .map_err(from_sqlx_error)?; + ) + .fetch_all(&mut **txn) + .await + }) + }) + .await + .map_err(from_sqlx_error)?; - sqlx::query( - "create table if not exists iceberg_namespace_properties ( + connection + .transaction(|txn| { + Box::pin(async move { + sqlx::query( + "create table if not exists iceberg_namespace_properties ( catalog_name text not null, namespace text not null, property_key text, property_value text, primary key (catalog_name, namespace, property_key) );", - ) - .execute(&pool) - .await - .map_err(from_sqlx_error)?; + ) + .fetch_all(&mut **txn) + .await + }) + }) + .await + .map_err(from_sqlx_error)?; + + connection.close().await.map_err(from_sqlx_error)?; Ok(SqlCatalog { name: name.to_owned(), diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index c6b998beb..90bba1f05 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -21,7 +21,7 @@ use iceberg::{Error, ErrorKind}; pub fn from_sqlx_error(error: sqlx::Error) -> Error { Error::new( ErrorKind::Unexpected, - "operation failed for hitting io error".to_string(), + "operation failed for hitting sqlx error".to_string(), ) .with_source(error) }