Skip to content

Commit

Permalink
fix(cat-gateway): Make cassandra namespace compliant.
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenj committed Sep 26, 2024
1 parent 89b0bfc commit cc466b2
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions catalyst-gateway/bin/src/db/index/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;

use anyhow::Context;
use anyhow::{bail, Context};
use handlebars::Handlebars;
use scylla::Session;
use serde_json::json;
Expand Down Expand Up @@ -138,9 +138,9 @@ fn generate_cql_schema_version() -> String {
pub(crate) fn namespace(cfg: &cassandra_db::EnvVars) -> String {
// Build and set the Keyspace to use.
format!(
"{}_V{}",
"{}_{}",
cfg.namespace.as_str(),
generate_cql_schema_version()
generate_cql_schema_version().replace('-', "_")
)
}

Expand All @@ -155,11 +155,19 @@ async fn create_namespace(
// disable default `html_escape` function
// which transforms `<`, `>` symbols to `&lt`, `&gt`
reg.register_escape_fn(|s| s.into());
let query = reg.render_template(CREATE_NAMESPACE_CQL, &json!({"keyspace": keyspace}))?;
let query = reg
.render_template(CREATE_NAMESPACE_CQL, &json!({"keyspace": keyspace}))
.context(format!("Keyspace: {keyspace}"))?;

// Create the Keyspace if it doesn't exist already.
let stmt = session.prepare(query).await?;
session.execute_unpaged(&stmt, ()).await?;
let stmt = session
.prepare(query)
.await
.context(format!("Keyspace: {keyspace}"))?;
session
.execute_unpaged(&stmt, ())
.await
.context(format!("Keyspace: {keyspace}"))?;

// Wait for the Schema to be ready.
session.await_schema_agreement().await?;
Expand All @@ -176,18 +184,29 @@ async fn create_namespace(
pub(crate) async fn create_schema(
session: &mut Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<()> {
create_namespace(session, cfg).await?;

for schema in SCHEMAS {
let stmt = session
.prepare(schema.0)
.await
.context(format!("{} : Prepared", schema.1))?;

session
.execute_unpaged(&stmt, ())
.await
.context(format!("{} : Executed", schema.1))?;
create_namespace(session, cfg)
.await
.context("Creating Namespace")?;

let mut failed = false;

for (schema, schema_name) in SCHEMAS {
match session.prepare(*schema).await {
Ok(stmt) => {
if let Err(err) = session.execute_unpaged(&stmt, ()).await {
failed = true;
error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
};
},
Err(err) => {
failed = true;
error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
},
}
}

if failed {
bail!("Failed to Create Schema");
}

// Wait for the Schema to be ready.
Expand Down

0 comments on commit cc466b2

Please sign in to comment.