From 469dbe70eca87b1acc94340c86d5bf71bb79cdf2 Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Fri, 15 Mar 2024 16:00:19 -0500 Subject: [PATCH] chore(*): remove content-store All "content" accesses go through the layerdb now. Content-store crate has been removed. Co-Authored-By: Scott Prutton --- .ci/docker-compose.test-integration.yml | 2 +- Cargo.lock | 28 -- Cargo.toml | 1 - bin/sdf/src/main.rs | 3 - component/postgres/BUCK | 2 +- dev/docker-compose.platform.yml | 4 +- lib/content-store/BUCK | 31 -- lib/content-store/Cargo.toml | 25 -- lib/content-store/build.rs | 13 - lib/content-store/src/lib.rs | 35 -- lib/content-store/src/pair.rs | 108 ------ lib/content-store/src/store.rs | 62 ---- lib/content-store/src/store/local.rs | 70 ---- lib/content-store/src/store/pg.rs | 139 -------- .../pg/migrations/U0001__content_pairs.sql | 6 - lib/content-store/src/store/pg/tools.rs | 39 --- lib/content-store/src/value.rs | 94 ----- lib/dal-test/BUCK | 1 - lib/dal-test/Cargo.toml | 1 - lib/dal-test/src/lib.rs | 87 +---- lib/dal/BUCK | 2 - lib/dal/Cargo.toml | 1 - lib/dal/src/attribute/prototype/argument.rs | 2 - lib/dal/src/context.rs | 36 +- lib/dal/src/lib.rs | 11 +- lib/dal/src/prop.rs | 2 - lib/dal/src/workspace_snapshot.rs | 7 +- .../src/workspace_snapshot/content_address.rs | 2 +- lib/dal/src/workspace_snapshot/graph.rs | 324 +++++++++--------- lib/dal/src/workspace_snapshot/graph/tests.rs | 2 +- .../workspace_snapshot/graph/tests/rebase.rs | 2 +- lib/dal/src/workspace_snapshot/node_weight.rs | 2 +- ...ttribute_prototype_argument_node_weight.rs | 2 +- .../attribute_value_node_weight.rs | 2 +- .../node_weight/category_node_weight.rs | 2 +- .../node_weight/component_node_weight.rs | 2 +- .../node_weight/content_node_weight.rs | 2 +- .../node_weight/func_argument_node_weight.rs | 2 +- .../node_weight/func_node_weight.rs | 2 +- .../node_weight/ordering_node_weight.rs | 2 +- .../node_weight/prop_node_weight.rs | 2 +- lib/dal/tests/integration.rs | 3 +- .../tests/integration_test/action/batch.rs | 6 + lib/pinga-server/BUCK | 1 - lib/pinga-server/Cargo.toml | 1 - lib/pinga-server/src/config.rs | 16 - lib/pinga-server/src/server.rs | 2 - lib/rebaser-server/BUCK | 1 - lib/rebaser-server/Cargo.toml | 1 - lib/rebaser-server/src/config.rs | 16 - lib/rebaser-server/src/server.rs | 7 - lib/rebaser-server/src/server/core_loop.rs | 2 - lib/sdf-server/BUCK | 1 - lib/sdf-server/Cargo.toml | 1 - lib/sdf-server/src/server/config.rs | 16 - lib/sdf-server/src/server/server.rs | 3 + lib/sdf-server/tests/api.rs | 3 +- lib/si-layer-cache/src/db.rs | 9 + lib/si-layer-cache/src/db/cache_updates.rs | 1 + lib/si-layer-cache/src/layer_cache.rs | 3 +- .../tests/integration_test/db/cas.rs | 5 + .../tests/integration_test/layer_cache.rs | 7 +- lib/si-test-macros/src/expand.rs | 1 - 63 files changed, 230 insertions(+), 1038 deletions(-) delete mode 100644 lib/content-store/BUCK delete mode 100644 lib/content-store/Cargo.toml delete mode 100644 lib/content-store/build.rs delete mode 100644 lib/content-store/src/lib.rs delete mode 100644 lib/content-store/src/pair.rs delete mode 100644 lib/content-store/src/store.rs delete mode 100644 lib/content-store/src/store/local.rs delete mode 100644 lib/content-store/src/store/pg.rs delete mode 100644 lib/content-store/src/store/pg/migrations/U0001__content_pairs.sql delete mode 100644 lib/content-store/src/store/pg/tools.rs delete mode 100644 lib/content-store/src/value.rs diff --git a/.ci/docker-compose.test-integration.yml b/.ci/docker-compose.test-integration.yml index 4530971e44..b2f5518877 100644 --- a/.ci/docker-compose.test-integration.yml +++ b/.ci/docker-compose.test-integration.yml @@ -29,7 +29,7 @@ services: - "PGPASSWORD=bugbear" - "POSTGRES_USER=si_test" - "POSTGRES_DB=si_test" - - "POSTGRES_MULTIPLE_DBS=si_test_content_store,si_test_dal,si_test_sdf_server,si_test_key_value_pairs" + - "POSTGRES_MULTIPLE_DBS=si_test_dal,si_test_sdf_server,si_test_layer_db" command: - "-c" - "fsync=off" diff --git a/Cargo.lock b/Cargo.lock index 798f462d5a..8905d2ba8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1079,29 +1079,6 @@ dependencies = [ "url", ] -[[package]] -name = "content-store" -version = "0.1.0" -dependencies = [ - "async-trait", - "blake3", - "bytes 1.5.0", - "chrono", - "color-eyre", - "postcard", - "postgres-types", - "refinery", - "remain", - "serde", - "serde_json", - "si-cbor", - "si-data-pg", - "si-events", - "telemetry", - "thiserror", - "uuid", -] - [[package]] name = "convert_case" version = "0.4.0" @@ -1506,7 +1483,6 @@ dependencies = [ "buck2-resources", "chrono", "ciborium", - "content-store", "convert_case 0.6.0", "council-server", "dal-test", @@ -1567,7 +1543,6 @@ version = "0.1.0" dependencies = [ "buck2-resources", "color-eyre", - "content-store", "council-server", "dal", "derive_builder", @@ -3975,7 +3950,6 @@ name = "pinga-server" version = "0.1.0" dependencies = [ "buck2-resources", - "content-store", "dal", "derive_builder", "futures", @@ -4490,7 +4464,6 @@ name = "rebaser-server" version = "0.1.0" dependencies = [ "buck2-resources", - "content-store", "dal", "derive_builder", "futures", @@ -5012,7 +4985,6 @@ dependencies = [ "base64 0.21.7", "buck2-resources", "chrono", - "content-store", "convert_case 0.6.0", "dal", "dal-test", diff --git a/Cargo.toml b/Cargo.toml index 798a4691e0..7591eb2099 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ members = [ "lib/buck2-resources", "lib/bytes-lines-codec", "lib/config-file", - "lib/content-store", "lib/council-server", "lib/cyclone-client", "lib/cyclone-core", diff --git a/bin/sdf/src/main.rs b/bin/sdf/src/main.rs index 8c77c9fbe1..0a868a6a6f 100644 --- a/bin/sdf/src/main.rs +++ b/bin/sdf/src/main.rs @@ -101,8 +101,6 @@ async fn async_main() -> Result<()> { let pg_pool = Server::create_pg_pool(config.pg_pool()).await?; - let content_store_pg_pool = Server::create_pg_pool(config.content_store_pg_pool()).await?; - let veritech = Server::create_veritech_client(nats_conn.clone()); let symmetric_crypto_service = @@ -138,7 +136,6 @@ async fn async_main() -> Result<()> { Some(module_index_url), symmetric_crypto_service, rebaser_config, - content_store_pg_pool, layer_db, ); diff --git a/component/postgres/BUCK b/component/postgres/BUCK index 84b6447800..0fd6770101 100644 --- a/component/postgres/BUCK +++ b/component/postgres/BUCK @@ -30,7 +30,7 @@ docker_image( "--env", "POSTGRES_DB=si", "--env", - "POSTGRES_MULTIPLE_DBS=si_content_store,si_auth,si_layer_db,si_test,si_test_content_store,si_test_dal,si_test_sdf_server", + "POSTGRES_MULTIPLE_DBS=si_auth,si_layer_db,si_test,si_test_dal,si_test_sdf_server", "--publish", "5432:5432", ], diff --git a/dev/docker-compose.platform.yml b/dev/docker-compose.platform.yml index 05bbfc44d1..685474cb79 100644 --- a/dev/docker-compose.platform.yml +++ b/dev/docker-compose.platform.yml @@ -9,7 +9,7 @@ services: - "PGPASSWORD=bugbear" - "POSTGRES_USER=si" - "POSTGRES_DB=si" - - "POSTGRES_MULTIPLE_DBS=si_content_store,si_layer_db,si_auth,si_module_index,si_key_value_pairs" + - "POSTGRES_MULTIPLE_DBS=si_layer_db,si_auth,si_module_index" ports: - "5432:5432" healthcheck: @@ -25,7 +25,7 @@ services: - "PGPASSWORD=bugbear" - "POSTGRES_USER=si_test" - "POSTGRES_DB=si_test" - - "POSTGRES_MULTIPLE_DBS=si_test_content_store,si_test_dal,si_test_sdf_server,si_test_key_value_pairs" + - "POSTGRES_MULTIPLE_DBS=si_test_dal,si_test_sdf_server,si_test_layer_db" command: - "-c" - "fsync=off" diff --git a/lib/content-store/BUCK b/lib/content-store/BUCK deleted file mode 100644 index 315a68acca..0000000000 --- a/lib/content-store/BUCK +++ /dev/null @@ -1,31 +0,0 @@ -load("@prelude-si//:macros.bzl", "rust_library") - -rust_library( - name = "content-store", - deps = [ - "//lib/si-cbor:si-cbor", - "//lib/si-data-pg:si-data-pg", - "//lib/si-events-rs:si-events", - "//lib/telemetry-rs:telemetry", - "//third-party/rust:async-trait", - "//third-party/rust:blake3", - "//third-party/rust:bytes", - "//third-party/rust:chrono", - "//third-party/rust:color-eyre", - "//third-party/rust:postcard", - "//third-party/rust:postgres-types", - "//third-party/rust:refinery", - "//third-party/rust:remain", - "//third-party/rust:serde", - "//third-party/rust:serde_json", - "//third-party/rust:thiserror", - "//third-party/rust:uuid", - ], - srcs = glob([ - "src/**/*.rs", - "src/store/pg/migrations/**/*.sql", - ]), - env = { - "CARGO_MANIFEST_DIR": ".", - }, -) diff --git a/lib/content-store/Cargo.toml b/lib/content-store/Cargo.toml deleted file mode 100644 index f3ba107f17..0000000000 --- a/lib/content-store/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "content-store" -version = "0.1.0" -edition = "2021" -publish = false - -[dependencies] -si-cbor = { path = "../../lib/si-cbor" } -si-data-pg = { path = "../../lib/si-data-pg" } -si-events = { path = "../../lib/si-events-rs" } -telemetry = { path = "../../lib/telemetry-rs" } - -async-trait = { workspace = true } -blake3 = { workspace = true } -bytes = { workspace = true } -chrono = { workspace = true } -color-eyre = { workspace = true } -postcard = { workspace = true } -postgres-types = { workspace = true } -refinery = { workspace = true } -remain = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -thiserror = { workspace = true } -uuid = { workspace = true } diff --git a/lib/content-store/build.rs b/lib/content-store/build.rs deleted file mode 100644 index d2e79ef4d0..0000000000 --- a/lib/content-store/build.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::fs; - -fn main() -> Result<(), Box> { - println!("cargo:rerun-if-changed=src/store/pg/migrations"); - for entry in fs::read_dir("./src/store/pg/migrations")? { - let entry = entry?; - let path = entry.path(); - if path.is_file() { - println!("cargo:rerun-if-changed={}", path.display()); - } - } - Ok(()) -} diff --git a/lib/content-store/src/lib.rs b/lib/content-store/src/lib.rs deleted file mode 100644 index 7e0a7be16e..0000000000 --- a/lib/content-store/src/lib.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! This crate provides the ability to interface with content stores of varying kinds as well as -//! the ability to generate hashes for hashable content blobs. - -#![warn( - missing_debug_implementations, - missing_docs, - unreachable_pub, - bad_style, - dead_code, - improper_ctypes, - non_shorthand_field_patterns, - no_mangle_generic_items, - overflowing_literals, - path_statements, - patterns_in_fns_without_body, - unconditional_recursion, - unused, - unused_allocation, - unused_comparisons, - unused_parens, - while_true, - clippy::missing_panics_doc -)] - -mod pair; -mod store; -mod value; - -pub use si_events::{content_hash::ContentHashParseError, ContentHash}; -pub use store::local::LocalStore; -pub use store::pg::tools::PgStoreTools; -pub use store::pg::PgStore; -pub use store::Store; -pub use store::{StoreError, StoreResult}; -pub use value::Value; diff --git a/lib/content-store/src/pair.rs b/lib/content-store/src/pair.rs deleted file mode 100644 index 8db782e1a9..0000000000 --- a/lib/content-store/src/pair.rs +++ /dev/null @@ -1,108 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use si_data_pg::{PgError, PgPool, PgPoolError, PgRow}; -use std::str::FromStr; -use telemetry::prelude::*; -use thiserror::Error; - -use crate::{ContentHash, ContentHashParseError}; - -#[remain::sorted] -#[derive(Error, Debug)] -pub enum ContentPairError { - #[error("content hash parse error: {0}")] - ContentHashParse(#[from] ContentHashParseError), - #[error("pg error: {0}")] - Pg(#[from] PgError), - #[error("pg pool error: {0}")] - PgPool(#[from] PgPoolError), -} - -pub(crate) type ContentPairResult = Result; - -#[derive(Debug, Serialize, Deserialize)] -pub(crate) struct ContentPair { - key: String, - created_at: DateTime, - value: Vec, -} - -impl TryFrom for ContentPair { - type Error = ContentPairError; - - fn try_from(row: PgRow) -> Result { - Ok(Self { - key: row.try_get("key")?, - created_at: row.try_get("created_at")?, - value: row.try_get("value")?, - }) - } -} - -impl ContentPair { - #[instrument(name = "content_store.content_pair.new", level = "debug", skip_all)] - pub(crate) async fn new( - pg_pool: &PgPool, - key: ContentHash, - value: &[u8], - ) -> ContentPairResult<()> { - let client = pg_pool.get().await?; - client - .query( - "INSERT INTO content_pairs (key, value) VALUES ($1, $2) ON CONFLICT DO NOTHING", - &[&key.to_string(), &value], - ) - .await?; - Ok(()) - } - - pub(crate) fn value(&self) -> &[u8] { - &self.value - } - - pub(crate) fn key(&self) -> ContentPairResult { - Ok(ContentHash::from_str(self.key.as_str())?) - } - - pub(crate) async fn find( - pg_pool: &PgPool, - key: &ContentHash, - ) -> ContentPairResult> { - let client = pg_pool.get().await?; - let maybe_row = client - .query_opt( - "SELECT * FROM content_pairs WHERE key = $1 LIMIT 1", - &[&key.to_string()], - ) - .await?; - match maybe_row { - Some(row) => Ok(Some(Self::try_from(row)?)), - None => Ok(None), - } - } - - pub(crate) async fn find_many( - pg_pool: &PgPool, - keys: &[ContentHash], - ) -> ContentPairResult> { - let mut result = vec![]; - let client = pg_pool.get().await?; - - let key_strings: Vec = keys.iter().map(|k| k.to_string()).collect(); - let key_string_refs: Vec<&String> = key_strings.iter().collect(); - - let rows = client - .query( - "SELECT * FROM content_pairs WHERE key = any($1)", - &[&key_string_refs], - ) - .await?; - - for row in rows { - let pair = Self::try_from(row)?; - result.push(pair); - } - - Ok(result) - } -} diff --git a/lib/content-store/src/store.rs b/lib/content-store/src/store.rs deleted file mode 100644 index caef5ae510..0000000000 --- a/lib/content-store/src/store.rs +++ /dev/null @@ -1,62 +0,0 @@ -use serde::de::DeserializeOwned; -use serde::Serialize; -use si_data_pg::{PgError, PgPoolError}; -use std::collections::HashMap; -use thiserror::Error; - -use crate::pair::ContentPairError; -use crate::ContentHash; - -pub(crate) mod local; -pub(crate) mod pg; - -#[allow(missing_docs)] -#[remain::sorted] -#[derive(Error, Debug)] -pub enum StoreError { - #[error("content pair error: {0}")] - ContentPair(#[from] ContentPairError), - #[error("pg error: {0}")] - Pg(#[from] PgError), - #[error("pg pool error: {0}")] - PgPool(#[from] PgPoolError), - #[error("postcard error: {0}")] - Postcard(#[from] postcard::Error), - #[error("serde json error: {0}")] - SerdeJson(#[from] serde_json::Error), -} - -/// The [`Result`] type used by the [`Store`] trait methods -pub type StoreResult = Result; - -/// This trait provides the minimum methods needed to create a content store. -#[async_trait::async_trait] -pub trait Store { - /// Indicates whether or not the store is empty. - fn is_empty(&self) -> bool; - - /// Indicates the number of keys in the store. - fn len(&self) -> usize; - - /// Adds an item to the store. - fn add(&mut self, object: &T) -> StoreResult - where - T: Serialize + ?Sized; - - /// Gets an item from the store. - /// - /// Implementers of this trait may want to consider a "pull-through cache" implementation for - /// this method. - async fn get(&mut self, key: &ContentHash) -> StoreResult> - where - T: DeserializeOwned; - - /// Gets multiple items from the store - /// - async fn get_bulk(&mut self, keys: &[ContentHash]) -> StoreResult> - where - T: DeserializeOwned + std::marker::Send; - - /// Writes out content in the store to a persistent storage layer, if applicable. - async fn write(&mut self) -> StoreResult<()>; -} diff --git a/lib/content-store/src/store/local.rs b/lib/content-store/src/store/local.rs deleted file mode 100644 index ee6cb7f8ff..0000000000 --- a/lib/content-store/src/store/local.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::store::{Store, StoreResult}; -use crate::ContentHash; -use serde::de::DeserializeOwned; -use serde::Serialize; -use std::collections::HashMap; -use telemetry::prelude::*; - -/// A kind of content store that operates entirely in memory. -#[derive(Default, Debug)] -pub struct LocalStore(HashMap>); - -#[async_trait::async_trait] -impl Store for LocalStore { - fn is_empty(&self) -> bool { - self.0.is_empty() - } - - fn len(&self) -> usize { - self.0.len() - } - - fn add(&mut self, object: &T) -> StoreResult - where - T: Serialize + ?Sized, - { - let value = serde_json::to_vec(object)?; - let key = ContentHash::new(&value); - self.0.insert(key, value); - Ok(key) - } - - async fn get(&mut self, key: &ContentHash) -> StoreResult> - where - T: DeserializeOwned, - { - let maybe_object = match self.0.get(key) { - Some(value) => Some(serde_json::from_slice(value)?), - None => None, - }; - Ok(maybe_object) - } - - async fn get_bulk(&mut self, keys: &[ContentHash]) -> StoreResult> - where - T: DeserializeOwned + std::marker::Send, - { - Ok(keys - .iter() - .filter_map(|key| match self.0.get(key) { - None => None, - Some(item) => match serde_json::from_slice(item) { - Ok(deserialized) => Some((key.to_owned(), deserialized)), - Err(err) => { - error!( - "Could not deserialize item {} in content store: {}", - key, - err.to_string() - ); - None - } - }, - }) - .collect()) - } - - /// This a "no-op" for the [`LocalStore`] since everything is handled in memory. - async fn write(&mut self) -> StoreResult<()> { - Ok(()) - } -} diff --git a/lib/content-store/src/store/pg.rs b/lib/content-store/src/store/pg.rs deleted file mode 100644 index be50f9ca90..0000000000 --- a/lib/content-store/src/store/pg.rs +++ /dev/null @@ -1,139 +0,0 @@ -use serde::de::DeserializeOwned; -use serde::Serialize; -use si_data_pg::PgPool; -use std::collections::HashMap; - -use crate::pair::ContentPair; -use crate::store::{Store, StoreResult}; -use crate::ContentHash; -use crate::PgStoreTools; - -pub(crate) mod tools; - -/// A content store backed by Postgres. -#[derive(Debug, Clone)] -pub struct PgStore { - inner: HashMap, - pg_pool: PgPool, -} - -#[derive(Default, Debug, Clone, Eq, PartialEq)] -struct PgStoreItem { - value: Vec, - written: bool, -} - -impl PgStoreItem { - fn new(value: Vec) -> Self { - Self { - value, - ..Default::default() - } - } -} - -impl PgStore { - /// Create a new [`PgStore`] from a given [`PgPool`]. - pub async fn new(pg_pool: PgPool) -> StoreResult { - Ok(Self { - inner: Default::default(), - pg_pool, - }) - } - - /// Create a new [`PgStore`] from a given [`PgPool`]. - pub async fn new_production() -> StoreResult { - let pg_pool = PgStoreTools::new_production_pg_pool().await?; - Ok(Self { - inner: Default::default(), - pg_pool, - }) - } - - /// Migrate the content store database - pub async fn migrate(pg_pool: &PgPool) -> StoreResult<()> { - PgStoreTools::migrate(pg_pool).await?; - - Ok(()) - } - - /// Access the internal pg_pool - pub fn pg_pool(&self) -> &PgPool { - &self.pg_pool - } -} - -#[async_trait::async_trait] -impl Store for PgStore { - fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - fn len(&self) -> usize { - self.inner.len() - } - - fn add(&mut self, object: &T) -> StoreResult - where - T: Serialize + ?Sized, - { - let value = postcard::to_stdvec(object)?; - let key = ContentHash::new(value.as_slice()); - self.inner.insert(key, PgStoreItem::new(value)); - Ok(key) - } - - async fn get(&mut self, key: &ContentHash) -> StoreResult> - where - T: DeserializeOwned, - { - let object = match self.inner.get(key) { - Some(item) => postcard::from_bytes(&item.value)?, - None => match ContentPair::find(&self.pg_pool, key).await? { - Some(content_pair) => { - let encoded = content_pair.value(); - let decoded = postcard::from_bytes(encoded)?; - self.add(encoded)?; - - decoded - } - None => return Ok(None), - }, - }; - Ok(Some(object)) - } - - async fn get_bulk(&mut self, keys: &[ContentHash]) -> StoreResult> - where - T: DeserializeOwned + std::marker::Send, - { - let mut result = HashMap::new(); - let mut keys_to_fetch = vec![]; - - for key in keys { - match self.inner.get(key) { - Some(item) => { - result.insert(*key, postcard::from_bytes(&item.value)?); - } - None => keys_to_fetch.push(*key), - } - } - - for pair in ContentPair::find_many(&self.pg_pool, keys_to_fetch.as_slice()).await? { - let encoded = pair.value(); - result.insert(pair.key()?, postcard::from_bytes(encoded)?); - self.add(encoded)?; - } - Ok(result) - } - - async fn write(&mut self) -> StoreResult<()> { - for (key, item) in self.inner.iter_mut() { - if !item.written { - ContentPair::new(&self.pg_pool, key.to_owned(), &item.value).await?; - item.written = true; - } - } - Ok(()) - } -} diff --git a/lib/content-store/src/store/pg/migrations/U0001__content_pairs.sql b/lib/content-store/src/store/pg/migrations/U0001__content_pairs.sql deleted file mode 100644 index 309657efbd..0000000000 --- a/lib/content-store/src/store/pg/migrations/U0001__content_pairs.sql +++ /dev/null @@ -1,6 +0,0 @@ -CREATE TABLE content_pairs -( - key text primary key NOT NULL, - created_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(), - value bytea NOT NULL -); diff --git a/lib/content-store/src/store/pg/tools.rs b/lib/content-store/src/store/pg/tools.rs deleted file mode 100644 index 0a548a92db..0000000000 --- a/lib/content-store/src/store/pg/tools.rs +++ /dev/null @@ -1,39 +0,0 @@ -use si_data_pg::{PgPool, PgPoolConfig, PgPoolError}; -use telemetry::prelude::*; - -mod embedded { - use refinery::embed_migrations; - - embed_migrations!("./src/store/pg/migrations"); -} - -const DBNAME: &str = "si_content_store"; -const APPLICATION_NAME: &str = "si-content-store"; - -/// A unit struct that provides helpers for performing [`PgStore`] migrations. -#[allow(missing_debug_implementations)] -pub struct PgStoreTools; - -impl PgStoreTools { - /// Create a new [`PgPool`] for a production [`PgStore`]. - pub async fn new_production_pg_pool() -> Result { - let pg_pool_config = Self::default_pool_config(); - let pg_pool = PgPool::new(&pg_pool_config).await?; - Ok(pg_pool) - } - - /// The default pool configuration for the PgStore - pub fn default_pool_config() -> PgPoolConfig { - PgPoolConfig { - dbname: DBNAME.to_string(), - application_name: APPLICATION_NAME.to_string(), - ..Default::default() - } - } - - /// Perform migrations for the database. - #[instrument(skip_all)] - pub async fn migrate(pg_pool: &PgPool) -> Result<(), PgPoolError> { - pg_pool.migrate(embedded::migrations::runner()).await - } -} diff --git a/lib/content-store/src/value.rs b/lib/content-store/src/value.rs deleted file mode 100644 index 54003b1fb9..0000000000 --- a/lib/content-store/src/value.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::collections::BTreeMap; - -#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize, Clone)] -pub enum ValueNumber { - U64(u64), - I64(i64), - F64(f64), -} - -/// A type that can be converted to and from serde_json::Value types infallibly, -/// *so long as* arbitrary precision arithmetic is not enabled for serde_json. -/// This is necessary because postcard will *not* deserialize serde_json's `Number` -/// type, but we still want to store arbitrary payloads in our content store. -/// The alternative is to serialize the value to a string and then serialize -/// that string with postcard. -#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize, Clone)] -#[remain::sorted] -pub enum Value { - /// An array of values - Array(Vec), - /// A boolean scalar - Bool(bool), - /// A null value - Null, - /// A Number value. JSON numbers are either double precision IEEE floating point values, or - /// they in some implementations can be BigInt values. However, we're currently only going to - /// support double precision floats and 64 bit integers. If arbitrary precision integers are - /// enabled for serde_json, this *will* cause a panic. - Number(ValueNumber), - /// An object. BTreeMap is the internal representation used by serde_json for objects, - /// *unless* order preservation is enabled. If order preservation is enabled, we will - /// lose that ordering information in the conversion to/from `serde_json::Value``. - Object(BTreeMap), - /// A string scalar value - String(String), -} - -// todo: make this non-recursive for maps and arrays -impl From for Value { - fn from(value: serde_json::Value) -> Self { - match value { - serde_json::Value::Null => Self::Null, - serde_json::Value::Bool(b) => Self::Bool(b), - serde_json::Value::Number(n) => Value::Number(if n.is_u64() { - ValueNumber::U64( - n.as_u64() - .expect("serde_json said it was a u64 but refused to give me one"), - ) - } else if n.is_i64() { - ValueNumber::I64( - n.as_i64() - .expect("serde_json said it was an i64 but refused to give me one"), - ) - } else if n.is_f64() { - ValueNumber::F64( - n.as_f64() - .expect("serde_json said it was an f64 but refused to give me one"), - ) - } else { - panic!("the arbitrary_precision feature of serde_json is not supported"); - }), - serde_json::Value::Array(mut a) => Self::Array(a.drain(..).map(|e| e.into()).collect()), - serde_json::Value::String(s) => Self::String(s), - // Can we avoid these clones? - serde_json::Value::Object(map) => Self::Object( - map.iter() - .map(|(k, v)| (k.to_owned(), v.to_owned().into())) - .collect(), - ), - } - } -} - -impl From for serde_json::Value { - fn from(value: Value) -> Self { - match value { - Value::Null => serde_json::Value::Null, - Value::Bool(b) => serde_json::Value::Bool(b), - Value::Array(mut a) => serde_json::Value::Array(a.drain(..).map(Into::into).collect()), - Value::Number(n) => serde_json::Value::Number(match n { - ValueNumber::U64(n) => n.into(), - ValueNumber::I64(n) => n.into(), - ValueNumber::F64(n) => serde_json::value::Number::from_f64(n) - .expect("cannot deserialize an infinite or NAN f64 value"), - }), - Value::String(s) => serde_json::Value::String(s), - Value::Object(map) => serde_json::Value::Object( - map.iter() - .map(|(k, v)| (k.to_owned(), v.to_owned().into())) - .collect(), - ), - } - } -} diff --git a/lib/dal-test/BUCK b/lib/dal-test/BUCK index c04ff2ed8d..1096bdd8d1 100644 --- a/lib/dal-test/BUCK +++ b/lib/dal-test/BUCK @@ -4,7 +4,6 @@ rust_library( name = "dal-test", deps = [ "//lib/buck2-resources:buck2-resources", - "//lib/content-store:content-store", "//lib/council-server:council-server", "//lib/dal:dal", "//lib/module-index-client:module-index-client", diff --git a/lib/dal-test/Cargo.toml b/lib/dal-test/Cargo.toml index ca199c8769..9cf15f8298 100644 --- a/lib/dal-test/Cargo.toml +++ b/lib/dal-test/Cargo.toml @@ -7,7 +7,6 @@ publish = false [dependencies] buck2-resources = { path = "../../lib/buck2-resources" } -content-store = { path = "../../lib/content-store" } council-server = { path = "../../lib/council-server" } dal = { path = "../../lib/dal" } module-index-client = { path = "../../lib/module-index-client" } diff --git a/lib/dal-test/src/lib.rs b/lib/dal-test/src/lib.rs index 46e811b592..e4ec8cbb0a 100644 --- a/lib/dal-test/src/lib.rs +++ b/lib/dal-test/src/lib.rs @@ -8,7 +8,6 @@ use std::{ }; use buck2_resources::Buck2Resources; -use content_store::PgStoreTools; use dal::{ job::processor::{JobQueueProcessor, NatsProcessor}, DalContext, DalLayerDb, JwtPublicSigningKey, ModelResult, ServicesContext, Workspace, @@ -51,7 +50,6 @@ const ENV_VAR_NATS_URL: &str = "SI_TEST_NATS_URL"; const ENV_VAR_MODULE_INDEX_URL: &str = "SI_TEST_MODULE_INDEX_URL"; const ENV_VAR_PG_HOSTNAME: &str = "SI_TEST_PG_HOSTNAME"; const ENV_VAR_PG_DBNAME: &str = "SI_TEST_PG_DBNAME"; -const ENV_VAR_CONTENT_STORE_PG_DBNAME: &str = "SI_TEST_CONTENT_STORE_PG_DBNAME"; const ENV_VAR_LAYER_CACHE_PG_DBNAME: &str = "SI_TEST_LAYER_CACHE_PG_DBNAME"; const ENV_VAR_PG_USER: &str = "SI_TEST_PG_USER"; const ENV_VAR_PG_PORT: &str = "SI_TEST_PG_PORT"; @@ -111,8 +109,6 @@ pub struct Config { #[allow(dead_code)] #[builder(default)] rebaser_config: RebaserClientConfig, - #[builder(default = "PgStoreTools::default_pool_config()")] - content_store_pg_pool: PgPoolConfig, #[builder(default = "si_layer_cache::default_pg_pool_config()")] layer_cache_pg_pool: PgPoolConfig, } @@ -122,7 +118,6 @@ impl Config { // all are prefixed with `SI_TEST_` fn create_default( pg_dbname: &'static str, - content_store_pg_dbname: &'static str, layer_cache_pg_dbname: &'static str, ) -> Result { let mut config = { @@ -147,20 +142,6 @@ impl Config { config.pg.pool_max_size *= 32; config.pg.certificate_path = Some(config.postgres_key_path.clone().try_into()?); - if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) { - config.content_store_pg_pool.hostname = value; - } - config.content_store_pg_pool.dbname = env::var(ENV_VAR_CONTENT_STORE_PG_DBNAME) - .unwrap_or_else(|_| content_store_pg_dbname.to_string()); - config.content_store_pg_pool.user = - env::var(ENV_VAR_PG_USER).unwrap_or_else(|_| DEFAULT_TEST_PG_USER.to_string()); - config.content_store_pg_pool.port = env::var(ENV_VAR_PG_PORT) - .unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string()) - .parse()?; - config.content_store_pg_pool.pool_max_size *= 32; - config.content_store_pg_pool.certificate_path = - Some(config.postgres_key_path.clone().try_into()?); - if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) { config.layer_cache_pg_pool.hostname = value; } @@ -232,9 +213,6 @@ pub struct TestContext { encryption_key: Arc, /// A service that can encrypt values based on the loaded donkeys symmetric_crypto_service: SymmetricCryptoService, - /// The pg_pool used by the content-addressable [`store`](content_store::Store) used by the - /// "dal". - content_store_pg_pool: PgPool, /// The pg_pool for the layer db layer_db_pg_pool: PgPool, /// The sled path for the layer db @@ -253,21 +231,16 @@ impl TestContext { /// database creation, migrations, and other preparations. pub async fn global( pg_dbname: &'static str, - content_store_pg_dbname: &'static str, layer_cache_pg_dbname: &'static str, ) -> Result { let mut mutex_guard = TEST_CONTEXT_BUILDER.lock().await; match &*mutex_guard { ContextBuilderState::Uninitialized => { - let config = Config::create_default( - pg_dbname, - content_store_pg_dbname, - layer_cache_pg_dbname, - ) - .si_inspect_err(|err| { - *mutex_guard = ContextBuilderState::errored(err.to_string()) - })?; + let config = Config::create_default(pg_dbname, layer_cache_pg_dbname) + .si_inspect_err(|err| { + *mutex_guard = ContextBuilderState::errored(err.to_string()) + })?; let test_context_builder = TestContextBuilder::create(config) .await .si_inspect_err(|err| { @@ -333,7 +306,6 @@ impl TestContext { None, self.symmetric_crypto_service.clone(), self.rebaser_config.clone(), - self.content_store_pg_pool.clone(), layer_db, ) } @@ -378,13 +350,9 @@ impl TestContextBuilder { let pg_pool = PgPool::new(&self.config.pg) .await .wrap_err("failed to create global setup PgPool")?; - let content_store_pool = PgPool::new(&self.config.content_store_pg_pool) - .await - .wrap_err("failed to create global setup content store PgPool")?; let layer_cache_pg_pool = PgPool::new(&self.config.layer_cache_pg_pool).await?; - self.build_inner(pg_pool, content_store_pool, layer_cache_pg_pool) - .await + self.build_inner(pg_pool, layer_cache_pg_pool).await } /// Builds and returns a new [`TestContext`] with its own connection pooling for each test. @@ -392,24 +360,15 @@ impl TestContextBuilder { let pg_pool = self .create_test_specific_db_with_pg_pool(&self.config.pg) .await?; - let content_store_pg_pool = self - .create_test_specific_db_with_pg_pool(&self.config.content_store_pg_pool) - .await?; let layer_cache_pg_pool = self .create_test_specific_db_with_pg_pool(&self.config.layer_cache_pg_pool) .await?; - self.build_inner(pg_pool, content_store_pg_pool, layer_cache_pg_pool) - .await + self.build_inner(pg_pool, layer_cache_pg_pool).await } - async fn build_inner( - &self, - pg_pool: PgPool, - content_store_pg_pool: PgPool, - layer_db_pg_pool: PgPool, - ) -> Result { + async fn build_inner(&self, pg_pool: PgPool, layer_db_pg_pool: PgPool) -> Result { let universal_prefix = random_identifier_string(); // Need to make a new NatsConfig so that we can add the test-specific subject prefix @@ -440,7 +399,6 @@ impl TestContextBuilder { encryption_key: self.encryption_key.clone(), symmetric_crypto_service, rebaser_config, - content_store_pg_pool, layer_db_pg_pool, layer_db_sled_path: si_layer_cache::disk_cache::default_sled_path()?.to_string(), }) @@ -591,7 +549,6 @@ pub fn rebaser_server(services_context: &ServicesContext) -> Result Result<()> { .await .wrap_err("failed to connect to database, is it running and available?")?; - info!("testing global content store database connection"); - services_ctx - .content_store_pg_pool() - .test_connection() - .await - .wrap_err("failed to connect to content store database, is it running and available?")?; - #[allow(clippy::disallowed_methods)] // Environment variables are used exclusively in test and // all are prefixed with `SI_TEST_` if !env::var(ENV_VAR_KEEP_OLD_DBS).is_ok_and(|v| !v.is_empty()) { @@ -687,10 +637,6 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> { .await .wrap_err("failed to drop old databases")?; - info!("dropping old test-specific content store databases"); - drop_old_test_databases(services_ctx.content_store_pg_pool()) - .await - .wrap_err("failed to drop old test-specific content store databases")?; info!("dropping old test-specific layerdb databases"); drop_old_test_databases(services_ctx.layer_db().pg_pool()) .await @@ -705,24 +651,24 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> { .await .wrap_err("failed to drop and create the database")?; - services_ctx - .content_store_pg_pool() - .drop_and_create_public_schema() - .await - .wrap_err("failed to drop and create content store database")?; - services_ctx .layer_db() .pg_pool() .drop_and_create_public_schema() .await - .wrap_err("failed to drop and create content store database")?; + .wrap_err("failed to drop and create layer db database")?; info!("running database migrations"); - dal::migrate(services_ctx.pg_pool(), services_ctx.content_store_pg_pool()) + dal::migrate(services_ctx.pg_pool()) .await .wrap_err("failed to migrate database")?; + services_ctx + .layer_db() + .pg_migrate() + .await + .wrap_err("failed to migrate layerdb")?; + // Check if the user would like to skip migrating schemas. This is helpful for boosting // performance when running integration tests that do not rely on builtin schemas. // let selected_test_builtin_schemas = determine_selected_test_builtin_schemas(); @@ -742,7 +688,6 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> { test_context.config.module_index_url.clone(), services_ctx.symmetric_crypto_service(), services_ctx.rebaser_config().clone(), - services_ctx.content_store_pg_pool(), services_ctx.layer_db().clone(), ) .await @@ -785,7 +730,6 @@ async fn migrate_local_builtins( module_index_url: String, symmetric_crypto_service: &SymmetricCryptoService, rebaser_config: RebaserClientConfig, - content_store_pg_pool: &PgPool, layer_db: DalLayerDb, ) -> ModelResult<()> { let services_context = ServicesContext::new( @@ -798,7 +742,6 @@ async fn migrate_local_builtins( Some(module_index_url), symmetric_crypto_service.clone(), rebaser_config, - content_store_pg_pool.clone(), layer_db.clone(), ); let dal_context = services_context.into_builder(true); diff --git a/lib/dal/BUCK b/lib/dal/BUCK index 695143167b..6e89094b36 100644 --- a/lib/dal/BUCK +++ b/lib/dal/BUCK @@ -8,7 +8,6 @@ rust_library( name = "dal", deps = [ "//lib/si-cbor:si-cbor", - "//lib/content-store:content-store", "//lib/council-server:council-server", "//lib/nats-subscriber:nats-subscriber", "//lib/object-tree:object-tree", @@ -83,7 +82,6 @@ rust_library( rust_test( name = "test-integration", deps = [ - "//lib/content-store:content-store", "//lib/dal-test:dal-test", "//lib/rebaser-client:rebaser-client", "//lib/rebaser-core:rebaser-core", diff --git a/lib/dal/Cargo.toml b/lib/dal/Cargo.toml index 69944e858b..c48be82ce3 100644 --- a/lib/dal/Cargo.toml +++ b/lib/dal/Cargo.toml @@ -13,7 +13,6 @@ base64 = { workspace = true } blake3 = { workspace = true } chrono = { workspace = true } ciborium = { workspace = true } -content-store = { path = "../../lib/content-store" } convert_case = { workspace = true } council-server = { path = "../../lib/council-server" } derive_more = { workspace = true } diff --git a/lib/dal/src/attribute/prototype/argument.rs b/lib/dal/src/attribute/prototype/argument.rs index 6c97f45b52..61cdb54f6a 100644 --- a/lib/dal/src/attribute/prototype/argument.rs +++ b/lib/dal/src/attribute/prototype/argument.rs @@ -63,8 +63,6 @@ pub enum AttributePrototypeArgumentError { NodeWeight(#[from] NodeWeightError), #[error("serde json error: {0}")] Serde(#[from] serde_json::Error), - #[error("store error: {0}")] - Store(#[from] content_store::StoreError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("could not acquire lock: {0}")] diff --git a/lib/dal/src/context.rs b/lib/dal/src/context.rs index 8a9f9160e5..2a68e88a6c 100644 --- a/lib/dal/src/context.rs +++ b/lib/dal/src/context.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, collections::HashSet, mem, path::PathBuf, sync::Arc}; -use content_store::{PgStore, StoreError}; use futures::Future; use rebaser_client::ClientError as RebaserClientError; use rebaser_client::Config as RebaserClientConfig; @@ -61,8 +60,6 @@ pub struct ServicesContext { symmetric_crypto_service: SymmetricCryptoService, /// Config for the the rebaser service rebaser_config: RebaserClientConfig, - /// Content store - content_store_pg_pool: PgPool, /// The layer db (moka-rs, sled and postgres) layer_db: DalLayerDb, } @@ -80,7 +77,6 @@ impl ServicesContext { module_index_url: Option, symmetric_crypto_service: SymmetricCryptoService, rebaser_config: RebaserClientConfig, - content_store_pg_pool: PgPool, layer_db: DalLayerDb, ) -> Self { Self { @@ -93,7 +89,6 @@ impl ServicesContext { module_index_url, symmetric_crypto_service, rebaser_config, - content_store_pg_pool, layer_db, } } @@ -146,20 +141,11 @@ impl ServicesContext { &self.rebaser_config } - /// Gets a reference to the content store pg pool - pub fn content_store_pg_pool(&self) -> &PgPool { - &self.content_store_pg_pool - } - + /// Gets a reference to the Layer Db pub fn layer_db(&self) -> &DalLayerDb { &self.layer_db } - /// Builds and returns a new [`content_store::PgStore`] - pub async fn content_store(&self) -> content_store::StoreResult { - PgStore::new(self.content_store_pg_pool().clone()).await - } - /// Builds and returns a new [`Connections`]. pub async fn connections(&self) -> PgPoolResult { let pg_conn = self.pg_pool.get().await?; @@ -291,11 +277,6 @@ pub struct DalContext { /// Determines if we should not enqueue dependent value update jobs for attribute updates in /// this context. Useful for builtin migrations, since we don't care about attribute values propagation then. no_dependent_values: bool, - /// The content-addressable [`store`](content_store::Store) used by the "dal". - /// - /// This should be configurable in the future, but for now, the only kind of store used is the - /// [`PgStore`](content_store::PgStore). - content_store: Arc>, /// The workspace snapshot for this context workspace_snapshot: Option>, /// The change set pointer for this context @@ -789,11 +770,6 @@ impl DalContext { self.services_context.module_index_url.as_deref() } - /// Gets a reference to the content store. - pub fn content_store(&self) -> &Arc> { - &self.content_store - } - /// Determines if a standard model object matches the tenancy of the current context and /// is in the same visibility. pub async fn check_tenancy( @@ -909,8 +885,6 @@ impl DalContextBuilder { /// Constructs and returns a new [`DalContext`] using a default [`RequestContext`]. pub async fn build_default(&self) -> Result { let conns = self.services_context.connections().await?; - // should we move this into Connections? - let content_store = self.services_context.content_store().await?; Ok(DalContext { services_context: self.services_context.clone(), @@ -919,7 +893,6 @@ impl DalContextBuilder { tenancy: Tenancy::new_empty(), visibility: Visibility::new_head(), history_actor: HistoryActor::SystemInit, - content_store: Arc::new(Mutex::new(content_store)), no_dependent_values: self.no_dependent_values, workspace_snapshot: None, change_set_pointer: None, @@ -932,7 +905,6 @@ impl DalContextBuilder { access_builder: AccessBuilder, ) -> Result { let conns = self.services_context.connections().await?; - let content_store = self.services_context.content_store().await?; let mut ctx = DalContext { services_context: self.services_context.clone(), @@ -942,7 +914,6 @@ impl DalContextBuilder { history_actor: access_builder.history_actor, visibility: Visibility::new_head(), no_dependent_values: self.no_dependent_values, - content_store: Arc::new(Mutex::new(content_store)), workspace_snapshot: None, change_set_pointer: None, }; @@ -964,7 +935,7 @@ impl DalContextBuilder { request_context: RequestContext, ) -> Result { let conns = self.services_context.connections().await?; - let content_store = self.services_context.content_store().await?; + let mut ctx = DalContext { services_context: self.services_context.clone(), blocking: self.blocking, @@ -973,7 +944,6 @@ impl DalContextBuilder { visibility: request_context.visibility, history_actor: request_context.history_actor, no_dependent_values: self.no_dependent_values, - content_store: Arc::new(Mutex::new(content_store)), workspace_snapshot: None, change_set_pointer: None, }; @@ -1036,8 +1006,6 @@ pub enum TransactionsError { RebaserClient(#[from] RebaserClientError), #[error(transparent)] SerdeJson(#[from] serde_json::Error), - #[error("store error: {0}")] - Store(#[from] StoreError), #[error(transparent)] Tenancy(#[from] TenancyError), #[error("Unable to acquire lock: {0}")] diff --git a/lib/dal/src/lib.rs b/lib/dal/src/lib.rs index 9e5e8b13e1..726059fd89 100644 --- a/lib/dal/src/lib.rs +++ b/lib/dal/src/lib.rs @@ -149,8 +149,6 @@ pub enum ModelError { #[error("builtins error: {0}")] Builtins(#[from] BuiltinsError), #[error(transparent)] - ContentStorePg(#[from] content_store::StoreError), - #[error(transparent)] Migration(#[from] PgPoolError), #[error(transparent)] Nats(#[from] NatsError), @@ -166,11 +164,7 @@ pub type ModelResult = Result; #[instrument(level = "info", skip_all)] pub async fn migrate_all(services_context: &ServicesContext) -> ModelResult<()> { - migrate( - services_context.pg_pool(), - services_context.content_store_pg_pool(), - ) - .await?; + migrate(services_context.pg_pool()).await?; Ok(()) } @@ -200,8 +194,7 @@ pub async fn migrate_all_with_progress(services_context: &ServicesContext) -> Mo } #[instrument(level = "info", skip_all)] -pub async fn migrate(pg: &PgPool, content_store_pg_pool: &PgPool) -> ModelResult<()> { - content_store::PgStore::migrate(content_store_pg_pool).await?; +pub async fn migrate(pg: &PgPool) -> ModelResult<()> { pg.migrate(embedded::migrations::runner()).await?; Ok(()) } diff --git a/lib/dal/src/prop.rs b/lib/dal/src/prop.rs index 79b268a49b..0d3eba8e1a 100644 --- a/lib/dal/src/prop.rs +++ b/lib/dal/src/prop.rs @@ -72,8 +72,6 @@ pub enum PropError { SingleChildPropHasUnexpectedSiblings(PropId, PropId, Vec), #[error("no single child prop found for parent: {0}")] SingleChildPropNotFound(PropId), - #[error("store error: {0}")] - Store(#[from] content_store::StoreError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("could not acquire lock: {0}")] diff --git a/lib/dal/src/workspace_snapshot.rs b/lib/dal/src/workspace_snapshot.rs index 33e6b3dbb1..610451f6c2 100644 --- a/lib/dal/src/workspace_snapshot.rs +++ b/lib/dal/src/workspace_snapshot.rs @@ -34,9 +34,9 @@ use std::sync::Arc; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use chrono::{DateTime, Utc}; -use content_store::{ContentHash, Store, StoreError}; use petgraph::prelude::*; use si_data_pg::{PgError, PgRow}; +use si_events::ContentHash; use telemetry::prelude::*; use thiserror::Error; use tokio::time::Instant; @@ -81,8 +81,6 @@ pub enum WorkspaceSnapshotError { Postcard(#[from] postcard::Error), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), - #[error("store error: {0}")] - Store(#[from] StoreError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("could not acquire lock: {0}")] @@ -215,9 +213,6 @@ impl WorkspaceSnapshot { working_copy.mark_graph_seen(vector_clock_id)?; } - // Write out to the content store. - ctx.content_store().lock().await.write().await?; - // Stamp the new workspace snapshot. let serialized_snapshot = postcard::to_stdvec(&*self.working_copy().await)?; let row = ctx diff --git a/lib/dal/src/workspace_snapshot/content_address.rs b/lib/dal/src/workspace_snapshot/content_address.rs index 699718248a..3b34977016 100644 --- a/lib/dal/src/workspace_snapshot/content_address.rs +++ b/lib/dal/src/workspace_snapshot/content_address.rs @@ -1,5 +1,5 @@ -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use strum::EnumDiscriminants; #[remain::sorted] diff --git a/lib/dal/src/workspace_snapshot/graph.rs b/lib/dal/src/workspace_snapshot/graph.rs index 9cd15b374c..106ea6f63d 100644 --- a/lib/dal/src/workspace_snapshot/graph.rs +++ b/lib/dal/src/workspace_snapshot/graph.rs @@ -1,8 +1,8 @@ use chrono::Utc; -use content_store::{ContentHash, Store, StoreError}; use petgraph::stable_graph::Edges; use petgraph::{algo, prelude::*, visit::DfsEvent}; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use std::collections::{HashMap, HashSet, VecDeque}; use std::fs::File; use std::io::Write; @@ -45,8 +45,6 @@ pub enum WorkspaceSnapshotGraphError { ChangeSet(#[from] ChangeSetPointerError), #[error("Unable to retrieve content for ContentHash")] ContentMissingForContentHash, - #[error("Content store error: {0}")] - ContentStore(#[from] StoreError), #[error("Action would create a graph cycle")] CreateGraphCycle, #[error("could not find the newly imported subgraph when performing updates")] @@ -368,166 +366,166 @@ impl WorkspaceSnapshotGraph { Ok(source) } - pub async fn attribute_value_view( - &self, - content_store: &mut impl Store, - root_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult { - let mut view = serde_json::json![{}]; - let mut nodes_to_add = VecDeque::from([(root_index, "".to_string())]); - - while let Some((current_node_index, write_location)) = nodes_to_add.pop_front() { - let current_node_weight = self.get_node_weight(current_node_index)?; - let current_node_content: serde_json::Value = content_store - .get(¤t_node_weight.content_hash()) - .await? - .ok_or(WorkspaceSnapshotGraphError::ContentMissingForContentHash)?; - // We don't need to care what kind the prop is, since assigning a value via - // `pointer_mut` completely overwrites the existing value, regardless of any - // pre-existing data types. - let view_pointer = match view.pointer_mut(&write_location) { - Some(pointer) => { - *pointer = current_node_content.clone(); - pointer - } - None => { - // This is an error, and really shouldn't ever happen. - dbg!(view, write_location, current_node_content); - todo!(); - } - }; - - if current_node_content.is_null() { - // If the value we just inserted is "null", then there shouldn't be any child - // values, so don't bother looking for them in the graph to be able to add - // them to the work queue. - continue; - } - - // Find the ordering if there is one, so we can add the children in the proper order. - if let Some(child_ordering) = self.ordered_children_for_node(current_node_index)? { - for (child_position_index, &child_node_index) in child_ordering.iter().enumerate() { - // `.enumerate()` gives us 1-indexed, but we need 0-indexed. - - // We insert a JSON `Null` as a "place holder" for the write location. We need - // it to exist to be able to get a `pointer_mut` to it on the next time around, - // but we don't really care what it is, since we're going to completely - // overwrite it anyway. - for edge in self - .graph - .edges_connecting(current_node_index, child_node_index) - { - let child_position = match edge.weight().kind() { - EdgeWeightKind::Contain(Some(key)) => { - view_pointer - .as_object_mut() - .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? - .insert(key.clone(), serde_json::json![null]); - key.clone() - } - EdgeWeightKind::Contain(None) => { - if current_node_content.is_array() { - view_pointer - .as_array_mut() - .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? - .push(serde_json::json![null]); - child_position_index.to_string() - } else { - // Get prop name - if let NodeWeight::Prop(prop_weight) = self.get_node_weight( - self.prop_node_index_for_node_index(child_node_index)? - .ok_or(WorkspaceSnapshotGraphError::NoPropFound( - child_node_index, - ))?, - )? { - view_pointer - .as_object_mut() - .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? - .insert( - prop_weight.name().to_string(), - serde_json::json![null], - ); - prop_weight.name().to_string() - } else { - return Err(WorkspaceSnapshotGraphError::InvalidValueGraph); - } - } - } - _ => continue, - }; - let child_write_location = format!("{}/{}", write_location, child_position); - nodes_to_add.push_back((child_node_index, child_write_location)); - } - } - } else { - // The child nodes aren't explicitly ordered, so we'll need to come up with one of - // our own. We'll sort the nodes by their `NodeIndex`, which means that when a - // write last happened to them (or anywhere further towards the leaves) will - // determine their sorting in oldest to most recent order. - let mut child_index_to_position = HashMap::new(); - let mut child_indexes = Vec::new(); - let outgoing_edges = self.graph.edges_directed(current_node_index, Outgoing); - for edge_ref in outgoing_edges { - match edge_ref.weight().kind() { - EdgeWeightKind::Contain(Some(key)) => { - view_pointer - .as_object_mut() - .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? - .insert(key.clone(), serde_json::json![null]); - child_index_to_position.insert(edge_ref.target(), key.clone()); - child_indexes.push(edge_ref.target()); - } - EdgeWeightKind::Contain(None) => { - child_indexes.push(edge_ref.target()); - if current_node_content.is_array() { - view_pointer - .as_array_mut() - .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? - .push(serde_json::json![null]); - } else { - // Get prop name - if let NodeWeight::Prop(prop_weight) = self.get_node_weight( - self.prop_node_index_for_node_index(edge_ref.target())? - .ok_or(WorkspaceSnapshotGraphError::NoPropFound( - edge_ref.target(), - ))?, - )? { - view_pointer - .as_object_mut() - .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? - .insert( - prop_weight.name().to_string(), - serde_json::json![null], - ); - child_index_to_position - .insert(edge_ref.target(), prop_weight.name().to_string()); - child_indexes.push(edge_ref.target()); - } else { - return Err(WorkspaceSnapshotGraphError::InvalidValueGraph); - } - } - } - _ => continue, - } - } - child_indexes.sort(); - - for (child_position_index, child_node_index) in child_indexes.iter().enumerate() { - if let Some(key) = child_index_to_position.get(child_node_index) { - nodes_to_add - .push_back((*child_node_index, format!("{}/{}", write_location, key))); - } else { - nodes_to_add.push_back(( - *child_node_index, - format!("{}/{}", write_location, child_position_index), - )); - } - } - } - } - - Ok(view) - } + // pub async fn attribute_value_view( + // &self, + // content_store: &mut impl Store, + // root_index: NodeIndex, + // ) -> WorkspaceSnapshotGraphResult { + // let mut view = serde_json::json![{}]; + // let mut nodes_to_add = VecDeque::from([(root_index, "".to_string())]); + + // while let Some((current_node_index, write_location)) = nodes_to_add.pop_front() { + // let current_node_weight = self.get_node_weight(current_node_index)?; + // let current_node_content: serde_json::Value = content_store + // .get(¤t_node_weight.content_hash()) + // .await? + // .ok_or(WorkspaceSnapshotGraphError::ContentMissingForContentHash)?; + // // We don't need to care what kind the prop is, since assigning a value via + // // `pointer_mut` completely overwrites the existing value, regardless of any + // // pre-existing data types. + // let view_pointer = match view.pointer_mut(&write_location) { + // Some(pointer) => { + // *pointer = current_node_content.clone(); + // pointer + // } + // None => { + // // This is an error, and really shouldn't ever happen. + // dbg!(view, write_location, current_node_content); + // todo!(); + // } + // }; + + // if current_node_content.is_null() { + // // If the value we just inserted is "null", then there shouldn't be any child + // // values, so don't bother looking for them in the graph to be able to add + // // them to the work queue. + // continue; + // } + + // // Find the ordering if there is one, so we can add the children in the proper order. + // if let Some(child_ordering) = self.ordered_children_for_node(current_node_index)? { + // for (child_position_index, &child_node_index) in child_ordering.iter().enumerate() { + // // `.enumerate()` gives us 1-indexed, but we need 0-indexed. + + // // We insert a JSON `Null` as a "place holder" for the write location. We need + // // it to exist to be able to get a `pointer_mut` to it on the next time around, + // // but we don't really care what it is, since we're going to completely + // // overwrite it anyway. + // for edge in self + // .graph + // .edges_connecting(current_node_index, child_node_index) + // { + // let child_position = match edge.weight().kind() { + // EdgeWeightKind::Contain(Some(key)) => { + // view_pointer + // .as_object_mut() + // .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? + // .insert(key.clone(), serde_json::json![null]); + // key.clone() + // } + // EdgeWeightKind::Contain(None) => { + // if current_node_content.is_array() { + // view_pointer + // .as_array_mut() + // .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? + // .push(serde_json::json![null]); + // child_position_index.to_string() + // } else { + // // Get prop name + // if let NodeWeight::Prop(prop_weight) = self.get_node_weight( + // self.prop_node_index_for_node_index(child_node_index)? + // .ok_or(WorkspaceSnapshotGraphError::NoPropFound( + // child_node_index, + // ))?, + // )? { + // view_pointer + // .as_object_mut() + // .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? + // .insert( + // prop_weight.name().to_string(), + // serde_json::json![null], + // ); + // prop_weight.name().to_string() + // } else { + // return Err(WorkspaceSnapshotGraphError::InvalidValueGraph); + // } + // } + // } + // _ => continue, + // }; + // let child_write_location = format!("{}/{}", write_location, child_position); + // nodes_to_add.push_back((child_node_index, child_write_location)); + // } + // } + // } else { + // // The child nodes aren't explicitly ordered, so we'll need to come up with one of + // // our own. We'll sort the nodes by their `NodeIndex`, which means that when a + // // write last happened to them (or anywhere further towards the leaves) will + // // determine their sorting in oldest to most recent order. + // let mut child_index_to_position = HashMap::new(); + // let mut child_indexes = Vec::new(); + // let outgoing_edges = self.graph.edges_directed(current_node_index, Outgoing); + // for edge_ref in outgoing_edges { + // match edge_ref.weight().kind() { + // EdgeWeightKind::Contain(Some(key)) => { + // view_pointer + // .as_object_mut() + // .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? + // .insert(key.clone(), serde_json::json![null]); + // child_index_to_position.insert(edge_ref.target(), key.clone()); + // child_indexes.push(edge_ref.target()); + // } + // EdgeWeightKind::Contain(None) => { + // child_indexes.push(edge_ref.target()); + // if current_node_content.is_array() { + // view_pointer + // .as_array_mut() + // .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? + // .push(serde_json::json![null]); + // } else { + // // Get prop name + // if let NodeWeight::Prop(prop_weight) = self.get_node_weight( + // self.prop_node_index_for_node_index(edge_ref.target())? + // .ok_or(WorkspaceSnapshotGraphError::NoPropFound( + // edge_ref.target(), + // ))?, + // )? { + // view_pointer + // .as_object_mut() + // .ok_or(WorkspaceSnapshotGraphError::InvalidValueGraph)? + // .insert( + // prop_weight.name().to_string(), + // serde_json::json![null], + // ); + // child_index_to_position + // .insert(edge_ref.target(), prop_weight.name().to_string()); + // child_indexes.push(edge_ref.target()); + // } else { + // return Err(WorkspaceSnapshotGraphError::InvalidValueGraph); + // } + // } + // } + // _ => continue, + // } + // } + // child_indexes.sort(); + + // for (child_position_index, child_node_index) in child_indexes.iter().enumerate() { + // if let Some(key) = child_index_to_position.get(child_node_index) { + // nodes_to_add + // .push_back((*child_node_index, format!("{}/{}", write_location, key))); + // } else { + // nodes_to_add.push_back(( + // *child_node_index, + // format!("{}/{}", write_location, child_position_index), + // )); + // } + // } + // } + // } + + // Ok(view) + // } pub fn cleanup(&mut self) { let start = tokio::time::Instant::now(); diff --git a/lib/dal/src/workspace_snapshot/graph/tests.rs b/lib/dal/src/workspace_snapshot/graph/tests.rs index 00c61dad20..c23b6acb94 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests.rs @@ -2,11 +2,11 @@ mod rebase; #[cfg(test)] mod test { - use content_store::ContentHash; use petgraph::graph::NodeIndex; use petgraph::visit::EdgeRef; use petgraph::Outgoing; use pretty_assertions_sorted::assert_eq; + use si_events::ContentHash; use std::collections::HashMap; use std::collections::HashSet; diff --git a/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs b/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs index 54227cec05..a71436af62 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs @@ -1,7 +1,7 @@ #[cfg(test)] mod test { - use content_store::ContentHash; use pretty_assertions_sorted::assert_eq; + use si_events::ContentHash; use crate::change_set_pointer::ChangeSetPointer; use crate::workspace_snapshot::content_address::ContentAddress; diff --git a/lib/dal/src/workspace_snapshot/node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight.rs index 81f80e2f34..dbe7286d04 100644 --- a/lib/dal/src/workspace_snapshot/node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use strum::EnumDiscriminants; use thiserror::Error; use ulid::Ulid; diff --git a/lib/dal/src/workspace_snapshot/node_weight/attribute_prototype_argument_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/attribute_prototype_argument_node_weight.rs index 0c31edfadc..b0be58c334 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/attribute_prototype_argument_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/attribute_prototype_argument_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::{ diff --git a/lib/dal/src/workspace_snapshot/node_weight/attribute_value_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/attribute_value_node_weight.rs index 29a4ebebd7..dad5438b73 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/attribute_value_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/attribute_value_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::{ diff --git a/lib/dal/src/workspace_snapshot/node_weight/category_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/category_node_weight.rs index 1ff80cbd89..89904519a9 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/category_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/category_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use strum::Display; use ulid::Ulid; diff --git a/lib/dal/src/workspace_snapshot/node_weight/component_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/component_node_weight.rs index d3ec567d0d..718722b6a7 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/component_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/component_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::{ diff --git a/lib/dal/src/workspace_snapshot/node_weight/content_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/content_node_weight.rs index 678c5ef83c..386b4fc363 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/content_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/content_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::workspace_snapshot::vector_clock::VectorClockId; diff --git a/lib/dal/src/workspace_snapshot/node_weight/func_argument_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/func_argument_node_weight.rs index fa1ea9ac2d..cdc372268e 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/func_argument_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/func_argument_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; diff --git a/lib/dal/src/workspace_snapshot/node_weight/func_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/func_node_weight.rs index ef234774ad..6d831706bf 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/func_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/func_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::workspace_snapshot::content_address::ContentAddressDiscriminants; diff --git a/lib/dal/src/workspace_snapshot/node_weight/ordering_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/ordering_node_weight.rs index 203c1176ce..f1160893d9 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/ordering_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/ordering_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::change_set_pointer::ChangeSetPointer; diff --git a/lib/dal/src/workspace_snapshot/node_weight/prop_node_weight.rs b/lib/dal/src/workspace_snapshot/node_weight/prop_node_weight.rs index 0ccee7f470..42cffaedc2 100644 --- a/lib/dal/src/workspace_snapshot/node_weight/prop_node_weight.rs +++ b/lib/dal/src/workspace_snapshot/node_weight/prop_node_weight.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; -use content_store::ContentHash; use serde::{Deserialize, Serialize}; +use si_events::ContentHash; use ulid::Ulid; use crate::workspace_snapshot::content_address::ContentAddressDiscriminants; diff --git a/lib/dal/tests/integration.rs b/lib/dal/tests/integration.rs index 133b5237e6..13ca2b86ed 100644 --- a/lib/dal/tests/integration.rs +++ b/lib/dal/tests/integration.rs @@ -1,5 +1,4 @@ const TEST_PG_DBNAME: &str = "si_test_dal"; -const TEST_CONTENT_STORE_PG_DBNAME: &str = "si_test_content_store"; -const SI_TEST_LAYER_CACHE_PG_DBNAME: &str = "si_test_key_value_pairs"; +const SI_TEST_LAYER_CACHE_PG_DBNAME: &str = "si_test_layer_db"; mod integration_test; diff --git a/lib/dal/tests/integration_test/action/batch.rs b/lib/dal/tests/integration_test/action/batch.rs index db953c415f..8655835ff5 100644 --- a/lib/dal/tests/integration_test/action/batch.rs +++ b/lib/dal/tests/integration_test/action/batch.rs @@ -150,23 +150,29 @@ async fn set_started_at(ctx: &mut DalContext) { #[test] async fn set_finished_at(ctx: &mut DalContext) { + dbg!("yo"); let mut batch = ActionBatch::new(ctx, "batch", "paulo was here") .await .expect("unable to create action batch"); + + dbg!("made batch"); assert_eq!(batch.finished_at, None); let conflicts = ctx.blocking_commit().await.expect("unable to commit"); + dbg!("blocking commit"); assert!(conflicts.is_none()); ctx.update_snapshot_to_visibility() .await .expect("unable to update snapshot to visiblity"); + dbg!("update snap"); batch .set_finished_at(ctx) .await .expect("unable to set completion status"); assert!(batch.finished_at.is_some()); + dbg!("set finished at"); let conflicts = ctx.blocking_commit().await.expect("unable to commit"); assert!(conflicts.is_none()); diff --git a/lib/pinga-server/BUCK b/lib/pinga-server/BUCK index a6d2bb21ea..df87b49d1a 100644 --- a/lib/pinga-server/BUCK +++ b/lib/pinga-server/BUCK @@ -4,7 +4,6 @@ rust_library( name = "pinga-server", deps = [ "//lib/buck2-resources:buck2-resources", - "//lib/content-store:content-store", "//lib/dal:dal", "//lib/nats-subscriber:nats-subscriber", "//lib/rebaser-client:rebaser-client", diff --git a/lib/pinga-server/Cargo.toml b/lib/pinga-server/Cargo.toml index 330ba9e3e5..de2d833ff5 100644 --- a/lib/pinga-server/Cargo.toml +++ b/lib/pinga-server/Cargo.toml @@ -7,7 +7,6 @@ publish = false [dependencies] buck2-resources = { path = "../../lib/buck2-resources" } -content-store = { path = "../../lib/content-store" } dal = { path = "../../lib/dal" } derive_builder = { workspace = true } futures = { workspace = true } diff --git a/lib/pinga-server/src/config.rs b/lib/pinga-server/src/config.rs index ac894fe7b2..2a0471e424 100644 --- a/lib/pinga-server/src/config.rs +++ b/lib/pinga-server/src/config.rs @@ -1,7 +1,6 @@ use std::{env, path::Path}; use buck2_resources::Buck2Resources; -use content_store::PgStoreTools; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use si_crypto::{CryptoConfig, SymmetricCryptoServiceConfig, SymmetricCryptoServiceConfigFile}; @@ -60,9 +59,6 @@ pub struct Config { #[builder(default = "SymmetricCryptoServiceConfig::default()")] symmetric_crypto_service: SymmetricCryptoServiceConfig, - #[builder(default = "PgStoreTools::default_pool_config()")] - content_store_pg_pool: PgPoolConfig, - #[builder(default = "si_layer_cache::default_pg_pool_config()")] layer_cache_pg_pool: PgPoolConfig, @@ -111,12 +107,6 @@ impl Config { self.instance_id.as_ref() } - /// Gets a reference to the config's content store pg pool. - #[must_use] - pub fn content_store_pg_pool(&self) -> &PgPoolConfig { - &self.content_store_pg_pool - } - #[must_use] pub fn layer_cache_pg_pool(&self) -> &PgPoolConfig { &self.layer_cache_pg_pool @@ -132,8 +122,6 @@ impl Config { pub struct ConfigFile { #[serde(default)] pg: PgPoolConfig, - #[serde(default = "PgStoreTools::default_pool_config")] - content_store_pg: PgPoolConfig, #[serde(default = "si_layer_cache::default_pg_pool_config")] layer_cache_pg_pool: PgPoolConfig, #[serde(default)] @@ -152,7 +140,6 @@ impl Default for ConfigFile { fn default() -> Self { Self { pg: Default::default(), - content_store_pg: PgStoreTools::default_pool_config(), layer_cache_pg_pool: si_layer_cache::default_pg_pool_config(), nats: Default::default(), concurrency_limit: default_concurrency_limit(), @@ -175,7 +162,6 @@ impl TryFrom for Config { let mut config = Config::builder(); config.pg_pool(value.pg); - config.content_store_pg_pool(value.content_store_pg); config.layer_cache_pg_pool(value.layer_cache_pg_pool); config.nats(value.nats); config.crypto(value.crypto); @@ -247,7 +233,6 @@ fn buck2_development(config: &mut ConfigFile) -> Result<()> { extra_keys: vec![], }; config.pg.certificate_path = Some(postgres_key.clone().try_into()?); - config.content_store_pg.certificate_path = Some(postgres_key.clone().try_into()?); config.layer_cache_pg_pool.certificate_path = Some(postgres_key.try_into()?); Ok(()) @@ -281,7 +266,6 @@ fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { extra_keys: vec![], }; config.pg.certificate_path = Some(postgres_key.clone().try_into()?); - config.content_store_pg.certificate_path = Some(postgres_key.clone().try_into()?); config.layer_cache_pg_pool.certificate_path = Some(postgres_key.try_into()?); Ok(()) diff --git a/lib/pinga-server/src/server.rs b/lib/pinga-server/src/server.rs index d771f7d800..3ccb677317 100644 --- a/lib/pinga-server/src/server.rs +++ b/lib/pinga-server/src/server.rs @@ -110,7 +110,6 @@ impl Server { let encryption_key = Self::load_encryption_key(config.crypto().clone()).await?; let nats = Self::connect_to_nats(config.nats()).await?; let pg_pool = Self::create_pg_pool(config.pg_pool()).await?; - let content_store_pg_pool = Self::create_pg_pool(config.content_store_pg_pool()).await?; let veritech = Self::create_veritech_client(nats.clone()); let job_processor = Self::create_job_processor(nats.clone()); let symmetric_crypto_service = @@ -134,7 +133,6 @@ impl Server { None, symmetric_crypto_service, rebaser_config, - content_store_pg_pool, layer_db, ); diff --git a/lib/rebaser-server/BUCK b/lib/rebaser-server/BUCK index 2db3f02148..c2566df29b 100644 --- a/lib/rebaser-server/BUCK +++ b/lib/rebaser-server/BUCK @@ -4,7 +4,6 @@ rust_library( name = "rebaser-server", deps = [ "//lib/buck2-resources:buck2-resources", - "//lib/content-store:content-store", "//lib/dal:dal", "//lib/nats-subscriber:nats-subscriber", "//lib/rebaser-core:rebaser-core", diff --git a/lib/rebaser-server/Cargo.toml b/lib/rebaser-server/Cargo.toml index 2f0d79a837..2115bff265 100644 --- a/lib/rebaser-server/Cargo.toml +++ b/lib/rebaser-server/Cargo.toml @@ -6,7 +6,6 @@ publish = false [dependencies] buck2-resources = { path = "../../lib/buck2-resources" } -content-store = { path = "../../lib/content-store" } dal = { path = "../../lib/dal" } nats-subscriber = { path = "../../lib/nats-subscriber" } rebaser-core = { path = "../../lib/rebaser-core" } diff --git a/lib/rebaser-server/src/config.rs b/lib/rebaser-server/src/config.rs index 08370fae43..7207832455 100644 --- a/lib/rebaser-server/src/config.rs +++ b/lib/rebaser-server/src/config.rs @@ -1,7 +1,6 @@ use std::{env, path::Path}; use buck2_resources::Buck2Resources; -use content_store::PgStoreTools; use derive_builder::Builder; use rebaser_core::RebaserMessagingConfig; use serde::{Deserialize, Serialize}; @@ -56,9 +55,6 @@ pub struct Config { #[builder(default)] messaging_config: RebaserMessagingConfig, - #[builder(default = "PgStoreTools::default_pool_config()")] - content_store_pg_pool: PgPoolConfig, - #[builder(default = "si_layer_cache::default_pg_pool_config()")] layer_cache_pg_pool: PgPoolConfig, @@ -103,12 +99,6 @@ impl Config { &self.messaging_config } - /// Gets a reference to the config's content store pg pool. - #[must_use] - pub fn content_store_pg_pool(&self) -> &PgPoolConfig { - &self.content_store_pg_pool - } - /// Gets a reference to the layer cache's pg pool config. #[must_use] pub fn layer_cache_pg_pool(&self) -> &PgPoolConfig { @@ -127,8 +117,6 @@ impl Config { pub struct ConfigFile { #[serde(default)] pg: PgPoolConfig, - #[serde(default = "PgStoreTools::default_pool_config")] - content_store_pg: PgPoolConfig, #[serde(default = "si_layer_cache::default_pg_pool_config")] layer_cache_pg_pool: PgPoolConfig, #[serde(default)] @@ -145,7 +133,6 @@ impl Default for ConfigFile { fn default() -> Self { Self { pg: Default::default(), - content_store_pg: PgStoreTools::default_pool_config(), layer_cache_pg_pool: si_layer_cache::default_pg_pool_config(), nats: Default::default(), cyclone_encryption_key_path: default_cyclone_encryption_key_path(), @@ -167,7 +154,6 @@ impl TryFrom for Config { let mut config = Config::builder(); config.pg_pool(value.pg); - config.content_store_pg_pool(value.content_store_pg); config.layer_cache_pg_pool(value.layer_cache_pg_pool); config.nats(value.nats); config.cyclone_encryption_key_path(value.cyclone_encryption_key_path.try_into()?); @@ -235,7 +221,6 @@ fn buck2_development(config: &mut ConfigFile) -> Result<()> { extra_keys: vec![], }; config.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.content_store_pg.certificate_path = Some(postgres_cert.clone().try_into()?); config.layer_cache_pg_pool.certificate_path = Some(postgres_cert.try_into()?); Ok(()) @@ -269,7 +254,6 @@ fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { extra_keys: vec![], }; config.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.content_store_pg.certificate_path = Some(postgres_cert.clone().try_into()?); config.layer_cache_pg_pool.certificate_path = Some(postgres_cert.try_into()?); Ok(()) diff --git a/lib/rebaser-server/src/server.rs b/lib/rebaser-server/src/server.rs index af718a2ccb..a65e95568b 100644 --- a/lib/rebaser-server/src/server.rs +++ b/lib/rebaser-server/src/server.rs @@ -73,8 +73,6 @@ pub struct Server { graceful_shutdown_rx: oneshot::Receiver<()>, /// The messaging configuration messaging_config: RebaserMessagingConfig, - /// The pg pool for the content store - content_store_pg_pool: PgPool, /// The layer db layer_db: DalLayerDb, } @@ -89,7 +87,6 @@ impl Server { Self::load_encryption_key(config.cyclone_encryption_key_path()).await?; let nats = Self::connect_to_nats(config.nats()).await?; let pg_pool = Self::create_pg_pool(config.pg_pool()).await?; - let content_store_pg_pool = Self::create_pg_pool(config.content_store_pg_pool()).await?; let veritech = Self::create_veritech_client(nats.clone()); let job_processor = Self::create_job_processor(nats.clone()); let symmetric_crypto_service = @@ -111,7 +108,6 @@ impl Server { job_processor, symmetric_crypto_service, messaging_config.to_owned(), - content_store_pg_pool, layer_db, ) } @@ -127,7 +123,6 @@ impl Server { job_processor: Box, symmetric_crypto_service: SymmetricCryptoService, messaging_config: RebaserMessagingConfig, - content_store_pg_pool: PgPool, layer_db: DalLayerDb, ) -> ServerResult { // An mpsc channel which can be used to externally shut down the server. @@ -153,7 +148,6 @@ impl Server { external_shutdown_tx, graceful_shutdown_rx, messaging_config, - content_store_pg_pool, layer_db, }) } @@ -170,7 +164,6 @@ impl Server { self.encryption_key, self.shutdown_watch_rx, self.messaging_config, - self.content_store_pg_pool, self.layer_db, ) .await?; diff --git a/lib/rebaser-server/src/server/core_loop.rs b/lib/rebaser-server/src/server/core_loop.rs index eb69c63cb5..ab2644b102 100644 --- a/lib/rebaser-server/src/server/core_loop.rs +++ b/lib/rebaser-server/src/server/core_loop.rs @@ -50,7 +50,6 @@ pub(crate) async fn setup_and_run_core_loop( encryption_key: Arc, shutdown_watch_rx: watch::Receiver<()>, messaging_config: RebaserMessagingConfig, - content_store_pg_pool: PgPool, layer_db: DalLayerDb, ) -> CoreLoopSetupResult<()> { let services_context = ServicesContext::new( @@ -63,7 +62,6 @@ pub(crate) async fn setup_and_run_core_loop( None, symmetric_crypto_service, messaging_config.clone(), - content_store_pg_pool, layer_db, ); diff --git a/lib/sdf-server/BUCK b/lib/sdf-server/BUCK index c1c382aedc..d94ee2ebab 100644 --- a/lib/sdf-server/BUCK +++ b/lib/sdf-server/BUCK @@ -4,7 +4,6 @@ rust_library( name = "sdf-server", deps = [ "//lib/buck2-resources:buck2-resources", - "//lib/content-store:content-store", "//lib/dal:dal", "//lib/module-index-client:module-index-client", "//lib/nats-multiplexer:nats-multiplexer", diff --git a/lib/sdf-server/Cargo.toml b/lib/sdf-server/Cargo.toml index 06a0337bd7..dbbd7183b3 100644 --- a/lib/sdf-server/Cargo.toml +++ b/lib/sdf-server/Cargo.toml @@ -28,7 +28,6 @@ async-trait = { workspace = true } axum = { workspace = true } base64 = { workspace = true } chrono = { workspace = true } -content-store = { path = "../../lib/content-store" } convert_case = { workspace = true } derive_builder = { workspace = true } futures = { workspace = true } diff --git a/lib/sdf-server/src/server/config.rs b/lib/sdf-server/src/server/config.rs index 88330a2d82..403f5c5226 100644 --- a/lib/sdf-server/src/server/config.rs +++ b/lib/sdf-server/src/server/config.rs @@ -8,7 +8,6 @@ use std::{ }; use buck2_resources::Buck2Resources; -use content_store::PgStoreTools; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use si_crypto::{SymmetricCryptoServiceConfig, SymmetricCryptoServiceConfigFile}; @@ -82,9 +81,6 @@ pub struct Config { #[builder(default = "JwtConfig::default()")] jwt_signing_public_key: JwtConfig, - #[builder(default = "PgStoreTools::default_pool_config()")] - content_store_pg_pool: PgPoolConfig, - #[builder(default = "si_layer_cache::default_pg_pool_config()")] layer_cache_pg_pool: PgPoolConfig, @@ -163,12 +159,6 @@ impl Config { &self.module_index_url } - /// Gets a reference to the config's content store pg pool. - #[must_use] - pub fn content_store_pg_pool(&self) -> &PgPoolConfig { - &self.content_store_pg_pool - } - #[must_use] pub fn layer_cache_pg_pool(&self) -> &PgPoolConfig { &self.layer_cache_pg_pool @@ -194,8 +184,6 @@ impl ConfigBuilder { pub struct ConfigFile { #[serde(default)] pub pg: PgPoolConfig, - #[serde(default = "PgStoreTools::default_pool_config")] - pub content_store_pg: PgPoolConfig, #[serde(default = "si_layer_cache::default_pg_pool_config")] layer_cache_pg_pool: PgPoolConfig, #[serde(default)] @@ -222,7 +210,6 @@ impl Default for ConfigFile { fn default() -> Self { Self { pg: Default::default(), - content_store_pg: PgStoreTools::default_pool_config(), layer_cache_pg_pool: si_layer_cache::default_pg_pool_config(), nats: Default::default(), migration_mode: Default::default(), @@ -249,7 +236,6 @@ impl TryFrom for Config { let mut config = Config::builder(); config.pg_pool(value.pg); - config.content_store_pg_pool(value.content_store_pg); config.layer_cache_pg_pool(value.layer_cache_pg_pool); config.nats(value.nats); config.migration_mode(value.migration_mode); @@ -386,7 +372,6 @@ fn buck2_development(config: &mut ConfigFile) -> Result<()> { extra_keys: vec![], }; config.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.content_store_pg.certificate_path = Some(postgres_cert.clone().try_into()?); config.layer_cache_pg_pool.certificate_path = Some(postgres_cert.try_into()?); config.pkgs_path = pkgs_path; @@ -446,7 +431,6 @@ fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { extra_keys: vec![], }; config.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.content_store_pg.certificate_path = Some(postgres_cert.clone().try_into()?); config.layer_cache_pg_pool.certificate_path = Some(postgres_cert.try_into()?); config.pkgs_path = pkgs_path; diff --git a/lib/sdf-server/src/server/server.rs b/lib/sdf-server/src/server/server.rs index 0370fbcc1b..da2acf08f6 100644 --- a/lib/sdf-server/src/server/server.rs +++ b/lib/sdf-server/src/server/server.rs @@ -62,6 +62,8 @@ pub enum ServerError { Join(#[from] JoinError), #[error("jwt secret key error")] JwtSecretKey(#[from] dal::jwt_key::JwtKeyError), + #[error("layer db error: {0}")] + LayerDb(#[from] si_layer_cache::LayerDbError), #[error(transparent)] Model(#[from] dal::ModelError), #[error("Module index: {0}")] @@ -262,6 +264,7 @@ impl Server<(), ()> { #[instrument(name = "sdf.init.migrate_database", level = "info", skip_all)] pub async fn migrate_database(services_context: &ServicesContext) -> Result<()> { + services_context.layer_db().pg_migrate().await?; dal::migrate_all_with_progress(services_context).await?; migrate_builtins_from_module_index(services_context).await?; Ok(()) diff --git a/lib/sdf-server/tests/api.rs b/lib/sdf-server/tests/api.rs index 00d4190ba1..b4ea247b11 100644 --- a/lib/sdf-server/tests/api.rs +++ b/lib/sdf-server/tests/api.rs @@ -1,7 +1,6 @@ #![recursion_limit = "256"] const TEST_PG_DBNAME: &str = "si_test_sdf_server"; -const TEST_CONTENT_STORE_PG_DBNAME: &str = "si_test_content_store"; -const SI_TEST_LAYER_CACHE_PG_DBNAME: &str = "si_test_key_value_pairs"; +const SI_TEST_LAYER_CACHE_PG_DBNAME: &str = "si_test_layer_db"; mod service_tests; diff --git a/lib/si-layer-cache/src/db.rs b/lib/si-layer-cache/src/db.rs index a23c9bf550..2f1c76aa12 100644 --- a/lib/si-layer-cache/src/db.rs +++ b/lib/si-layer-cache/src/db.rs @@ -129,4 +129,13 @@ where pub fn instance_id(&self) -> Ulid { self.instance_id } + + /// Run all migrations + pub async fn pg_migrate(&self) -> LayerDbResult<()> { + // This will do all migrations, not just "cas" migrations. We might want + // to think about restructuring this + self.cas.cache.pg().migrate().await?; + + Ok(()) + } } diff --git a/lib/si-layer-cache/src/db/cache_updates.rs b/lib/si-layer-cache/src/db/cache_updates.rs index e71bd59c4b..f06338b4f7 100644 --- a/lib/si-layer-cache/src/db/cache_updates.rs +++ b/lib/si-layer-cache/src/db/cache_updates.rs @@ -17,6 +17,7 @@ use crate::{ }; #[derive(Copy, Clone, Debug, EnumString, AsRefStr)] +#[strum(serialize_all = "snake_case")] enum CacheName { Cas, } diff --git a/lib/si-layer-cache/src/layer_cache.rs b/lib/si-layer-cache/src/layer_cache.rs index 1d6f7276b8..dfbf33dfea 100644 --- a/lib/si-layer-cache/src/layer_cache.rs +++ b/lib/si-layer-cache/src/layer_cache.rs @@ -28,8 +28,7 @@ where pub async fn new(name: &str, fast_disk: sled::Db, pg_pool: PgPool) -> LayerDbResult { let disk_cache = Arc::new(DiskCache::new(fast_disk, name.as_bytes())?); - let pg = PgLayer::new(pg_pool, "cas"); - pg.migrate().await?; + let pg = PgLayer::new(pg_pool.clone(), "cas"); Ok(LayerCache { memory_cache: MemoryCache::new(), diff --git a/lib/si-layer-cache/tests/integration_test/db/cas.rs b/lib/si-layer-cache/tests/integration_test/db/cas.rs index 5cbf255ec5..a674a4aba3 100644 --- a/lib/si-layer-cache/tests/integration_test/db/cas.rs +++ b/lib/si-layer-cache/tests/integration_test/db/cas.rs @@ -16,6 +16,7 @@ async fn write_to_db() { ) .await .expect("cannot create layerdb"); + ldb.pg_migrate().await.expect("migrate layer db"); let cas_value: Arc = Arc::new(serde_json::json!("stone sour").into()); let (cas_pk, status) = ldb @@ -76,6 +77,7 @@ async fn write_and_read_many() { ) .await .expect("cannot create layerdb"); + ldb.pg_migrate().await.expect("migrate ldb"); let cas_values: Vec> = vec![ Arc::new(serde_json::json!("stone sour").into()), @@ -124,6 +126,7 @@ async fn cold_read_from_db() { ) .await .expect("cannot create layerdb"); + ldb.pg_migrate().await.expect("migrate layerdb"); let cas_value: Arc = Arc::new(serde_json::json!("stone sour").into()); let (cas_pk, status) = ldb @@ -215,6 +218,7 @@ async fn writes_are_gossiped() { ) .await .expect("cannot create layerdb"); + ldb_slash.pg_migrate().await.expect("migrate layerdb"); // Then, we need a layerdb for axl let ldb_axl = LayerDb::new( @@ -224,6 +228,7 @@ async fn writes_are_gossiped() { ) .await .expect("cannot create layerdb"); + ldb_axl.pg_migrate().await.expect("migrate layerdb"); let cas_value: Arc = Arc::new(serde_json::json!("stone sour").into()); let (cas_pk, status) = ldb_slash diff --git a/lib/si-layer-cache/tests/integration_test/layer_cache.rs b/lib/si-layer-cache/tests/integration_test/layer_cache.rs index cbfeece6c0..4dca0fe02c 100644 --- a/lib/si-layer-cache/tests/integration_test/layer_cache.rs +++ b/lib/si-layer-cache/tests/integration_test/layer_cache.rs @@ -8,9 +8,12 @@ async fn make_layer_cache(db_name: &str) -> LayerCache { let tempdir = tempfile::TempDir::new_in("/tmp").expect("cannot create tempdir"); let db = sled::open(tempdir).expect("unable to open sled database"); - LayerCache::new("test1", db, super::setup_pg_db(db_name).await) + let layer_cache = LayerCache::new("test1", db, super::setup_pg_db(db_name).await) .await - .expect("cannot create layer cache") + .expect("cannot create layer cache"); + layer_cache.pg().migrate().await.expect("migrate"); + + layer_cache } #[tokio::test] diff --git a/lib/si-test-macros/src/expand.rs b/lib/si-test-macros/src/expand.rs index 517b9b2283..fd75d4aad3 100644 --- a/lib/si-test-macros/src/expand.rs +++ b/lib/si-test-macros/src/expand.rs @@ -240,7 +240,6 @@ pub(crate) trait FnSetupExpander { self.code_extend(quote! { let test_context = ::dal_test::TestContext::global( crate::TEST_PG_DBNAME, - crate::TEST_CONTENT_STORE_PG_DBNAME, crate::SI_TEST_LAYER_CACHE_PG_DBNAME ).await?; });