Skip to content

Commit

Permalink
feature: Teach the combiner to keep track of a running inferred schem…
Browse files Browse the repository at this point in the history
…a and log whenever it changes. (#1128)

* feature: Teach the combiner to keep track of a running inferred schema and log whenever it changes.

* fix: Initialize schema as fully constrained instead of fully open

* Emit the schema as real JSON, instead of an escaped string.

* fix: Only initialize `shape` and `scratch-shape` if this is a brand new Combiner, otherwise keep the schema around to reduce spurious logging

* fix: log on error

* refactor: remove reference to `Shape::to_serde()`

* fix: import ordering

* fix: serialization should never fail

* Add `collection_name` and `enable_schema_inference` to the combiner's `Config` message, and plumb through the appropriate values.

* fix: Some clean-ups after rebase

* refactor: Pass a schema in `infer_schema_json` instead of a boolean in `enable_schema_inference`

* fix: Actually enforce inferred schema limits

* fix: remove unneccesary `shape_from_value`

* fix logic error

* exercise widening logic in tests

* Also exercise real `infer_schema_json` parsing codepath

* ops: update tracing snapshot for new debug behavior

---------

Co-authored-by: Johnny Graettinger <[email protected]>
  • Loading branch information
jshearer and jgraettinger authored Aug 10, 2023
1 parent d9af413 commit 886e308
Show file tree
Hide file tree
Showing 13 changed files with 509 additions and 398 deletions.
98 changes: 97 additions & 1 deletion crates/derive/src/combine_api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{new_validator, DocCounter, JsonError, StatsAccumulator};
use crate::{new_validator, DebugJson, DocCounter, JsonError, StatsAccumulator};
use anyhow::Context;
use bytes::Buf;
use doc::shape::{limits::enforce_field_count_limits, schema::to_schema};
use prost::Message;
use proto_flow::flow::combine_api::{self, Code};

Expand Down Expand Up @@ -58,11 +59,18 @@ struct State {
fields_ex: Vec<doc::Extractor>,
// Document key components over which we're grouping while combining.
key_ex: Box<[doc::Extractor]>,
// Inferred shape of drained documents. Some if schema inference
// is confitured to be enabled, otherwise None
shape: Option<doc::Shape>,
// Keeps track of whether the running inferred shape was widened
// since last being logged.
shape_changed: bool,
// Statistics of a current combine operation.
stats: CombineStats,
// JSON-Pointer into which a UUID placeholder should be set,
// or None if a placeholder shouldn't be set.
uuid_placeholder_ptr: Option<doc::Pointer>,
collection_name: String,
}

impl cgo::Service for API {
Expand Down Expand Up @@ -93,12 +101,16 @@ impl cgo::Service for API {
fields,
uuid_placeholder_ptr,
projections,
collection_name,
infer_schema_json,
} = combine_api::Config::decode(data)?;
tracing::debug!(
%schema_json,
?key_ptrs,
?fields,
?uuid_placeholder_ptr,
?collection_name,
schema_inference = infer_schema_json.len()>0,
"configure",
);

Expand All @@ -122,12 +134,36 @@ impl cgo::Service for API {
validator,
)?;

// If `infer_schema_json` is non-empty then enable schema inference
// by setting Shape to Some, and use the schema defined by `infer_schema_json`
// as the default schema to start from. Otherwise, disable schema inference.
// TODO (jshearer): We're special-casing "false" here because at the moment,
// the Go side of things doesn't actually know how to load a useful inferred schema
// and instead always passes the maximally-restrictive "false" value. Once we teach
// it how to fetch useful inferred schemas, we should remove this special case.
let shape = if infer_schema_json.eq("false") {
self.state
.as_ref()
.and_then(|state| state.shape.clone())
.or(Some(doc::Shape::nothing()))
} else if infer_schema_json.len() > 0 {
let validator = new_validator(infer_schema_json.as_str())?;
Some(doc::Shape::infer(
&validator.schemas()[0],
validator.schema_index(),
))
} else {
None
};
self.state = Some(State {
combiner,
shape,
shape_changed: false,
fields_ex,
key_ex,
uuid_placeholder_ptr,
stats: CombineStats::default(),
collection_name,
});
Ok(())
}
Expand Down Expand Up @@ -177,9 +213,26 @@ impl cgo::Service for API {
arena,
out,
&mut state.stats.out,
&mut state.shape,
&mut state.shape_changed,
)?;

if !more {
if let Some(ref shape) = state.shape {
if state.shape_changed {
state.shape_changed = false;

let serialized = serde_json::to_value(&to_schema(shape.clone()))
.expect("shape serialization should never fail");

tracing::info!(
schema = ?DebugJson(serialized),
collection_name = state.collection_name,
"inferred schema updated"
);
}
}

// Send a final message with accumulated stats.
cgo::send_message(Code::DrainedStats as u32, &state.stats.drain(), arena, out);
state.combiner = doc::Combiner::Accumulator(drainer.into_new_accumulator()?);
Expand Down Expand Up @@ -225,11 +278,23 @@ pub fn drain_chunk(
arena: &mut Vec<u8>,
out: &mut Vec<cgo::Out>,
stats: &mut DocCounter,
shape: &mut Option<doc::Shape>,
did_change: &mut bool,
) -> Result<bool, doc::combine::Error> {
// Convert target from a delta to an absolute target length of the arena.
let target_length = target_length + arena.len();

drainer.drain_while(|doc, fully_reduced| {
if let Some(ref mut shape) = shape {
let changed = match &doc {
doc::LazyNode::Node(n) => shape.widen(*n),
doc::LazyNode::Heap(h) => shape.widen(h),
};
if changed {
enforce_field_count_limits(shape, json::Location::Root);
*did_change = true;
}
}
// Send serialized document.
let begin = arena.len();
let w: &mut Vec<u8> = &mut *arena;
Expand Down Expand Up @@ -313,6 +378,8 @@ pub mod test {
..Default::default()
},
],
collection_name: "test".to_string(),
infer_schema_json: "false".to_string(),
},
&mut arena,
&mut out,
Expand Down Expand Up @@ -405,6 +472,33 @@ pub mod test {
fields: vec![],
uuid_placeholder_ptr: String::new(),
projections: vec![],
collection_name: "test".to_string(),
infer_schema_json: "".to_string(),
},
&mut arena,
&mut out,
),
Err(Error::EmptyKey)
));
}

