Skip to content

Add filter.epi_archive #651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Type: Package
Package: epiprocess
Title: Tools for basic signal processing in epidemiology
Version: 0.11.5
Version: 0.11.6
Authors@R: c(
person("Jacob", "Bien", role = "ctb"),
person("Logan", "Brooks", , "[email protected]", role = c("aut", "cre")),
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ S3method(epix_slide,epi_archive)
S3method(epix_slide,grouped_epi_archive)
S3method(epix_truncate_versions_after,epi_archive)
S3method(epix_truncate_versions_after,grouped_epi_archive)
S3method(filter,epi_archive)
S3method(group_by,epi_archive)
S3method(group_by,epi_df)
S3method(group_by,grouped_epi_archive)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Pre-1.0.0 numbering scheme: 0.x will indicate releases, while 0.x.y will indicat

- `is_epi_archive` function has been reintroduced.
- `epix_as_of_current()` introduced as an alias for `epix_as_of(.$versions_end)`.
- Added `dplyr::filter` implementation for `epi_archive`s.

# epiprocess 0.11

Expand Down
41 changes: 17 additions & 24 deletions R/archive.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ validate_version_bound <- function(version_bound, x, na_ok = FALSE,
class = "epiprocess__version_bound_mismatched_class"
)
}
if (!identical(typeof(version_bound), typeof(x[["version"]]))) {
cli_abort(
"{version_bound_arg} must have the same `typeof` as x$version,
which has a `typeof` of {typeof(x$version)}",
class = "epiprocess__version_bound_mismatched_typeof"
)
}
}

