Skip to content

Commit

Permalink
merge: #3482
Browse files Browse the repository at this point in the history
3482: feat(rebaser): Rebaser uses layerdb activites r=adamhjk a=adamhjk

This refactors the rebaser to use a layerdb activities stream. Under the hood it uses a NATS Jetstream work queue.

* Makes subscriptions to activities multiplexed (unless they are a work queue)
* Adds integration test activities
* Adds rebase and rebase_and_wait, which waits for a finish event

Co-authored-by: Adam Jacob <[email protected]>
  • Loading branch information
si-bors-ng[bot] and adamhjk authored Mar 28, 2024
2 parents 95855b6 + ed2bc7c commit 1d0ec39
Show file tree
Hide file tree
Showing 36 changed files with 1,047 additions and 787 deletions.
21 changes: 1 addition & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ members = [
"lib/naxum",
"lib/object-tree",
"lib/pinga-server",
"lib/rebaser-client",
"lib/rebaser-core",
"lib/rebaser-server",
"lib/sdf-server",
Expand Down
1 change: 0 additions & 1 deletion bin/sdf/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ rust_binary(
name = "sdf",
deps = [
"//lib/nats-multiplexer:nats-multiplexer",
"//lib/rebaser-client:rebaser-client",
"//lib/sdf-server:sdf-server",
"//lib/si-std:si-std",
"//lib/telemetry-application-rs:telemetry-application",
Expand Down
1 change: 0 additions & 1 deletion bin/sdf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ path = "src/main.rs"

[dependencies]
nats-multiplexer = { path = "../../lib/nats-multiplexer" }
rebaser-client = { path = "../../lib/rebaser-client" }
sdf-server = { path = "../../lib/sdf-server" }
si-std = { path = "../../lib/si-std" }
telemetry-application = { path = "../../lib/telemetry-application-rs" }
Expand Down
7 changes: 0 additions & 7 deletions bin/sdf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use sdf_server::{
use telemetry_application::prelude::*;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

use rebaser_client::Config as RebaserClientConfig;

mod args;

type JobProcessor = sdf_server::NatsProcessor;
Expand Down Expand Up @@ -111,10 +109,6 @@ async fn async_main() -> Result<()> {

let module_index_url = config.module_index_url().to_string();

// TODO: accept command line arguments and or environment variables to configure the rebaser
// client
let rebaser_config = RebaserClientConfig::default();

let (ws_multiplexer, ws_multiplexer_client) =
Multiplexer::new(&nats_conn, WS_MULTIPLEXER_SUBJECT).await?;
let (crdt_multiplexer, crdt_multiplexer_client) =
Expand All @@ -138,7 +132,6 @@ async fn async_main() -> Result<()> {
Some(pkgs_path),
Some(module_index_url),
symmetric_crypto_service,
rebaser_config,
layer_db,
);

Expand Down
1 change: 0 additions & 1 deletion lib/dal-test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ rust_library(
"//lib/dal:dal",
"//lib/module-index-client:module-index-client",
"//lib/pinga-server:pinga-server",
"//lib/rebaser-client:rebaser-client",
"//lib/rebaser-server:rebaser-server",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
Expand Down
1 change: 0 additions & 1 deletion lib/dal-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ names = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry_sdk = { workspace = true }
pinga-server = { path = "../../lib/pinga-server" }
rebaser-client = { path = "../../lib/rebaser-client"}
rebaser-server = { path = "../../lib/rebaser-server" }
remain = { workspace = true }
serde = { workspace = true }
Expand Down
23 changes: 0 additions & 23 deletions lib/dal-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use dal::{
use derive_builder::Builder;
use jwt_simple::prelude::RS256KeyPair;
use lazy_static::lazy_static;
use rebaser_client::Config as RebaserClientConfig;
use si_crypto::{
SymmetricCryptoService, SymmetricCryptoServiceConfig, SymmetricCryptoServiceConfigFile,
};
Expand Down Expand Up @@ -109,8 +108,6 @@ pub struct Config {
symmetric_crypto_service_config: SymmetricCryptoServiceConfig,
// TODO(nick): determine why this is unused.
#[allow(dead_code)]
#[builder(default)]
rebaser_config: RebaserClientConfig,
#[builder(default = "si_layer_cache::default_pg_pool_config()")]
layer_cache_pg_pool: PgPoolConfig,
}
Expand Down Expand Up @@ -219,8 +216,6 @@ pub struct TestContext {
layer_db_pg_pool: PgPool,
/// The sled path for the layer db
layer_db_sled_path: String,
/// The configuration for the rebaser client used in tests
rebaser_config: RebaserClientConfig,
}

impl TestContext {
Expand Down Expand Up @@ -313,7 +308,6 @@ impl TestContext {
self.config.pkgs_path.to_owned(),
None,
self.symmetric_crypto_service.clone(),
self.rebaser_config.clone(),
layer_db,
)
}
Expand Down Expand Up @@ -396,17 +390,13 @@ impl TestContextBuilder {
SymmetricCryptoService::from_config(&self.config.symmetric_crypto_service_config)
.await?;

let mut rebaser_config = RebaserClientConfig::default();
rebaser_config.set_subject_prefix(universal_prefix);

Ok(TestContext {
config,
pg_pool,
nats_conn,
job_processor,
encryption_key: self.encryption_key.clone(),
symmetric_crypto_service,
rebaser_config,
layer_db_pg_pool,
layer_db_sled_path: si_layer_cache::disk_cache::default_sled_path()?.to_string(),
})
Expand Down Expand Up @@ -540,23 +530,13 @@ pub fn pinga_server(services_context: &ServicesContext) -> Result<pinga_server::
/// Configures and builds a [`rebaser_server::Server`] suitable for running alongside DAL
/// object-related tests.
pub fn rebaser_server(services_context: &ServicesContext) -> Result<rebaser_server::Server> {
let _config: rebaser_server::Config = {
let mut config_file = rebaser_server::ConfigFile::default();
rebaser_server::detect_and_configure_development(&mut config_file)
.wrap_err("failed to detect and configure Rebaser ConfigFile")?;
config_file
.try_into()
.wrap_err("failed to build Rebaser server config")?
};

let server = rebaser_server::Server::from_services(
services_context.encryption_key(),
services_context.nats_conn().clone(),
services_context.pg_pool().clone(),
services_context.veritech().clone(),
services_context.job_processor(),
services_context.symmetric_crypto_service().clone(),
services_context.rebaser_config().clone(),
services_context.layer_db().clone(),
)
.wrap_err("failed to create Rebaser server")?;
Expand Down Expand Up @@ -702,7 +682,6 @@ async fn global_setup(test_context_builer: TestContextBuilder) -> Result<()> {
.expect("no pkgs path configured"),
test_context.config.module_index_url.clone(),
services_ctx.symmetric_crypto_service(),
services_ctx.rebaser_config().clone(),
services_ctx.layer_db().clone(),
)
.await
Expand Down Expand Up @@ -745,7 +724,6 @@ async fn migrate_local_builtins(
pkgs_path: PathBuf,
module_index_url: String,
symmetric_crypto_service: &SymmetricCryptoService,
rebaser_config: RebaserClientConfig,
layer_db: DalLayerDb,
) -> ModelResult<()> {
let services_context = ServicesContext::new(
Expand All @@ -757,7 +735,6 @@ async fn migrate_local_builtins(
Some(pkgs_path),
Some(module_index_url),
symmetric_crypto_service.clone(),
rebaser_config,
layer_db.clone(),
);
let dal_context = services_context.into_builder(true);
Expand Down
2 changes: 0 additions & 2 deletions lib/dal/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ rust_library(
"//lib/council-server:council-server",
"//lib/nats-subscriber:nats-subscriber",
"//lib/object-tree:object-tree",
"//lib/rebaser-client:rebaser-client",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
Expand Down Expand Up @@ -83,7 +82,6 @@ rust_test(
name = "test-integration",
deps = [
"//lib/dal-test:dal-test",
"//lib/rebaser-client:rebaser-client",
"//lib/rebaser-core:rebaser-core",
"//lib/rebaser-server:rebaser-server",
"//lib/si-pkg:si-pkg",
Expand Down
1 change: 0 additions & 1 deletion lib/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ petgraph = { workspace = true }
postcard = { version = "1.0.8", features = ["alloc"] }
postgres-types = { workspace = true }
rand = { workspace = true }
rebaser-client = { path = "../../lib/rebaser-client" }
refinery = { workspace = true }
regex = { workspace = true }
remain = { workspace = true }
Expand Down
Loading

0 comments on commit 1d0ec39

Please sign in to comment.