#[test]
fn test_combine_with_real_schema() {
let mut svc = API::create();
let mut arena = Vec::new();
let mut out = Vec::new();

assert!(matches!(
svc.invoke_message(
Code::Configure as u32,
combine_api::Config {
schema_json: build_min_max_sum_schema(),
key_ptrs: vec![],
fields: vec![],
uuid_placeholder_ptr: String::new(),
projections: vec![],
collection_name: "test".to_string(),
infer_schema_json: r#"{"type": "object"}"#.to_string(),
},
&mut arena,
&mut out,
Expand Down Expand Up @@ -675,6 +769,8 @@ pub mod test {
fields: field_ptrs,
uuid_placeholder_ptr: String::new(),
projections,
collection_name: "test".to_string(),
infer_schema_json: "".to_string(),
},
&mut arena,
&mut out,
Expand Down
156 changes: 0 additions & 156 deletions crates/doc/src/schema.rs

This file was deleted.

12 changes: 10 additions & 2 deletions crates/ops/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ impl<'a> tracing::field::Visit for FieldVisitor<'a> {
}

fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.record_raw(field, format!("{value:?}"))
let stringified = format!("{value:?}");
match serde_json::from_str::<serde::de::IgnoredAny>(&stringified) {
Ok(_) => {
self.0
.fields_json_map
.insert(field.name().to_string(), stringified);
}
Err(_) => self.record_raw(field, stringified),
};
}
}

Expand Down Expand Up @@ -349,7 +357,7 @@ mod test {
"level": "debug",
"fields": {
"module": "ops::tracing::test",
"return": "\"ok\""
"return": "ok"
},
"spans": [
{
Expand Down
7 changes: 7 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,13 @@ pub mod combine_api {
/// TODO(johnny): This is a kludge as we seek to remove this API.
#[prost(message, repeated, tag = "5")]
pub projections: ::prost::alloc::vec::Vec<super::Projection>,
/// The name of the collection that's being written to.
#[prost(string, tag = "6")]
pub collection_name: ::prost::alloc::string::String,
/// JSON-encoded string representing the JSON schema to start inference
/// from. If empty, do not emit inferred schemas.
#[prost(string, tag = "7")]
pub infer_schema_json: ::prost::alloc::string::String,
}
/// Stats holds statistics relating to one or more combiner transactions.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
Loading

0 comments on commit 886e308

Please sign in to comment.