Skip to content

Commit

Permalink
Add collection_name and enable_schema_inference to the combiner's…
Browse files Browse the repository at this point in the history
… `Config` message, and plumb through the appropriate values.
  • Loading branch information
jshearer committed Aug 10, 2023
1 parent ba9e899 commit b9a93fe
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 252 deletions.
35 changes: 27 additions & 8 deletions crates/derive/src/combine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct State {
// JSON-Pointer into which a UUID placeholder should be set,
// or None if a placeholder shouldn't be set.
uuid_placeholder_ptr: Option<doc::Pointer>,
collection_name: String,
enable_schema_inference: bool,
}

impl cgo::Service for API {
Expand Down Expand Up @@ -102,12 +104,16 @@ impl cgo::Service for API {
fields,
uuid_placeholder_ptr,
projections,
collection_name,
enable_schema_inference,
} = combine_api::Config::decode(data)?;
tracing::debug!(
%schema_json,
?key_ptrs,
?fields,
?uuid_placeholder_ptr,
?collection_name,
?enable_schema_inference,
"configure",
);

Expand Down Expand Up @@ -145,6 +151,8 @@ impl cgo::Service for API {
key_ex,
uuid_placeholder_ptr,
stats: CombineStats::default(),
collection_name,
enable_schema_inference,
});
Ok(())
}
Expand Down Expand Up @@ -194,10 +202,12 @@ impl cgo::Service for API {
arena,
out,
&mut state.stats.out,
&mut state.scratch_shape,
&mut state
.enable_schema_inference
.then(|| &mut state.scratch_shape),
)?;

