Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/UUID timestamp extraction #1070

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ yaml-merge-keys = { version = "0.5", features = ["serde_yaml"] }
zip = "0.5"
zstd = "0.11.2"
derivative = "2.2.0"
const_format = "0.2.30"

# Used exclusively as dev-dependencies
assert_cmd = "2.0"
Expand Down
8 changes: 4 additions & 4 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use std::time::Duration;
mod ops;
pub use ops::generate_ops_collections;

// For the forseeable future, we don't allow customizing this.
pub const UUID_PTR: &str = "/_meta/uuid";

pub fn inference(shape: &Shape, exists: Exists) -> flow::Inference {
let default_json = shape
.default
Expand Down Expand Up @@ -422,9 +425,6 @@ pub fn collection_spec(
})
.collect();

// For the forseeable future, we don't allow customizing this.
let uuid_ptr = "/_meta/uuid".to_string();

let (write_schema_json, read_schema_json) = match (schema, write_schema, read_schema) {
(Some(schema), None, None) => (schema.to_string(), String::new()),
(None, Some(write_schema), Some(read_schema)) => {
Expand All @@ -440,7 +440,7 @@ pub fn collection_spec(
key: key.iter().map(|p| p.to_string()).collect(),
projections,
partition_fields,
uuid_ptr,
uuid_ptr: UUID_PTR.to_string(),
ack_template_json: serde_json::json!({
"_meta": {"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef",
"ack": true,
Expand Down
1 change: 1 addition & 0 deletions crates/derive-sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license.workspace = true
[dependencies]
doc = { path = "../doc" }
json = { path = "../json" }
derive = { path = "../derive" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not thrilled with adding new dependency on derive. That crate is slatted to be taken out back to the woodshed...

proto-flow = { path = "../proto-flow" }

anyhow = { workspace = true }
Expand Down
9 changes: 8 additions & 1 deletion crates/derive-sqlite/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::iter;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: avoid extra space here. Have a single block of use statements, so that they're automatically sorted by the formatter.

use super::{dbutil, do_validate, parse_validate, Config, Lambda, Param, Transform};
use anyhow::Context;
use futures::channel::mpsc;
Expand Down Expand Up @@ -213,7 +215,12 @@ fn parse_open(
} = transform;

let source = source.unwrap();
let params: Vec<_> = source.projections.iter().map(Param::new).collect();
let params: Vec<_> = source
.projections
.iter()
.zip(iter::repeat(&source))
.map(|(p, c)| Param::new(p, c))
.collect();

let block: String = serde_json::from_str(&lambda_config_json).with_context(|| {
format!("failed to parse SQLite lambda block: {lambda_config_json}")
Expand Down
7 changes: 5 additions & 2 deletions crates/derive-sqlite/src/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,16 @@ fn bind_parameter<N: doc::AsNode>(
is_content_encoding_base64,
is_format_integer,
is_format_number,
collection,
..
}: &Param,
document: &N,
) -> rusqlite::Result<()> {
use derive::PointerExt;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love the introduced dependence on derive ... that crate is slated to be taken out behind the woodshed.

use doc::Node;
let uuid_ptr_parsed = doc::Pointer::maybe_parse(collection.uuid_ptr.as_str());

match ptr.query(document).map(doc::AsNode::as_node) {
ptr.query_and_resolve_virtuals(uuid_ptr_parsed, document, |doc| match doc {
None | Some(Node::Null) => return stmt.raw_bind_parameter(index + 1, None::<bool>),
Some(Node::Bool(b)) => return stmt.raw_bind_parameter(index + 1, b),

Expand Down Expand Up @@ -179,7 +182,7 @@ fn bind_parameter<N: doc::AsNode>(
Some(n @ Node::Object(_)) => {
stmt.raw_bind_parameter(index + 1, &serde_json::to_string(&n).unwrap())
}
}
})
}

fn row_to_json(
Expand Down
37 changes: 20 additions & 17 deletions crates/derive-sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,27 @@ fn test_param(
) -> Param {
use proto_flow::flow;

Param::new(&flow::Projection {
field: field.to_string(),
ptr: ptr.to_string(),
inference: Some(flow::Inference {
string: Some(flow::inference::String {
format: if is_format_integer {
"integer"
} else if is_format_number {
"number"
} else {
""
}
.to_string(),
content_encoding: if is_base64 { "base64" } else { "" }.to_string(),
Param::new(
&flow::Projection {
field: field.to_string(),
ptr: ptr.to_string(),
inference: Some(flow::Inference {
string: Some(flow::inference::String {
format: if is_format_integer {
"integer"
} else if is_format_number {
"number"
} else {
""
}
.to_string(),
content_encoding: if is_base64 { "base64" } else { "" }.to_string(),
..Default::default()
}),
..Default::default()
}),
..Default::default()
}),
..Default::default()
})
},
&flow::CollectionSpec::default(),
)
}
5 changes: 4 additions & 1 deletion crates/derive-sqlite/src/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use proto_flow::flow;
pub struct Param {
// Projection which is being related as a SQLite parameter.
pub projection: flow::Projection,
// Collection which contains the projection that is being related as a SQLite parameter.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the reason for Param is to provide separation from the details of CollectionSpec representation and the way we actually use parameters for SQLite.

I'm thinking we need to introduce a is_uuid_timestamp boolean on flow::Projection.
Then, there would be projections like field: _meta/uuid/timestamp (or something custom) ptr: /_meta/uuid is_uuid_timestamp: true

That allows resolution of all this stuff from only the flow::Projection instance, and we don't need to put logic for UUID_PTR+'/timestamp' anywhere except the validation logic that makes a flow::Projection.

For this crate specifically, derive::PointerExt is backed out and instead, within the existing :

 Some(Node::String(s)) => {
           if *is_format_integer {

block, you also test for is_uuid_timestamp and extract it accordingly.

pub collection: flow::CollectionSpec,
// Canonical SQLite parameter encoding for this field.
pub canonical_encoding: String,
// JSON pointer location of this field within documents.
Expand All @@ -20,9 +22,10 @@ pub struct Param {
}

impl Param {
pub fn new(p: &flow::Projection) -> Self {
pub fn new(p: &flow::Projection, c: &flow::CollectionSpec) -> Self {
Self {
projection: p.clone(),
collection: c.clone(),
canonical_encoding: canonical_param_encoding(&p.field),
ptr: doc::Pointer::from_str(&p.ptr),
is_format_integer: matches!(&p.inference, Some(flow::Inference{string: Some(str), ..}) if str.format == "integer"),
Expand Down
9 changes: 8 additions & 1 deletion crates/derive-sqlite/src/validate.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::iter;

use super::{dbutil, is_url_to_generate, Config, Param, Transform};
use anyhow::Context;
use proto_flow::{
Expand Down Expand Up @@ -33,7 +35,12 @@ pub fn parse_validate(
} = transform;

let source = source.unwrap();
let params: Vec<_> = source.projections.iter().map(Param::new).collect();
let params: Vec<_> = source
.projections
.iter()
.zip(iter::repeat(&source))
.map(|(p, c)| Param::new(p, c))
.collect();

let block: String = serde_json::from_str(&lambda_config_json).with_context(|| {
format!("failed to parse SQLite lambda block: {lambda_config_json}")
Expand Down
1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ thiserror = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
time = { workspace = true }

[dev-dependencies]
insta = { workspace = true }
Expand Down
Loading