From d9b7826d23b1dbf53c052b741fa19be5850f56c2 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 4 Aug 2023 16:27:05 -0400 Subject: [PATCH] Move `SchemaBuilder` from `schema-inference` to `doc` (#1135) In order to support the introduction of a reduction annotation for merging json schemas, the `doc` crate needs to learn how to turn a `Shape` into the correct blob of JSON representing that `Shape` as a JSON schema. This logic used to live in `schema-inference`, but not only do we want to get rid of that crate eventually, it also already depends on `doc` indirectly, so introducing the `doc`->`schema-inference` dependency caused a circular dependency issue. In addition to moving `SchemaBuilder` into `doc`, I also realized that it was missing quite a few translations between `Shape` and `Schema`. I added the ones I could find, but it's entirely possible that there are more missing. --- Cargo.lock | 1 + crates/doc/Cargo.toml | 1 + crates/doc/src/lib.rs | 2 + .../{schema-inference => doc}/src/schema.rs | 67 ++++++++++++++----- crates/flowctl/src/preview/mod.rs | 4 +- crates/flowctl/src/raw/suggest_schema.rs | 6 +- crates/schema-inference/src/analyze.rs | 2 +- crates/schema-inference/src/lib.rs | 1 - crates/schema-inference/src/server.rs | 2 +- ..._analyze__test__simple_schema_merging.snap | 4 +- 10 files changed, 63 insertions(+), 27 deletions(-) rename crates/{schema-inference => doc}/src/schema.rs (61%) diff --git a/Cargo.lock b/Cargo.lock index 9b53b7288e..3af3d9b134 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1278,6 +1278,7 @@ dependencies = [ "rand 0.8.5", "rand_distr", "rkyv", + "schemars", "serde", "serde-transcode", "serde_json", diff --git a/crates/doc/Cargo.toml b/crates/doc/Cargo.toml index 843c10269a..f173fec293 100644 --- a/crates/doc/Cargo.toml +++ b/crates/doc/Cargo.toml @@ -21,6 +21,7 @@ fxhash = { workspace = true } itertools = { workspace = true } lz4 = { workspace = true, optional = true } rkyv = { workspace = true } +schemars = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } diff --git a/crates/doc/src/lib.rs b/crates/doc/src/lib.rs index 05cfd32c5a..c955418e94 100644 --- a/crates/doc/src/lib.rs +++ b/crates/doc/src/lib.rs @@ -99,6 +99,8 @@ pub use validation::{ // Doc implementations may be reduced. pub mod reduce; +pub mod schema; + // Documents may be combined. #[cfg(feature = "combine")] pub mod combine; diff --git a/crates/schema-inference/src/schema.rs b/crates/doc/src/schema.rs similarity index 61% rename from crates/schema-inference/src/schema.rs rename to crates/doc/src/schema.rs index 14141e75be..2d33afb848 100644 --- a/crates/schema-inference/src/schema.rs +++ b/crates/doc/src/schema.rs @@ -1,12 +1,15 @@ +use crate::inference::Shape; +use json::schema::{ + keywords, + types::{self, Set}, +}; +use schemars::{ + gen::SchemaGenerator, + schema::{InstanceType, RootSchema, Schema, SchemaObject, SingleOrVec}, +}; +use serde_json::json; use std::collections::{BTreeMap, BTreeSet}; -use doc::inference::Shape; -use json::schema::types::{self, Set}; -use schemars::gen::SchemaGenerator; -use schemars::schema::{InstanceType, RootSchema, Schema, SchemaObject, SingleOrVec}; - -use crate::shape; - #[derive(Debug, Default)] pub struct SchemaBuilder { shape: Shape, @@ -24,21 +27,19 @@ impl SchemaBuilder { ..Default::default() } } - - pub fn merge(self, other: Self) -> Self { - Self { - shape: shape::merge(self.shape, other.shape), - } - } } pub fn to_schema(shape: &Shape) -> Schema { let mut schema_object = SchemaObject { instance_type: Some(shape_type_to_schema_type(shape.type_)), - format: shape.string.format.map(|f| f.to_string()), ..Default::default() }; + schema_object.metadata().title = shape.title.clone(); + schema_object.metadata().description = shape.description.clone(); + schema_object.metadata().default = shape.default.clone().map(|(d, _)| d); + schema_object.enum_values = shape.enum_.clone(); + if shape.type_.overlaps(types::OBJECT) { let mut prop_schemas = BTreeMap::new(); let mut required = BTreeSet::new(); @@ -51,6 +52,10 @@ pub fn to_schema(shape: &Shape) -> Schema { let object = &mut schema_object.object(); object.properties = prop_schemas; object.required = required; + + if let Some(addl) = &shape.object.additional { + object.additional_properties = Some(Box::new(to_schema(addl))); + } } if shape.type_.overlaps(types::ARRAY) { @@ -58,7 +63,39 @@ pub fn to_schema(shape: &Shape) -> Schema { for item in shape.array.tuple.iter() { array_items.push(to_schema(item)); } - schema_object.array().items = Some(flatten(array_items)); + if array_items.len() > 0 { + schema_object.array().items = Some(flatten(array_items)); + } + + if let Some(addl_items) = &shape.array.additional { + schema_object.array().additional_items = Some(Box::new(to_schema(addl_items))); + } + + schema_object.array().max_items = shape.array.max.and_then(|max| u32::try_from(max).ok()); + schema_object.array().min_items = shape.array.min.and_then(|max| u32::try_from(max).ok()); + } + + if shape.type_.overlaps(types::STRING) { + schema_object.format = shape.string.format.map(|f| f.to_string()); + schema_object.string().max_length = shape + .string + .max_length + .and_then(|max| u32::try_from(max).ok()); + + if shape.string.min_length > 0 { + schema_object.string().min_length = shape.string.min_length.try_into().ok(); + } + if let Some(encoding) = &shape.string.content_encoding { + schema_object + .extensions + .insert(keywords::CONTENT_ENCODING.to_string(), json!(encoding)); + } + if let Some(content_type) = &shape.string.content_type { + schema_object.extensions.insert( + keywords::CONTENT_MEDIA_TYPE.to_string(), + json!(content_type), + ); + } } Schema::Object(schema_object) diff --git a/crates/flowctl/src/preview/mod.rs b/crates/flowctl/src/preview/mod.rs index b4b16e9fe1..3967b4e49e 100644 --- a/crates/flowctl/src/preview/mod.rs +++ b/crates/flowctl/src/preview/mod.rs @@ -1,5 +1,6 @@ use crate::{dataplane, local_specs}; use anyhow::Context; +use doc::schema::to_schema; use futures::channel::mpsc; use futures::{SinkExt, StreamExt, TryStreamExt}; use prost::Message; @@ -384,8 +385,7 @@ where } } - Ok(inferred_shape - .map(|shape| serde_json::to_value(schema_inference::schema::to_schema(&shape)).unwrap())) + Ok(inferred_shape.map(|shape| serde_json::to_value(to_schema(&shape)).unwrap())) } fn status_to_anyhow(status: tonic::Status) -> anyhow::Error { diff --git a/crates/flowctl/src/raw/suggest_schema.rs b/crates/flowctl/src/raw/suggest_schema.rs index a11418ad37..9ecf1b547c 100644 --- a/crates/flowctl/src/raw/suggest_schema.rs +++ b/crates/flowctl/src/raw/suggest_schema.rs @@ -5,14 +5,12 @@ use crate::{ }; use anyhow::anyhow; use bytelines::AsyncByteLines; -use doc::{inference::Shape, FailedValidation, SchemaIndexBuilder}; +use doc::{inference::Shape, schema::SchemaBuilder, FailedValidation, SchemaIndexBuilder}; use futures::{Stream, StreamExt, TryStreamExt}; use json::schema::build::build_schema; use models::Schema; use proto_flow::ops::Log; -use schema_inference::{ - inference::infer_shape, json_decoder::JsonCodec, schema::SchemaBuilder, shape, -}; +use schema_inference::{inference::infer_shape, json_decoder::JsonCodec, shape}; use std::{io::ErrorKind, pin::Pin}; use tokio::io::BufReader; use tokio_util::{codec::FramedRead, compat::FuturesAsyncReadCompatExt}; diff --git a/crates/schema-inference/src/analyze.rs b/crates/schema-inference/src/analyze.rs index dbd0d469ea..e3586ce1fb 100644 --- a/crates/schema-inference/src/analyze.rs +++ b/crates/schema-inference/src/analyze.rs @@ -1,9 +1,9 @@ +use doc::schema::SchemaBuilder; use schemars::schema::RootSchema; use serde_json::Value as JsonValue; use std::io::BufRead; use crate::inference::infer_shape; -use crate::schema::SchemaBuilder; use crate::shape; type StreamResult = serde_json::Result; diff --git a/crates/schema-inference/src/lib.rs b/crates/schema-inference/src/lib.rs index 1b49023831..57ba9b332c 100644 --- a/crates/schema-inference/src/lib.rs +++ b/crates/schema-inference/src/lib.rs @@ -1,6 +1,5 @@ pub mod analyze; pub mod inference; pub mod json_decoder; -pub mod schema; pub mod server; pub mod shape; diff --git a/crates/schema-inference/src/server.rs b/crates/schema-inference/src/server.rs index a5d2893100..903bf0751a 100644 --- a/crates/schema-inference/src/server.rs +++ b/crates/schema-inference/src/server.rs @@ -3,9 +3,9 @@ use std::time::Duration; use crate::inference::infer_shape; use crate::json_decoder::{JsonCodec, JsonCodecError}; -use crate::schema::SchemaBuilder; use crate::shape; use bytesize::ByteSize; +use doc::schema::SchemaBuilder; use serde_json::json; use anyhow::Context; diff --git a/crates/schema-inference/src/snapshots/schema_inference__analyze__test__simple_schema_merging.snap b/crates/schema-inference/src/snapshots/schema_inference__analyze__test__simple_schema_merging.snap index a4df380ab3..f935c2bf4f 100644 --- a/crates/schema-inference/src/snapshots/schema_inference__analyze__test__simple_schema_merging.snap +++ b/crates/schema-inference/src/snapshots/schema_inference__analyze__test__simple_schema_merging.snap @@ -1,6 +1,5 @@ --- source: crates/schema-inference/src/analyze.rs -assertion_line: 123 expression: schema --- { @@ -29,8 +28,7 @@ expression: schema ] }, "optional": { - "type": "array", - "items": [] + "type": "array" } } },