return(invisible(NULL))
Expand Down Expand Up @@ -207,23 +200,23 @@ next_after.Date <- function(x) x + 1L
#' undergo tiny nonmeaningful revisions and the archive object with the
#' default setting is too large.
#' @param clobberable_versions_start Optional; `length`-1; either a value of the
#' same `class` and `typeof` as `x$version`, or an `NA` of any `class` and
#' `typeof`: specifically, either (a) the earliest version that could be
#' subject to "clobbering" (being overwritten with different update data, but
#' using the *same* version tag as the old update data), or (b) `NA`, to
#' indicate that no versions are clobberable. There are a variety of reasons
#' why versions could be clobberable under routine circumstances, such as (a)
#' today's version of one/all of the columns being published after initially
#' being filled with `NA` or LOCF, (b) a buggy version of today's data being
#' published but then fixed and republished later in the day, or (c) data
#' pipeline delays (e.g., publisher uploading, periodic scraping, database
#' syncing, periodic fetching, etc.) that make events (a) or (b) reflected
#' later in the day (or even on a different day) than expected; potential
#' causes vary between different data pipelines. The default value is `NA`,
#' which doesn't consider any versions to be clobberable. Another setting that
#' may be appropriate for some pipelines is `max_version_with_row_in(x)`.
#' @param versions_end Optional; length-1, same `class` and `typeof` as
#' `x$version`: what is the last version we have observed? The default is
#' same `class` as `x$version`, or an `NA` of any `class`: specifically,
#' either (a) the earliest version that could be subject to "clobbering"
#' (being overwritten with different update data, but using the *same* version
#' tag as the old update data), or (b) `NA`, to indicate that no versions are
#' clobberable. There are a variety of reasons why versions could be
#' clobberable under routine circumstances, such as (a) today's version of
#' one/all of the columns being published after initially being filled with
#' `NA` or LOCF, (b) a buggy version of today's data being published but then
#' fixed and republished later in the day, or (c) data pipeline delays (e.g.,
#' publisher uploading, periodic scraping, database syncing, periodic
#' fetching, etc.) that make events (a) or (b) reflected later in the day (or
#' even on a different day) than expected; potential causes vary between
#' different data pipelines. The default value is `NA`, which doesn't consider
#' any versions to be clobberable. Another setting that may be appropriate for
#' some pipelines is `max_version_with_row_in(x)`.
#' @param versions_end Optional; length-1, same `class` as `x$version`: what is
#' the last version we have observed? The default is
#' `max_version_with_row_in(x)`, but values greater than this could also be
#' valid, and would indicate that we observed additional versions of the data
#' beyond `max(x$version)`, but they all contained empty updates. (The default
Expand Down
171 changes: 161 additions & 10 deletions R/methods-epi_archive.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,13 @@ epix_as_of <- function(x, version, min_time_value = -Inf, all_versions = FALSE,
"`version` must have the same `class` vector as `epi_archive$DT$version`."
)
}
if (!identical(typeof(version), typeof(x$DT$version))) {
cli_abort(
"`version` must have the same `typeof` as `epi_archive$DT$version`."
)
}
assert_scalar(version, na.ok = FALSE)
if (version > x$versions_end) {
cli_abort("`version` must be at most `epi_archive$versions_end`.")
}
assert_scalar(min_time_value, na.ok = FALSE)
min_time_value_inf <- is.infinite(min_time_value) && min_time_value < 0
min_time_value_same_type <- typeof(min_time_value) == typeof(x$DT$time_value) &
class(min_time_value) == class(x$DT$time_value)
min_time_value_same_type <- identical(class(min_time_value), class(x$DT$time_value))
if (!min_time_value_inf && !min_time_value_same_type) {
cli_abort("`min_time_value` must be either -Inf or a time_value of the same type and
class as `epi_archive$time_value`.")
Expand Down Expand Up @@ -941,9 +935,6 @@ epix_truncate_versions_after.epi_archive <- function(x, max_version) {
if (!identical(class(max_version), class(x$DT$version))) {
cli_abort("`max_version` must have the same `class` as `epi_archive$DT$version`.")
}
if (!identical(typeof(max_version), typeof(x$DT$version))) {
cli_abort("`max_version` must have the same `typeof` as `epi_archive$DT$version`.")
}
assert_scalar(max_version, na.ok = FALSE)
if (max_version > x$versions_end) {
cli_abort("`max_version` must be at most `epi_archive$versions_end`.")
Expand Down Expand Up @@ -1020,3 +1011,163 @@ dplyr_col_modify.col_modify_recorder_df <- function(data, cols) {
attr(data, "epiprocess::col_modify_recorder_df::cols") <- cols
data
}



#' [`dplyr::filter`] for `epi_archive`s
#'
#' @param .data an `epi_archive`
#' @param ... as in [`dplyr::filter`]; using the `version` column is not allowed
#' unless you use `.format_aware = TRUE`; see details.
#' @param .by as in [`dplyr::filter`]
#' @param .format_aware optional, `TRUE` or `FALSE`; default `FALSE`. See
#' details.
#'
#' @details
#'
#' By default, using the `version` column or measurement columns is disabled as
#' it's easy to get unexpected results. See if either [`epix_as_of`] or
#' [`epix_slide`] works for any version selection you have in mind: for version
#' selection, see the `version` or `.versions` args, respectively; for
#' measurement column-based filtering, try `filter`ing after `epix_as_of` or
#' inside the `.f` in `epix_slide()`. If they don't cover your use case, then
#' you can set `.format_aware = TRUE` to enable usage of these columns, but be
#' careful to:
#' * Factor in that `.data$DT` may have been converted into a compact format
#' based on diffing consecutive versions, and the last version of each
#' observation in `.data$DT` will always be carried forward to future
#' `version`s`; see details of [`as_epi_archive`].
#' * Set `clobberable_versions_start` and `versions_end` of the result
#' appropriately after the `filter` call. They will be initialized with the
#' same values as in `.data`.
#'
#' `dplyr::filter` also has an optional argument `.preserve`, which should not
#' have an impact on (ungrouped) `epi_archive`s, and `grouped_epi_archive`s do
#' not currently support `dplyr::filter`.
#'
#' @examples
#'
#' # Filter to one location and a particular time range:
#' archive_cases_dv_subset %>%
#' filter(geo_value == "fl", time_value >= as.Date("2020-10-01"))
#'
#' # Convert to weekly by taking the Saturday data for each week, so that
#' # `case_rate_7d_av` represents a Sun--Sat average:
#' archive_cases_dv_subset %>%
#' filter(as.POSIXlt(time_value)$wday == 6L)
#'
#' # Filtering involving the `version` column or measurement columns requires
#' # extra care. See epix_as_of and epix_slide instead for some common
#' # operations. One semi-common operation that ends up being fairly simple is
#' # treating observations as finalized after some amount of time, and ignoring
#' # any revisions that were made after that point:
#' archive_cases_dv_subset %>%
#' filter(
#' version <= time_value + as.difftime(60, units = "days"),
#' .format_aware = TRUE
#' )
#'
#' @export
filter.epi_archive <- function(.data, ..., .by = NULL, .format_aware = FALSE) {
in_tbl <- tibble::as_tibble(as.list(.data$DT), .name_repair = "minimal")
if (.format_aware) {
out_tbl <- in_tbl %>%
filter(..., .by = {{ .by }})
} else {
measurement_colnames <- setdiff(names(.data$DT), key_colnames(.data))
forbidden_colnames <- c("version", measurement_colnames)
out_tbl <- in_tbl %>%
filter(
# Add our own fake filter arg to the user's ..., to update the data mask
# to prevent `version` column usage.
{
# We should be evaluating inside the data mask. To disable both
# `version` and `.data$version` etc., we need to go to the ancestor
# environment containing the data mask's column bindings. This is
# likely just the parent env, but search to make sure, in a way akin
# to `<<-`:
e <- environment()
while (!identical(e, globalenv()) && !identical(e, emptyenv())) { # nolint:vector_logic_linter
if ("version" %in% names(e)) {
# This is where the column bindings are. Replace the forbidden ones.
# They are expected to be active bindings, so directly
# assigning has issues; `rm` first.
rm(list = forbidden_colnames, envir = e)
eval_env <- new.env(parent = asNamespace("epiprocess")) # see (2) below
delayedAssign(
"version",
cli_abort(c(
"Using `version` in `filter.epi_archive` may produce unexpected results.",
">" = "See if `epix_as_of` or `epix_slide` would work instead.",
">" = "If not, see `?filter.epi_archive` details for how to proceed."
), class = "epiprocess__filter_archive__used_version"),
eval.env = eval_env,
assign.env = e
)
for (measurement_colname in measurement_colnames) {
# Record current `measurement_colname` and set up execution for
# the promise for the error in its own dedicated environment, so
# that (1) `for` loop updating its value and `rm` cleanup don't
# mess things up. We can also (2) prevent changes to data mask
# ancestry (to involve user's quosure env rather than our
# quosure env) or contents (from edge case of user binding
# functions inside the mask) from potentially interfering by
# setting the promise's execution environment to skip over the
# data mask.
eval_env <- new.env(parent = asNamespace("epiprocess"))
eval_env[["local_measurement_colname"]] <- measurement_colname
delayedAssign(
measurement_colname,
cli_abort(c(
"Using `{format_varname(local_measurement_colname)}`
in `filter.epi_archive` may produce unexpected results.",
">" = "See `?filter.epi_archive` details for how to proceed."
), class = "epiprocess__filter_archive__used_measurement"),
eval.env = eval_env,
assign.env = e
)
}
break
}
e <- parent.env(e)
}
# Don't mask similarly-named user objects in ancestor envs:
rm(list = c("e", "measurement_colname", "eval_env"))
TRUE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it will error if you use it inside a function that has "version" or "time_value" defined in its environment? I'm reading this as traversing up the environment chain and stopping short of the globalenv(), which would be most likely to have variables like that defined, but intermediate scopes might still have false positives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, I tested it locally and it seems fine. I guess this works because we hit the data mask environment first and break out before we hit the user's function environment? Seems reasonable.

Copy link
Contributor Author

@brookslogan brookslogan Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the data mask environment chain in current dplyr&rlang looks something like
rlang wrapper env -> data bindings env (/ env chain input to as_data_mask env) -> quosure env (chain)
where:

  • rlang wrapper env holds the real data pronoun objects, a ~ override I don't quite understand, and some other internals
  • data bindings env is typically just a single env holding (group's) column bindings; in other contexts, it could be an env chain we fed into as_data_mask (with its "top" ancestor reassigned to point at:)
  • the quosure env

We should stop at the data bindings env and reassign things there.

But I did find an issue along those lines

epidatasets::case_death_rate_archive %>% {.$DT <- copy(.$DT)[, e := 1]; .} %>% filter(e < 2)
#> Error in `filter()`:
#> ℹ In argument: `e < 2`.
#> Caused by error in `e < 2`:
#> ! comparison (<) is possible only for atomic and list types
#> Run `rlang::last_trace()` to see where the error occurred.

because I'm leaving around an e in the rlang wrapper env.

Also, I fell for a classic lazy eval + env issue

epidatasets::case_death_rate_archive %>% filter(case_rate_7d_av < 2)
#> Error in `filter()`:
#> ℹ In argument: `case_rate_7d_av < 2`.
#> Caused by error:
#> ! Using `death_rate_7d_av` in `filter.epi_archive` may produce unexpected results.
#> → See `?filter.epi_archive` details for how to proceed.
#> Run `rlang::last_trace()` to see where the error occurred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and found some others. Should be fixed now.

},
...,
.by = {{ .by }}
)
}
# We could try to re-infer the geo_type, e.g., when filtering from
# national+state to just state. However, we risk inference failures such as
# "hrr" -> "hhs" from filtering to hrr 10, or "custom" -> USA-related when
# working with non-USA data:
out_geo_type <- .data$geo_type
if (.data$time_type == "day") {
# We might be going from daily to weekly; re-infer:
out_time_type <- guess_time_type(out_tbl$time_value)
} else {
# We might be filtering weekly to a single time_value; avoid re-inferring to
# stay "week". Or in other cases, just skip inferring, as re-inferring is
# expected to match the input time_type:
out_time_type <- .data$time_type
}
# Even if they narrow down to just a single value of an other_keys column,
# it's probably still better (& simpler) to treat it as an other_keys column
# since it still exists in the result:
out_other_keys <- .data$other_keys
# `filter` makes no guarantees about not aliasing columns in its result when
# the filter condition is all TRUE, so don't setDT.
out_dtbl <- as.data.table(out_tbl, key = out_other_keys)
result <- new_epi_archive(
out_dtbl,
out_geo_type, out_time_type, out_other_keys,
# Assume version-related metadata unchanged; part of why we want to push
# back on filter expressions like `.data$version <= .env$as_of`:
.data$clobberable_versions_start, .data$versions_end
)
# Filtering down rows while keeping all (ukey) columns should preserve ukey
# uniqueness.
result
}
2 changes: 1 addition & 1 deletion R/methods-epi_df.R
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ sum_groups_epi_df <- function(.x, sum_cols, group_cols = "time_value") {
if (!"geo_value" %in% group_cols) {
out <- out %>%
mutate(geo_value = "total") %>%
relocate(.data$geo_value, .before = 1)
relocate("geo_value", .before = 1)
}

# The `geo_type` will be correctly inherited here by the following logic:
Expand Down
3 changes: 2 additions & 1 deletion _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ reference:
- epix_as_of
- epix_as_of_current
- epix_slide
- epix_merge
- revision_summary
- epix_merge
- filter.epi_archive
- epix_fill_through_version
- epix_truncate_versions_after
- set_versions_end
Expand Down
34 changes: 17 additions & 17 deletions man/epi_archive.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading