diff --git a/Cargo.lock b/Cargo.lock index 204775d54a..abc430b820 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -42,6 +42,7 @@ dependencies = [ "futures", "insta", "itertools 0.10.5", + "json", "lazy_static", "models", "proto-flow", diff --git a/Tiltfile b/Tiltfile index d8494d7bf4..45e4a8b6f1 100644 --- a/Tiltfile +++ b/Tiltfile @@ -52,15 +52,16 @@ local_resource('gazette', serve_cmd='%s/flow/.build/package/bin/gazette serve \ local_resource('reactor', serve_cmd='%s/flow/.build/package/bin/flowctl-go serve consumer \ --broker.address http://localhost:8080 \ --broker.cache.size 128 \ + --consumer.host localhost \ --consumer.limit 1024 \ --consumer.max-hot-standbys 0 \ --consumer.port 9000 \ - --consumer.host localhost \ --etcd.address http://localhost:2379 \ --flow.builds-root file://%s/ \ + --flow.enable-schema-inference \ + --flow.network supabase_network_flow \ --log.format text \ - --log.level info \ - --flow.network supabase_network_flow' % (REPO_BASE, FLOW_BUILDS_DIR), + --log.level info' % (REPO_BASE, FLOW_BUILDS_DIR), links='http://localhost:9000/debug/pprof', resource_deps=['etcd'], readiness_probe=probe( diff --git a/crates/agent-sql/src/publications.rs b/crates/agent-sql/src/publications.rs index c50e0d81ef..8f85b73d4a 100644 --- a/crates/agent-sql/src/publications.rs +++ b/crates/agent-sql/src/publications.rs @@ -618,3 +618,48 @@ pub async fn resolve_storage_mappings( .fetch_all(&mut *txn) .await } + +pub struct ResolvedCollectionRow { + pub built_spec: Option>, +} + +pub async fn resolve_collections( + collections: Vec, + pool: sqlx::PgPool, +) -> sqlx::Result> { + sqlx::query_as!( + ResolvedCollectionRow, + r#"select + built_spec as "built_spec: Json" + from live_specs + where catalog_name = ANY($1::text[]) + and spec_type = 'collection' + "#, + collections as Vec, + ) + .fetch_all(&pool) + .await +} + +pub struct InferredSchemaRow { + pub collection_name: String, + pub schema: Json>, +} + +pub async fn get_inferred_schemas( + collections: Vec, + pool: sqlx::PgPool, +) -> sqlx::Result> { + sqlx::query_as!( + InferredSchemaRow, + r#"select + collection_name, + schema as "schema!: Json>" + from inferred_schemas + where collection_name = ANY($1::text[]) + "#, + collections as Vec, + ) + .fetch_all(&pool) + .await +} diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index 631260d0f4..b78fa8b38a 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -15,6 +15,7 @@ agent-sql = { path = "../agent-sql" } async-process = { path = "../async-process" } build = { path = "../build" } doc = { path = "../doc" } +json = { path = "../json" } models = { path = "../models" } proto-flow = { path = "../proto-flow" } runtime = { path = "../runtime" } diff --git a/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__merge_collection.snap b/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__merge_collection.snap index fa38dcc187..546c68b2c6 100644 --- a/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__merge_collection.snap +++ b/crates/agent/src/discovers/snapshots/agent__discovers__specs__tests__merge_collection.snap @@ -39,11 +39,25 @@ expression: "serde_json::to_string_pretty(&out).unwrap()" ] }, "case/4": { - "writeSchema": {"const":"write!"}, + "writeSchema": {"const":"write!","x-infer-schema":true}, "readSchema": {"const":"read!"}, "key": [ "/foo", "/bar" ] + }, + "case/5": { + "writeSchema": {"const":"write!","x-infer-schema":true}, + "readSchema": {"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]}, + "key": [ + "/key" + ] + }, + "case/6": { + "writeSchema": {"const":"write!","x-infer-schema":true}, + "readSchema": {"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]}, + "key": [ + "/key" + ] } } diff --git a/crates/agent/src/discovers/specs.rs b/crates/agent/src/discovers/specs.rs index a39111d792..aaeabc3267 100644 --- a/crates/agent/src/discovers/specs.rs +++ b/crates/agent/src/discovers/specs.rs @@ -123,7 +123,9 @@ pub fn merge_collections( }, ) in targets.into_iter().zip(discovered_bindings.into_iter()) { - let document_schema: models::Schema = serde_json::from_str(&document_schema_json).unwrap(); + let document_schema = + models::Schema::new(models::RawValue::from_string(document_schema_json).unwrap()); + // Unwrap a fetched collection, or initialize a blank one. let mut collection = fetched_collections @@ -141,6 +143,20 @@ pub fn merge_collections( if collection.read_schema.is_some() { collection.write_schema = Some(document_schema); + } else if matches!( + // Does the connector use schema inference? + document_schema.to_value().get("x-infer-schema"), + Some(serde_json::Value::Bool(true)) + ) { + collection.schema = None; + collection.write_schema = Some(document_schema); + + // Synthesize a minimal read schema. + collection.read_schema = Some(models::Schema::new(models::RawValue::from_value( + &serde_json::json!({ + "allOf": [{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}], + }), + ))); } else { collection.schema = Some(document_schema) } @@ -240,7 +256,11 @@ mod tests { // case/3: If discovered key is empty, it doesn't replace the collection key. {"documentSchema": {"const": 42}, "key": [], "recommendedName": "", "resourceConfig": {}}, // case/4: If fetched collection has read & write schemas, only the write schema is updated. - {"documentSchema": {"const": "write!"}, "key": ["/foo", "/bar"], "recommendedName": "", "resourceConfig": {}}, + {"documentSchema": {"x-infer-schema": true, "const": "write!"}, "key": ["/foo", "/bar"], "recommendedName": "", "resourceConfig": {}}, + // case/5: If there is no fetched collection but schema inference is used, an initial read schema is created. + {"documentSchema": {"x-infer-schema": true, "const": "write!"}, "key": ["/key"], "recommendedName": "", "resourceConfig": {}}, + // case/6: The fetched collection did not use schema inference, but now does. + {"documentSchema": {"x-infer-schema": true, "const": "write!"}, "key": ["/key"], "recommendedName": "", "resourceConfig": {}}, ], { "case/2": { @@ -262,12 +282,18 @@ mod tests { "readSchema": {"const": "read!"}, "key": ["/old"], }, + "case/6": { + "schema": false, + "key": ["/old"], + }, }, [ "case/1", "case/2", "case/3", "case/4", + "case/5", + "case/6", ] ])) .unwrap(); @@ -387,7 +413,7 @@ mod tests { ("Foo", "Foo"), ("foo/bar", "foo/bar"), ("/foo/bar//baz/", "foo/bar_baz"), // Invalid leading, middle, & trailing slash. - ("#੫൬ , bar-_!", "੫൬_bar-_"), // Invalid leading, middle, & trailing chars. + ("#੫൬ , bar-_!", "੫൬_bar-_"), // Invalid leading, middle, & trailing chars. ("One! two/_three", "One_two/_three"), ] { assert_eq!( diff --git a/crates/agent/src/main.rs b/crates/agent/src/main.rs index 621e84a15d..4f03b145c7 100644 --- a/crates/agent/src/main.rs +++ b/crates/agent/src/main.rs @@ -102,6 +102,7 @@ async fn main() -> Result<(), anyhow::Error> { &args.connector_network, &args.consumer_address, &logs_tx, + Some(&pg_pool), )), Box::new(agent::TagHandler::new( &args.connector_network, diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index c6e9e2345a..91e8a73a1d 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -1,4 +1,5 @@ use self::builds::IncompatibleCollection; +use self::validation::ControlPlane; use super::{ draft::{self, Error}, logs, Handler, HandlerStatus, Id, @@ -12,6 +13,7 @@ pub mod builds; mod linked_materializations; pub mod specs; mod storage; +mod validation; /// JobStatus is the possible outcomes of a handled draft submission. #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] @@ -56,6 +58,7 @@ pub struct PublishHandler { builds_root: url::Url, connector_network: String, consumer_address: url::Url, + control_plane: ControlPlane, logs_tx: logs::Tx, } @@ -68,6 +71,7 @@ impl PublishHandler { connector_network: &str, consumer_address: &url::Url, logs_tx: &logs::Tx, + pool: Option<&sqlx::PgPool>, ) -> Self { Self { agent_user_email: agent_user_email.into(), @@ -76,6 +80,7 @@ impl PublishHandler { builds_root: builds_root.clone(), connector_network: connector_network.to_string(), consumer_address: consumer_address.clone(), + control_plane: ControlPlane::new(pool), logs_tx: logs_tx.clone(), } } @@ -301,6 +306,7 @@ impl PublishHandler { &self.builds_root, &draft_catalog, &self.connector_network, + self.control_plane.clone(), row.logs_token, &self.logs_tx, row.pub_id, diff --git a/crates/agent/src/publications/builds.rs b/crates/agent/src/publications/builds.rs index 18b23d3664..c1b0513155 100644 --- a/crates/agent/src/publications/builds.rs +++ b/crates/agent/src/publications/builds.rs @@ -62,7 +62,7 @@ impl BuildOutput { .iter() .map(|e| Error { scope: Some(e.scope.to_string()), - detail: e.error.to_string(), + detail: format!("{:#}", e.error), ..Default::default() }) .collect() @@ -118,6 +118,7 @@ pub async fn build_catalog( builds_root: &url::Url, catalog: &models::Catalog, connector_network: &str, + control_plane: super::ControlPlane, logs_token: Uuid, logs_tx: &logs::Tx, pub_id: Id, @@ -137,7 +138,6 @@ pub async fn build_catalog( .context("writing catalog file")?; let build_id = format!("{pub_id}"); - let control_plane = validation::NoOpControlPlane {}; let db_path = builds_dir.join(&build_id); let log_handler = logs::ops_handler(logs_tx.clone(), "build".to_string(), logs_token); let project_root = url::Url::parse("file:///").unwrap(); diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index e9aa09fbd3..a650bbec7f 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -752,6 +752,7 @@ mod test { "", &bs_url, &logs_tx, + None, ); let mut results: Vec = vec![]; diff --git a/crates/agent/src/publications/validation.rs b/crates/agent/src/publications/validation.rs new file mode 100644 index 0000000000..732b73e9c2 --- /dev/null +++ b/crates/agent/src/publications/validation.rs @@ -0,0 +1,62 @@ +use futures::{FutureExt, TryFutureExt}; + +#[derive(Clone)] +pub struct ControlPlane { + pool: Option, +} + +impl ControlPlane { + pub fn new(pool: Option<&sqlx::PgPool>) -> Self { + Self { + pool: pool.cloned(), + } + } +} + +impl validation::ControlPlane for ControlPlane { + fn resolve_collections<'a>( + &'a self, + collections: Vec, + ) -> futures::future::BoxFuture<'a, anyhow::Result>> { + let Some(pool) = self.pool.clone() else { + return validation::NoOpControlPlane.resolve_collections(collections) + }; + let collections = collections.into_iter().map(Into::into).collect(); + + agent_sql::publications::resolve_collections(collections, pool) + .map_err(Into::into) + .map_ok(|rows| { + rows.into_iter() + .filter_map(|row| row.built_spec.map(|s| s.0)) + .collect() + }) + .boxed() + } + + fn get_inferred_schemas<'a>( + &'a self, + collections: Vec, + ) -> futures::future::BoxFuture< + 'a, + anyhow::Result>, + > { + let Some(pool) = self.pool.clone() else { + return validation::NoOpControlPlane.get_inferred_schemas(collections) + }; + let collections = collections.into_iter().map(Into::into).collect(); + + agent_sql::publications::get_inferred_schemas(collections, pool) + .map_err(Into::into) + .map_ok(|rows| { + rows.into_iter() + .map(|row| { + ( + models::Collection::new(row.collection_name), + models::Schema::new(row.schema.0.into()), + ) + }) + .collect() + }) + .boxed() + } +} diff --git a/crates/assemble/src/lib.rs b/crates/assemble/src/lib.rs index 49ab812c4a..d2f5e1633e 100644 --- a/crates/assemble/src/lib.rs +++ b/crates/assemble/src/lib.rs @@ -411,23 +411,24 @@ pub fn collection_spec( build_id: &str, collection: &tables::Collection, projections: Vec, + read_bundle: Option, stores: &[models::Store], uuid_ptr: &str, + write_bundle: models::RawValue, ) -> flow::CollectionSpec { let tables::Collection { scope: _, collection: name, spec: models::CollectionDef { - schema, - read_schema, - write_schema, + schema: _, + read_schema: _, + write_schema: _, key, projections: _, journals, derivation: _, derive: _, - .. }, } = collection; @@ -446,18 +447,16 @@ pub fn collection_spec( }) .collect(); - let (write_schema_json, read_schema_json) = match (schema, write_schema, read_schema) { - (Some(schema), None, None) => (schema.to_string(), String::new()), - (None, Some(write_schema), Some(read_schema)) => { - (write_schema.to_string(), read_schema.to_string()) - } - _ => (String::new(), String::new()), + let bundle_to_string = |b: models::RawValue| -> String { + let b: Box = b.into(); + let b: Box = b.into(); + b.into() }; flow::CollectionSpec { name: name.to_string(), - write_schema_json, - read_schema_json, + write_schema_json: bundle_to_string(write_bundle), + read_schema_json: read_bundle.map(bundle_to_string).unwrap_or_default(), key: key.iter().map(|p| p.to_string()).collect(), projections, partition_fields, diff --git a/crates/doc/src/validation.rs b/crates/doc/src/validation.rs index 7fb3d87a25..e36c3f06b5 100644 --- a/crates/doc/src/validation.rs +++ b/crates/doc/src/validation.rs @@ -20,8 +20,14 @@ pub fn build_bundle(bundle: &str) -> Result &serde_json::from_str(bundle).unwrap(), )?; - // Tweak scope to remove a synthetic resource pointer previously - // embedded in the $id during schema bundling. + // Tweak scope to remove a synthetic resource pointer that was previously + // embedded in the $id during schema bundling, so that any errors + // generated by this schema have a relate-able error scope for the user. + // We tweak only the outer / top-level Schema instance and not its children, + // which is not fully correct because those children will still have `ptr=...` + // in their `curi` fields. + // TODO(johnny): Per JSON-Schema spec $id is not allowed to have a fragment, + // but perhaps we should relax this restriction for our implementation? for (key, value) in schema.curi.query_pairs() { if key != "ptr" { continue; diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index f28392ea50..800deb767e 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -1,5 +1,5 @@ -use anyhow::Context; -use futures::{future::BoxFuture, FutureExt}; +use futures::{future::BoxFuture, FutureExt, TryStreamExt}; +use itertools::Itertools; /// Load and validate sources and derivation connectors (only). /// Capture and materialization connectors are not validated. @@ -197,58 +197,107 @@ pub(crate) struct Resolver { } impl validation::ControlPlane for Resolver { - fn resolve_collections<'a, 'b: 'a>( + fn resolve_collections<'a>( &'a self, collections: Vec, ) -> BoxFuture<'a, anyhow::Result>> { #[derive(serde::Deserialize, Clone)] struct Row { pub catalog_name: String, - pub built_spec: Option, + pub built_spec: Option, } + let type_selector = crate::catalog::SpecTypeSelector { + captures: Some(false), + collections: Some(true), + materializations: Some(false), + tests: Some(false), + }; + + let rows = collections + .into_iter() + .chunks(API_FETCH_CHUNK_SIZE) + .into_iter() + .map(|names| { + let builder = self + .client + .from("live_specs_ext") + .select("catalog_name,built_spec") + .in_("catalog_name", names); + let builder = type_selector.add_live_specs_filters(builder, false); + + async move { crate::api_exec::>(builder).await } + }) + .collect::>() + .try_collect::>>(); + async move { - // NameSelector will return *all* collections, rather than *no* - // collections, if its selector is empty. - if collections.is_empty() { - return Ok(vec![]); - } + let rows = rows.await?; + + rows + .into_iter() + .map(|chunk| chunk.into_iter().map( + |Row{ catalog_name, built_spec}| { + let Some(built_spec) = built_spec else { + anyhow::bail!("collection {catalog_name} is an old specification which must be upgraded to continue. Please contact support for assistance"); + }; + Ok(built_spec) + } + )) + .flatten() + .try_collect() + } + .boxed() + } + + fn get_inferred_schemas<'a>( + &'a self, + collections: Vec, + ) -> BoxFuture<'a, anyhow::Result>> + { + #[derive(serde::Deserialize, Clone)] + struct Row { + pub collection_name: models::Collection, + pub schema: models::Schema, + } - let list = crate::catalog::List { - flows: false, - name_selector: crate::catalog::NameSelector { - name: collections.into_iter().map(|c| c.to_string()).collect(), - prefix: Vec::new(), - }, - type_selector: crate::catalog::SpecTypeSelector { - captures: Some(false), - collections: Some(true), - materializations: Some(false), - tests: Some(false), - }, - deleted: false, - }; - - let columns = vec![ - "catalog_name", - "built_spec", - ]; - let rows = crate::catalog::fetch_live_specs::(self.client.clone(), &list, columns) - .await - .context("failed to fetch collection specs")?; - - tracing::debug!(name=?list.name_selector.name, rows=?rows.len(), "resolved remote collections"); - - rows.into_iter() - .map(|Row{ catalog_name, built_spec}| { - let Some(built_spec) = built_spec else { - anyhow::bail!("collection {catalog_name} is an old specification which must be upgraded to continue. Please contact support for assistance"); - }; - Ok(serde_json::from_str(built_spec.get()) - .with_context(|| format!("failed to parse previously-built specification of {catalog_name}"))?) + let rows = collections + .into_iter() + .chunks(API_FETCH_CHUNK_SIZE) + .into_iter() + .map(|names| { + let builder = self + .client + .from("inferred_schemas") + .select("collection_name,schema") + .in_("collection_name", names); + + async move { crate::api_exec::>(builder).await } + }) + .collect::>() + .try_collect::>>(); + + async move { + let rows = rows.await?; + + Ok(rows + .into_iter() + .map(|chunk| { + chunk.into_iter().map( + |Row { + collection_name, + schema, + }| (collection_name, schema), + ) }) - .collect::>() + .flatten() + .collect()) } .boxed() } } + +// API_BATCH_SIZE is used to chunk a set of API entities fetched in a single request. +// PostgREST passes query predicates as URL parameters, so if we don't chunk requests +// then we run into URL length limits. +const API_FETCH_CHUNK_SIZE: usize = 25; diff --git a/crates/sources/src/loader.rs b/crates/sources/src/loader.rs index b5bac24a30..55f1c223dc 100644 --- a/crates/sources/src/loader.rs +++ b/crates/sources/src/loader.rs @@ -240,7 +240,9 @@ impl Loader { let mut uri = uri.clone(); uri.set_fragment(None); - if index.fetch(&uri).is_none() { + // The "flow" scheme is used to inject contextual schemas + // and is not attempted to be fetched. + if index.fetch(&uri).is_none() && uri.scheme() != "flow" { Some(uri) } else { None diff --git a/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-2.snap b/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-2.snap index d3bc7f5a90..8916d0c5ff 100644 --- a/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-2.snap +++ b/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-2.snap @@ -35,6 +35,18 @@ Sources { ] }, }, + Collection { + scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-inferred-read, + collection: test/collection-with-write-and-inferred-read, + spec: { + "writeSchema": {"$anchor":"foobar","$id":"test://example/schema.json","properties":{"a":{"properties":{"a":{"type":"string"}},"type":"object"},"b":{"properties":{"b":{"type":"string"}},"type":"object"},"key":{"items":{"type":"string"},"minItems":2,"type":"array"}},"type":"object"}, + "readSchema": {"$id":"test://example/catalog.yaml?ptr=/collections/test~1collection-with-write-and-inferred-read/readSchema","allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]}, + "key": [ + "/key/1", + "/key/0" + ] + }, + }, Collection { scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-read, collection: test/collection-with-write-and-read, diff --git a/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-3.snap b/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-3.snap index 8181765ed0..03c0fb9847 100644 --- a/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-3.snap +++ b/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections-3.snap @@ -35,6 +35,18 @@ Sources { ] }, }, + Collection { + scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-inferred-read, + collection: test/collection-with-write-and-inferred-read, + spec: { + "writeSchema": "collection-with-write-and-inferred-read.write.schema.yaml", + "readSchema": "collection-with-write-and-inferred-read.read.schema.yaml", + "key": [ + "/key/1", + "/key/0" + ] + }, + }, Collection { scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-read, collection: test/collection-with-write-and-read, @@ -69,6 +81,14 @@ Sources { errors: [], fetches: [], imports: [ + Import { + scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-inferred-read/readSchema, + to_resource: test://example/collection-with-write-and-inferred-read.read.schema.yaml, + }, + Import { + scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-inferred-read/writeSchema, + to_resource: test://example/collection-with-write-and-inferred-read.write.schema.yaml, + }, Import { scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-read/readSchema, to_resource: test://example/collection-with-write-and-read.read.schema.yaml, @@ -92,7 +112,19 @@ Sources { resource: test://example/catalog.yaml, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"test/collection":{"schema":"collection.schema.yaml","key":["/key/1","/key/0"],"projections":{"field_a":{"location":"/a/a","partition":true},"field_b":{"location":"/b/b","partition":false}}},"test/collection-no-schema":{"key":["/key"]},"test/collection-with-write-and-read":{"writeSchema":"collection-with-write-and-read.write.schema.yaml","readSchema":"collection-with-write-and-read.read.schema.yaml","key":["/key/1","/key/0"]},"test/collection/with-journals":{"schema":"with-journals.schema.yaml","key":["/a/key"],"journals":{"fragments":{"length":12345,"compressionCodec":"GZIP_OFFLOAD_DECOMPRESSION","retention":"13days 8h","flushInterval":"15m"}}}}}, + content_dom: {"collections":{"test/collection":{"schema":"collection.schema.yaml","key":["/key/1","/key/0"],"projections":{"field_a":{"location":"/a/a","partition":true},"field_b":{"location":"/b/b","partition":false}}},"test/collection-no-schema":{"key":["/key"]},"test/collection-with-write-and-inferred-read":{"writeSchema":"collection-with-write-and-inferred-read.write.schema.yaml","readSchema":"collection-with-write-and-inferred-read.read.schema.yaml","key":["/key/1","/key/0"]},"test/collection-with-write-and-read":{"writeSchema":"collection-with-write-and-read.write.schema.yaml","readSchema":"collection-with-write-and-read.read.schema.yaml","key":["/key/1","/key/0"]},"test/collection/with-journals":{"schema":"with-journals.schema.yaml","key":["/a/key"],"journals":{"fragments":{"length":12345,"compressionCodec":"GZIP_OFFLOAD_DECOMPRESSION","retention":"13days 8h","flushInterval":"15m"}}}}}, + }, + Resource { + resource: test://example/collection-with-write-and-inferred-read.read.schema.yaml, + content_type: "JSON_SCHEMA", + content: ".. binary ..", + content_dom: {"$id":"test://example/catalog.yaml?ptr=/collections/test~1collection-with-write-and-inferred-read/readSchema","allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]}, + }, + Resource { + resource: test://example/collection-with-write-and-inferred-read.write.schema.yaml, + content_type: "JSON_SCHEMA", + content: ".. binary ..", + content_dom: {"$anchor":"foobar","$id":"test://example/schema.json","properties":{"a":{"properties":{"a":{"type":"string"}},"type":"object"},"b":{"properties":{"b":{"type":"string"}},"type":"object"},"key":{"items":{"type":"string"},"minItems":2,"type":"array"}},"type":"object"}, }, Resource { resource: test://example/collection-with-write-and-read.read.schema.yaml, diff --git a/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections.snap b/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections.snap index fe60eff068..ad923fcb57 100644 --- a/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections.snap +++ b/crates/sources/src/scenarios/snapshots/sources__scenarios__test__collections.snap @@ -35,6 +35,18 @@ Sources { ] }, }, + Collection { + scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-inferred-read, + collection: test/collection-with-write-and-inferred-read, + spec: { + "writeSchema": "schema.json", + "readSchema": {"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]}, + "key": [ + "/key/1", + "/key/0" + ] + }, + }, Collection { scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-read, collection: test/collection-with-write-and-read, @@ -78,6 +90,10 @@ Sources { }, ], imports: [ + Import { + scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-inferred-read/writeSchema, + to_resource: test://example/schema.json, + }, Import { scope: test://example/catalog.yaml#/collections/test~1collection-with-write-and-read/readSchema, to_resource: test://example/schema.json, @@ -101,7 +117,7 @@ Sources { resource: test://example/catalog.yaml, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"test/collection":{"key":["/key/1","/key/0"],"projections":{"field_a":{"location":"/a/a","partition":true},"field_b":{"location":"/b/b","partition":false}},"schema":"schema.json"},"test/collection-no-schema":{"key":["/key"]},"test/collection-with-write-and-read":{"key":["/key/1","/key/0"],"readSchema":"schema.json#/properties/a","writeSchema":"schema.json"},"test/collection/with-journals":{"journals":{"fragments":{"compressionCodec":"GZIP_OFFLOAD_DECOMPRESSION","flushInterval":"15m","length":12345,"retention":"320h"}},"key":["/a/key"],"schema":"schema.json#foobar"}}}, + content_dom: {"collections":{"test/collection":{"key":["/key/1","/key/0"],"projections":{"field_a":{"location":"/a/a","partition":true},"field_b":{"location":"/b/b","partition":false}},"schema":"schema.json"},"test/collection-no-schema":{"key":["/key"]},"test/collection-with-write-and-inferred-read":{"key":["/key/1","/key/0"],"readSchema":{"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]},"writeSchema":"schema.json"},"test/collection-with-write-and-read":{"key":["/key/1","/key/0"],"readSchema":"schema.json#/properties/a","writeSchema":"schema.json"},"test/collection/with-journals":{"journals":{"fragments":{"compressionCodec":"GZIP_OFFLOAD_DECOMPRESSION","flushInterval":"15m","length":12345,"retention":"320h"}},"key":["/a/key"],"schema":"schema.json#foobar"}}}, }, Resource { resource: test://example/schema.json, diff --git a/crates/sources/src/scenarios/test_collections.yaml b/crates/sources/src/scenarios/test_collections.yaml index 207510a1c3..36bc49f37a 100644 --- a/crates/sources/src/scenarios/test_collections.yaml +++ b/crates/sources/src/scenarios/test_collections.yaml @@ -22,6 +22,14 @@ test://example/catalog.yaml: readSchema: schema.json#/properties/a key: [/key/1, /key/0] + test/collection-with-write-and-inferred-read: + writeSchema: schema.json + readSchema: + allOf: + - { $ref: flow://write-schema } + - { $ref: flow://inferred-schema } + key: [/key/1, /key/0] + test/collection-no-schema: key: [/key] diff --git a/crates/validation/src/collection.rs b/crates/validation/src/collection.rs index e656bfdacc..290ea56009 100644 --- a/crates/validation/src/collection.rs +++ b/crates/validation/src/collection.rs @@ -6,13 +6,20 @@ use std::collections::BTreeMap; pub fn walk_all_collections( build_id: &str, collections: &[tables::Collection], + inferred_schemas: &BTreeMap, storage_mappings: &[tables::StorageMapping], errors: &mut tables::Errors, ) -> tables::BuiltCollections { let mut built_collections = tables::BuiltCollections::new(); for collection in collections { - if let Some(spec) = walk_collection(build_id, collection, storage_mappings, errors) { + if let Some(spec) = walk_collection( + build_id, + collection, + inferred_schemas, + storage_mappings, + errors, + ) { built_collections.insert_row(&collection.scope, &collection.collection, None, spec); } } @@ -22,6 +29,7 @@ pub fn walk_all_collections( fn walk_collection( build_id: &str, collection: &tables::Collection, + inferred_schemas: &BTreeMap, storage_mappings: &[tables::StorageMapping], errors: &mut tables::Errors, ) -> Option { @@ -57,18 +65,34 @@ fn walk_collection( .push(scope.push_prop("key"), errors); } - let (write_schema, read_schema) = match (schema, write_schema, read_schema) { + let (write_schema, write_bundle, read_schema_bundle) = match (schema, write_schema, read_schema) + { // One schema used for both writes and reads. - (Some(schema), None, None) => ( - walk_collection_schema(scope.push_prop("schema"), schema, errors)?, + (Some(bundle), None, None) => ( + walk_collection_schema(scope.push_prop("schema"), bundle, errors)?, + (&bundle as &models::RawValue).clone(), None, ), // Separate schemas used for writes and reads. - (None, Some(write_schema), Some(read_schema)) => { - let write = - walk_collection_schema(scope.push_prop("writeSchema"), write_schema, errors); - let read = walk_collection_schema(scope.push_prop("readSchema"), read_schema, errors); - (write?, Some(read?)) + (None, Some(write_bundle), Some(read_bundle)) => { + let write_schema = + walk_collection_schema(scope.push_prop("writeSchema"), write_bundle, errors); + + // Potentially extend the user's read schema with definitions + // for the collection's current write and inferred schemas. + let read_bundle = extend_read_bundle( + read_bundle, + write_bundle, + inferred_schemas.get(&collection.collection), + ); + + let read_schema = + walk_collection_schema(scope.push_prop("readSchema"), &read_bundle, errors); + ( + write_schema?, + (&write_bundle as &models::RawValue).clone(), + Some((read_schema?, read_bundle)), + ) } _ => { Error::InvalidSchemaCombination { @@ -88,7 +112,7 @@ fn walk_collection( if let Err(err) = write_schema.walk_ptr(ptr, true) { Error::from(err).push(scope, errors); } - if let Some(read_schema) = &read_schema { + if let Some((read_schema, _read_bundle)) = &read_schema_bundle { if let Err(err) = read_schema.walk_ptr(ptr, true) { Error::from(err).push(scope, errors); } @@ -98,7 +122,7 @@ fn walk_collection( let projections = walk_collection_projections( scope.push_prop("projections"), &write_schema, - read_schema.as_ref(), + read_schema_bundle.as_ref(), key, projections, errors, @@ -116,8 +140,10 @@ fn walk_collection( build_id, collection, projections, + read_schema_bundle.map(|(_schema, bundle)| bundle), partition_stores, UUID_PTR, + write_bundle, )) } @@ -152,12 +178,12 @@ fn walk_collection_schema( fn walk_collection_projections( scope: Scope, write_schema: &schema::Schema, - read_schema: Option<&schema::Schema>, + read_schema_bundle: Option<&(schema::Schema, models::RawValue)>, key: &models::CompositeKey, projections: &BTreeMap, errors: &mut tables::Errors, ) -> Vec { - let effective_read_schema = if let Some(read_schema) = read_schema { + let effective_read_schema = if let Some((read_schema, _read_bundle)) = read_schema_bundle { read_schema } else { write_schema @@ -219,7 +245,7 @@ fn walk_collection_projections( if let Err(err) = effective_read_schema.walk_ptr(ptr, partition) { Error::from(err).push(scope, errors); } - if matches!(read_schema, Some(_) if partition) { + if matches!(read_schema_bundle, Some(_) if partition) { // Partitioned projections must also be key-able within the write schema. if let Err(err) = write_schema.walk_ptr(ptr, true) { Error::from(err).push(scope, errors); @@ -260,7 +286,7 @@ fn walk_collection_projections( if !saw_uuid_timestamp_projection { projections.push(flow::Projection { ptr: UUID_PTR.to_string(), - field: "flow_published_at".to_string(), + field: FLOW_PUBLISHED_AT.to_string(), inference: Some(assemble::inference_uuid_v1_date_time()), ..Default::default() }) @@ -400,7 +426,123 @@ pub fn walk_selector( } } +fn extend_read_bundle( + read_bundle: &models::Schema, + write_bundle: &models::Schema, + inferred_bundle: Option<&models::Schema>, +) -> models::RawValue { + use json::schema::keywords; + use serde_json::{value::to_raw_value, Value}; + type Skim = BTreeMap; + + let mut read_schema: Skim = serde_json::from_str(read_bundle.get()).unwrap(); + let mut read_defs: Skim = read_schema + .get(keywords::DEF) + .map(|d| serde_json::from_str(d.get()).unwrap()) + .unwrap_or_default(); + + // Add a definition for the write schema if it's referenced. + // We cannot add it in all cases because the existing `read_bundle` and + // `write_bundle` may have a common sub-schema defined, and naively adding + // it would result in an indexing error due to the duplicate definition. + // So, we treat $ref: flow://write-schema as a user assertion that there is + // no such conflicting definition (and we may produce an indexing error + // later if they're wrong). + if read_bundle.get().contains(super::REF_WRITE_SCHEMA_PATTERN) { + let mut write_schema: Skim = serde_json::from_str(write_bundle.get()).unwrap(); + + // Set $id to "flow://write-schema". + _ = write_schema.insert( + keywords::ID.to_string(), + models::RawValue::from_value(&Value::String(REF_WRITE_SCHEMA_URL.to_string())), + ); + // Add as a definition within the read schema. + read_defs.insert( + REF_WRITE_SCHEMA_URL.to_string(), + to_raw_value(&write_schema).unwrap().into(), + ); + } + + // Add a definition for an inferred schema if it's provided. + // Note that we previously filtered the set of retrieved schemas to those + // having a read schema matching super::REF_INFERRED_SCHEMA_PATTERN. + if let Some(inferred_bundle) = inferred_bundle { + let mut inferred_schema: Skim = serde_json::from_str(inferred_bundle.get()).unwrap(); + + // Set $id to "flow://inferred-schema". + _ = inferred_schema.insert( + keywords::ID.to_string(), + models::RawValue::from_value(&Value::String(REF_INFERRED_SCHEMA_URL.to_string())), + ); + // Add as a definition within the read schema. + read_defs.insert( + REF_INFERRED_SCHEMA_URL.to_string(), + to_raw_value(&inferred_schema).unwrap().into(), + ); + } + + // Re-serialize the updated definitions of the read schema. + _ = read_schema.insert( + keywords::DEF.to_string(), + serde_json::value::to_raw_value(&read_defs).unwrap().into(), + ); + to_raw_value(&read_schema).unwrap().into() +} + /// The default field name for the root document projection. const FLOW_DOCUMENT: &str = "flow_document"; +/// The default field name for the document publication time. +const FLOW_PUBLISHED_AT: &str = "flow_published_at"; +/// The JSON Pointer of the Flow document UUID. const UUID_PTR: &str = "/_meta/uuid"; +/// The JSON Pointer of the synthetic document publication time. +/// This pointer typically pairs with the FLOW_PUBLISHED_AT field. const UUID_DATE_TIME_PTR: &str = "/_meta/uuid/date-time"; +// URL for referencing the inferred schema of a collection, which may be used within a read schema. +const REF_INFERRED_SCHEMA_URL: &str = "flow://inferred-schema"; +// URL for referencing the write schema of a collection, which may be used within a read schema. +const REF_WRITE_SCHEMA_URL: &str = "flow://write-schema"; + +#[cfg(test)] +mod test { + use serde_json::json; + + #[test] + fn test_extend_read_schema() { + let read_schema = models::Schema::new(models::RawValue::from_value(&json!({ + "$defs": { + "existing://def": {"type": "array"}, + }, + "maxProperties": 10, + "allOf": [ + {"$ref": "flow://inferred-schema"}, + {"$ref": "flow://write-schema"}, + ] + }))); + let write_schema = models::Schema::new(models::RawValue::from_value(&json!({ + "$id": "old://value", + "required": ["a_key"], + }))); + let inferred_schema = models::Schema::new(models::RawValue::from_value(&json!({ + "$id": "old://value", + "minProperties": 5, + }))); + + assert_eq!( + super::extend_read_bundle(&read_schema, &write_schema, Some(&inferred_schema)) + .to_value(), + json!({ + "$defs": { + "existing://def": {"type": "array"}, // Left alone. + "flow://write-schema": { "$id": "flow://write-schema", "required": ["a_key"] }, + "flow://inferred-schema": { "$id": "flow://inferred-schema", "minProperties": 5 }, + }, + "maxProperties": 10, + "allOf": [ + {"$ref": "flow://inferred-schema"}, + {"$ref": "flow://write-schema"}, + ] + }) + ); + } +} diff --git a/crates/validation/src/errors.rs b/crates/validation/src/errors.rs index 7ba19b76a8..33b334923b 100644 --- a/crates/validation/src/errors.rs +++ b/crates/validation/src/errors.rs @@ -153,8 +153,8 @@ pub enum Error { }, #[error("connector returned wrong number of bindings (expected {expect}, got {got})")] WrongConnectorBindings { expect: usize, got: usize }, - #[error("error while resolving referenced collections from the control plane")] - ResolveCollections { + #[error("error while communicating with the Flow control-plane API")] + ControlPlane { #[source] detail: anyhow::Error, }, diff --git a/crates/validation/src/lib.rs b/crates/validation/src/lib.rs index 8ee20834fd..050400ec49 100644 --- a/crates/validation/src/lib.rs +++ b/crates/validation/src/lib.rs @@ -1,6 +1,7 @@ use futures::future::BoxFuture; use itertools::{EitherOrBoth, Itertools}; use sources::Scope; +use std::collections::BTreeMap; mod capture; mod collection; @@ -43,10 +44,17 @@ pub trait ControlPlane: Send + Sync { // *close* to a provided name, it will be returned so that a suitable spelling // hint can be surfaced to the user. This implies we must account for possible // overlap with locally-built collections even if none were asked for. - fn resolve_collections<'a, 'b: 'a>( + fn resolve_collections<'a>( &'a self, collections: Vec, ) -> BoxFuture<'a, anyhow::Result>>; + + /// Retrieve the inferred schema of each of the given `collections`. + /// Collections for which a schema is not found should be omitted from the response. + fn get_inferred_schemas<'a>( + &'a self, + collections: Vec, + ) -> BoxFuture<'a, anyhow::Result>>; } pub async fn validate( @@ -74,68 +82,76 @@ pub async fn validate( } storage_mapping::walk_all_storage_mappings(storage_mappings, &mut errors); - // Build all local collections. - let built_collections = - collection::walk_all_collections(build_id, collections, storage_mappings, &mut errors); - - // If we failed to build one or more collections then further validation - // will generate lots of misleading "not found" errors. - if built_collections.len() != collections.len() { - return tables::Validations { - built_captures: tables::BuiltCaptures::new(), - built_collections, - built_materializations: tables::BuiltMaterializations::new(), - built_tests: tables::BuiltTests::new(), - errors, - }; - } + // Names of collection which use inferred schemas. + let inferred_collections = reference::gather_inferred_collections(collections); + // Names of collections which are referenced, but are not being validated themselves. + let remote_collections = + reference::gather_referenced_collections(captures, collections, materializations, tests); - // Next resolve all referenced collections which are not in local `collections`. - let remote_collections = match control_plane - .resolve_collections(reference::gather_referenced_collections( - captures, - collections, - materializations, - tests, - )) - .await - { + // Concurrently fetch referenced collections and inferred schemas from the control-plane. + let (inferred_schemas, remote_collections) = match futures::try_join!( + control_plane.get_inferred_schemas(inferred_collections), + control_plane.resolve_collections(remote_collections), + // TODO(johnny): Also fetch storage mappings here. + ) { + Ok(ok) => ok, Err(err) => { - // If we failed to complete the resolve operation then further validation - // will generate lots of misleading "not found" errors. This is distinct - // from collections not being found, which is communicated by their absence - // and/or presence of nearly-matched names in the resolved set. - Error::ResolveCollections { detail: err }.push(root_scope, &mut errors); + // If we failed to fetch from the control-plane then further validation + // will generate lots of misleading errors, so fail now. + Error::ControlPlane { detail: err }.push(root_scope, &mut errors); return tables::Validations { built_captures: tables::BuiltCaptures::new(), - built_collections, + built_collections: tables::BuiltCollections::new(), built_materializations: tables::BuiltMaterializations::new(), built_tests: tables::BuiltTests::new(), errors, }; } - Ok(c) => c - .into_iter() - .map(|mut spec| { - tracing::debug!(collection=%spec.name, "resolved referenced remote collection"); - - // Clear a derivation (if there is one), as we do not need it - // when embedding a referenced collection. - spec.derivation = None; - - tables::BuiltCollection { - collection: models::Collection::new(&spec.name), - scope: url::Url::parse("flow://control-plane").unwrap(), - spec, - validated: None, - } - }) - .collect::(), }; + + let remote_collections = remote_collections + .into_iter() + .map(|mut spec| { + tracing::debug!(collection=%spec.name, "resolved referenced remote collection"); + + // Clear a derivation (if there is one), as we do not need it + // when embedding a referenced collection. + spec.derivation = None; + + tables::BuiltCollection { + collection: models::Collection::new(&spec.name), + scope: url::Url::parse("flow://control-plane").unwrap(), + spec, + validated: None, + } + }) + .collect::(); + if remote_collections.is_empty() { tracing::debug!("there were no remote collections to resolve"); } + // Build all local collections. + let built_collections = collection::walk_all_collections( + build_id, + collections, + &inferred_schemas, + storage_mappings, + &mut errors, + ); + + // If we failed to build one or more collections then further validation + // will generate lots of misleading "not found" errors. + if built_collections.len() != collections.len() { + return tables::Validations { + built_captures: tables::BuiltCaptures::new(), + built_collections, + built_materializations: tables::BuiltMaterializations::new(), + built_tests: tables::BuiltTests::new(), + errors, + }; + } + // Merge local and remote BuiltCollections. On conflict, keep the local one. let mut built_collections = built_collections .into_iter() @@ -230,3 +246,15 @@ pub async fn validate( errors, } } + +// This pattern lets us cheaply detect if a read schema references the inferred +// of it's collection. Assuming an otherwise well-formed JSON schema, +// it can neither false-positive nor false-negative: +// * It must detect an actual property because a the same pattern within a JSON +// string would be quote-escaped. +// * It must be a schema keyword ($ref cannot be, say, a property) because +// "flow://inferred-schema" is not a valid JSON schema and would error at build time. +const REF_INFERRED_SCHEMA_PATTERN: &str = "\"$ref\":\"flow://inferred-schema\""; +// This pattern lets us cheaply detect if a read schema references the write +// schema of its collection. +const REF_WRITE_SCHEMA_PATTERN: &str = "\"$ref\":\"flow://write-schema\""; diff --git a/crates/validation/src/noop.rs b/crates/validation/src/noop.rs index d201b8e8d2..7c38177576 100644 --- a/crates/validation/src/noop.rs +++ b/crates/validation/src/noop.rs @@ -103,10 +103,17 @@ impl Connectors for NoOpConnectors { pub struct NoOpControlPlane; impl ControlPlane for NoOpControlPlane { - fn resolve_collections<'a, 'b: 'a>( + fn resolve_collections<'a>( &'a self, _collections: Vec, ) -> BoxFuture<'a, anyhow::Result>> { - async move { Ok(vec![]) }.boxed() + async move { Ok(Vec::new()) }.boxed() + } + + fn get_inferred_schemas<'a>( + &'a self, + _collections: Vec, + ) -> BoxFuture<'a, anyhow::Result>> { + async move { Ok(BTreeMap::new()) }.boxed() } } diff --git a/crates/validation/src/reference.rs b/crates/validation/src/reference.rs index 9f9eaf394e..05d5452e80 100644 --- a/crates/validation/src/reference.rs +++ b/crates/validation/src/reference.rs @@ -47,6 +47,25 @@ pub fn gather_referenced_collections<'a>( out.into_iter().cloned().collect() } +pub fn gather_inferred_collections(collections: &[tables::Collection]) -> Vec { + collections + .iter() + .filter_map(|row| { + if row + .spec + .read_schema + .as_ref() + .map(|schema| schema.get().contains(super::REF_INFERRED_SCHEMA_PATTERN)) + .unwrap_or_default() + { + Some(row.collection.clone()) + } else { + None + } + }) + .collect() +} + pub fn walk_reference<'a, T, F>( this_scope: Scope<'a>, this_entity: &str, diff --git a/crates/validation/tests/model.yaml b/crates/validation/tests/model.yaml index 8e2646d420..8b7de121c4 100644 --- a/crates/validation/tests/model.yaml +++ b/crates/validation/tests/model.yaml @@ -67,6 +67,14 @@ test://example/int-string: location: /bit partition: true + testing/int-string-inferred: + writeSchema: test://example/int-string.schema + readSchema: + allOf: + - { $ref: flow://write-schema } + - { $ref: flow://inferred-schema } + key: [/int] + test://example/int-string-captures: import: - test://example/int-string @@ -391,6 +399,16 @@ driver: - resourcePath: [schema, table] networkPorts: *networkPortsFixture + inferredSchemas: + testing/int-string-inferred: + type: object + properties: + # Novel property which is only inferred. + inferred-field: { const: 42 } + # Property is merged with projection from the write-schema. + str: + maxLength: 8675309 + test://example/array-key.schema: # This schema models array additionalItems which provably exist, # due to minItems, but are not ordinarily generated as inferences. diff --git a/crates/validation/tests/scenario_tests.rs b/crates/validation/tests/scenario_tests.rs index 19e8747182..7c8c439c39 100644 --- a/crates/validation/tests/scenario_tests.rs +++ b/crates/validation/tests/scenario_tests.rs @@ -128,11 +128,7 @@ test://example/catalog.yaml: stores: [{provider: S3, bucket: a-bucket}] recovery/: stores: [{provider: S3, bucket: a-bucket}] -driver: - imageInspections: {} - captures: {} - derivations: {} - materializations: {} +driver: {} "##, ) .unwrap(); @@ -255,10 +251,6 @@ test://example/catalog.yaml: stores: [{provider: S3, bucket: a-bucket}] driver: - imageInspections: - s3: - output: '[{"Config": {}}]' - derivations: testing/partly-disabled-derivation: connectorType: SQLITE @@ -507,10 +499,7 @@ test://example/captures: # Illegal duplicates under naming collation. testing/some-source: *spec testing/SoMe-source: *spec -driver: - imageInspections: { "an/image": {output: '[{"Config":{}}]' }} - materializations: {} - captures: {} +driver: {} "#, ); insta::assert_debug_snapshot!(errors); @@ -550,10 +539,7 @@ test://example/materializations: # Illegal duplicates under naming collation. testing/some-target: *spec testing/SoMe-target: *spec -driver: - imageInspections: { "an/image": {output: '[{"Config":{}}]' }} - materializations: {} - captures: {} +driver: {} "#, ); insta::assert_debug_snapshot!(errors); @@ -636,10 +622,7 @@ test://example/catalog.yaml: documents: [] testing/b/4/suffix: *test_spec -driver: - imageInspections: { "an/image": {output: '[{"Config":{}}]' }} - materializations: {} - captures: {} +driver: {} "#, ); insta::assert_debug_snapshot!(errors); @@ -1481,10 +1464,15 @@ driver: } #[derive(serde::Deserialize)] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "camelCase", deny_unknown_fields)] struct MockDriverCalls { + #[serde(default)] captures: BTreeMap, + #[serde(default)] derivations: BTreeMap, + #[serde(default)] + inferred_schemas: BTreeMap, + #[serde(default)] materializations: BTreeMap, } @@ -1757,6 +1745,31 @@ impl validation::Connectors for MockDriverCalls { } } +impl validation::ControlPlane for MockDriverCalls { + fn resolve_collections<'a>( + &'a self, + _collections: Vec, + ) -> BoxFuture<'a, anyhow::Result>> { + async move { Ok(Vec::new()) }.boxed() + } + + fn get_inferred_schemas<'a>( + &'a self, + collections: Vec, + ) -> BoxFuture<'a, anyhow::Result>> { + let out = collections + .iter() + .filter_map(|collection| { + self.inferred_schemas + .get(collection) + .map(|schema| (collection.clone(), schema.clone())) + }) + .collect(); + + async move { Ok(out) }.boxed() + } +} + fn run_test( mut fixture: Value, build_id: &str, @@ -1787,7 +1800,7 @@ fn run_test( build_id, &url::Url::parse("file:///project/root").unwrap(), &mock_calls, - &validation::NoOpControlPlane, + &mock_calls, captures, collections, fetches, diff --git a/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap b/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap index cc1459d1e0..8dea69da19 100644 --- a/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap +++ b/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap @@ -3668,6 +3668,207 @@ All { derivation: None, }, }, + BuiltCollection { + scope: test://example/int-string#/collections/testing~1int-string-inferred, + collection: testing/int-string-inferred, + validated: NULL, + spec: CollectionSpec { + name: "testing/int-string-inferred", + write_schema_json: "{\"$defs\":{\"anAnchor\":{\"$anchor\":\"AnAnchor\",\"properties\":{\"one\":{\"type\":\"string\"},\"two\":{\"type\":\"integer\"}},\"required\":[\"one\"],\"type\":\"object\"}},\"$id\":\"test://example/int-string.schema\",\"properties\":{\"bit\":{\"type\":\"boolean\"},\"int\":{\"type\":\"integer\"},\"str\":{\"type\":\"string\"}},\"required\":[\"int\",\"str\",\"bit\"],\"type\":\"object\"}", + read_schema_json: "{\"$defs\":{\"flow://inferred-schema\":{\"$id\":\"flow://inferred-schema\",\"properties\":{\"inferred-field\":{\"const\":42},\"str\":{\"maxLength\":8675309}},\"type\":\"object\"},\"flow://write-schema\":{\"$defs\":{\"anAnchor\":{\"$anchor\":\"AnAnchor\",\"properties\":{\"one\":{\"type\":\"string\"},\"two\":{\"type\":\"integer\"}},\"required\":[\"one\"],\"type\":\"object\"}},\"$id\":\"flow://write-schema\",\"properties\":{\"bit\":{\"type\":\"boolean\"},\"int\":{\"type\":\"integer\"},\"str\":{\"type\":\"string\"}},\"required\":[\"int\",\"str\",\"bit\"],\"type\":\"object\"}},\"$id\":\"test://example/int-string?ptr=/collections/testing~1int-string-inferred/readSchema\",\"allOf\":[{\"$ref\":\"flow://write-schema\"},{\"$ref\":\"flow://inferred-schema\"}]}", + key: [ + "/int", + ], + uuid_ptr: "/_meta/uuid", + partition_fields: [], + projections: [ + Projection { + ptr: "/bit", + field: "bit", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "boolean", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + }, + ), + }, + Projection { + ptr: "", + field: "flow_document", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "object", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + }, + ), + }, + Projection { + ptr: "/_meta/uuid", + field: "flow_published_at", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "date-time", + content_encoding: "uuid", + max_length: 0, + }, + ), + title: "Flow Publication Time", + description: "Flow publication date-time of this document", + default_json: "", + secret: false, + exists: Must, + }, + ), + }, + Projection { + ptr: "/inferred-field", + field: "inferred-field", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "integer", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: May, + }, + ), + }, + Projection { + ptr: "/int", + field: "int", + explicit: false, + is_partition_key: false, + is_primary_key: true, + inference: Some( + Inference { + types: [ + "integer", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + }, + ), + }, + Projection { + ptr: "/str", + field: "str", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "", + content_encoding: "", + max_length: 8675309, + }, + ), + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + }, + ), + }, + ], + ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", + partition_template: Some( + JournalSpec { + name: "testing/int-string-inferred", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + }, + Label { + name: "content-type", + value: "application/x-ndjson", + }, + Label { + name: "estuary.dev/build", + value: "a-build-id", + }, + Label { + name: "estuary.dev/collection", + value: "testing/int-string-inferred", + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 536870912, + compression_codec: Gzip, + stores: [ + "s3://data-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + derivation: None, + }, + }, BuiltCollection { scope: test://example/int-string#/collections/testing~1int-string-rw, collection: testing/int-string-rw, @@ -5672,6 +5873,17 @@ All { } }, }, + Collection { + scope: test://example/int-string#/collections/testing~1int-string-inferred, + collection: testing/int-string-inferred, + spec: { + "writeSchema": {"$defs":{"anAnchor":{"$anchor":"AnAnchor","properties":{"one":{"type":"string"},"two":{"type":"integer"}},"required":["one"],"type":"object"}},"$id":"test://example/int-string.schema","properties":{"bit":{"type":"boolean"},"int":{"type":"integer"},"str":{"type":"string"}},"required":["int","str","bit"],"type":"object"}, + "readSchema": {"$id":"test://example/int-string?ptr=/collections/testing~1int-string-inferred/readSchema","allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]}, + "key": [ + "/int" + ] + }, + }, Collection { scope: test://example/int-string#/collections/testing~1int-string-rw, collection: testing/int-string-rw, @@ -5861,6 +6073,10 @@ All { scope: test://example/int-reverse#/import/0, to_resource: test://example/int-string, }, + Import { + scope: test://example/int-string#/collections/testing~1int-string-inferred/writeSchema, + to_resource: test://example/int-string.schema, + }, Import { scope: test://example/int-string#/collections/testing~1int-string-rw/readSchema, to_resource: test://example/int-string-len.schema, @@ -6056,7 +6272,7 @@ All { resource: test://example/int-string, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"collections":{"testing/int-string":{"key":["/int"],"projections":{"Int":"/int","bit":{"location":"/bit","partition":true}},"schema":"test://example/int-string.schema"},"testing/int-string-rw":{"key":["/int"],"projections":{"Int":"/int","Len":"/len","Str":"/str","bit":{"location":"/bit","partition":true}},"readSchema":"test://example/int-string-len.schema","writeSchema":"test://example/int-string.schema"},"testing/int-string.v2":{"journals":{"fragments":{"compressionCodec":"ZSTANDARD"}},"key":["/int"],"schema":{"$id":"test://inlined/canonical/id","$ref":"test://example/int-string.schema"}}},"import":["test://example/int-halve"]}, + content_dom: {"collections":{"testing/int-string":{"key":["/int"],"projections":{"Int":"/int","bit":{"location":"/bit","partition":true}},"schema":"test://example/int-string.schema"},"testing/int-string-inferred":{"key":["/int"],"readSchema":{"allOf":[{"$ref":"flow://write-schema"},{"$ref":"flow://inferred-schema"}]},"writeSchema":"test://example/int-string.schema"},"testing/int-string-rw":{"key":["/int"],"projections":{"Int":"/int","Len":"/len","Str":"/str","bit":{"location":"/bit","partition":true}},"readSchema":"test://example/int-string-len.schema","writeSchema":"test://example/int-string.schema"},"testing/int-string.v2":{"journals":{"fragments":{"compressionCodec":"ZSTANDARD"}},"key":["/int"],"schema":{"$id":"test://inlined/canonical/id","$ref":"test://example/int-string.schema"}}},"import":["test://example/int-halve"]}, }, Resource { resource: test://example/int-string-captures, diff --git a/crates/validation/tests/snapshots/scenario_tests__keyed_location_wrong_type.snap b/crates/validation/tests/snapshots/scenario_tests__keyed_location_wrong_type.snap index 61248acf43..453d8688ef 100644 --- a/crates/validation/tests/snapshots/scenario_tests__keyed_location_wrong_type.snap +++ b/crates/validation/tests/snapshots/scenario_tests__keyed_location_wrong_type.snap @@ -15,6 +15,14 @@ expression: errors scope: test://example/int-string#/collections/testing~1int-string/key/0, error: location /int accepts "number", "object" in schema test://example/int-string.schema, but locations used as keys may only be null-able integers, strings, or booleans, }, + Error { + scope: test://example/int-string#/collections/testing~1int-string-inferred/key/0, + error: location /int accepts "number", "object" in schema test://example/int-string.schema, but locations used as keys may only be null-able integers, strings, or booleans, + }, + Error { + scope: test://example/int-string#/collections/testing~1int-string-inferred/key/0, + error: location /int accepts "number", "object" in schema test://example/int-string#/collections/testing~1int-string-inferred/readSchema, but locations used as keys may only be null-able integers, strings, or booleans, + }, Error { scope: test://example/int-string#/collections/testing~1int-string-rw/key/0, error: location /int accepts "number", "object" in schema test://example/int-string.schema, but locations used as keys may only be null-able integers, strings, or booleans, diff --git a/crates/validation/tests/snapshots/scenario_tests__storage_mappings_not_found.snap b/crates/validation/tests/snapshots/scenario_tests__storage_mappings_not_found.snap index 6d15dac1b5..57b03815cf 100644 --- a/crates/validation/tests/snapshots/scenario_tests__storage_mappings_not_found.snap +++ b/crates/validation/tests/snapshots/scenario_tests__storage_mappings_not_found.snap @@ -23,6 +23,10 @@ expression: errors scope: test://example/int-string#/collections/testing~1int-string, error: could not map collection testing/int-string into a storage mapping; did you mean TestinG/ defined at test://example/int-string#/storageMappings/TestinG~1?, }, + Error { + scope: test://example/int-string#/collections/testing~1int-string-inferred, + error: could not map collection testing/int-string-inferred into a storage mapping; did you mean TestinG/ defined at test://example/int-string#/storageMappings/TestinG~1?, + }, Error { scope: test://example/int-string#/collections/testing~1int-string-rw, error: could not map collection testing/int-string-rw into a storage mapping; did you mean TestinG/ defined at test://example/int-string#/storageMappings/TestinG~1?, diff --git a/crates/validation/tests/validation_skipped_when_disabled.yaml b/crates/validation/tests/validation_skipped_when_disabled.yaml index 0c7d59db2b..2b5bab991a 100644 --- a/crates/validation/tests/validation_skipped_when_disabled.yaml +++ b/crates/validation/tests/validation_skipped_when_disabled.yaml @@ -43,9 +43,5 @@ test://example/catalog.yaml: recovery/acmeCo/: stores: [{ provider: S3, bucket: data-bucket }] -driver: - # Expect that no actual driver calls are made. - captures: {} - derivations: {} - imageInspections: {} - materializations: {} +# Expect that no actual driver calls are made. +driver: {}