Skip to content

Commit

Permalink
fix: Some clean-ups after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Aug 10, 2023
1 parent b9a93fe commit 0f50e31
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 187 deletions.
60 changes: 29 additions & 31 deletions crates/derive/src/combine_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{new_validator, DebugJson, DocCounter, JsonError, StatsAccumulator};
use anyhow::Context;
use bytes::Buf;
use doc::{inference::Shape, schema::SchemaBuilder};
use json::Location;
use prost::Message;
use proto_flow::flow::combine_api::{self, Code};
Expand Down Expand Up @@ -56,13 +55,12 @@ impl StatsAccumulator for CombineStats {
struct State {
// Combiner which is doing the heavy lifting.
combiner: doc::Combiner,
// Inferred shape of drained documents
shape: doc::inference::Shape,
// We need to keep track of the potentially-modified shape
// while we drain documents in order to figure out whether
// it has changed or not. We only want to emit shape updates
// if there is a change.
scratch_shape: doc::inference::Shape,
// Inferred shape of drained documents. Some if schema inference
// is confitured to be enabled, otherwise None
shape: Option<doc::Shape>,
// Keeps track of whether the running inferred shape was widened
// since last being logged.
shape_changed: bool,
// Fields which are extracted and returned from combined documents.
fields_ex: Vec<doc::Extractor>,
// Document key components over which we're grouping while combining.
Expand Down Expand Up @@ -139,14 +137,17 @@ impl cgo::Service for API {

self.state = Some(State {
combiner,
shape: self
.state
.as_ref()
.map_or(Shape::invalid(), |state| state.shape.clone()),
scratch_shape: self
.state
.as_ref()
.map_or(Shape::invalid(), |state| state.scratch_shape.clone()),
// Always Some if enable_schema_inference, else always None
shape: enable_schema_inference.then(|| {
// We want schema inference
self.state
.as_ref()
// Re-use the existing shape if it exists
.and_then(|state| state.shape.clone())
// Otherwise start fresh
.unwrap_or(doc::Shape::nothing())
}),
shape_changed: false,
fields_ex,
key_ex,
uuid_placeholder_ptr,
Expand Down Expand Up @@ -202,20 +203,16 @@ impl cgo::Service for API {
arena,
out,
&mut state.stats.out,
&mut state
.enable_schema_inference
.then(|| &mut state.scratch_shape),
&mut state.shape,
&mut state.shape_changed,
)?;

if state.enable_schema_inference && !more {
if state.shape.ne(&state.scratch_shape) {
// Update the "true" shape with the newly widened shape
state.shape = state.scratch_shape.clone();
if let Some(ref shape) = state.shape {
if !more && state.shape_changed {
state.shape_changed = false;

let serialized = serde_json::to_value(
&SchemaBuilder::new(state.scratch_shape.clone()).root_schema(),
)
.expect("shape serialization should never fail");
let serialized = serde_json::to_value(&doc::to_schema(shape.clone()))
.expect("shape serialization should never fail");

tracing::info!(
schema = ?DebugJson(serialized),
Expand Down Expand Up @@ -269,16 +266,17 @@ pub fn drain_chunk(
arena: &mut Vec<u8>,
out: &mut Vec<cgo::Out>,
stats: &mut DocCounter,
shape: &mut Option<&mut doc::inference::Shape>,
shape: &mut Option<doc::Shape>,
did_change: &mut bool,
) -> Result<bool, doc::combine::Error> {
// Convert target from a delta to an absolute target length of the arena.
let target_length = target_length + arena.len();

drainer.drain_while(|doc, fully_reduced| {
if let Some(shape) = shape.as_mut() {
if let Some(ref mut shape) = shape {
match &doc {
doc::LazyNode::Node(n) => shape.widen(*n, Location::Root),
doc::LazyNode::Heap(h) => shape.widen(h, Location::Root),
doc::LazyNode::Node(n) => *did_change |= shape.widen(*n),
doc::LazyNode::Heap(h) => *did_change |= shape.widen(h),
};
}
// Send serialized document.
Expand Down
1 change: 1 addition & 0 deletions crates/doc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub mod tuple_pack;
// It's similar to (and built from) a JSON Schema, but includes only
// those inferences which can be statically proven for all documents.
pub mod shape;
pub use shape::schema::to_schema;
pub use shape::Shape;

// Fancy diff support for documents.
Expand Down
156 changes: 0 additions & 156 deletions crates/doc/src/schema.rs

This file was deleted.

0 comments on commit 0f50e31

Please sign in to comment.