Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Add expression evaluation support for EXTRACT #1262

Open
wants to merge 2 commits into
base: Ic73ef858478e73b6c466695a84ddb0266d881e92
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions dataflow-expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ tokio = { workspace = true, features = ["full"] }
tokio-postgres = { workspace = true, features = ["with-chrono-0_4", "with-eui48-1", "with-uuid-0_8", "with-serde_json-1", "with-bit-vec-0_6"] }
postgres = { workspace = true, features = ["with-chrono-0_4", "with-eui48-1", "with-uuid-0_8", "with-serde_json-1", "with-bit-vec-0_6"] }
mysql_async = { workspace = true }
anyhow = "1.0"
regex = "1.8.4"
bytes = "1.0"
170 changes: 169 additions & 1 deletion dataflow-expression/src/eval/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::ops::{Add, Div, Mul, Sub};
use std::str::FromStr;

use chrono::{
Datelike, LocalResult, Month, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Timelike, Weekday,
DateTime, Datelike, FixedOffset, LocalResult, Month, NaiveDate, NaiveDateTime, NaiveTime,
TimeZone, Timelike, Weekday,
};
use chrono_tz::Tz;
use itertools::Either;
use mysql_time::MySqlTime;
use nom_sql::TimestampField;
use readyset_data::{DfType, DfValue};
use readyset_errors::{invalid_query_err, unsupported, ReadySetError, ReadySetResult};
use readyset_util::math::integer_rnd;
Expand All @@ -21,8 +23,14 @@ use vec1::Vec1;

use crate::{BuiltinFunction, Expr};

const MICROS_IN_SECOND: u32 = 1_000_000;
const MILLIS_IN_SECOND: u32 = 1_000;
const NANOS_IN_MICRO: u32 = 1_000;
const NANOS_IN_MILLI: u32 = 1_000_000;
const SECONDS_IN_HOUR: u64 = 60 * 60;
const SECONDS_IN_MINUTE: u64 = 60;
const SECONDS_IN_DAY: u64 = SECONDS_IN_HOUR * 24;
const MINUTES_IN_HOUR: u64 = 60;

