Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epix rbind #343

Draft
wants to merge 8 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export(epi_cor)
export(epi_slide)
export(epix_as_of)
export(epix_merge)
export(epix_rbind)
export(epix_slide)
export(epix_truncate_versions_after)
export(filter)
Expand Down Expand Up @@ -70,6 +71,7 @@ importFrom(data.table,set)
importFrom(data.table,setkeyv)
importFrom(dplyr,arrange)
importFrom(dplyr,bind_rows)
importFrom(dplyr,distinct)
importFrom(dplyr,dplyr_col_modify)
importFrom(dplyr,dplyr_reconstruct)
importFrom(dplyr,dplyr_row_slice)
Expand All @@ -79,16 +81,22 @@ importFrom(dplyr,group_by_drop_default)
importFrom(dplyr,group_modify)
importFrom(dplyr,group_vars)
importFrom(dplyr,groups)
importFrom(dplyr,intersect)
importFrom(dplyr,mutate)
importFrom(dplyr,relocate)
importFrom(dplyr,rename)
importFrom(dplyr,select)
importFrom(dplyr,setdiff)
importFrom(dplyr,slice)
importFrom(dplyr,ungroup)
importFrom(dplyr,union)
importFrom(lubridate,days)
importFrom(lubridate,weeks)
importFrom(magrittr,"%>%")
importFrom(purrr,map)
importFrom(purrr,map_lgl)
importFrom(purrr,map_vec)
importFrom(purrr,reduce)
importFrom(rlang,"!!!")
importFrom(rlang,"!!")
importFrom(rlang,.data)
Expand Down Expand Up @@ -117,6 +125,7 @@ importFrom(rlang,syms)
importFrom(stats,cor)
importFrom(stats,median)
importFrom(tibble,as_tibble)
importFrom(tidyr,fill)
importFrom(tidyr,unnest)
importFrom(tidyselect,eval_select)
importFrom(tidyselect,starts_with)
Expand Down
236 changes: 223 additions & 13 deletions R/methods-epi_archive.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,11 @@ epix_fill_through_version = function(x, fill_versions_end,
#' @param x,y Two `epi_archive` objects to join together.
#' @param sync Optional; `"forbid"`, `"na"`, `"locf"`, or `"truncate"`; in the
#' case that `x$versions_end` doesn't match `y$versions_end`, what do we do?:
#' `"forbid"`: emit an error; "na": use `max(x$versions_end, y$versions_end)`
#' as the result's `versions_end`, but ensure that, if we request a snapshot
#' as of a version after `min(x$versions_end, y$versions_end)`, the
#' observation columns from the less up-to-date archive will be all NAs (i.e.,
#' imagine there was an update immediately after its `versions_end` which
#' revised all observations to be `NA`); `"locf"`: use `max(x$versions_end,
#' y$versions_end)` as the result's `versions_end`, allowing the last version
#' of each observation to be carried forward to extrapolate unavailable
#' versions for the less up-to-date input archive (i.e., imagining that in the
#' less up-to-date archive's data set remained unchanged between its actual
#' `versions_end` and the other archive's `versions_end`); or `"truncate"`:
#' use `min(x$versions_end, y$versions_end)` as the result's `versions_end`,
#' and discard any rows containing update rows for later versions.
#'
#' - `"forbid"`: emit an error;
#' - "na": use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, but ensure that, if we request a snapshot as of a version after `min(x$versions_end, y$versions_end)`, the observation columns from the less up-to-date archive will be all NAs (i.e., imagine there was an update immediately after its `versions_end` which revised all observations to be `NA`);
#' - `"locf"`: use `max(x$versions_end, y$versions_end)` as the result's `versions_end`, allowing the last version of each observation to be carried forward to extrapolate unavailable versions for the less up-to-date input archive (i.e., imagining that in the less up-to-date archive's data set remained unchanged between its actual `versions_end` and the other archive's `versions_end`); or
#' - `"truncate"`: use `min(x$versions_end, y$versions_end)` as the result's `versions_end`, and discard any rows containing update rows for later versions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be
#' compactified? See [`as_epi_archive`] for an explanation of what this means.
#' Default here is `TRUE`.
Expand Down Expand Up @@ -360,6 +352,224 @@ epix_merge = function(x, y,
))
}

