Skip to content

Commit

Permalink
Move SchemaBuilder from schema-inference to doc (#1135)
Browse files Browse the repository at this point in the history
In order to support the introduction of a reduction annotation for merging json schemas, the `doc` crate needs to learn how to turn a `Shape` into the correct blob of JSON representing that `Shape` as a JSON schema. This logic used to live in `schema-inference`, but not only do we want to get rid of that crate eventually, it also already depends on `doc` indirectly, so introducing the `doc`->`schema-inference` dependency caused a circular dependency issue.

In addition to moving `SchemaBuilder` into `doc`, I also realized that it was missing quite a few translations between `Shape` and `Schema`. I added the ones I could find, but it's entirely possible that there are more missing.
  • Loading branch information
jshearer authored Aug 4, 2023
1 parent ccf622e commit d9b7826
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/doc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fxhash = { workspace = true }
itertools = { workspace = true }
lz4 = { workspace = true, optional = true }
rkyv = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/doc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub use validation::{
// Doc implementations may be reduced.
pub mod reduce;

pub mod schema;

// Documents may be combined.
#[cfg(feature = "combine")]
pub mod combine;
Expand Down
67 changes: 52 additions & 15 deletions crates/schema-inference/src/schema.rs → crates/doc/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::inference::Shape;
use json::schema::{
keywords,
types::{self, Set},
};
use schemars::{
gen::SchemaGenerator,
schema::{InstanceType, RootSchema, Schema, SchemaObject, SingleOrVec},
};
use serde_json::json;
use std::collections::{BTreeMap, BTreeSet};

use doc::inference::Shape;
use json::schema::types::{self, Set};
use schemars::gen::SchemaGenerator;
use schemars::schema::{InstanceType, RootSchema, Schema, SchemaObject, SingleOrVec};

use crate::shape;

#[derive(Debug, Default)]
pub struct SchemaBuilder {
shape: Shape,
Expand All @@ -24,21 +27,19 @@ impl SchemaBuilder {
..Default::default()
}
}

pub fn merge(self, other: Self) -> Self {
Self {
shape: shape::merge(self.shape, other.shape),
}
}
}

pub fn to_schema(shape: &Shape) -> Schema {
let mut schema_object = SchemaObject {
instance_type: Some(shape_type_to_schema_type(shape.type_)),
format: shape.string.format.map(|f| f.to_string()),
..Default::default()
};

schema_object.metadata().title = shape.title.clone();
schema_object.metadata().description = shape.description.clone();
schema_object.metadata().default = shape.default.clone().map(|(d, _)| d);
schema_object.enum_values = shape.enum_.clone();

if shape.type_.overlaps(types::OBJECT) {
let mut prop_schemas = BTreeMap::new();
let mut required = BTreeSet::new();
Expand All @@ -51,14 +52,50 @@ pub fn to_schema(shape: &Shape) -> Schema {
let object = &mut schema_object.object();
object.properties = prop_schemas;
object.required = required;

if let Some(addl) = &shape.object.additional {
object.additional_properties = Some(Box::new(to_schema(addl)));
}
}

if shape.type_.overlaps(types::ARRAY) {
let mut array_items = Vec::new();
for item in shape.array.tuple.iter() {
array_items.push(to_schema(item));
}
schema_object.array().items = Some(flatten(array_items));
if array_items.len() > 0 {
schema_object.array().items = Some(flatten(array_items));
}

if let Some(addl_items) = &shape.array.additional {
schema_object.array().additional_items = Some(Box::new(to_schema(addl_items)));
}

schema_object.array().max_items = shape.array.max.and_then(|max| u32::try_from(max).ok());
schema_object.array().min_items = shape.array.min.and_then(|max| u32::try_from(max).ok());
}

if shape.type_.overlaps(types::STRING) {
schema_object.format = shape.string.format.map(|f| f.to_string());
schema_object.string().max_length = shape
.string
.max_length
.and_then(|max| u32::try_from(max).ok());

if shape.string.min_length > 0 {
schema_object.string().min_length = shape.string.min_length.try_into().ok();
}
if let Some(encoding) = &shape.string.content_encoding {
schema_object
.extensions
.insert(keywords::CONTENT_ENCODING.to_string(), json!(encoding));
}
if let Some(content_type) = &shape.string.content_type {
schema_object.extensions.insert(
keywords::CONTENT_MEDIA_TYPE.to_string(),
json!(content_type),
);
}
}

Schema::Object(schema_object)
Expand Down
4 changes: 2 additions & 2 deletions crates/flowctl/src/preview/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{dataplane, local_specs};
use anyhow::Context;
use doc::schema::to_schema;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt, TryStreamExt};
use prost::Message;
Expand Down Expand Up @@ -384,8 +385,7 @@ where
}
}

Ok(inferred_shape
.map(|shape| serde_json::to_value(schema_inference::schema::to_schema(&shape)).unwrap()))
Ok(inferred_shape.map(|shape| serde_json::to_value(to_schema(&shape)).unwrap()))
}

fn status_to_anyhow(status: tonic::Status) -> anyhow::Error {
Expand Down
6 changes: 2 additions & 4 deletions crates/flowctl/src/raw/suggest_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ use crate::{
};
use anyhow::anyhow;
use bytelines::AsyncByteLines;
use doc::{inference::Shape, FailedValidation, SchemaIndexBuilder};
use doc::{inference::Shape, schema::SchemaBuilder, FailedValidation, SchemaIndexBuilder};
use futures::{Stream, StreamExt, TryStreamExt};
use json::schema::build::build_schema;
use models::Schema;
use proto_flow::ops::Log;
use schema_inference::{
inference::infer_shape, json_decoder::JsonCodec, schema::SchemaBuilder, shape,
};
use schema_inference::{inference::infer_shape, json_decoder::JsonCodec, shape};
use std::{io::ErrorKind, pin::Pin};
use tokio::io::BufReader;
use tokio_util::{codec::FramedRead, compat::FuturesAsyncReadCompatExt};
Expand Down
2 changes: 1 addition & 1 deletion crates/schema-inference/src/analyze.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use doc::schema::SchemaBuilder;
use schemars::schema::RootSchema;
use serde_json::Value as JsonValue;
use std::io::BufRead;

use crate::inference::infer_shape;
use crate::schema::SchemaBuilder;
use crate::shape;

type StreamResult = serde_json::Result<JsonValue>;
Expand Down
1 change: 0 additions & 1 deletion crates/schema-inference/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod analyze;
pub mod inference;
pub mod json_decoder;
pub mod schema;
pub mod server;
pub mod shape;
2 changes: 1 addition & 1 deletion crates/schema-inference/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::time::Duration;

use crate::inference::infer_shape;
use crate::json_decoder::{JsonCodec, JsonCodecError};
use crate::schema::SchemaBuilder;
use crate::shape;
use bytesize::ByteSize;
use doc::schema::SchemaBuilder;
use serde_json::json;

use anyhow::Context;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
---
source: crates/schema-inference/src/analyze.rs
assertion_line: 123
expression: schema
---
{
Expand Down Expand Up @@ -29,8 +28,7 @@ expression: schema
]
},
"optional": {
"type": "array",
"items": []
"type": "array"
}
}
},
Expand Down

0 comments on commit d9b7826

Please sign in to comment.