Skip to content

Commit

Permalink
add implicit flow_published_at projection which extracts UUID clock
Browse files Browse the repository at this point in the history
Create a `flow_published_at` projection for the /_meta/uuid, which is
over-rideable through a user-provided projection of
/_meta/uuid/date-time

Use inference of format: "date-time", contentEncoding: "uuid" to
communicate that the timestamp contained within the UUID is to be
extracted and formatted as an RFC3339 date-time.

Update doc::Extractor to handle this case.

Issue #934
  • Loading branch information
jgraettinger committed Jun 29, 2023
1 parent 20fd150 commit 94fc0d4
Show file tree
Hide file tree
Showing 21 changed files with 895 additions and 355 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 20 additions & 13 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use doc::inference::{Exists, Shape};
use json::schema::types;
use json::schema::{formats, types};
use proto_flow::flow;
use proto_gazette::{broker, consumer};
use serde_json::Value;
Expand All @@ -16,13 +16,6 @@ pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
.map(|(v, _)| v.to_string())
.unwrap_or_default();

let is_base64 = shape
.string
.content_encoding
.as_ref()
.map(|v| v.to_ascii_lowercase() == "base64")
.unwrap_or_default();

let exists = match exists {
Exists::Must => flow::inference::Exists::Must,
Exists::May => flow::inference::Exists::May,
Expand All @@ -46,7 +39,6 @@ pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
.map(|f| f.to_string())
.unwrap_or_default(),
content_encoding: shape.string.content_encoding.clone().unwrap_or_default(),
is_base64,
max_length: shape.string.max_length.unwrap_or_default() as u32,
})
} else {
Expand All @@ -55,6 +47,23 @@ pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
}
}

// inference_uuid_timestamp is a special-case flow::Inference
// for the timestamp embedded within the Flow document UUID.
pub fn inference_uuid_v1_date_time() -> flow::Inference {
flow::Inference {
types: vec!["string".to_string()],
string: Some(flow::inference::String {
format: formats::Format::DateTime.to_string(),
content_encoding: "uuid".to_string(),
..Default::default()
}),
title: "Flow Publication Time".to_string(),
description: "Flow publication date-time of this document".to_string(),
exists: flow::inference::Exists::Must as i32,
..Default::default()
}
}

