From 32355cff550709c64e23290a833cddc6bebd3440 Mon Sep 17 00:00:00 2001 From: Phil Date: Wed, 19 Jul 2023 10:52:56 -0400 Subject: [PATCH] agent: ignore disabled bindings in reads_to and writes_from columns Fixes a bug in the agen publications handler, where it was setting `reads_from` and `writes_to` for derivation and materialization bindings that are disabled. This updates the logic to ignore disabled bindings when setting those values. Also removes the `MaterializationBinding::non_null_resource` function, which was vestigial and confusing, and cleans up some of the `validation` code to no longer rely on that function. --- crates/agent/src/evolution.rs | 12 +++++++----- crates/agent/src/publications/specs.rs | 8 ++++++-- crates/models/src/materializations.rs | 10 ---------- crates/sources/src/loader.rs | 4 ++-- crates/validation/src/capture.rs | 4 +++- crates/validation/src/materialization.rs | 2 +- 6 files changed, 19 insertions(+), 21 deletions(-) diff --git a/crates/agent/src/evolution.rs b/crates/agent/src/evolution.rs index 7fa894d187..d2c186115c 100644 --- a/crates/agent/src/evolution.rs +++ b/crates/agent/src/evolution.rs @@ -364,6 +364,8 @@ fn evolve_collection( .filter(|b| b.source.collection() == &old_collection) { // If we're re-creating the collection, then update the source in place. + // We do this even for disabled bindings, so that the spec is up to date + // with the latest changes to the rest of the catalog. if re_create_collection { binding .source @@ -376,13 +378,13 @@ fn evolve_collection( continue; }; - let Some(resource) = binding.non_null_resource() else { - // The binding is disabled, so no need to update anything else. + // Don't update resources for disabled bindings. + if binding.disable { continue; - }; + } // Parse the current resource spec into a `Value` that we can mutate - let mut resource_spec: Value = - serde_json::from_str(resource.get()).with_context(|| { + let mut resource_spec: Value = serde_json::from_str(binding.resource.get()) + .with_context(|| { format!( "parsing materialization resource spec of '{}' binding for '{}", mat_name, &new_name diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index deaf12a2ef..a3f55b701b 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -628,7 +628,9 @@ fn extract_spec_metadata<'a>( if let Some(derivation) = &collection.derive { for tdef in &derivation.transforms { - reads_from.push(tdef.source.collection().as_ref()); + if !tdef.disable { + reads_from.push(tdef.source.collection().as_ref()); + } } reads_from.reserve(1); } @@ -642,7 +644,9 @@ fn extract_spec_metadata<'a>( image_parts = Some(split_tag(&config.image)); } for binding in &materialization.bindings { - reads_from.push(binding.source.collection().as_ref()); + if !binding.disable { + reads_from.push(binding.source.collection().as_ref()); + } } reads_from.reserve(1); } diff --git a/crates/models/src/materializations.rs b/crates/models/src/materializations.rs index 26068146f8..ce31f7d5b5 100644 --- a/crates/models/src/materializations.rs +++ b/crates/models/src/materializations.rs @@ -48,16 +48,6 @@ pub struct MaterializationBinding { pub fields: MaterializationFields, } -impl MaterializationBinding { - pub fn non_null_resource(&self) -> Option<&RawValue> { - if self.disable { - None - } else { - Some(&self.resource) - } - } -} - /// MaterializationFields defines a selection of projections to materialize, /// as well as optional per-projection, driver-specific configuration. #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] diff --git a/crates/sources/src/loader.rs b/crates/sources/src/loader.rs index 6d202a8689..73ea43bb4c 100644 --- a/crates/sources/src/loader.rs +++ b/crates/sources/src/loader.rs @@ -658,7 +658,7 @@ impl Loader { }; for (index, binding) in spec.bindings.iter().enumerate() { - if let Some(resource) = binding.non_null_resource() { + if !binding.disable { tasks.push( async move { self.load_config( @@ -666,7 +666,7 @@ impl Loader { .push_prop("bindings") .push_item(index) .push_prop("resource"), - resource, + &binding.resource, ) .await } diff --git a/crates/validation/src/capture.rs b/crates/validation/src/capture.rs index 7ebeb76b52..9c3518b900 100644 --- a/crates/validation/src/capture.rs +++ b/crates/validation/src/capture.rs @@ -256,7 +256,9 @@ fn walk_capture_binding<'a>( errors: &mut tables::Errors, ) -> Option { let models::CaptureBinding { - resource, target, .. + resource, + target, + disable: _, } = binding; // We must resolve the target collection to continue. diff --git a/crates/validation/src/materialization.rs b/crates/validation/src/materialization.rs index ae06bffc38..fa3dd672ff 100644 --- a/crates/validation/src/materialization.rs +++ b/crates/validation/src/materialization.rs @@ -127,7 +127,7 @@ pub async fn walk_all_materializations( .. } = binding_models .iter() - .filter(|b| b.non_null_resource().is_some()) + .filter(|b| !b.disable) .nth(binding_index) .expect("models bindings are consistent with validation requests bindings");