#' combine epi_archives by rows
#'
#' Take a sequence of archives and combine by rows. Complications arise if
#' there are `time_value`s shared between the lists. `sync` determines how
#' any later `NA`'s are treated, with the default `"forbid"` throwing an error,
#' `"na"` treating them as intentional data (no modification), and `"locf"`
#' filling forward across versions.
#' Shared keys are another problem; by default, `force_distinct=FALSE`, meaning
#' the entry in the earlier archive overwrites later archives. Otherwise there
#' is an error on shared keys
#' this function is still under active development, so there may be remaining
#' edge cases
#'
#' @param ... list of `epi_archive` objects to append in order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: delete "list of"
question: what order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the order ends up mattering only if there are duplicate keys, where earlier ones overwrite later ones. The wording here could be a lot clearer

#' @param sync Optional; `"forbid"`, `"na"`, or `"locf"`; in the case that later
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: the shared name with the epix_merge parameter seems confusing as it seems like it's doing something a bit different

suggestion: rename this parameter either

  • something like fill, like data.table::nafill (and also tidyr::complete, which is less similar), or maybe something slightly different so users don't assume they can pass a replacement value / list of replacement values like in those functions.
  • something more archive-specific, maybe involving the word "clobber" or "rewrite"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of the choices below should also be revised. E.g., if it's fill then maybe "na" should be "keep".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, tracks. I'd rather not use fill directly since I have to call that function; maybe fill_na, since it only effects NAs? I think clobber fits better with force_distinct, and will probably be getting its own parameter based on other discussions. "keep" is definitely the right word for what's currently "na"

#' versions contain `NA`'s, what do we do?
#' - `"forbid"`: emit an error if there are any shared time values between
#' different archives;
#' - `"na"`: All `NA` values are treated as actual data, and thus are
#' maintained (up to archival compression).
#' - `"locf"`: for every shared time value, use earlier versions of
#' earlier archives to overwrite any `NA`'s found in later
#' versions of later archives.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: I don't think we should allow "locf" here. Any explicit NAs in "later" archives represent revisions to NA or row removals (we can't distinguish between the two in the current archive representation).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed the motivation somewhat in the main text, though it might make sense to move that conversation here. One way to mitigate the risk of overwriting meaningful NAs as well is to only locf on shared time values when the user specifically asks for it. Reflecting, there are possibly 3 levels to doing this writing:

  1. overwrite all future NA's, which is what sync="locf" currently does
  2. only overwrite NA's on shared time_value's
  3. only overwrite when the first entry in a given archive is NA; I think this is probably the best approach
    2 and 3 are both somewhat more expensive, and definitely more complicated (you have to split the data, then perform the write on the problem half, and then rejoin).

Copy link
Contributor

@brookslogan brookslogan Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync="locf" is addressing a fairly subtle issue, where one indicator has versions released in a given month while the other doesn't.

I think I'm starting to understand, but I think this situation probably would emerge only when a mistake has been made already in setting up the component archives, in epix_merge settings, or an epix_merge bug. If signal A has version data where signal B does not, signal B should only have NA filled in if it's before its first measurement for a given epikey-time, or by filling forward explicit NAs for that epikey-time. Is this the situation you're thinking of, and do you know if epix_merge has a bug where it's not doing this correctly?

Copy link
Contributor

@brookslogan brookslogan Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, just saw your great example. I agree we need something like this functionality, maybe by default.

