Skip to content

Commit

Permalink
shift window back til earliest datapoint is in or in front of window
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoGorelli committed Mar 28, 2024
1 parent 40e5ff5 commit c25d2d4
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 197 deletions.
328 changes: 172 additions & 156 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion crates/polars-time/src/windows/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ impl Bounds {
pub(crate) fn is_future(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::Left | ClosedWindow::None => self.stop <= t,
ClosedWindow::Both | ClosedWindow::Right => t > self.stop,
ClosedWindow::Both | ClosedWindow::Right => self.stop < t,
}
}

#[inline]
pub(crate) fn is_past(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::Left | ClosedWindow::Both => self.start > t,
ClosedWindow::None | ClosedWindow::Right => self.start >= t,
}
}
}
3 changes: 2 additions & 1 deletion crates/polars-time/src/windows/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub fn group_by_windows(
window
.get_overlapping_bounds_iter(
boundary,
closed_window,
tu,
tz.parse::<Tz>().ok().as_ref(),
start_by,
Expand All @@ -198,7 +199,7 @@ pub fn group_by_windows(
_ => {
update_groups_and_bounds(
window
.get_overlapping_bounds_iter(boundary, tu, None, start_by)
.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)
.unwrap(),
start_offset,
time,
Expand Down
16 changes: 12 additions & 4 deletions crates/polars-time/src/windows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ fn test_offset() {
Duration::parse("-2m"),
);

let b = w.get_earliest_bounds_ns(t, None).unwrap();
let b = w
.get_earliest_bounds_ns(t, ClosedWindow::Left, None)
.unwrap();
let start = NaiveDate::from_ymd_opt(2020, 1, 1)
.unwrap()
.and_hms_opt(23, 58, 0)
Expand Down Expand Up @@ -209,7 +211,9 @@ fn test_boundaries() {
);

// earliest bound is first datapoint: 2021-12-16 00:00:00
let b = w.get_earliest_bounds_ns(ts[0], None).unwrap();
let b = w
.get_earliest_bounds_ns(ts[0], ClosedWindow::Both, None)
.unwrap();
assert_eq!(b.start, start.and_utc().timestamp_nanos_opt().unwrap());

// test closed: "both" (includes both ends of the interval)
Expand Down Expand Up @@ -391,7 +395,9 @@ fn test_boundaries_2() {
let w = Window::new(Duration::parse("2h"), Duration::parse("1h"), offset);

// earliest bound is first datapoint: 2021-12-16 00:00:00 + 30m offset: 2021-12-16 00:30:00
let b = w.get_earliest_bounds_ns(ts[0], None).unwrap();
let b = w
.get_earliest_bounds_ns(ts[0], ClosedWindow::Both, None)
.unwrap();

assert_eq!(
b.start,
Expand Down Expand Up @@ -520,7 +526,9 @@ fn test_boundaries_ms() {
);

// earliest bound is first datapoint: 2021-12-16 00:00:00
let b = w.get_earliest_bounds_ms(ts[0], None).unwrap();
let b = w
.get_earliest_bounds_ms(ts[0], ClosedWindow::Both, None)
.unwrap();
assert_eq!(b.start, start.and_utc().timestamp_millis());

// test closed: "both" (includes both ends of the interval)
Expand Down
112 changes: 93 additions & 19 deletions crates/polars-time/src/windows/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ use polars_core::prelude::*;

use crate::prelude::*;

fn ensure_boundary_start_in_window(
mut every: Duration,
t: i64,
offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
period: Duration,
mut start: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
every.negative = !every.negative;
let mut stop = offset_fn(&period, start, tz)?;
while Bounds::new(start, stop).is_past(t, closed_window) {
start = offset_fn(&every, start, tz)?;
stop = offset_fn(&period, start, tz)?;
}
Ok(Bounds::new_checked(start, stop))
}

/// Represents a window in time
#[derive(Copy, Clone)]
pub struct Window {
Expand Down Expand Up @@ -82,24 +100,58 @@ impl Window {
/// returns the bounds for the earliest window bounds
/// that contains the given time t. For underlapping windows that
/// do not contain time t, the window directly after time t will be returned.
pub fn get_earliest_bounds_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<Bounds> {
pub fn get_earliest_bounds_ns(
&self,
t: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
let start = self.truncate_ns(t, tz)?;
let stop = self.period.add_ns(start, tz)?;

Ok(Bounds::new_checked(start, stop))
ensure_boundary_start_in_window(
self.every,
t,
Duration::add_ns,
self.period,
start,
closed_window,
tz,
)
}

pub fn get_earliest_bounds_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<Bounds> {
pub fn get_earliest_bounds_us(
&self,
t: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
let start = self.truncate_us(t, tz)?;
let stop = self.period.add_us(start, tz)?;
Ok(Bounds::new_checked(start, stop))
ensure_boundary_start_in_window(
self.every,
t,
Duration::add_us,
self.period,
start,
closed_window,
tz,
)
}

pub fn get_earliest_bounds_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<Bounds> {
pub fn get_earliest_bounds_ms(
&self,
t: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
let start = self.truncate_ms(t, tz)?;
let stop = self.period.add_ms(start, tz)?;

Ok(Bounds::new_checked(start, stop))
ensure_boundary_start_in_window(
self.every,
t,
Duration::add_ms,
self.period,
start,
closed_window,
tz,
)
}

pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
Expand All @@ -120,11 +172,12 @@ impl Window {
pub fn get_overlapping_bounds_iter<'a>(
&'a self,
boundary: Bounds,
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<&'a Tz>,
start_by: StartBy,
) -> PolarsResult<BoundsIter> {
BoundsIter::new(*self, boundary, tu, tz, start_by)
BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
}
}

Expand All @@ -140,6 +193,7 @@ pub struct BoundsIter<'a> {
impl<'a> BoundsIter<'a> {
fn new(
window: Window,
closed_window: ClosedWindow,
boundary: Bounds,
tu: TimeUnit,
tz: Option<&'a Tz>,
Expand All @@ -157,9 +211,15 @@ impl<'a> BoundsIter<'a> {
boundary
},
StartBy::WindowBound => match tu {
TimeUnit::Nanoseconds => window.get_earliest_bounds_ns(boundary.start, tz)?,
TimeUnit::Microseconds => window.get_earliest_bounds_us(boundary.start, tz)?,
TimeUnit::Milliseconds => window.get_earliest_bounds_ms(boundary.start, tz)?,
TimeUnit::Nanoseconds => {
window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
},
TimeUnit::Microseconds => {
window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
},
TimeUnit::Milliseconds => {
window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
},
},
_ => {
{
Expand Down Expand Up @@ -202,9 +262,16 @@ impl<'a> BoundsIter<'a> {
Some(tz),
)?;
// apply the 'offset'
let start = offset(&window.offset, start, Some(tz))?;
let mut start = offset(&window.offset, start, Some(tz))?;
// make sure the first datapoint is included
// and compute the end of the window defined by the 'period'
let stop = offset(&window.period, start, Some(tz))?;
let mut every = window.every;
every.negative = !every.negative;
let mut stop = offset(&window.period, start, Some(tz))?;
while Bounds::new(start, stop).is_past(boundary.start, closed_window) {
start = offset(&every, start, Some(tz))?;
stop = offset(&window.period, start, Some(tz))?;
}
(start, stop)
},
_ => {
Expand All @@ -221,9 +288,16 @@ impl<'a> BoundsIter<'a> {
)
.unwrap();
// apply the 'offset'
let start = offset(&window.offset, start, None).unwrap();
let mut start = offset(&window.offset, start, None).unwrap();
// make sure the first datapoint is included
// and compute the end of the window defined by the 'period'
let stop = offset(&window.period, start, None).unwrap();
let mut every = window.every;
every.negative = !every.negative;
let mut stop = offset(&window.period, start, None)?;
while Bounds::new(start, stop).is_past(boundary.start, closed_window) {
start = offset(&every, start, None)?;
stop = offset(&window.period, start, None)?;
}
(start, stop)
},
};
Expand Down
18 changes: 11 additions & 7 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -5554,8 +5554,8 @@ def group_by_dynamic(
- [start + 2*every, start + 2*every + period)
- ...
where `start` is determined by `start_by`, `offset`, and `every` (see parameter
descriptions below).
where `start` is determined by `start_by`, `offset`, `every`, and the earliest
datapoint. See the `start_by` argument description for details.
.. warning::
The index column must be sorted in ascending order. If `by` is passed, then
Expand All @@ -5577,7 +5577,7 @@ def group_by_dynamic(
period
length of the window, if None it will equal 'every'
offset
offset of the window, only takes effect if `start_by` is `'window'`.
offset of the window, does not take effect if `start_by` is 'datapoint'.
Defaults to negative `every`.
truncate
truncate the time value to the window lower bound
Expand All @@ -5604,15 +5604,17 @@ def group_by_dynamic(
The strategy to determine the start of the first window by.
* 'window': Start by taking the earliest timestamp, truncating it with
`every`, and then adding `offset`.
Note that weekly windows start on Monday.
`every`, and adding `offset`.
* 'datapoint': Start from the first encountered data point.
* a day of the week (only takes effect if `every` contains `'w'`):
* 'monday': Start the window on the Monday before the first data point.
* 'tuesday': Start the window on the Tuesday before the first data point.
* ...
* 'sunday': Start the window on the Sunday before the first data point.
The resulting window is then shifted back until the earliest datapoint
is in or in front of it.
check_sorted
Check whether `index_column` is sorted (or, if `group_by` is given,
check whether it's sorted within each group).
Expand Down Expand Up @@ -10685,15 +10687,17 @@ def groupby_dynamic(
The strategy to determine the start of the first window by.
* 'window': Start by taking the earliest timestamp, truncating it with
`every`, and then adding `offset`.
Note that weekly windows start on Monday.
`every`, and adding `offset`.
* 'datapoint': Start from the first encountered data point.
* a day of the week (only takes effect if `every` contains `'w'`):
* 'monday': Start the window on the Monday before the first data point.
* 'tuesday': Start the window on the Tuesday before the first data point.
* ...
* 'sunday': Start the window on the Sunday before the first data point.
The resulting window is then shifted back until the earliest datapoint
is in or in front of it.
check_sorted
Check whether `index_column` is sorted (or, if `by` is given,
check whether it's sorted within each group).
Expand Down
20 changes: 12 additions & 8 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3403,8 +3403,8 @@ def group_by_dynamic(
- [start + 2*every, start + 2*every + period)
- ...
where `start` is determined by `start_by`, `offset`, and `every` (see parameter
descriptions below).
where `start` is determined by `start_by`, `offset`, `every`, and the earliest
datapoint. See the `start_by` argument description for details.
.. warning::
The index column must be sorted in ascending order. If `by` is passed, then
Expand All @@ -3426,7 +3426,7 @@ def group_by_dynamic(
period
length of the window, if None it will equal 'every'
offset
offset of the window, only takes effect if `start_by` is `'window'`.
offset of the window, does not take effect if `start_by` is 'datapoint'.
Defaults to negative `every`.
truncate
truncate the time value to the window lower bound
Expand All @@ -3453,15 +3453,17 @@ def group_by_dynamic(
The strategy to determine the start of the first window by.
* 'window': Start by taking the earliest timestamp, truncating it with
`every`, and then adding `offset`.
Note that weekly windows start on Monday.
`every`, and adding `offset`.
* 'datapoint': Start from the first encountered data point.
* a day of the week (only takes effect if `every` contains `'w'`):
* 'monday': Start the window on the Monday before the first data point.
* 'tuesday': Start the window on the Tuesday before the first data point.
* ...
* 'sunday': Start the window on the Sunday before the first data point.
The resulting window is then shifted back until the earliest datapoint
is in or in front of it.
check_sorted
Check whether `index_column` is sorted (or, if `group_by` is given,
check whether it's sorted within each group).
Expand Down Expand Up @@ -6447,7 +6449,7 @@ def groupby_dynamic(
period
length of the window, if None it will equal 'every'
offset
offset of the window, only takes effect if `start_by` is `'window'`.
offset of the window, does not take effect if `start_by` is 'datapoint'.
Defaults to negative `every`.
truncate
truncate the time value to the window lower bound
Expand All @@ -6463,15 +6465,17 @@ def groupby_dynamic(
The strategy to determine the start of the first window by.
* 'window': Start by taking the earliest timestamp, truncating it with
`every`, and then adding `offset`.
Note that weekly windows start on Monday.
`every`, and adding `offset`.
* 'datapoint': Start from the first encountered data point.
* a day of the week (only takes effect if `every` contains `'w'`):
* 'monday': Start the window on the Monday before the first data point.
* 'tuesday': Start the window on the Tuesday before the first data point.
* ...
* 'sunday': Start the window on the Sunday before the first data point.
The resulting window is then shifted back until the earliest datapoint
is in or in front of it.
check_sorted
Check whether `index_column` is sorted (or, if `by` is given,
check whether it's sorted within each group).
Expand Down
Loading

0 comments on commit c25d2d4

Please sign in to comment.