Skip to content

Commit

Permalink
models: disable bindings using a boolean
Browse files Browse the repository at this point in the history
This changes the representation of disabled capture and materialization
bindings in Flow specs.  We previously used `target: null` and `resource: null`
to represent disabled capture and materialization bindings respctively.  That
change was introduced quite recently and has not seen any use yet.  This commit
changes both capture and materialization bindings to use `disabled: true`.

Ultimately, the reason for this is to make it easier to programatically
manipulate Flow specs.  Say a user runs a discover of an existing capture via
the UI. Afterward, the draft could contain a bunch of disabled bindings with
`target: null` along with a corresponding collection for each disabled binding.
The UI needs those collections so that it can make sure they're included in the
draft if the user re-enables any of the disabled bindings. But the UI would
have no idea how to match a disabled binding with its corresponding collection
spec because the `target` is null. We'd have a similar problem in Flowctl. The
output of a discover needs to somehow communicate the relationship between each
binding and the collection that goes along with it. Rather than introduce a
separate data structure containing these relationships, it seems preferable to
just keep them within the specs themselves.

This ends up being a simpler, it works the same for both captures and
materializations, and seems a bit more obvious and clear. It also means that
disabled capture bindings would be required to have a non-null `target`.  The
validation logic is skipped for bindings that are disabled, so disabled
bindings can have _any_ collection name. It's a little awkward, but it seems
preferable to the alternative of having a separate data structure containing
the mappings from resources to collections.
  • Loading branch information
