Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Teach the combiner to keep track of a running inferred schema and log whenever it changes. #1128

Merged
merged 17 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion crates/derive/src/combine_api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{new_validator, DocCounter, JsonError, StatsAccumulator};
use crate::{new_validator, DebugJson, DocCounter, JsonError, StatsAccumulator};
use anyhow::Context;
use bytes::Buf;
use doc::shape::{limits::enforce_field_count_limits, schema::to_schema};
use prost::Message;
use proto_flow::flow::combine_api::{self, Code};

Expand Down Expand Up @@ -58,11 +59,18 @@ struct State {
fields_ex: Vec<doc::Extractor>,
// Document key components over which we're grouping while combining.
key_ex: Box<[doc::Extractor]>,
// Inferred shape of drained documents. Some if schema inference
// is confitured to be enabled, otherwise None
jshearer marked this conversation as resolved.
Show resolved Hide resolved
shape: Option<doc::Shape>,
// Keeps track of whether the running inferred shape was widened
// since last being logged.
shape_changed: bool,
// Statistics of a current combine operation.
stats: CombineStats,
// JSON-Pointer into which a UUID placeholder should be set,
// or None if a placeholder shouldn't be set.
uuid_placeholder_ptr: Option<doc::Pointer>,
collection_name: String,
}

impl cgo::Service for API {
Expand Down Expand Up @@ -93,12 +101,16 @@ impl cgo::Service for API {
fields,
uuid_placeholder_ptr,
projections,
collection_name,
infer_schema_json,
} = combine_api::Config::decode(data)?;
tracing::debug!(
%schema_json,
?key_ptrs,
?fields,
?uuid_placeholder_ptr,
?collection_name,
schema_inference = infer_schema_json.len()>0,
"configure",
);

Expand All @@ -122,12 +134,36 @@ impl cgo::Service for API {
validator,
)?;

// If `infer_schema_json` is non-empty then enable schema inference
// by setting Shape to Some, and use the schema defined by `infer_schema_json`
// as the default schema to start from. Otherwise, disable schema inference.
// TODO (jshearer): We're special-casing "false" here because at the moment,
// the Go side of things doesn't actually know how to load a useful inferred schema
// and instead always passes the maximally-restrictive "false" value. Once we teach
// it how to fetch useful inferred schemas, we should remove this special case.
let shape = if infer_schema_json.eq("false") {
self.state
.as_ref()
.and_then(|state| state.shape.clone())
.or(Some(doc::Shape::nothing()))
} else if infer_schema_json.len() > 0 {
let validator = new_validator(infer_schema_json.as_str())?;
Some(doc::Shape::infer(
&validator.schemas()[0],
validator.schema_index(),
))
} else {
None
};
self.state = Some(State {
combiner,
shape,
shape_changed: false,
fields_ex,
key_ex,
uuid_placeholder_ptr,
stats: CombineStats::default(),
collection_name,
});
Ok(())
}
Expand Down Expand Up @@ -177,9 +213,26 @@ impl cgo::Service for API {
arena,
out,
&mut state.stats.out,
&mut state.shape,
&mut state.shape_changed,
)?;

if !more {
if let Some(ref shape) = state.shape {
if state.shape_changed {
state.shape_changed = false;

let serialized = serde_json::to_value(&to_schema(shape.clone()))
.expect("shape serialization should never fail");

tracing::info!(
schema = ?DebugJson(serialized),
collection_name = state.collection_name,
"inferred schema updated"
);
}
}

// Send a final message with accumulated stats.
cgo::send_message(Code::DrainedStats as u32, &state.stats.drain(), arena, out);
state.combiner = doc::Combiner::Accumulator(drainer.into_new_accumulator()?);
Expand Down Expand Up @@ -225,11 +278,23 @@ pub fn drain_chunk(
arena: &mut Vec<u8>,
out: &mut Vec<cgo::Out>,
stats: &mut DocCounter,
shape: &mut Option<doc::Shape>,
did_change: &mut bool,
) -> Result<bool, doc::combine::Error> {
// Convert target from a delta to an absolute target length of the arena.
let target_length = target_length + arena.len();

drainer.drain_while(|doc, fully_reduced| {
if let Some(ref mut shape) = shape {
let changed = match &doc {
doc::LazyNode::Node(n) => shape.widen(*n),
doc::LazyNode::Heap(h) => shape.widen(h),
};
if changed {
enforce_field_count_limits(shape, json::Location::Root);
*did_change = true;
}
}
// Send serialized document.
let begin = arena.len();
let w: &mut Vec<u8> = &mut *arena;
Expand Down Expand Up @@ -313,6 +378,8 @@ pub mod test {
..Default::default()
},
],
collection_name: "test".to_string(),
infer_schema_json: "".to_string(),
},
jshearer marked this conversation as resolved.
Show resolved Hide resolved
&mut arena,
&mut out,
Expand Down Expand Up @@ -405,6 +472,8 @@ pub mod test {
fields: vec![],
uuid_placeholder_ptr: String::new(),
projections: vec![],
collection_name: "test".to_string(),
infer_schema_json: "".to_string(),
},
&mut arena,
&mut out,
Expand Down Expand Up @@ -675,6 +744,8 @@ pub mod test {
fields: field_ptrs,
uuid_placeholder_ptr: String::new(),
projections,
collection_name: "test".to_string(),
infer_schema_json: "".to_string(),
},
&mut arena,
&mut out,
Expand Down
156 changes: 0 additions & 156 deletions crates/doc/src/schema.rs

This file was deleted.

10 changes: 9 additions & 1 deletion crates/ops/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ impl<'a> tracing::field::Visit for FieldVisitor<'a> {
}

fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.record_raw(field, format!("{value:?}"))
let stringified = format!("{value:?}");
match serde_json::from_str::<serde::de::IgnoredAny>(&stringified) {
Ok(_) => {
self.0
.fields_json_map
.insert(field.name().to_string(), stringified);
}
Err(_) => self.record_raw(field, stringified),
};
}
}

Expand Down
7 changes: 7 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,13 @@ pub mod combine_api {
/// TODO(johnny): This is a kludge as we seek to remove this API.
#[prost(message, repeated, tag = "5")]
pub projections: ::prost::alloc::vec::Vec<super::Projection>,
/// The name of the collection that's being written to.
#[prost(string, tag = "6")]
pub collection_name: ::prost::alloc::string::String,
/// JSON-encoded string representing the JSON schema to start inference
/// from. If empty, do not emit inferred schemas.
#[prost(string, tag = "7")]
pub infer_schema_json: ::prost::alloc::string::String,
}
/// Stats holds statistics relating to one or more combiner transactions.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
Loading
Loading