Skip to content

Commit

Permalink
merge: #3483
Browse files Browse the repository at this point in the history
3483: feat(dal,si-events): Move node weights off the graph r=zacharyhamm a=zacharyhamm

Moves node weights off the internal petgraph, storing them in the layerdb. Local node weights now only store the address and the merkel tree hash (although we may move that off the local node weight as well). 

Co-authored-by: Zachary Hamm <[email protected]>
  • Loading branch information
si-bors-ng[bot] and zacharyhamm committed Apr 2, 2024
2 parents bbd1995 + 30be50d commit c064155
Show file tree
Hide file tree
Showing 60 changed files with 6,955 additions and 8,493 deletions.
134 changes: 66 additions & 68 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ directories = "5.0.1"
docker-api = "0.14.0"
dyn-clone = "1.0.17"
flate2 = "1.0.28"
futures = "0.3.30"
futures = {version = "0.3.30", features = ["executor"]}
futures-lite = "2.3.0"
hex = "0.4.3"
http = "0.2.12" # todo: upgrade this alongside hyper/axum/tokio-tungstenite/tower-http
Expand Down
6 changes: 3 additions & 3 deletions lib/dal-test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ rust_library(
deps = [
"//lib/buck2-resources:buck2-resources",
"//lib/council-server:council-server",
"//lib/dal:dal",
"//lib/dal:dal-integration-test",
"//lib/module-index-client:module-index-client",
"//lib/pinga-server:pinga-server",
"//lib/rebaser-server:rebaser-server",
"//lib/pinga-server:pinga-server-integration-test",
"//lib/rebaser-server:rebaser-server-integration-test",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
Expand Down
5 changes: 3 additions & 2 deletions lib/dal-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ impl Config {
config.pg.port = env::var(ENV_VAR_PG_PORT)
.unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string())
.parse()?;
config.pg.pool_max_size *= 32;

config.pg.pool_max_size = 4;
config.pg.certificate_path = Some(config.postgres_key_path.clone().try_into()?);

if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) {
Expand All @@ -151,7 +152,7 @@ impl Config {
config.layer_cache_pg_pool.port = env::var(ENV_VAR_PG_PORT)
.unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string())
.parse()?;
config.layer_cache_pg_pool.pool_max_size *= 32;
config.layer_cache_pg_pool.pool_max_size = 4;
config.layer_cache_pg_pool.certificate_path =
Some(config.postgres_key_path.clone().try_into()?);

Expand Down
83 changes: 81 additions & 2 deletions lib/dal/BUCK
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
load(
"@prelude-si//:macros.bzl",
"rust_library",
"rust_library_integration_test",
"rust_test",
)

Expand Down Expand Up @@ -75,6 +76,83 @@ rust_library(
test_unit_deps = [
"//third-party/rust:tempfile",
],
)

rust_library_integration_test(
name = "dal-integration-test",
crate = "dal",
deps = [
"//lib/si-cbor:si-cbor",
"//lib/council-server:council-server",
"//lib/nats-subscriber:nats-subscriber",
"//lib/object-tree:object-tree",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
"//lib/si-events-rs:si-events",
"//lib/si-hash:si-hash",
"//lib/si-layer-cache:si-layer-cache",
"//lib/si-pkg:si-pkg",
"//lib/si-std:si-std",
"//lib/telemetry-rs:telemetry",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/veritech-client:veritech-client",
"//third-party/rust:async-recursion",
"//third-party/rust:async-trait",
"//third-party/rust:base64",
"//third-party/rust:blake3",
"//third-party/rust:chrono",
"//third-party/rust:ciborium",
"//third-party/rust:convert_case",
"//third-party/rust:derive_more",
"//third-party/rust:diff",
"//third-party/rust:dyn-clone",
"//third-party/rust:futures",
"//third-party/rust:hex",
"//third-party/rust:iftree",
"//third-party/rust:itertools",
"//third-party/rust:jwt-simple",
"//third-party/rust:lazy_static",
"//third-party/rust:once_cell",
"//third-party/rust:paste",
"//third-party/rust:petgraph",
"//third-party/rust:postcard",
"//third-party/rust:postgres-types",
"//third-party/rust:pretty_assertions_sorted",
"//third-party/rust:rand",
"//third-party/rust:refinery",
"//third-party/rust:regex",
"//third-party/rust:remain",
"//third-party/rust:serde",
"//third-party/rust:serde-aux",
"//third-party/rust:serde_json",
"//third-party/rust:serde_with",
"//third-party/rust:sled",
"//third-party/rust:sodiumoxide",
"//third-party/rust:strum",
"//third-party/rust:thiserror",
"//third-party/rust:tokio",
"//third-party/rust:tokio-stream",
"//third-party/rust:ulid",
"//third-party/rust:url",
],
rustc_flags = [
"--cfg=integration_test",
],
srcs = glob([
"src/**/*.rs",
"src/builtins/func/**",
"src/builtins/schema/data/**/*.json",
"src/builtins/schema/definitions/**/*.json",
"src/migrations/**/*.sql",
"src/queries/**/*.sql",
]),
env = {
"CARGO_MANIFEST_DIR": ".",
},
test_unit_deps = [
"//third-party/rust:tempfile",
],
extra_test_targets = [":test-integration"],
)

Expand All @@ -83,7 +161,8 @@ rust_test(
deps = [
"//lib/dal-test:dal-test",
"//lib/rebaser-core:rebaser-core",
"//lib/rebaser-server:rebaser-server",
"//lib/rebaser-server:rebaser-server-integration-test",
"//lib/si-events-rs:si-events",
"//lib/si-pkg:si-pkg",
"//lib/veritech-client:veritech-client",
"//third-party/rust:base64",
Expand All @@ -98,7 +177,7 @@ rust_test(
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
"//third-party/rust:ulid",
":dal",
":dal-integration-test",
],
crate_root = "tests/integration.rs",
srcs = glob([
Expand Down
1 change: 1 addition & 0 deletions lib/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ veritech-client = { path = "../../lib/veritech-client" }

itertools = { workspace = true }
pretty_assertions_sorted = { workspace = true }
si-events = { path = "../../lib/si-events-rs" }
tempfile = { workspace = true }
tokio-util = { workspace = true }
2 changes: 1 addition & 1 deletion lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl ActionPrototype {
{
let node_weight = workspace_snapshot.get_node_weight(node_index).await?;
let id = node_weight.id();
if NodeWeightDiscriminants::Func == node_weight.into() {
if NodeWeightDiscriminants::Func == node_weight.as_ref().into() {
return Ok(id.into());
}
}
Expand Down
10 changes: 7 additions & 3 deletions lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl AttributePrototype {
{
let node_weight = workspace_snapshot.get_node_weight(node_index).await?;
let node_weight_id = node_weight.id();
if NodeWeightDiscriminants::Func == node_weight.into() {
if NodeWeightDiscriminants::Func == node_weight.as_ref().into() {
return Ok(node_weight_id.into());
}
}
Expand Down Expand Up @@ -356,6 +356,7 @@ impl AttributePrototype {
let (target_id, edge_weight_discrim) = match workspace_snapshot
.get_node_weight(prototype_edge_source)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => {
(prop_inner.id(), EdgeWeightKindDiscriminants::Prop)
Expand Down Expand Up @@ -390,6 +391,7 @@ impl AttributePrototype {
if let NodeWeight::AttributeValue(av_node_weight) = workspace_snapshot
.get_node_weight(attribute_value_target)
.await?
.as_ref()
{
attribute_value_ids.push(av_node_weight.id().into())
}
Expand Down Expand Up @@ -426,8 +428,10 @@ impl AttributePrototype {

Ok(match maybe_value_idxs.first().copied() {
Some(value_idx) => {
if let NodeWeight::AttributeValue(av_node_weight) =
workspace_snapshot.get_node_weight(value_idx).await?
if let NodeWeight::AttributeValue(av_node_weight) = workspace_snapshot
.get_node_weight(value_idx)
.await?
.as_ref()
{
Some(av_node_weight.id().into())
} else {
Expand Down
6 changes: 3 additions & 3 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl AttributePrototypeArgument {
)
.await?
{
match workspace_snapshot.get_node_weight(node_idx).await? {
match workspace_snapshot.get_node_weight(node_idx).await?.as_ref() {
NodeWeight::Content(inner) => {
let inner_addr_discrim: ContentAddressDiscriminants =
inner.content_address().into();
Expand Down Expand Up @@ -318,7 +318,7 @@ impl AttributePrototypeArgument {
.into_iter()
.next()
{
match workspace_snapshot.get_node_weight(target).await? {
match workspace_snapshot.get_node_weight(target).await?.as_ref() {
NodeWeight::Prop(inner) => {
return Ok(Some(ValueSource::Prop(inner.id().into())));
}
Expand Down Expand Up @@ -513,7 +513,7 @@ impl AttributePrototypeArgument {

for idx in apa_node_idxs {
let node_weight = workspace_snapshot.get_node_weight(idx).await?;
if let NodeWeight::AttributePrototypeArgument(apa_weight) = &node_weight {
if let NodeWeight::AttributePrototypeArgument(apa_weight) = node_weight.as_ref() {
if let Some(ArgumentTargets {
destination_component_id,
..
Expand Down
19 changes: 11 additions & 8 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,10 @@ impl AttributeValue {
let workspace_snapshot = ctx.workspace_snapshot()?;

let prop_node_index = workspace_snapshot.get_node_index_by_id(prop_id).await?;
if let NodeWeight::Prop(prop_inner) =
workspace_snapshot.get_node_weight(prop_node_index).await?
if let NodeWeight::Prop(prop_inner) = workspace_snapshot
.get_node_weight(prop_node_index)
.await?
.as_ref()
{
prop_inner.kind()
} else {
Expand Down Expand Up @@ -1275,6 +1277,7 @@ impl AttributeValue {
.workspace_snapshot()?
.get_node_weight(node_index)
.await?
.as_ref()
{
prop_map.insert(
prop_inner.name().to_string(),
Expand Down Expand Up @@ -1372,6 +1375,7 @@ impl AttributeValue {
match workspace_snapshot
.get_node_weight(element_prop_index)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => (prop_inner.id(), prop_inner.kind()),
_ => {
Expand Down Expand Up @@ -1616,6 +1620,7 @@ impl AttributeValue {
match workspace_snapshot
.get_node_weight(element_prop_index)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => (prop_inner.id(), prop_inner.kind()),
_ => {
Expand Down Expand Up @@ -1817,7 +1822,7 @@ impl AttributeValue {
view: Option<serde_json::Value>,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (av_idx, av_node_weight) = {
let (_, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;
Expand Down Expand Up @@ -1862,7 +1867,6 @@ impl AttributeValue {
workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

Ok(())
}
Expand All @@ -1877,7 +1881,7 @@ impl AttributeValue {
func_execution_pk: FuncExecutionPk,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (av_idx, av_node_weight) = {
let (_av_idx, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;
Expand Down Expand Up @@ -1938,7 +1942,6 @@ impl AttributeValue {
workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

Ok(())
}
Expand Down Expand Up @@ -1975,7 +1978,7 @@ impl AttributeValue {
.await?
{
let target_node_weight = workspace_snapshot.get_node_weight(target).await?;
if let NodeWeight::Prop(prop_node_weight) = &target_node_weight {
if let NodeWeight::Prop(prop_node_weight) = target_node_weight.as_ref() {
maybe_prop_id = match maybe_prop_id {
Some(already_found_prop_id) => {
return Err(AttributeValueError::MultiplePropsFound(
Expand Down Expand Up @@ -2094,7 +2097,7 @@ impl AttributeValue {
.pop()
{
let node_weight = workspace_snapshot.get_node_weight(ordering).await?;
if let NodeWeight::Ordering(ordering_weight) = node_weight {
if let NodeWeight::Ordering(ordering_weight) = node_weight.as_ref() {
Ok(ordering_weight
.order()
.clone()
Expand Down
6 changes: 2 additions & 4 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ impl Component {
if let NodeWeight::Content(content) = workspace_snapshot
.get_node_weight(maybe_schema_variant_index)
.await?
.as_ref()
{
let content_hash_discriminants: ContentAddressDiscriminants =
content.content_address().into();
Expand Down Expand Up @@ -832,7 +833,7 @@ impl Component {
.await?
{
let target_node_weight = workspace_snapshot.get_node_weight(target).await?;
if let NodeWeight::AttributeValue(_) = target_node_weight {
if let NodeWeight::AttributeValue(_) = target_node_weight.as_ref() {
maybe_root_attribute_value_id = match maybe_root_attribute_value_id {
Some(already_found_root_attribute_value_id) => {
return Err(ComponentError::MultipleRootAttributeValuesFound(
Expand Down Expand Up @@ -1242,9 +1243,6 @@ impl Component {
ctx.workspace_snapshot()?
.add_node(NodeWeight::Component(new_component_node_weight))
.await?;
ctx.workspace_snapshot()?
.replace_references(component_idx)
.await?;
}

let updated = ComponentContentV1::from(component.clone());
Expand Down
3 changes: 2 additions & 1 deletion lib/dal/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::time::Instant;
use veritech_client::{Client as VeritechClient, CycloneEncryptionKey};

use crate::layer_db_types::ContentTypes;
use crate::workspace_snapshot::node_weight::NodeWeight;
use crate::workspace_snapshot::{
conflict::Conflict, graph::WorkspaceSnapshotGraph, update::Update, vector_clock::VectorClockId,
};
Expand All @@ -36,7 +37,7 @@ use crate::{
};
use crate::{EncryptedSecret, Workspace};

pub type DalLayerDb = LayerDb<ContentTypes, EncryptedSecret, WorkspaceSnapshotGraph>;
pub type DalLayerDb = LayerDb<ContentTypes, EncryptedSecret, WorkspaceSnapshotGraph, NodeWeight>;

/// A context type which contains handles to common core service dependencies.
///
Expand Down
Loading

0 comments on commit c064155

Please sign in to comment.