Skip to content

Commit

Permalink
use doc::Extractor everywhere for extract and combine operations
Browse files Browse the repository at this point in the history
Update all uses of key extraction and doc::Combiner to use the
doc::Extractor pattern. Generally this means threading through
projections so that a doc::Extractor can be built from either key or
field lookups of those projections.

Add an `extractors` crate which encapsulates building extractors from an
array of key pointers or field names.
  • Loading branch information
jgraettinger committed Jun 29, 2023
1 parent 06dda50 commit 20fd150
Show file tree
Hide file tree
Showing 42 changed files with 902 additions and 616 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/derive-sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license.workspace = true

[dependencies]
doc = { path = "../doc" }
extractors = { path = "../extractors" }
json = { path = "../json" }
proto-flow = { path = "../proto-flow" }

Expand Down
6 changes: 5 additions & 1 deletion crates/derive-sqlite/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ fn parse_open(
} = transform;

let source = source.unwrap();
let params: Vec<_> = source.projections.iter().map(Param::new).collect();
let params = source
.projections
.iter()
.map(Param::new)
.collect::<Result<Vec<_>, _>>()?;

let block: String = serde_json::from_str(&lambda_config_json).with_context(|| {
format!("failed to parse SQLite lambda block: {lambda_config_json}")
Expand Down
36 changes: 24 additions & 12 deletions crates/derive-sqlite/src/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,35 @@ impl<'db> Lambda<'db> {
}

fn bind_parameter<N: doc::AsNode>(
stmt: &mut rusqlite::Statement<'_>,
index: usize,
param: &Param,
document: &N,
) -> rusqlite::Result<()> {
match param.extractor.query(document) {
Ok(node) => bind_parameter_node(stmt, index, param, node),
Err(node) => bind_parameter_node(stmt, index, param, node.as_ref()),
}
}

fn bind_parameter_node<N: doc::AsNode>(
stmt: &mut rusqlite::Statement<'_>,
index: usize,
Param {
ptr,
is_content_encoding_base64,
is_format_integer,
is_format_number,
..
}: &Param,
document: &N,
node: &N,
) -> rusqlite::Result<()> {
use doc::Node;

match ptr.query(document).map(doc::AsNode::as_node) {
None | Some(Node::Null) => return stmt.raw_bind_parameter(index + 1, None::<bool>),
Some(Node::Bool(b)) => return stmt.raw_bind_parameter(index + 1, b),
match node.as_node() {
Node::Null => return stmt.raw_bind_parameter(index + 1, None::<bool>),
Node::Bool(b) => return stmt.raw_bind_parameter(index + 1, b),

Some(Node::String(s)) => {
Node::String(s) => {
if *is_format_integer {
if let Ok(i) = s.parse::<i64>() {
return stmt.raw_bind_parameter(index + 1, i);
Expand All @@ -169,14 +180,14 @@ fn bind_parameter<N: doc::AsNode>(
}
stmt.raw_bind_parameter(index + 1, s)
}
Some(Node::Bytes(b)) => stmt.raw_bind_parameter(index + 1, b),
Some(Node::Number(json::Number::Float(f))) => stmt.raw_bind_parameter(index + 1, f),
Some(Node::Number(json::Number::Signed(s))) => stmt.raw_bind_parameter(index + 1, s),
Some(Node::Number(json::Number::Unsigned(u))) => stmt.raw_bind_parameter(index + 1, u),
Some(n @ Node::Array(_)) => {
Node::Bytes(b) => stmt.raw_bind_parameter(index + 1, b),
Node::Number(json::Number::Float(f)) => stmt.raw_bind_parameter(index + 1, f),
Node::Number(json::Number::Signed(s)) => stmt.raw_bind_parameter(index + 1, s),
Node::Number(json::Number::Unsigned(u)) => stmt.raw_bind_parameter(index + 1, u),
n @ Node::Array(_) => {
stmt.raw_bind_parameter(index + 1, &serde_json::to_string(&n).unwrap())
}
Some(n @ Node::Object(_)) => {
n @ Node::Object(_) => {
stmt.raw_bind_parameter(index + 1, &serde_json::to_string(&n).unwrap())
}
}
Expand Down Expand Up @@ -392,6 +403,7 @@ mod test {
{"case": "obj", "in": "{\"four\": 4}"},
{"case": "invalid-array", "in": "[1 2 \"three\"]"},
{"case": "invalid-obj", "in": "{four 4}"},
{"case": "missing"},
]);

let mut output = fixtures
Expand Down
4 changes: 4 additions & 0 deletions crates/derive-sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub enum Error {
#[source]
err: rusqlite::Error,
},
#[error(transparent)]
Extractor(#[from] extractors::Error),

// rusqlite does a pretty good job of showing context in its errors.
#[error(transparent)]
Expand Down Expand Up @@ -84,8 +86,10 @@ fn test_param(
content_encoding: if is_base64 { "base64" } else { "" }.to_string(),
..Default::default()
}),
default_json: "\"the default\"".to_string(),
..Default::default()
}),
..Default::default()
})
.unwrap()
}
12 changes: 6 additions & 6 deletions crates/derive-sqlite/src/param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub struct Param {
pub projection: flow::Projection,
// Canonical SQLite parameter encoding for this field.
pub canonical_encoding: String,
// JSON pointer location of this field within documents.
pub ptr: doc::Pointer,
// Extractor of this projection within documents.
pub extractor: doc::Extractor,
// Location uses string format: "integer" ?
pub is_format_integer: bool,
// Location uses string format: "number" ?
Expand All @@ -20,15 +20,15 @@ pub struct Param {
}

impl Param {
pub fn new(p: &flow::Projection) -> Self {
Self {
pub fn new(p: &flow::Projection) -> Result<Self, Error> {
Ok(Self {
projection: p.clone(),
canonical_encoding: canonical_param_encoding(&p.field),
ptr: doc::Pointer::from_str(&p.ptr),
extractor: extractors::for_projection(&p)?,
is_format_integer: matches!(&p.inference, Some(flow::Inference{string: Some(str), ..}) if str.format == "integer"),
is_format_number: matches!(&p.inference, Some(flow::Inference{string: Some(str), ..}) if str.format == "number"),
is_content_encoding_base64: matches!(&p.inference, Some(flow::Inference{string: Some(str), ..}) if str.content_encoding == "base64"),
}
})
}

pub fn resolve<'p>(encoding: &str, params: &'p [Self]) -> Result<&'p Self, Error> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ expression: snap(&lambda)
"projection": {
"field": "amount",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/amount"
Expand All @@ -19,6 +20,7 @@ expression: snap(&lambda)
"projection": {
"field": "sender",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/sender"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ expression: snap(&lambda)
"projection": {
"field": "recipient",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/recipient"
Expand All @@ -19,6 +20,7 @@ expression: snap(&lambda)
"projection": {
"field": "amount",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/amount"
Expand All @@ -29,6 +31,7 @@ expression: snap(&lambda)
"projection": {
"field": "id",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ expression: snap(&lambda)
"projection": {
"field": "sender",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/sender"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ expression: snap(&lambda)
"projection": {
"field": "id",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/id"
Expand All @@ -19,6 +20,7 @@ expression: snap(&lambda)
"projection": {
"field": "sender",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/sender"
Expand All @@ -29,6 +31,7 @@ expression: snap(&lambda)
"projection": {
"field": "recipient",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/recipient"
Expand All @@ -39,6 +42,7 @@ expression: snap(&lambda)
"projection": {
"field": "amount",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/amount"
Expand All @@ -49,6 +53,7 @@ expression: snap(&lambda)
"projection": {
"field": "nested/prop",
"inference": {
"default": "the default",
"string": {}
},
"ptr": "/nested/prop"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ expression: output
"output": "{four 4}"
}
],
[
{
"case": "missing",
"output": "the default"
}
],
[
{
"$str_int * 10": 120,
Expand Down
6 changes: 5 additions & 1 deletion crates/derive-sqlite/src/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ pub fn parse_validate(
} = transform;

let source = source.unwrap();
let params: Vec<_> = source.projections.iter().map(Param::new).collect();
let params = source
.projections
.iter()
.map(Param::new)
.collect::<Result<Vec<_>, _>>()?;

let block: String = serde_json::from_str(&lambda_config_json).with_context(|| {
format!("failed to parse SQLite lambda block: {lambda_config_json}")
Expand Down
1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
allocator = { path = "../allocator" }
cgo = { path = "../cgo" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
json = { path = "../json" }
models = { path = "../models" }
proto-flow = { path = "../proto-flow" }
Expand Down
Loading

0 comments on commit 20fd150

Please sign in to comment.