-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Epix rbind #343
base: dev
Are you sure you want to change the base?
Epix rbind #343
Changes from 6 commits
99e8860
ad9105b
18436b1
68b33e9
489d924
4abc409
fbbafe2
70c76c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,19 +123,11 @@ epix_fill_through_version = function(x, fill_versions_end, | |
#' @param x,y Two `epi_archive` objects to join together. | ||
#' @param sync Optional; `"forbid"`, `"na"`, `"locf"`, or `"truncate"`; in the | ||
#' case that `x$versions_end` doesn't match `y$versions_end`, what do we do?: | ||
#' `"forbid"`: emit an error; "na": use `max(x$versions_end, y$versions_end)` | ||
#' as the result's `versions_end`, but ensure that, if we request a snapshot | ||
#' as of a version after `min(x$versions_end, y$versions_end)`, the | ||
#' observation columns from the less up-to-date archive will be all NAs (i.e., | ||
#' imagine there was an update immediately after its `versions_end` which | ||
#' revised all observations to be `NA`); `"locf"`: use `max(x$versions_end, | ||
#' y$versions_end)` as the result's `versions_end`, allowing the last version | ||
#' of each observation to be carried forward to extrapolate unavailable | ||
#' versions for the less up-to-date input archive (i.e., imagining that in the | ||
#' less up-to-date archive's data set remained unchanged between its actual | ||
#' `versions_end` and the other archive's `versions_end`); or `"truncate"`: | ||
#' use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, | ||
#' and discard any rows containing update rows for later versions. | ||
#' | ||
#' - `"forbid"`: emit an error; | ||
#' - "na": use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, but ensure that, if we request a snapshot as of a version after `min(x$versions_end, y$versions_end)`, the observation columns from the less up-to-date archive will be all NAs (i.e., imagine there was an update immediately after its `versions_end` which revised all observations to be `NA`); | ||
#' - `"locf"`: use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, allowing the last version of each observation to be carried forward to extrapolate unavailable versions for the less up-to-date input archive (i.e., imagining that in the less up-to-date archive's data set remained unchanged between its actual `versions_end` and the other archive's `versions_end`); or | ||
#' - `"truncate"`: use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, and discard any rows containing update rows for later versions. | ||
#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be | ||
#' compactified? See [`as_epi_archive`] for an explanation of what this means. | ||
#' Default here is `TRUE`. | ||
|
@@ -360,6 +352,224 @@ epix_merge = function(x, y, | |
)) | ||
} | ||
|
||
#' combine epi_archives by rows | ||
#' | ||
#' Take a sequence of archives and combine by rows. Complications arise if | ||
#' there are `time_value`s shared between the lists. `sync` determines how | ||
#' any later `NA`'s are treated, with the default `"forbid"` throwing an error, | ||
#' `"na"` treating them as intentional data (no modification), and `"locf"` | ||
#' filling forward across versions. | ||
#' Shared keys are another problem; by default, `force_distinct=FALSE`, meaning | ||
#' the entry in the earlier archive overwrites later archives. Otherwise there | ||
#' is an error on shared keys | ||
#' this function is still under active development, so there may be remaining | ||
#' edge cases | ||
#' | ||
#' @param ... list of `epi_archive` objects to append in order. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: delete "list of" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the order ends up mattering only if there are duplicate keys, where earlier ones overwrite later ones. The wording here could be a lot clearer |
||
#' @param sync Optional; `"forbid"`, `"na"`, or `"locf"`; in the case that later | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: the shared name with the suggestion: rename this parameter either
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming of the choices below should also be revised. E.g., if it's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, tracks. I'd rather not use |
||
#' versions contain `NA`'s, what do we do? | ||
#' - `"forbid"`: emit an error if there are any shared time values between | ||
#' different archives; | ||
#' - `"na"`: All `NA` values are treated as actual data, and thus are | ||
#' maintained (up to archival compression). | ||
#' - `"locf"`: for every shared time value, use earlier versions of | ||
#' earlier archives to overwrite any `NA`'s found in later | ||
#' versions of later archives. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: I don't think we should allow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I discussed the motivation somewhat in the main text, though it might make sense to move that conversation here. One way to mitigate the risk of overwriting meaningful
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think I'm starting to understand, but I think this situation probably would emerge only when a mistake has been made already in setting up the component archives, in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, just saw your great example. I agree we need something like this functionality, maybe by default. Some contributors to the problem [maybe we could address later and then remove some choices in this function]:
|
||
#' @param force_distinct Optional; `TRUE`, `FALSE`, or `NULL`; should the keys | ||
#' be forced to be distinct? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: I thought this meant something similar but not quite the same as |
||
#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be | ||
#' compactified? See [`as_epi_archive`] for an explanation of what this means. | ||
#' Default here is `TRUE`. | ||
#' @return the resulting `epi_archive` | ||
#' | ||
#' @details In all cases, `additional_metadata` will be an empty list, and | ||
#' `clobberable_versions_start` will be set to the latest version that could | ||
#' be clobbered in either input archive. | ||
#' | ||
#' @examples | ||
#' # create two example epi_archive datasets where using rbind alone would | ||
#' # work incorrectly | ||
#' x1 <- archive_cases_dv_subset$DT %>% | ||
#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% | ||
#' filter(time_value < "2021-06-01") %>% | ||
#' as_epi_archive(compactify = TRUE) | ||
#' x2 <- archive_cases_dv_subset$DT %>% | ||
#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>% | ||
#' filter(time_value >= "2021-06-01") %>% | ||
#' as_epi_archive(compactify = TRUE) | ||
#' y1 <- archive_cases_dv_subset$DT %>% | ||
#' dplyr::select(geo_value, time_value, version, percent_cli) %>% | ||
#' filter(time_value < "2021-06-01") %>% | ||
#' as_epi_archive(compactify = TRUE) | ||
#' y2 <- archive_cases_dv_subset$DT %>% | ||
#' dplyr::select(geo_value, time_value, version, percent_cli) %>% | ||
#' filter(time_value >= "2021-06-01") %>% | ||
#' as_epi_archive(compactify = TRUE) | ||
#' # the problematic examples | ||
#' first_merge <- epix_merge(x1, y1) | ||
#' second_merge <- epix_merge(x2, y2) | ||
#' # rebind the results together | ||
#' epix_rbind(first_merge, second_merge) | ||
#' | ||
#' @importFrom data.table key set setkeyv | ||
#' @importFrom purrr map map_vec reduce | ||
#' @importFrom dplyr setdiff union intersect group_by ungroup distinct arrange | ||
#' @importFrom tidyr fill | ||
#' @export | ||
epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = FALSE, compactify = TRUE) { | ||
# things not currently supported that may be warranted: | ||
# 1. extra keys beyond the default ones | ||
# 2. treating different columns differently (leaving some na's, locfing others) | ||
# 3. there are various parameters for rbind that we haven't defined here; some of them may actually be applicable | ||
archives <- list(...) | ||
if (any(map_vec(archives, function(x) { | ||
!inherits(x, "epi_archive") | ||
}))) { | ||
Abort("all must be of class `epi_archive`.") | ||
} | ||
|
||
sync <- rlang::arg_match(sync) | ||
|
||
geo_type <- archives[[1]]$geo_type | ||
if (any(map_vec(archives, function(x) { | ||
!identical(x$geo_type, geo_type) | ||
}))) { | ||
Abort("all must have the same `$geo_type`") | ||
} | ||
|
||
time_type <- archives[[1]]$time_type | ||
if (any(map_vec(archives, function(x) { | ||
!identical(x$time_type, time_type) | ||
}))) { | ||
Abort("all must have the same `$time_type`") | ||
} | ||
|
||
for (x in archives) { | ||
if (length(x$additional_metadata) != 0L) { | ||
Warn("x$additional_metadata won't appear in merge result", | ||
class = "epiprocess__epix_rbind_ignores_additional_metadata" | ||
) | ||
} | ||
} | ||
result_additional_metadata <- list() | ||
|
||
clobberable_versions_start <- map_vec(archives, function(x) { | ||
(x$clobberable_versions_start) | ||
}) | ||
|
||
versions_end <- max((map_vec(archives, "versions_end"))) | ||
|
||
result_clobberable_versions_start <- if (all(is.na(clobberable_versions_start))) { | ||
NA # (any type of NA is fine here) | ||
} else { | ||
max(clobberable_versions_start) # unlike the case of merging, we want the later date | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. musing: this seems like it could be a little off in edge cases. Hard to handle both aggregation across keys and across versions in the same function; should they be separate? Consider |
||
} | ||
# plans: | ||
# 1. isolate the shared from the non-shared | ||
# 2. throw everything together, do a group_by and ffill | ||
|
||
DTs <- map(archives, "DT") | ||
|
||
# check the keys are correct as done in epix_merge | ||
keys <- map(DTs, key) | ||
new_key <- keys[[1]] | ||
for (ii in seq_along(DTs)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a general habit of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neat trick! In emacs there's also isearch-forward-symbol, isearch-forward-symbol-at-point, and evil normal mode Any way's fine; from a little searching around, I don't think the |
||
if (!identical(keys[[ii]], key(archives[[ii]]$DT))) { | ||
Warn(" | ||
`epiprocess` internal warning (please report): pre-processing for | ||
epix_merge unexpectedly resulted in an intermediate data table (or | ||
tables) with a different key than the corresponding input archive. | ||
Manually setting intermediate data table keys to the expected values. | ||
", internal = TRUE) | ||
setkeyv(DTs[[ii]], key(archives[[ii]]$DT)) | ||
keys[[ii]] <- key(archives[[ii]]$DT) | ||
} | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!identical(keys[[ii]], new_key)) { | ||
# Without some sort of annotations of what various columns represent, we can't | ||
# do something that makes sense when rbinding archives with mismatched keys. | ||
# E.g., even if we assume extra keys represent demographic breakdowns, a | ||
# sensible default treatment of count-type and rate-type value columns would | ||
# differ. | ||
if (!identical(sort(key(DTs[[ii]])), sort(key(DTs[[1]])))) { | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Abort(" | ||
The archives must have the same set of key column names; if the | ||
key columns represent the same things, just with different | ||
names, please retry after manually renaming to match; if they | ||
represent different things (e.g., x has an age breakdown | ||
but y does not), please retry after processing them to share | ||
the same key (e.g., by summarizing x to remove the age breakdown, | ||
or by applying a static age breakdown to y). | ||
", class = "epiprocess__epix_rbind_input_must_have_same_key_set") | ||
} | ||
} | ||
} | ||
|
||
non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), union) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: it's probably not sensible to rbind archives with different column sets. We need the same type of validation as for the keys. |
||
# find the shared (geo_values, time_values) which requires: | ||
# first define a function to get the unique pairs in a given archive | ||
unique_geo_times <- function(x) { | ||
x %>% | ||
select(geo_value, time_value) %>% | ||
distinct() | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: take advantage of DT operations that know the keys & indices for performance. maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (resolved the duplicate, but I'll make sure to include something along these lines) |
||
} | ||
|
||
other_keys <- dplyr::setdiff(new_key, c("geo_value", "time_value", "version")) | ||
if (length(other_keys) != 0) { | ||
Abort("epix_rbind does not currently support additional keys", | ||
class = "epiprocess__epxi_rbind_unsupported" | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
} | ||
|
||
shared_geo_time_values <- reduce(map(DTs, unique_geo_times), intersect) | ||
# there are no difference between the methods if there's no overlap | ||
if (nrow(shared_geo_time_values) == 0) { | ||
DT <- reduce(DTs, rbind) | ||
if (force_distinct) { | ||
DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} else if (sync == "forbid") { | ||
Abort(paste( | ||
"There are shared time values with different versions;", | ||
"either deal with those separately, or specify how to", | ||
"handle `NA` values (either `NA` or `locf`)." | ||
), "epiprocess__epix_rbind_unresolved_sync") | ||
} else if (sync == "na") { | ||
# doesn't really care if there are repeated time_values, simply: | ||
# binds the results together | ||
DT <- reduce(DTs, rbind) | ||
# remove any redundant keys | ||
if (force_distinct) { | ||
DT <- distinct(DT, geo_value, time_value, version, .keep_all = TRUE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue: we need to prioritize later versions of versions here. could also use maybe something with |
||
} | ||
# and return an archive (which sorts) | ||
} else if (sync == "locf") { | ||
# filter, creating shared and non shared or | ||
# just do forward fill on all times | ||
DT <- reduce(DTs, rbind) | ||
|
||
if (force_distinct) { | ||
DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE) | ||
} | ||
DT <- DT %>% | ||
group_by(geo_value, time_value) %>% | ||
arrange(geo_value, time_value, version) %>% | ||
fill(!!!non_by_colnames, .direction = "downup") %>% # everything not in the keys | ||
ungroup() | ||
} | ||
|
||
return(as_epi_archive(DT[], | ||
geo_type = geo_type, | ||
time_type = time_type, | ||
other_keys = other_keys, | ||
additional_metadata = list(), | ||
compactify = compactify, | ||
clobberable_versions_start = result_clobberable_versions_start, | ||
versions_end = versions_end | ||
)) | ||
} | ||
|
||
|
||
|
||
# Helpers for `group_by`: | ||
|
||
#' Make non-testing mock to get [`dplyr::dplyr_col_modify`] input | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✨