Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

models: disable bindings using a boolean #1100

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions crates/agent/src/discovers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,14 @@ impl DiscoverHandler {
let targets = capture
.bindings
.iter()
.map(|models::CaptureBinding { target, .. }| target.as_ref().cloned())
.map(|models::CaptureBinding { target, .. }| target.clone())
.collect::<Vec<_>>();

catalog.captures.insert(capture_name, capture); // Replace merged capture.

// Now resolve all targeted collections, if they exist.
let resolved = agent_sql::discovers::resolve_merge_target_specs(
&targets
.iter()
.flat_map(|t| t.iter().map(models::Collection::as_str))
.collect::<Vec<_>>(),
&targets.iter().map(|t| t.as_str()).collect::<Vec<_>>(),
CatalogType::Collection,
draft_id,
user_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ expression: json!(out)
"target": "acmeCo/renamed"
},
{
"disable": true,
"resource": {
"modified": "yup",
"stream": "disabled"
},
"target": null
"target": "test/collection/disabled"
}
],
"endpoint": {
Expand Down
16 changes: 5 additions & 11 deletions crates/agent/src/discovers/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ pub fn merge_capture(
} else if !update_only {
// Create a new CaptureBinding.
capture_bindings.push(models::CaptureBinding {
target: Some(models::Collection::new(format!(
"{capture_prefix}/{recommended_name}"
)))
.into(),
target: models::Collection::new(format!("{capture_prefix}/{recommended_name}")),
disable: false,
resource: models::RawValue::from_value(&resource),
});
filtered_bindings.push(discovered_binding);
Expand All @@ -110,7 +108,7 @@ pub fn merge_capture(
pub fn merge_collections(
discovered_bindings: Vec<Binding>,
mut fetched_collections: BTreeMap<models::Collection, models::CollectionDef>,
targets: Vec<Option<models::Collection>>,
targets: Vec<models::Collection>,
) -> BTreeMap<models::Collection, models::CollectionDef> {
assert_eq!(targets.len(), discovered_bindings.len());

Expand All @@ -125,10 +123,6 @@ pub fn merge_collections(
},
) in targets.into_iter().zip(discovered_bindings.into_iter())
{
// Skip over any disabled bindings, as indicated by them having no target.
let Some(target) = target else {
continue;
};
let document_schema: models::Schema = serde_json::from_str(&document_schema_json).unwrap();
// Unwrap a fetched collection, or initialize a blank one.
let mut collection =
Expand Down Expand Up @@ -235,7 +229,7 @@ mod tests {
let (discovered_bindings, fetched_collections, targets): (
Vec<Binding>,
BTreeMap<models::Collection, models::CollectionDef>,
Vec<Option<models::Collection>>,
Vec<models::Collection>,
) = serde_json::from_value(json!([
[
// case/1: if there is no fetched collection, one is assembled.
Expand Down Expand Up @@ -321,7 +315,7 @@ mod tests {
"bindings": [
{ "resource": { "stream": "foo", "modified": 1 }, "target": "acmeCo/renamed" },
{ "resource": { "stream": "removed" }, "target": "acmeCo/discarded" },
{ "resource": { "stream": "disabled", "modified": "yup" }, "target": null },
{ "resource": { "stream": "disabled", "modified": "yup" }, "disable": true, "target": "test/collection/disabled" },
],
"endpoint": { "connector": { "config": { "fetched": 1 }, "image": "old/image" } },
// Extra fields which are passed-through.
Expand Down
20 changes: 10 additions & 10 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ fn evolve_collection(
.filter(|b| b.source.collection() == &old_collection)
{
// If we're re-creating the collection, then update the source in place.
// We do this even for disabled bindings, so that the spec is up to date
// with the latest changes to the rest of the catalog.
if re_create_collection {
binding
.source
Expand All @@ -376,13 +378,13 @@ fn evolve_collection(
continue;
};

let Some(resource) = binding.non_null_resource() else {
// The binding is disabled, so no need to update anything else.
// Don't update resources for disabled bindings.
if binding.disable {
continue;
};
}
// Parse the current resource spec into a `Value` that we can mutate
let mut resource_spec: Value =
serde_json::from_str(resource.get()).with_context(|| {
let mut resource_spec: Value = serde_json::from_str(binding.resource.get())
.with_context(|| {
format!(
"parsing materialization resource spec of '{}' binding for '{}",
mat_name, &new_name
Expand Down Expand Up @@ -417,8 +419,8 @@ fn evolve_collection(
.or_insert_with(|| cap_spec.clone());

for binding in new_spec.bindings.iter_mut() {
if binding.target.as_ref() == Some(&old_collection) {
binding.target = Some(new_name.clone()).into();
if &binding.target == &old_collection {
binding.target = new_name.clone();
}
}
}
Expand All @@ -435,9 +437,7 @@ fn evolve_collection(
}

fn has_cap_binding(spec: &models::CaptureDef, collection: &models::Collection) -> bool {
spec.bindings
.iter()
.any(|b| b.target.as_ref() == Some(collection))
spec.bindings.iter().any(|b| &b.target == collection)
}

fn has_mat_binding(spec: &models::MaterializationDef, collection: &models::Collection) -> bool {
Expand Down
12 changes: 8 additions & 4 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ fn extract_spec_metadata<'a>(
image_parts = Some(split_tag(&config.image));

for binding in &capture.bindings {
if let Some(target) = binding.target.as_ref() {
writes_to.push(target.as_ref());
if !binding.disable {
psFried marked this conversation as resolved.
Show resolved Hide resolved
writes_to.push(binding.target.as_ref());
}
}
writes_to.reserve(1);
Expand All @@ -628,7 +628,9 @@ fn extract_spec_metadata<'a>(

if let Some(derivation) = &collection.derive {
for tdef in &derivation.transforms {
reads_from.push(tdef.source.collection().as_ref());
if !tdef.disable {
reads_from.push(tdef.source.collection().as_ref());
}
}
reads_from.reserve(1);
}
Expand All @@ -642,7 +644,9 @@ fn extract_spec_metadata<'a>(
image_parts = Some(split_tag(&config.image));
}
for binding in &materialization.bindings {
reads_from.push(binding.source.collection().as_ref());
if !binding.disable {
reads_from.push(binding.source.collection().as_ref());
}
}
reads_from.reserve(1);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ pub async fn do_discover(
let collection = Collection::new(collection_name);

capture_bindings.push(CaptureBinding {
target: Some(collection.clone()).into(),
target: collection.clone(),
disable: false,
resource: RawValue::from_string(binding.resource_config_json.clone())?.into(),
});

Expand Down
80 changes: 10 additions & 70 deletions crates/models/src/captures.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Collection, ConnectorConfig, RawValue, ShardTemplate};
use super::{is_false, Collection, ConnectorConfig, RawValue, ShardTemplate};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand Down Expand Up @@ -68,11 +68,17 @@ pub enum CaptureEndpoint {
pub struct CaptureBinding {
/// # Endpoint resource to capture from.
pub resource: RawValue,
/// # Whether to disable the binding
/// Disabled bindings are inactive, and not validated.
/// They can be used to represent discovered resources that are
/// intentionally not being captured.
#[serde(default, skip_serializing_if = "is_false")]
pub disable: bool,
/// # Name of the collection to capture into.
// Note(johnny): If we need to add details about how data is written to a
// target, we should turn this into a Target enum as has already been done
// with Source (used by Materialization & Derive).
pub target: Target,
pub target: Collection,
}

impl CaptureDef {
Expand Down Expand Up @@ -101,74 +107,8 @@ impl CaptureBinding {
pub fn example() -> Self {
Self {
resource: serde_json::from_value(json!({"stream": "a_stream"})).unwrap(),
target: Some(Collection::new("target/collection")).into(),
}
}
}

/// Target represents the destination side of a capture binding. It can be
/// either the name of a Flow collection (e.g. "acmeCo/foo/bar") or `null` to
/// disable the binding.
#[derive(Debug, Clone)]
pub struct Target(Option<Collection>);

impl schemars::JsonSchema for Target {
fn schema_name() -> String {
"Target".to_string()
}

fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
let collection_schema = gen.subschema_for::<Collection>();
serde_json::from_value(json!({
"oneOf": [
collection_schema,
{
"title": "Disabled binding",
"description": "a null target indicates that the binding is disabled and will be ignored at runtime",
"type": "null",
}
]
})).unwrap()
}

fn is_referenceable() -> bool {
false
}
}

impl std::ops::Deref for Target {
type Target = Option<Collection>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<Option<Collection>> for Target {
fn from(value: Option<Collection>) -> Self {
Target(value)
}
}

impl<'de> serde::Deserialize<'de> for Target {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let value: Option<Collection> = Deserialize::deserialize(deserializer)?;
Ok(value.into())
}
}

impl serde::Serialize for Target {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self.0.as_ref() {
Some(collection) => collection.serialize(serializer),
// 'unit' is represented as an explicit null in serde_json.
None => serializer.serialize_unit(),
disable: false,
target: Collection::new("target/collection"),
}
}
}
7 changes: 6 additions & 1 deletion crates/models/src/derivation.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
CompositeKey, ConnectorConfig, DeriveUsingSqlite, DeriveUsingTypescript, RawValue,
is_false, CompositeKey, ConnectorConfig, DeriveUsingSqlite, DeriveUsingTypescript, RawValue,
ShardTemplate, Source, Transform,
};
use schemars::{schema::Schema, JsonSchema};
Expand Down Expand Up @@ -82,6 +82,11 @@ pub struct TransformDef {
/// or as a relative URL to a file containing the lambda.
#[serde(default, skip_serializing_if = "RawValue::is_null")]
pub lambda: RawValue,

/// # Whether to disable this transform.
/// Disabled transforms are completely ignored at runtime and are not validated.
#[serde(default, skip_serializing_if = "is_false")]
pub disable: bool,
}

/// A Shuffle specifies how a shuffling key is to be extracted from
Expand Down
17 changes: 6 additions & 11 deletions crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ConnectorConfig, Field, RawValue, RelativeUrl, ShardTemplate, Source};
use super::{is_false, ConnectorConfig, Field, RawValue, RelativeUrl, ShardTemplate, Source};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
Expand Down Expand Up @@ -39,21 +39,15 @@ pub struct MaterializationBinding {
pub resource: RawValue,
/// # The collection to be materialized.
pub source: Source,
/// # Whether to disable the binding
/// Disabled bindings are inactive, and not validated.
#[serde(default, skip_serializing_if = "is_false")]
pub disable: bool,
/// # Selected projections for this materialization.
#[serde(default)]
pub fields: MaterializationFields,
}

impl MaterializationBinding {
pub fn non_null_resource(&self) -> Option<&RawValue> {
if self.resource.is_null() {
None
} else {
Some(&self.resource)
}
}
}

/// MaterializationFields defines a selection of projections to materialize,
/// as well as optional per-projection, driver-specific configuration.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
Expand Down Expand Up @@ -90,6 +84,7 @@ impl MaterializationBinding {
Self {
resource: serde_json::from_value(json!({"table": "a_table"})).unwrap(),
source: Source::example(),
disable: false,
fields: MaterializationFields::default(),
}
}
Expand Down
16 changes: 7 additions & 9 deletions crates/sources/src/indirect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,13 @@ fn indirect_materialization(

for (index, models::MaterializationBinding { resource, .. }) in bindings.iter_mut().enumerate()
{
if !resource.is_null() {
indirect_dom(
scope,
resource,
format!("{base}.resource.{index}.config"),
resources,
threshold,
)
}
indirect_dom(
scope,
resource,
format!("{base}.resource.{index}.config"),
resources,
threshold,
)
}
}

Expand Down
5 changes: 1 addition & 4 deletions crates/sources/src/inline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,7 @@ fn inline_materialization(
}

for models::MaterializationBinding { resource, .. } in bindings {
if !resource.is_null() {
// a null resource would be ignored by inline config
inline_config(resource, scope, resources)
}
inline_config(resource, scope, resources)
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/sources/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,15 +658,15 @@ impl<F: Fetcher> Loader<F> {
};

for (index, binding) in spec.bindings.iter().enumerate() {
if let Some(resource) = binding.non_null_resource() {
if !binding.disable {
tasks.push(
async move {
self.load_config(
scope
.push_prop("bindings")
.push_item(index)
.push_prop("resource"),
resource,
&binding.resource,
)
.await
}
Expand Down
Loading
Loading