if !more {
if state.enable_schema_inference && !more {
if state.shape.ne(&state.scratch_shape) {
// Update the "true" shape with the newly widened shape
state.shape = state.scratch_shape.clone();
Expand All @@ -208,7 +218,8 @@ impl cgo::Service for API {
.expect("shape serialization should never fail");

tracing::info!(
inferred_schema = ?DebugJson(serialized),
schema = ?DebugJson(serialized),
collection_name = state.collection_name,
"inferred schema updated"
);
}
Expand Down Expand Up @@ -258,16 +269,18 @@ pub fn drain_chunk(
arena: &mut Vec<u8>,
out: &mut Vec<cgo::Out>,
stats: &mut DocCounter,
shape: &mut doc::inference::Shape,
shape: &mut Option<&mut doc::inference::Shape>,
) -> Result<bool, doc::combine::Error> {
// Convert target from a delta to an absolute target length of the arena.
let target_length = target_length + arena.len();

drainer.drain_while(|doc, fully_reduced| {
match &doc {
doc::LazyNode::Node(n) => shape.widen(*n, Location::Root),
doc::LazyNode::Heap(h) => shape.widen(h, Location::Root),
};
if let Some(shape) = shape.as_mut() {
match &doc {
doc::LazyNode::Node(n) => shape.widen(*n, Location::Root),
doc::LazyNode::Heap(h) => shape.widen(h, Location::Root),
};
}
// Send serialized document.
let begin = arena.len();
let w: &mut Vec<u8> = &mut *arena;
Expand Down Expand Up @@ -351,6 +364,8 @@ pub mod test {
..Default::default()
},
],
collection_name: "test".to_string(),
enable_schema_inference: false,
},
&mut arena,
&mut out,
Expand Down Expand Up @@ -443,6 +458,8 @@ pub mod test {
fields: vec![],
uuid_placeholder_ptr: String::new(),
projections: vec![],
collection_name: "test".to_string(),
enable_schema_inference: false,
},
&mut arena,
&mut out,
Expand Down Expand Up @@ -713,6 +730,8 @@ pub mod test {
fields: field_ptrs,
uuid_placeholder_ptr: String::new(),
projections,
collection_name: "test".to_string(),
enable_schema_inference: false,
},
&mut arena,
&mut out,
Expand Down
6 changes: 6 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,12 @@ pub mod combine_api {
/// TODO(johnny): This is a kludge as we seek to remove this API.
#[prost(message, repeated, tag = "5")]
pub projections: ::prost::alloc::vec::Vec<super::Projection>,
/// The name of the collection that's being written to.
#[prost(string, tag = "6")]
pub collection_name: ::prost::alloc::string::String,
/// Whether to emit maximally-constrained schemas as documents are combined.
#[prost(bool, tag = "7")]
pub enable_schema_inference: bool,
}
/// Stats holds statistics relating to one or more combiner transactions.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
36 changes: 36 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,12 @@ impl serde::Serialize for combine_api::Config {
if !self.projections.is_empty() {
len += 1;
}
if !self.collection_name.is_empty() {
len += 1;
}
if self.enable_schema_inference {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.CombineAPI.Config", len)?;
if !self.schema_json.is_empty() {
struct_ser.serialize_field("schemaJson", crate::as_raw_json(&self.schema_json)?)?;
Expand All @@ -2121,6 +2127,12 @@ impl serde::Serialize for combine_api::Config {
if !self.projections.is_empty() {
struct_ser.serialize_field("projections", &self.projections)?;
}
if !self.collection_name.is_empty() {
struct_ser.serialize_field("collectionName", &self.collection_name)?;
}
if self.enable_schema_inference {
struct_ser.serialize_field("enableSchemaInference", &self.enable_schema_inference)?;
}
struct_ser.end()
}
}
Expand All @@ -2139,6 +2151,10 @@ impl<'de> serde::Deserialize<'de> for combine_api::Config {
"uuid_placeholder_ptr",
"uuidPlaceholderPtr",
"projections",
"collection_name",
"collectionName",
"enable_schema_inference",
"enableSchemaInference",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2148,6 +2164,8 @@ impl<'de> serde::Deserialize<'de> for combine_api::Config {
Fields,
UuidPlaceholderPtr,
Projections,
CollectionName,
EnableSchemaInference,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand All @@ -2174,6 +2192,8 @@ impl<'de> serde::Deserialize<'de> for combine_api::Config {
"fields" => Ok(GeneratedField::Fields),
"uuidPlaceholderPtr" | "uuid_placeholder_ptr" => Ok(GeneratedField::UuidPlaceholderPtr),
"projections" => Ok(GeneratedField::Projections),
"collectionName" | "collection_name" => Ok(GeneratedField::CollectionName),
"enableSchemaInference" | "enable_schema_inference" => Ok(GeneratedField::EnableSchemaInference),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -2198,6 +2218,8 @@ impl<'de> serde::Deserialize<'de> for combine_api::Config {
let mut fields__ = None;
let mut uuid_placeholder_ptr__ = None;
let mut projections__ = None;
let mut collection_name__ = None;
let mut enable_schema_inference__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::SchemaJson => {
Expand Down Expand Up @@ -2230,6 +2252,18 @@ impl<'de> serde::Deserialize<'de> for combine_api::Config {
}
projections__ = Some(map.next_value()?);
}
GeneratedField::CollectionName => {
if collection_name__.is_some() {
return Err(serde::de::Error::duplicate_field("collectionName"));
}
collection_name__ = Some(map.next_value()?);
}
GeneratedField::EnableSchemaInference => {
if enable_schema_inference__.is_some() {
return Err(serde::de::Error::duplicate_field("enableSchemaInference"));
}
enable_schema_inference__ = Some(map.next_value()?);
}
}
}
Ok(combine_api::Config {
Expand All @@ -2238,6 +2272,8 @@ impl<'de> serde::Deserialize<'de> for combine_api::Config {
fields: fields__.unwrap_or_default(),
uuid_placeholder_ptr: uuid_placeholder_ptr__.unwrap_or_default(),
projections: projections__.unwrap_or_default(),
collection_name: collection_name__.unwrap_or_default(),
enable_schema_inference: enable_schema_inference__.unwrap_or_default(),
})
}
}
Expand Down
13 changes: 8 additions & 5 deletions go/bindings/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,21 @@ func (c *Combine) Configure(
keyPtrs []string,
fields []string,
projections []pf.Projection,
enableSchemaInference bool,
) error {
combineConfigureCounter.WithLabelValues(fqn, collection.String()).Inc()
c.metrics = newCombineMetrics(fqn, collection)

c.svc.mustSendMessage(
uint32(pf.CombineAPI_CONFIGURE),
&pf.CombineAPI_Config{
SchemaJson: schemaJSON,
KeyPtrs: keyPtrs,
Fields: fields,
UuidPlaceholderPtr: uuidPtr,
Projections: projections,
SchemaJson: schemaJSON,
KeyPtrs: keyPtrs,
Fields: fields,
UuidPlaceholderPtr: uuidPtr,
Projections: projections,
CollectionName: collection.String(),
EnableSchemaInference: enableSchemaInference,
})

return pollExpectNoOutput(c.svc)
Expand Down
2 changes: 2 additions & 0 deletions go/bindings/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestValidationFailuresAreLogged(t *testing.T) {
collection.Key,
nil,
collection.Projections,
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -108,6 +109,7 @@ func TestCombineBindings(t *testing.T) {
collection.Key,
[]string{"part_a", "part_b"},
collection.Projections,
false,
)
require.NoError(t, err)
}
Expand Down
Loading

0 comments on commit b9a93fe

Please sign in to comment.