psFried committed Jul 7, 2023
1 parent 4c00155 commit 6066006
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 134 deletions.
7 changes: 2 additions & 5 deletions crates/agent/src/discovers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,14 @@ impl DiscoverHandler {
let targets = capture
.bindings
.iter()
.map(|models::CaptureBinding { target, .. }| target.as_ref().cloned())
.map(|models::CaptureBinding { target, .. }| target.clone())
.collect::<Vec<_>>();

catalog.captures.insert(capture_name, capture); // Replace merged capture.

// Now resolve all targeted collections, if they exist.
let resolved = agent_sql::discovers::resolve_merge_target_specs(
&targets
.iter()
.flat_map(|t| t.iter().map(models::Collection::as_str))
.collect::<Vec<_>>(),
&targets.iter().map(|t| t.as_str()).collect::<Vec<_>>(),
CatalogType::Collection,
draft_id,
user_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ expression: json!(out)
"target": "acmeCo/renamed"
},
{
"disable": true,
"resource": {
"modified": "yup",
"stream": "disabled"
},
"target": null
"target": "test/collection/disabled"
}
],
"endpoint": {
Expand Down
16 changes: 5 additions & 11 deletions crates/agent/src/discovers/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ pub fn merge_capture(
} else if !update_only {
// Create a new CaptureBinding.
capture_bindings.push(models::CaptureBinding {
target: Some(models::Collection::new(format!(
"{capture_prefix}/{recommended_name}"
)))
.into(),
target: models::Collection::new(format!("{capture_prefix}/{recommended_name}")),
disable: false,
resource: models::RawValue::from_value(&resource),
});
filtered_bindings.push(discovered_binding);
Expand All @@ -110,7 +108,7 @@ pub fn merge_capture(
pub fn merge_collections(
discovered_bindings: Vec<Binding>,
mut fetched_collections: BTreeMap<models::Collection, models::CollectionDef>,
targets: Vec<Option<models::Collection>>,
targets: Vec<models::Collection>,
) -> BTreeMap<models::Collection, models::CollectionDef> {
assert_eq!(targets.len(), discovered_bindings.len());

Expand All @@ -125,10 +123,6 @@ pub fn merge_collections(
},
) in targets.into_iter().zip(discovered_bindings.into_iter())
{
// Skip over any disabled bindings, as indicated by them having no target.
let Some(target) = target else {
continue;
};
let document_schema: models::Schema = serde_json::from_str(&document_schema_json).unwrap();
// Unwrap a fetched collection, or initialize a blank one.
let mut collection =
Expand Down Expand Up @@ -235,7 +229,7 @@ mod tests {
let (discovered_bindings, fetched_collections, targets): (
Vec<Binding>,
BTreeMap<models::Collection, models::CollectionDef>,
Vec<Option<models::Collection>>,
Vec<models::Collection>,
) = serde_json::from_value(json!([
[
// case/1: if there is no fetched collection, one is assembled.
Expand Down Expand Up @@ -321,7 +315,7 @@ mod tests {
"bindings": [
{ "resource": { "stream": "foo", "modified": 1 }, "target": "acmeCo/renamed" },
{ "resource": { "stream": "removed" }, "target": "acmeCo/discarded" },
{ "resource": { "stream": "disabled", "modified": "yup" }, "target": null },
{ "resource": { "stream": "disabled", "modified": "yup" }, "disable": true, "target": "test/collection/disabled" },
],
"endpoint": { "connector": { "config": { "fetched": 1 }, "image": "old/image" } },
// Extra fields which are passed-through.
Expand Down
8 changes: 3 additions & 5 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ fn evolve_collection(
.or_insert_with(|| cap_spec.clone());

for binding in new_spec.bindings.iter_mut() {
if binding.target.as_ref() == Some(&old_collection) {
binding.target = Some(new_name.clone()).into();
if &binding.target == &old_collection {
binding.target = new_name.clone();
}
}
}
Expand All @@ -435,9 +435,7 @@ fn evolve_collection(
}

fn has_cap_binding(spec: &models::CaptureDef, collection: &models::Collection) -> bool {
spec.bindings
.iter()
.any(|b| b.target.as_ref() == Some(collection))
spec.bindings.iter().any(|b| &b.target == collection)
}

fn has_mat_binding(spec: &models::MaterializationDef, collection: &models::Collection) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ fn extract_spec_metadata<'a>(
image_parts = Some(split_tag(&config.image));

for binding in &capture.bindings {
if let Some(target) = binding.target.as_ref() {
writes_to.push(target.as_ref());
if !binding.disable {
writes_to.push(binding.target.as_ref());
}
}
writes_to.reserve(1);
Expand Down
3 changes: 2 additions & 1 deletion crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ pub async fn do_discover(
let collection = Collection::new(collection_name);

capture_bindings.push(CaptureBinding {
target: Some(collection.clone()).into(),
target: collection.clone(),
disable: false,
resource: RawValue::from_string(binding.resource_config_json.clone())?.into(),
});

Expand Down
80 changes: 10 additions & 70 deletions crates/models/src/captures.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Collection, ConnectorConfig, RawValue, ShardTemplate};
use super::{is_false, Collection, ConnectorConfig, RawValue, ShardTemplate};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand Down Expand Up @@ -68,11 +68,17 @@ pub enum CaptureEndpoint {
pub struct CaptureBinding {
/// # Endpoint resource to capture from.
pub resource: RawValue,
/// # Whether to disable the binding
/// Disabled bindings are inactive, and not validated.
/// They can be used to represent discovered resources that are
/// intentionally not being captured.
#[serde(default, skip_serializing_if = "is_false")]
pub disable: bool,
/// # Name of the collection to capture into.
// Note(johnny): If we need to add details about how data is written to a
// target, we should turn this into a Target enum as has already been done
// with Source (used by Materialization & Derive).
pub target: Target,
pub target: Collection,
}

impl CaptureDef {
Expand Down Expand Up @@ -101,74 +107,8 @@ impl CaptureBinding {
pub fn example() -> Self {
Self {
resource: serde_json::from_value(json!({"stream": "a_stream"})).unwrap(),
target: Some(Collection::new("target/collection")).into(),
}
}
}

/// Target represents the destination side of a capture binding. It can be
/// either the name of a Flow collection (e.g. "acmeCo/foo/bar") or `null` to
/// disable the binding.
#[derive(Debug, Clone)]
pub struct Target(Option<Collection>);

impl schemars::JsonSchema for Target {
fn schema_name() -> String {
"Target".to_string()
}

fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
let collection_schema = gen.subschema_for::<Collection>();
serde_json::from_value(json!({
"oneOf": [
collection_schema,
{
"title": "Disabled binding",
"description": "a null target indicates that the binding is disabled and will be ignored at runtime",
"type": "null",
}
]
})).unwrap()
}

fn is_referenceable() -> bool {
false
}
}

impl std::ops::Deref for Target {
type Target = Option<Collection>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<Option<Collection>> for Target {
fn from(value: Option<Collection>) -> Self {
Target(value)
}
}

impl<'de> serde::Deserialize<'de> for Target {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value: Option<Collection> = Deserialize::deserialize(deserializer)?;
Ok(value.into())
}
}

impl serde::Serialize for Target {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self.0.as_ref() {
Some(collection) => collection.serialize(serializer),
// 'unit' is represented as an explicit null in serde_json.
None => serializer.serialize_unit(),
disable: false,
target: Collection::new("target/collection"),
}
}
}
7 changes: 5 additions & 2 deletions crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ConnectorConfig, Field, RawValue, RelativeUrl, ShardTemplate, Source};
use super::{is_false, ConnectorConfig, Field, RawValue, RelativeUrl, ShardTemplate, Source};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand Down Expand Up @@ -39,14 +39,16 @@ pub struct MaterializationBinding {
pub resource: RawValue,
/// # The collection to be materialized.
pub source: Source,
#[serde(default, skip_serializing_if = "is_false")]
pub disabled: bool,
/// # Selected projections for this materialization.
#[serde(default)]
pub fields: MaterializationFields,
}

impl MaterializationBinding {
pub fn non_null_resource(&self) -> Option<&RawValue> {
if self.resource.is_null() {
if self.disabled {
None
} else {
Some(&self.resource)
Expand Down Expand Up @@ -90,6 +92,7 @@ impl MaterializationBinding {
Self {
resource: serde_json::from_value(json!({"table": "a_table"})).unwrap(),
source: Source::example(),
disabled: false,
fields: MaterializationFields::default(),
}
}
Expand Down
16 changes: 7 additions & 9 deletions crates/sources/src/indirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,13 @@ fn indirect_materialization(

for (index, models::MaterializationBinding { resource, .. }) in bindings.iter_mut().enumerate()
{
if !resource.is_null() {
indirect_dom(
scope,
resource,
format!("{base}.resource.{index}.config"),
resources,
threshold,
)
}
indirect_dom(
scope,
resource,
format!("{base}.resource.{index}.config"),
resources,
threshold,
)
}
}

Expand Down
5 changes: 1 addition & 4 deletions crates/sources/src/inline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,7 @@ fn inline_materialization(
}

for models::MaterializationBinding { resource, .. } in bindings {
if !resource.is_null() {
// a null resource would be ignored by inline config
inline_config(resource, scope, resources)
}
inline_config(resource, scope, resources)
}
}

Expand Down
13 changes: 8 additions & 5 deletions crates/validation/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ fn walk_capture_request<'a>(
let bindings = bindings
.iter()
.enumerate()
.filter(|(_, b)| b.target.is_some())
// Disabled bindings get skipped. We don't send them in the Validate request, and we also
// don't do any validation of the target name. By implication, the target of a disabled binding
// can be any catalog name, even if it doesn't exist or is otherwise "wrong".
.filter(|(_, b)| !b.disable)
.map(|(binding_index, binding)| {
walk_capture_binding(
scope.push_prop("bindings").push_item(binding_index),
Expand Down Expand Up @@ -252,16 +255,16 @@ fn walk_capture_binding<'a>(
built_collections: &'a [tables::BuiltCollection],
errors: &mut tables::Errors,
) -> Option<capture::request::validate::Binding> {
let models::CaptureBinding { resource, target } = binding;
let models::CaptureBinding {
resource, target, ..
} = binding;

// We must resolve the target collection to continue.
let built_collection = reference::walk_reference(
scope,
"this capture binding",
"collection",
target
.as_ref()
.expect("only enabled bindings are validated"),
target,
built_collections,
|c| (&c.collection, Scope::new(&c.scope)),
errors,
Expand Down
3 changes: 2 additions & 1 deletion crates/validation/src/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ fn walk_materialization_request<'a>(
.iter()
.enumerate()
// Filter the bindings that we send to the connector to only those that are enabled.
.filter(|(_, b)| b.non_null_resource().is_some())
.filter(|(_, b)| !b.disabled)
.map(|(binding_index, binding)| {
walk_materialization_binding(
scope.push_prop("bindings").push_item(binding_index),
Expand Down Expand Up @@ -327,6 +327,7 @@ fn walk_materialization_binding<'a>(
exclude: fields_exclude,
recommended: _,
},
..
} = binding;

let (collection, source_partitions) = match source {
Expand Down
4 changes: 1 addition & 3 deletions crates/validation/src/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ pub fn gather_referenced_collections<'a>(

for capture in captures {
for binding in &capture.spec.bindings {
if let Some(collection) = binding.target.as_ref() {
out.insert(collection);
}
out.insert(&binding.target);
}
}
for collection in collections {
Expand Down
15 changes: 10 additions & 5 deletions crates/validation/tests/scenario_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,36 @@ test://example/catalog.yaml:
testing/partially-disabled-capture:
endpoint: { connector: { image: s3, config: {} }}
bindings:
- target: ~
- target: disabled/test/one
disable: true
resource: { stream: disabled-stream }
- target: testing/collection
resource: { stream: enabled-stream }
testing/fully-disabled-capture:
endpoint: { connector: { image: s3, config: {} }}
bindings:
- target: ~
- target: disabled/test/two
disable: true
resource: { stream: disabled-stream }
- target: ~
- target: disabled/test/three
disable: true
resource: { stream: another-disabled-stream }
materializations:
testing/partially-disabled-materialization:
endpoint: { connector: { image: s3, config: {} }}
bindings:
- source: testing/collection
resource: null
disabled: true
resource: { stream: disabled-stream }
- source: testing/collection
resource: { stream: enabled-stream }
testing/fully-disabled-materialization:
endpoint: { connector: { image: s3, config: {} }}
bindings:
- source: testing/collection
resource: null
disabled: true
resource: { stream: disabled-stream }
storageMappings:
testing/:
Expand Down
Loading

0 comments on commit 6066006

Please sign in to comment.