macro_rules! try_cast_or_none {
($df_value:expr, $to_ty:expr, $from_ty:expr) => {{
Expand Down Expand Up @@ -186,6 +194,7 @@ impl TryFrom<&DfValue> for DateTruncPrecision {
}
}

// TODO ethan replace math in here with new extract function
fn date_trunc(precision: DateTruncPrecision, dt: NaiveDateTime) -> ReadySetResult<NaiveDateTime> {
// note: cannot use the `DurationRound::duration_trunc()` fn as it calls
// `NaiveDateTime::timestamp_nanos_opt()`, and that can only represent dates between 1677 AD
Expand Down Expand Up @@ -272,6 +281,138 @@ fn date_trunc(precision: DateTruncPrecision, dt: NaiveDateTime) -> ReadySetResul
}
}

fn extract_from_timestamptz(
field: TimestampField,
dt: DateTime<FixedOffset>,
) -> ReadySetResult<DfValue> {
Ok(match field {
TimestampField::Century => {
let year = dt.year();

if year > 0 {
((year + 99) / 100).into()
} else {
(-((99 - (year - 1)) / 100)).into()
}
}
// TODO are these types all right?
TimestampField::Day => dt.day().into(),
TimestampField::Decade => {
let year = dt.year();

let decade = if year >= 0 {
year / 10
} else {
-((8 - (year - 1)) / 10)
};

decade.into()
}
TimestampField::Dow => (dt.weekday().number_from_monday() % 7).into(),
TimestampField::Doy => dt.ordinal().into(),
// TODO do we need to handle precision? out of scope?
TimestampField::Epoch => {
let num: Decimal = dt.timestamp_micros().into();
let denom: Decimal = MICROS_IN_SECOND.into();
(num / denom).round_dp(6).into()
}
TimestampField::Hour => dt.hour().into(),
TimestampField::Isodow => dt.weekday().number_from_monday().into(),
TimestampField::Isoyear => {
let mut isoyear = dt.iso_week().year();

if isoyear <= 0 {
isoyear -= 1;
}

isoyear.into()
}
TimestampField::Julian => {
// https://github.com/postgres/postgres/blob/c6cf6d353c2865d82356ac86358622a101fde8ca/src/interfaces/ecpg/pgtypeslib/dt_common.c#L581-L582
let mut year = dt.year();
let mut month = dt.month();
let day = dt.day();

if month > 2 {
month += 1;
year += 4800;
} else {
month += 13;
year += 4799;
}
let century = year / 100;
let mut julian_date = year * 365 - 32167;
julian_date += year / 4 - century + century / 4;
julian_date += 7834 * month as i32 / 256 + day as i32;
let julian_date: Decimal = julian_date.into();

let fsec = dt.nanosecond() as u64 / 1000;
let numerator: Decimal = (((((dt.hour() as u64 * MINUTES_IN_HOUR)
+ dt.minute() as u64)
* SECONDS_IN_MINUTE)
+ dt.second() as u64)
* 1000000
+ fsec)
.into();
let denom: Decimal = (SECONDS_IN_DAY * 1000000).into();
// TODO ethan I think we might need to implement postgres's division algorithm to get
// the exact same results
let fraction = numerator / denom;

(julian_date + fraction).into()
}
// TODO test for rounding errors
TimestampField::Microseconds => {
let naive = dt.naive_local();
let seconds = naive.second() * MICROS_IN_SECOND;
let frac = naive.nanosecond().saturating_div(NANOS_IN_MICRO);
(seconds + frac).into()
}
TimestampField::Millennium => {
let year = dt.year();

if year > 0 {
((year + 999) / 1000).into()
} else {
(-((999 - (year - 1)) / 1000)).into()
}
}
TimestampField::Milliseconds => {
let seconds = dt.second() * MILLIS_IN_SECOND;
let frac = dt.nanosecond().saturating_div(NANOS_IN_MILLI);
(seconds + frac).into()
}
TimestampField::Minute => dt.minute().into(),
// TODO for INTERVALs, the month is mod 12 (starting from 0) -- do we support INTERVALs?
TimestampField::Month => dt.month().into(),
TimestampField::Quarter => (dt.month0() / 3 + 1).into(),
// TODO ethan how to handle time zones? postgres seems to change answer based on time zone
// set on pg server...we should do the same?
// TODO ethan how would we even handle this? what if time zone is changed after data is
// already in cache? would we convert the timezone when the results are being returned?
// would need to...
TimestampField::Second => dbg!(dt.naive_utc().second().into()),
TimestampField::Timezone => dt.offset().local_minus_utc().into(),
TimestampField::TimezoneHour => {
(dt.offset().local_minus_utc() / SECONDS_IN_HOUR as i32).into()
}
TimestampField::TimezoneMinute => {
((dt.offset().local_minus_utc() % SECONDS_IN_HOUR as i32) / SECONDS_IN_MINUTE as i32)
.into()
}
TimestampField::Week => dt.iso_week().week().into(),
TimestampField::Year => {
let mut year = dt.year();

if year < 0 {
year -= 1;
}

year.into()
}
})
}

/// Format the given time value according to the given `format_string`, using the [MySQL date
/// formatting rules][mysql-docs]. Since these rules don't match up well with anything available in
/// the Rust crate ecosystem, this is done manually.
Expand Down Expand Up @@ -1033,6 +1174,33 @@ impl BuiltinFunction {
date_trunc(precision, datetime.naive_utc()).unwrap(),
))
}
BuiltinFunction::Extract(field, expr) => {
println!("here1");
let ts = non_null!(expr.eval(record)?);
println!("evalled");
// let coerced = ts.coerce_to(
// &DfType::TimestampTz {
// subsecond_digits: 6,
// },
// &DfType::Unknown,
// )?;

match ts {
// TODO should this ever be ::Date or ::Time?
DfValue::TimestampTz(t) => {
if t.has_date_only() && !field.is_date_field() {
Err(invalid_query_err!("Cannot extract field {field} from date"))
} else {
// TODO ethan why is it executing twice?
extract_from_timestamptz(*field, t.to_chrono())
.and_then(|value| value.coerce_to(ty, &DfType::Unknown))
}
}
_ => Err(invalid_query_err!(
"Cannot invoke EXTRACT() on value of type {ty}"
)),
}
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions dataflow-expression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::fmt::{self, Display, Formatter};

pub use eval::builtins::DateTruncPrecision;
use itertools::Itertools;
use nom_sql::TimestampField;
pub use readyset_data::Dialect;
use readyset_data::{DfType, DfValue};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -112,6 +113,9 @@ pub enum BuiltinFunction {

/// [`date_trunc`](https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC)
DateTrunc(Expr, Expr),

/// [`date_trunc`](https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC)
Extract(TimestampField, Expr),
}

impl BuiltinFunction {
Expand Down Expand Up @@ -146,6 +150,7 @@ impl BuiltinFunction {
Least { .. } => "least",
ArrayToString { .. } => "array_to_string",
DateTrunc { .. } => "date_trunc",
Extract { .. } => "extract",
}
}
}
Expand Down Expand Up @@ -244,6 +249,9 @@ impl Display for BuiltinFunction {
DateTrunc(field, source) => {
write!(f, "({}, {})", field, source)
}
Extract(field, expr) => {
write!(f, "({} FROM {})", field, expr)
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions dataflow-expression/src/lower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,13 @@ impl Expr {

Ok(Self::Call { func, ty })
}
AstExpr::Call(FunctionExpr::Extract { field, expr }) => {
let expr = Self::lower(*expr, dialect, context.clone())?;
let ty = DfType::Numeric { prec: 20, scale: 6 };
let func = Box::new(BuiltinFunction::Extract(field, expr));

Ok(Self::Call { func, ty })
}
AstExpr::Call(call) => internal!(
"Unexpected (aggregate?) call node in project expression: {:?}",
Sensitive(&call)
Expand Down
9 changes: 2 additions & 7 deletions dataflow-expression/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@ pub fn parse_lower_eval(
expr: &str,
parser_dialect: nom_sql::Dialect,
expr_dialect: dataflow_expression::Dialect,
) -> DfValue {
) -> Result<DfValue, anyhow::Error> {
let ast = parse_expr(parser_dialect, expr).unwrap();
let lowered = Expr::lower(ast, expr_dialect, TestLowerContext).unwrap();
match lowered.eval::<DfValue>(&[]) {
Ok(res) => res,
Err(e) => {
panic!("Error evaluating `{expr}`: {e}")
}
}
Ok(lowered.eval::<DfValue>(&[])?)
}
3 changes: 2 additions & 1 deletion dataflow-expression/tests/mysql_oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ async fn compare_eval(expr: &str, conn: &mut Conn) {
expr,
nom_sql::Dialect::MySQL,
dataflow_expression::Dialect::DEFAULT_MYSQL,
);
)
.unwrap_or_else(|e| panic!("Error evaluating `{expr}`: {e}"));
assert_eq!(
our_result, mysql_result,
"mismatched results for {expr} (left: us, right: mysql)"
Expand Down
18 changes: 18 additions & 0 deletions dataflow-expression/tests/postgres_oracle.proptest-regressions
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 2babad91d07b8955fc644887a196de3f10a2d5906841ecf2266022cab7ada9df # shrinks to field = Century, datetime = 1970-01-01T00:00:00
cc 74a8620219bfa0ce332d4888bd451e13ebae34411b31cb3c698100aa36b7ed36 # shrinks to field = Second, datetime = 1970-01-01T00:00:01
cc 7cb714461ce8352e2586db8c9d70dc6473741533cff95ab38f93945034ee9082 # shrinks to field = Isoyear, datetime = 1970-01-01T00:00:00
cc 0e4282717b1b027255dedda6a563a9e476c859217c68eb85c662c7fd9f222e0a # shrinks to field = Decade, datetime = 1970-01-01T00:00:00
cc 03b2ab6ce5e6c9a8625203de6bff831de34734dbccce73558e8cc34113459319 # shrinks to field = Epoch, datetime = 1970-01-01T00:00:01
cc c11828b40b19ecaf688bf5e0da47f3615913ada65a2c103f9d06f06a6c6415d2 # shrinks to field = Dow, datetime = 2035-09-09T00:00:00
cc d893989c83aaae04070d3d4f3a5268baaee176330458a0a73cb738a64940cc08 # shrinks to field = Julian, datetime = 2038-01-01T00:00:01
cc f0e758dc44d25cc02f50841e54d5a9ce2808d9d7c3d2bf3d499eba04b893b9f2 # shrinks to field = Julian, datetime = 1970-01-01T00:00:09
cc ad2e5c4942b266e05f9337bef7b482985b0dd83d986c2be7265a3cb45385931f # shrinks to field = Century, date = -0001-01-01
cc ae8a16e5a382edc488ecb5d21c31ac5f9c816e47d9b15f1e86bfe1951a218655 # shrinks to field = Isoyear, date = -0001-01-03
cc 0d521b15cabc8ee12b89ec2532e9b59923158340de74ca00a7580fb78f7c68ba # shrinks to field = Decade, date = -0001-01-02
cc b9588ba5419366556017de48721258ee0f8568893550e25503c14d637b1744fb # shrinks to field = Century, date = -4290-01-01
Loading