Skip to content

Commit

Permalink
agent: ignore disabled bindings in reads_to and writes_from columns
Browse files Browse the repository at this point in the history
Fixes a bug in the agen publications handler, where it was setting `reads_from`
and `writes_to` for derivation and materialization bindings that are disabled.
This updates the logic to ignore disabled bindings when setting those values.

Also removes the `MaterializationBinding::non_null_resource` function, which
was vestigial and confusing, and cleans up some of the `validation` code to no
longer rely on that function.
  • Loading branch information
psFried committed Jul 19, 2023
1 parent 6d4726b commit 32355cf
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 21 deletions.
12 changes: 7 additions & 5 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ fn evolve_collection(
.filter(|b| b.source.collection() == &old_collection)
{
// If we're re-creating the collection, then update the source in place.
// We do this even for disabled bindings, so that the spec is up to date
// with the latest changes to the rest of the catalog.
if re_create_collection {
binding
.source
Expand All @@ -376,13 +378,13 @@ fn evolve_collection(
continue;
};

let Some(resource) = binding.non_null_resource() else {
// The binding is disabled, so no need to update anything else.
// Don't update resources for disabled bindings.
if binding.disable {
continue;
};
}
// Parse the current resource spec into a `Value` that we can mutate
let mut resource_spec: Value =
serde_json::from_str(resource.get()).with_context(|| {
let mut resource_spec: Value = serde_json::from_str(binding.resource.get())
.with_context(|| {
format!(
"parsing materialization resource spec of '{}' binding for '{}",
mat_name, &new_name
Expand Down
8 changes: 6 additions & 2 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,9 @@ fn extract_spec_metadata<'a>(

if let Some(derivation) = &collection.derive {
for tdef in &derivation.transforms {
reads_from.push(tdef.source.collection().as_ref());
if !tdef.disable {
reads_from.push(tdef.source.collection().as_ref());
}
}
reads_from.reserve(1);
}
Expand All @@ -642,7 +644,9 @@ fn extract_spec_metadata<'a>(
image_parts = Some(split_tag(&config.image));
}
for binding in &materialization.bindings {
reads_from.push(binding.source.collection().as_ref());
if !binding.disable {
reads_from.push(binding.source.collection().as_ref());
}
}
reads_from.reserve(1);
}
Expand Down
10 changes: 0 additions & 10 deletions crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ pub struct MaterializationBinding {
pub fields: MaterializationFields,
}

impl MaterializationBinding {
pub fn non_null_resource(&self) -> Option<&RawValue> {
if self.disable {
None
} else {
Some(&self.resource)
}
}
}

/// MaterializationFields defines a selection of projections to materialize,
/// as well as optional per-projection, driver-specific configuration.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
Expand Down
4 changes: 2 additions & 2 deletions crates/sources/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,15 +658,15 @@ impl<F: Fetcher> Loader<F> {
};

for (index, binding) in spec.bindings.iter().enumerate() {
if let Some(resource) = binding.non_null_resource() {
if !binding.disable {
tasks.push(
async move {
self.load_config(
scope
.push_prop("bindings")
.push_item(index)
.push_prop("resource"),
resource,
&binding.resource,
)
.await
}
Expand Down
4 changes: 3 additions & 1 deletion crates/validation/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ fn walk_capture_binding<'a>(
errors: &mut tables::Errors,
) -> Option<capture::request::validate::Binding> {
let models::CaptureBinding {
resource, target, ..
resource,
target,
disable: _,
} = binding;

// We must resolve the target collection to continue.
Expand Down
2 changes: 1 addition & 1 deletion crates/validation/src/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub async fn walk_all_materializations<C: Connectors>(
..
} = binding_models
.iter()
.filter(|b| b.non_null_resource().is_some())
.filter(|b| !b.disable)
.nth(binding_index)
.expect("models bindings are consistent with validation requests bindings");

Expand Down

0 comments on commit 32355cf

Please sign in to comment.