// partition_template returns a template JournalSpec for creating
// or updating data partitions of the collection.
pub fn partition_template(
Expand Down Expand Up @@ -389,6 +398,7 @@ pub fn collection_spec(
collection: &tables::Collection,
projections: Vec<flow::Projection>,
stores: &[models::Store],
uuid_ptr: &str,
) -> flow::CollectionSpec {
let tables::Collection {
scope: _,
Expand Down Expand Up @@ -422,9 +432,6 @@ pub fn collection_spec(
})
.collect();

// For the forseeable future, we don't allow customizing this.
let uuid_ptr = "/_meta/uuid".to_string();

let (write_schema_json, read_schema_json) = match (schema, write_schema, read_schema) {
(Some(schema), None, None) => (schema.to_string(), String::new()),
(None, Some(write_schema), Some(read_schema)) => {
Expand All @@ -440,7 +447,7 @@ pub fn collection_spec(
key: key.iter().map(|p| p.to_string()).collect(),
projections,
partition_fields,
uuid_ptr,
uuid_ptr: uuid_ptr.to_string(),
ack_template_json: serde_json::json!({
"_meta": {"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef",
"ack": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ expression: "&[out1, out2]"
content_type: "a/type",
format: "date-time",
content_encoding: "BaSE64",
is_base64: true,
max_length: 123,
},
),
Expand Down
2 changes: 2 additions & 0 deletions crates/doc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ serde = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
allocator = { path = "../allocator" }
Expand Down
66 changes: 62 additions & 4 deletions crates/doc/src/extractor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{compare::compare, ArchivedNode, AsNode, LazyNode, Pointer};
use crate::{compare::compare, ArchivedNode, AsNode, LazyNode, Node, Pointer};
use bytes::BufMut;
use std::borrow::Cow;
use tuple::TuplePack;
Expand All @@ -9,6 +9,7 @@ use tuple::TuplePack;
pub struct Extractor {
ptr: Pointer,
default: serde_json::Value,
is_uuid_v1_date_time: bool,
}

impl Extractor {
Expand All @@ -18,6 +19,7 @@ impl Extractor {
Self {
ptr: Pointer::from(ptr),
default: serde_json::Value::Null,
is_uuid_v1_date_time: false,
}
}

Expand All @@ -27,6 +29,16 @@ impl Extractor {
Self {
ptr: Pointer::from(ptr),
default,
is_uuid_v1_date_time: false,
}
}

/// Build an extractor for the JSON pointer, which is a v1 UUID.
pub fn for_uuid_v1_date_time(ptr: &str) -> Self {
Self {
ptr: Pointer::from(ptr),
default: serde_json::Value::Null,
is_uuid_v1_date_time: true,
}
}

Expand All @@ -43,10 +55,33 @@ impl Extractor {
&'s self,
doc: &'n N,
) -> Result<&'n N, Cow<'s, serde_json::Value>> {
match self.ptr.query(doc) {
Some(n) => Ok(n),
None => Err(Cow::Borrowed(&self.default)),
let Some(node) = self.ptr.query(doc) else {
return Err(Cow::Borrowed(&self.default));
};

if self.is_uuid_v1_date_time {
if let Some(date_time) = match node.as_node() {
Node::String(s) => Some(s),
_ => None,
}
.and_then(|s| uuid::Uuid::parse_str(s).ok())
.and_then(|u| u.get_timestamp())
.and_then(|t| {
let (seconds, nanos) = t.to_unix();
time::OffsetDateTime::from_unix_timestamp_nanos(
seconds as i128 * 1_000_000_000 + nanos as i128,
)
.ok()
}) {
return Err(Cow::Owned(serde_json::Value::String(
date_time
.format(&time::format_description::well_known::Rfc3339)
.expect("rfc3339 format always succeeds"),
)));
}
}

Ok(node)
}

/// Extract a packed tuple representation from an instance of doc::AsNode.
Expand Down Expand Up @@ -131,6 +166,11 @@ mod test {
"doub": 1.3,
"unsi": 2,
"sign": -30,
"uuid-ts": [
"85bad119-15f2-11ee-8401-43f05f562888",
"1878923d-162a-11ee-8401-43f05f562888",
"6d304974-1631-11ee-8401-whoops"
]
});

let extractors = vec![
Expand All @@ -144,6 +184,11 @@ mod test {
Extractor::new("/sign"),
Extractor::new("/obj"),
Extractor::new("/arr"),
Extractor::for_uuid_v1_date_time("/uuid-ts/0"),
Extractor::for_uuid_v1_date_time("/uuid-ts/1"),
Extractor::for_uuid_v1_date_time("/uuid-ts/2"),
Extractor::for_uuid_v1_date_time("/missing"),
Extractor::for_uuid_v1_date_time("/fals"),
];

let mut buffer = bytes::BytesMut::new();
Expand Down Expand Up @@ -178,6 +223,19 @@ mod test {
Bytes(
b"\x5b\x22foo\x22\x5d",
),
String(
"2023-06-28T20:29:46.4945945Z",
),
String(
"2023-06-29T03:07:35.0056509Z",
),
String(
"6d304974-1631-11ee-8401-whoops",
),
Nil,
Bool(
false,
),
]
"###);
}
Expand Down
37 changes: 36 additions & 1 deletion crates/extractors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ pub fn for_projection(projection: &flow::Projection) -> Result<doc::Extractor> {
return Err(Error::InferenceNotFound);
};

// Special-case for date-time extracted from the clock component of a UUID.
// Compare to assemble::inference_uuid_v1_date_time().
if matches!(inf,
flow::Inference {
string:
Some(flow::inference::String {
format,
content_encoding,
..
}),
..
} if format == "date-time" && content_encoding == "uuid")
{
return Ok(doc::Extractor::for_uuid_v1_date_time(&projection.ptr));
}

let default = if inf.default_json != "" {
serde_json::from_str(&inf.default_json).map_err(Error::ParseDefault)?
} else {
Expand All @@ -79,6 +95,7 @@ mod test {
{"field": "user_key", "ptr": "/the/key", "explicit": true, "inference": {"default": "user_key"}},
{"field": "foo", "ptr": "/foo", "inference": {"default": 32}},
{"field": "user_bar", "ptr": "/bar/baz", "explicit": true, "inference": {}},
{"field": "flow_published_at", "ptr": "/_meta/uuid", "inference": {"string": {"format": "date-time", "contentEncoding": "uuid"}}},
]))
.unwrap();
projections.sort_by(|l, r| l.field.cmp(&r.field));
Expand All @@ -97,6 +114,7 @@ mod test {
],
),
default: String("user_key"),
is_uuid_v1_date_time: false,
},
Extractor {
ptr: Pointer(
Expand All @@ -110,11 +128,12 @@ mod test {
],
),
default: Null,
is_uuid_v1_date_time: false,
},
]
"###);

insta::assert_debug_snapshot!(for_fields(&["user_bar", "foo"], &projections).unwrap(), @r###"
insta::assert_debug_snapshot!(for_fields(&["user_bar", "foo", "flow_published_at"], &projections).unwrap(), @r###"
[
Extractor {
ptr: Pointer(
Expand All @@ -128,6 +147,7 @@ mod test {
],
),
default: Null,
is_uuid_v1_date_time: false,
},
Extractor {
ptr: Pointer(
Expand All @@ -138,6 +158,21 @@ mod test {
],
),
default: Number(32),
is_uuid_v1_date_time: false,
},
Extractor {
ptr: Pointer(
[
Property(
"_meta",
),
Property(
"uuid",
),
],
),
default: Null,
is_uuid_v1_date_time: true,
},
]
"###);
Expand Down
3 changes: 0 additions & 3 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ pub mod inference {
/// Annotated Content-Encoding when the projection is of "string" type.
#[prost(string, tag = "7")]
pub content_encoding: ::prost::alloc::string::String,
/// Is the Content-Encoding "base64" (case-invariant)?
#[prost(bool, tag = "5")]
pub is_base64: bool,
/// Maximum length when the projection is of "string" type. Zero for no
/// limit.
#[prost(uint32, tag = "6")]
Expand Down
18 changes: 0 additions & 18 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3408,9 +3408,6 @@ impl serde::Serialize for inference::String {
if !self.content_encoding.is_empty() {
len += 1;
}
if self.is_base64 {
len += 1;
}
if self.max_length != 0 {
len += 1;
}
Expand All @@ -3424,9 +3421,6 @@ impl serde::Serialize for inference::String {
if !self.content_encoding.is_empty() {
struct_ser.serialize_field("contentEncoding", &self.content_encoding)?;
}
if self.is_base64 {
struct_ser.serialize_field("isBase64", &self.is_base64)?;
}
if self.max_length != 0 {
struct_ser.serialize_field("maxLength", &self.max_length)?;
}
Expand All @@ -3445,8 +3439,6 @@ impl<'de> serde::Deserialize<'de> for inference::String {
"format",
"content_encoding",
"contentEncoding",
"is_base64",
"isBase64",
"max_length",
"maxLength",
];
Expand All @@ -3456,7 +3448,6 @@ impl<'de> serde::Deserialize<'de> for inference::String {
ContentType,
Format,
ContentEncoding,
IsBase64,
MaxLength,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -3482,7 +3473,6 @@ impl<'de> serde::Deserialize<'de> for inference::String {
"contentType" | "content_type" => Ok(GeneratedField::ContentType),
"format" => Ok(GeneratedField::Format),
"contentEncoding" | "content_encoding" => Ok(GeneratedField::ContentEncoding),
"isBase64" | "is_base64" => Ok(GeneratedField::IsBase64),
"maxLength" | "max_length" => Ok(GeneratedField::MaxLength),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
Expand All @@ -3506,7 +3496,6 @@ impl<'de> serde::Deserialize<'de> for inference::String {
let mut content_type__ = None;
let mut format__ = None;
let mut content_encoding__ = None;
let mut is_base64__ = None;
let mut max_length__ = None;
while let Some(k) = map.next_key()? {
match k {
Expand All @@ -3528,12 +3517,6 @@ impl<'de> serde::Deserialize<'de> for inference::String {
}
content_encoding__ = Some(map.next_value()?);
}
GeneratedField::IsBase64 => {
if is_base64__.is_some() {
return Err(serde::de::Error::duplicate_field("isBase64"));
}
is_base64__ = Some(map.next_value()?);
}
GeneratedField::MaxLength => {
if max_length__.is_some() {
return Err(serde::de::Error::duplicate_field("maxLength"));
Expand All @@ -3548,7 +3531,6 @@ impl<'de> serde::Deserialize<'de> for inference::String {
content_type: content_type__.unwrap_or_default(),
format: format__.unwrap_or_default(),
content_encoding: content_encoding__.unwrap_or_default(),
is_base64: is_base64__.unwrap_or_default(),
max_length: max_length__.unwrap_or_default(),
})
}
Expand Down
Loading

0 comments on commit 94fc0d4

Please sign in to comment.