Some contributors to the problem [maybe we could address later and then remove some choices in this function]:

  • Archives were originally intended to hold complete version histories.
  • The archive format not distinguishing explicit NAs vs. missing rows. [nevermind, no, this is not connected. The issue is not being able to signal something like "out of range of the updates recorded in this partial archive for this signal/{epikey,signal}/{epikey,time_value,signal}......." / "its latest value prior to the earliest version covered by this archive"]
  • Solving the point above is messier with our current epix_merge() implementation. At some point I thought I had an issue to use a lazy epix_merge() which would just store the parameters and re-implement some base archive operations; that might be useful if we want to make this cleaner later. [might or might not be true, but I'm not sure we want to mandate a lazy merge to get a working format.]

#' @param force_distinct Optional; `TRUE`, `FALSE`, or `NULL`; should the keys
#' be forced to be distinct?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: I thought this meant something similar but not quite the same as "forbid", but I can't tell what the implementation is doing. Maybe it's related to the two options I described above? I think part of this is because I've mixed up "shared time_values" and "shared keys" when reading the docs. We probably want to think about design, naming, and documentation some more here.

#' @param compactify Optional; `TRUE`, `FALSE`, or `NULL`; should the result be
#' compactified? See [`as_epi_archive`] for an explanation of what this means.
#' Default here is `TRUE`.
#' @return the resulting `epi_archive`
#'
#' @details In all cases, `additional_metadata` will be an empty list, and
#' `clobberable_versions_start` will be set to the latest version that could
#' be clobbered in either input archive.
#'
#' @examples
#' # create two example epi_archive datasets where using rbind alone would
#' # work incorrectly
#' x1 <- archive_cases_dv_subset$DT %>%
#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>%
#' filter(time_value < "2021-06-01") %>%
#' as_epi_archive(compactify = TRUE)
#' x2 <- archive_cases_dv_subset$DT %>%
#' dplyr::select(geo_value,time_value,version,case_rate_7d_av) %>%
#' filter(time_value >= "2021-06-01") %>%
#' as_epi_archive(compactify = TRUE)
#' y1 <- archive_cases_dv_subset$DT %>%
#' dplyr::select(geo_value, time_value, version, percent_cli) %>%
#' filter(time_value < "2021-06-01") %>%
#' as_epi_archive(compactify = TRUE)
#' y2 <- archive_cases_dv_subset$DT %>%
#' dplyr::select(geo_value, time_value, version, percent_cli) %>%
#' filter(time_value >= "2021-06-01") %>%
#' as_epi_archive(compactify = TRUE)
#' # the problematic examples
#' first_merge <- epix_merge(x1, y1)
#' second_merge <- epix_merge(x2, y2)
#' # rebind the results together
#' epix_rbind(first_merge, second_merge)
#'
#' @importFrom data.table key set setkeyv
#' @importFrom purrr map map_vec reduce
#' @importFrom dplyr setdiff union intersect group_by ungroup distinct arrange
#' @importFrom tidyr fill
#' @export
epix_rbind <- function(..., sync = c("forbid", "na", "locf"), force_distinct = FALSE, compactify = TRUE) {
# things not currently supported that may be warranted:
# 1. extra keys beyond the default ones
# 2. treating different columns differently (leaving some na's, locfing others)
# 3. there are various parameters for rbind that we haven't defined here; some of them may actually be applicable
archives <- list(...)
if (any(map_vec(archives, function(x) {
!inherits(x, "epi_archive")
}))) {
Abort("all must be of class `epi_archive`.")
}

sync <- rlang::arg_match(sync)

geo_type <- archives[[1]]$geo_type
if (any(map_vec(archives, function(x) {
!identical(x$geo_type, geo_type)
}))) {
Abort("all must have the same `$geo_type`")
}

time_type <- archives[[1]]$time_type
if (any(map_vec(archives, function(x) {
!identical(x$time_type, time_type)
}))) {
Abort("all must have the same `$time_type`")
}

for (x in archives) {
if (length(x$additional_metadata) != 0L) {
Warn("x$additional_metadata won't appear in merge result",
class = "epiprocess__epix_rbind_ignores_additional_metadata"
)
}
}
result_additional_metadata <- list()

clobberable_versions_start <- map_vec(archives, function(x) {
(x$clobberable_versions_start)
})

versions_end <- max((map_vec(archives, "versions_end")))

result_clobberable_versions_start <- if (all(is.na(clobberable_versions_start))) {
NA # (any type of NA is fine here)
} else {
max(clobberable_versions_start) # unlike the case of merging, we want the later date
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

musing: this seems like it could be a little off in edge cases. Hard to handle both aggregation across keys and across versions in the same function; should they be separate? Consider epix_rbind(archive1, archive2) where: archive1 has data for CA, clobberable starting with version 500, and archive2 has data for AS, clobberable starting with version 800. That's pretty edge-case-y, probably an error. But we might also have archive1b with CA data starting with version 500, clobberable starting with version 805. Then we might want something similar to the merge sync that says the clobberable versions start with version 800.

}
# plans:
# 1. isolate the shared from the non-shared
# 2. throw everything together, do a group_by and ffill

DTs <- map(archives, "DT")

# check the keys are correct as done in epix_merge
keys <- map(DTs, key)
new_key <- keys[[1]]
for (ii in seq_along(DTs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: ii -> i; I think ii is conventionally used for vectors of indices (of lengths potentially != 1)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general habit of using ii because it's impossible to search for i alone to find all uses of a variable, whereas ii is basically uniquely identifiable as a variable. In general, avoiding single letter variable names is preferred. I suppose I could do i_dt or something like that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat trick! In emacs there's also isearch-forward-symbol, isearch-forward-symbol-at-point, and evil normal mode * for when pre-existing code uses i etc.; probably also some regex thing with word boundary matching but I don't remember what it would be.

Any way's fine; from a little searching around, I don't think the ii convention's that strong apparently.

if (!identical(keys[[ii]], key(archives[[ii]]$DT))) {
Warn("
`epiprocess` internal warning (please report): pre-processing for
epix_merge unexpectedly resulted in an intermediate data table (or
tables) with a different key than the corresponding input archive.
Manually setting intermediate data table keys to the expected values.
", internal = TRUE)
setkeyv(DTs[[ii]], key(archives[[ii]]$DT))
keys[[ii]] <- key(archives[[ii]]$DT)
}
dsweber2 marked this conversation as resolved.
Show resolved Hide resolved
if (!identical(keys[[ii]], new_key)) {
# Without some sort of annotations of what various columns represent, we can't
# do something that makes sense when rbinding archives with mismatched keys.
# E.g., even if we assume extra keys represent demographic breakdowns, a
# sensible default treatment of count-type and rate-type value columns would
# differ.
if (!identical(sort(key(DTs[[ii]])), sort(key(DTs[[1]])))) {
dsweber2 marked this conversation as resolved.
Show resolved Hide resolved
Abort("
The archives must have the same set of key column names; if the
key columns represent the same things, just with different
names, please retry after manually renaming to match; if they
represent different things (e.g., x has an age breakdown
but y does not), please retry after processing them to share
the same key (e.g., by summarizing x to remove the age breakdown,
or by applying a static age breakdown to y).
", class = "epiprocess__epix_rbind_input_must_have_same_key_set")
}
}
}

non_by_colnames <- reduce(map(DTs, function(DT) setdiff(names(DT), new_key)), union)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: it's probably not sensible to rbind archives with different column sets. We need the same type of validation as for the keys.

# find the shared (geo_values, time_values) which requires:
# first define a function to get the unique pairs in a given archive
unique_geo_times <- function(x) {
x %>%
select(geo_value, time_value) %>%
distinct()
dsweber2 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: take advantage of DT operations that know the keys & indices for performance. maybe unique(x, by=c("geo_value, "time_value"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(resolved the duplicate, but I'll make sure to include something along these lines)

}

other_keys <- dplyr::setdiff(new_key, c("geo_value", "time_value", "version"))
if (length(other_keys) != 0) {
Abort("epix_rbind does not currently support additional keys",
class = "epiprocess__epxi_rbind_unsupported"
dsweber2 marked this conversation as resolved.
Show resolved Hide resolved
)
}

shared_geo_time_values <- reduce(map(DTs, unique_geo_times), intersect)
# there are no difference between the methods if there's no overlap
if (nrow(shared_geo_time_values) == 0) {
DT <- reduce(DTs, rbind)
if (force_distinct) {
DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE)
dsweber2 marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (sync == "forbid") {
Abort(paste(
"There are shared time values with different versions;",
"either deal with those separately, or specify how to",
"handle `NA` values (either `NA` or `locf`)."
), "epiprocess__epix_rbind_unresolved_sync")
} else if (sync == "na") {
# doesn't really care if there are repeated time_values, simply:
# binds the results together
DT <- reduce(DTs, rbind)
# remove any redundant keys
if (force_distinct) {
DT <- distinct(DT, geo_value, time_value, version, .keep_all = TRUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: we need to prioritize later versions of versions here. could also use data.table ops

maybe something with
unique(DT, by=c("geo_value", "time_value", "version"), fromLast = TRUE)

}
# and return an archive (which sorts)
} else if (sync == "locf") {
# filter, creating shared and non shared or
# just do forward fill on all times
DT <- reduce(DTs, rbind)

if (force_distinct) {
DT <- distinct(possibly_redundant, geo_value, time_value, version, .keep_all = TRUE)
}
DT <- DT %>%
group_by(geo_value, time_value) %>%
arrange(geo_value, time_value, version) %>%
fill(!!!non_by_colnames, .direction = "downup") %>% # everything not in the keys
ungroup()
}

return(as_epi_archive(DT[],
geo_type = geo_type,
time_type = time_type,
other_keys = other_keys,
additional_metadata = list(),
compactify = compactify,
clobberable_versions_start = result_clobberable_versions_start,
versions_end = versions_end
))
}



# Helpers for `group_by`:

#' Make non-testing mock to get [`dplyr::dplyr_col_modify`] input
Expand Down
18 changes: 6 additions & 12 deletions man/epix_merge.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading