Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .renvignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tmp.R
node_modules/*
local_logs/
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ submit: submit-covid submit-flu
get_nwss:
mkdir -p aux_data/nwss_covid_data; \
mkdir -p aux_data/nwss_flu_data; \
. .venv/bin/activate; \
cd scripts/nwss_export_tool/; \
python nwss_covid_export.py; \
python nwss_covid_export.py
python nwss_influenza_export.py

run-nohup:
nohup Rscript scripts/run.R &
Expand Down
111 changes: 83 additions & 28 deletions R/aux_data_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -188,38 +188,76 @@ daily_to_weekly <- function(epi_df, agg_method = c("sum", "mean"), day_of_week =
select(-epiweek, -year)
}

#' Aggregate a daily archive to a weekly archive.
#'
#' @param epi_arch the archive to aggregate.
#' @param agg_columns the columns to aggregate.
#' @param agg_method the method to use to aggregate the data, one of "sum" or "mean".
#' @param day_of_week the day of the week to use as the reference day.
#' @param day_of_week_end the day of the week to use as the end of the week.
daily_to_weekly_archive <- function(epi_arch,
agg_columns,
agg_method = c("sum", "mean"),
day_of_week = 4L,
day_of_week_end = 7L) {
# How to aggregate the windowed data.
agg_method <- arg_match(agg_method)
# The columns we will later group by when aggregating.
keys <- key_colnames(epi_arch, exclude = c("time_value", "version"))
# The versions we will slide over.
ref_time_values <- epi_arch$DT$version %>%
unique() %>%
sort()
# Choose a fast function to use to slide and aggregate.
if (agg_method == "sum") {
slide_fun <- epi_slide_sum
} else if (agg_method == "mean") {
slide_fun <- epi_slide_mean
}
too_many_tibbles <- epix_slide(
# Slide over the versions and aggregate.
epix_slide(
epi_arch,
.before = 99999999L,
.versions = ref_time_values,
function(x, group, ref_time) {
ref_time_last_week_end <-
floor_date(ref_time, "week", day_of_week_end - 1) # this is over by 1
function(x, group_keys, ref_time) {
# The last day of the week we will slide over.
ref_time_last_week_end <- floor_date(ref_time, "week", day_of_week_end - 1)

# To find the days we will slide over, we need to find the first and last
# complete weeks of data. Get the max and min times, and then find the
# first and last complete weeks of data.
min_time <- min(x$time_value)
max_time <- max(x$time_value)
valid_slide_days <- seq.Date(
from = ceiling_date(min(x$time_value), "week", week_start = day_of_week_end - 1),
to = floor_date(max(x$time_value), "week", week_start = day_of_week_end - 1),
by = 7L
)

# Let's determine if the min and max times are in the same week.
ceil_min_time <- ceiling_date(min_time, "week", week_start = day_of_week_end - 1)
ceil_max_time <- ceiling_date(max_time, "week", week_start = day_of_week_end - 1)

# If they're not in the same week, this means we have at least one
# complete week of data to slide over.
if (ceil_min_time < ceil_max_time) {
valid_slide_days <- seq.Date(
from = ceiling_date(min_time, "week", week_start = day_of_week_end - 1),
to = floor_date(max_time, "week", week_start = day_of_week_end - 1),
by = 7L
)
} else {
# This is the degenerate case, where we have about 1 week or less of
# data. In this case, we opt to return nothing for two reasons:
# 1. in most cases here, the data is incomplete for a single week,
# 2. if the data is complete, a single week of data is not enough to
# reasonably perform any kind of aggregation.
return(tibble())
}

# If the last day of the week is not the end of the week, add it to the
# list of valid slide days (this will produce an incomplete slide, but
# that's fine for us, since it should only be 1 day, historically.)
if (wday(max_time) != day_of_week_end) {
valid_slide_days <- c(valid_slide_days, max_time)
}
slid_result <- x %>%

# Slide over the days and aggregate.
x %>%
group_by(across(all_of(keys))) %>%
slide_fun(
agg_columns,
Expand All @@ -229,18 +267,13 @@ daily_to_weekly_archive <- function(epi_arch,
) %>%
select(-all_of(agg_columns)) %>%
rename_with(~ gsub("slide_value_", "", .x)) %>%
# only keep 1/week
# group_by week, keep the largest in each week
# alternatively
# switch time_value to the designated day of the week
rename_with(~ gsub("_7dsum", "", .x)) %>%
# Round all dates to reference day of the week. These will get
# de-duplicated by compactify in as_epi_archive below.
mutate(time_value = round_date(time_value, "week", day_of_week - 1)) %>%
as_tibble()
}
)
too_many_tibbles %>%
pull(time_value) %>%
max()
too_many_tibbles %>%
) %>%
as_epi_archive(compactify = TRUE)
}

Expand Down Expand Up @@ -313,9 +346,8 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {

most_recent_row <- meta_data %>%
# update_date is actually a time, so we need to filter for the day after.
filter(update_date <= as_of + 1) %>%
arrange(desc(update_date)) %>%
slice(1)
filter(update_date <= as.Date(as_of) + 1) %>%
slice_max(update_date)

if (nrow(most_recent_row) == 0) {
cli::cli_abort("No data available for the given date.")
Expand All @@ -331,9 +363,7 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
if (disease == "covid") {
data %<>% mutate(
hhs = previous_day_admission_adult_covid_confirmed +
previous_day_admission_adult_covid_suspected +
previous_day_admission_pediatric_covid_confirmed +
previous_day_admission_pediatric_covid_suspected
previous_day_admission_pediatric_covid_confirmed
)
} else if (disease == "flu") {
data %<>% mutate(hhs = previous_day_admission_influenza_confirmed)
Expand Down Expand Up @@ -709,10 +739,12 @@ create_nhsn_data_archive <- function(disease_name) {
as_epi_archive(compactify = TRUE)
}


up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
disease <- arg_match(disease)
nssp_state <- pub_covidcast(
nssp_state <- retry_fn(
max_attempts = 10,
wait_seconds = 1,
fn = pub_covidcast,
source = "nssp",
signal = glue::glue("pct_ed_visits_{disease}"),
time_type = "week",
Expand All @@ -728,3 +760,26 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
mutate(time_value = time_value + 3) %>%
as_epi_archive(compactify = TRUE)
}

# Get the last time the signal was updated.
get_covidcast_signal_last_update <- function(source, signal) {
pub_covidcast_meta() %>%
filter(source == !!source, signal == !!signal) %>%
pull(last_update) %>%
as.POSIXct()
}

# Get the last time the Socrata dataset was updated.
get_socrata_updated_at <- function(dataset_url) {
httr::GET(dataset_url) %>%
httr::content() %>%
pluck("rowsUpdatedAt") %>%
as.POSIXct()
}

get_s3_object_last_modified <- function(bucket, key) {
# Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
attr(aws.s3::head_object(key, bucket = bucket), "last-modified") %>%
str_replace_all(" GMT", "") %>%
as.POSIXct(format = "%a, %d %b %Y %H:%M:%S")
}
31 changes: 12 additions & 19 deletions R/forecasters/data_transforms.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,10 @@ get_nonkey_names <- function(epi_data) {
#' @export
rolling_mean <- function(epi_data, width = 7L, cols_to_mean = NULL) {
cols_to_mean <- get_trainable_names(epi_data, cols_to_mean)
epi_data %<>% group_by(across(key_colnames(epi_data, exclude = "time_value")))
for (col in cols_to_mean) {
mean_name <- paste0("slide_", col, "_m", width)
epi_data %<>%
epi_slide_mean(all_of(col), .window_size = width) %>%
rename(!!mean_name := paste0("slide_value_", col))
}
epi_data %<>% ungroup()
return(epi_data)
epi_data %>%
group_by(across(key_colnames(epi_data, exclude = "time_value"))) %>%
epi_slide_mean(all_of(cols_to_mean), .window_size = width, .new_col_names = paste0("slide_", cols_to_mean, "_m", width)) %>%
ungroup()
}

#' Get a rolling standard deviation for the named columns
Expand All @@ -75,20 +70,18 @@ rolling_sd <- function(epi_data, sd_width = 29L, mean_width = NULL, cols_to_sd =
cols_to_sd <- get_trainable_names(epi_data, cols_to_sd)
result <- epi_data
result %<>% group_by(across(key_colnames(epi_data, exclude = "time_value")))
for (col in cols_to_sd) {
mean_name <- glue::glue("slide_{col}_m{mean_width}")
sd_name <- glue::glue("slide_{col}_sd{sd_width}")
for (col_name in cols_to_sd) {
mean_name <- glue::glue("slide_{col_name}_m{mean_width}")
sd_name <- glue::glue("slide_{col_name}_sd{sd_width}")

result %<>%
epi_slide_mean(all_of(col), .window_size = mean_width) %>%
rename(!!mean_name := paste0("slide_value_", col))
epi_slide_mean(all_of(col_name), .window_size = mean_width, .new_col_names = mean_name)

result %<>%
mutate(.temp = (.data[[mean_name]] - .data[[col]])^2) %>%
epi_slide_mean(all_of(".temp"), .window_size = sd_width) %>%
select(-.temp) %>%
rename(!!sd_name := "slide_value_.temp") %>%
mutate(!!sd_name := sqrt(.data[[sd_name]]))
mutate(.temp = (.data[[mean_name]] - .data[[col_name]])^2) %>%
epi_slide_mean(all_of(".temp"), .window_size = sd_width, .new_col_names = sd_name) %>%
mutate(!!sd_name := sqrt(.data[[sd_name]])) %>%
select(-.temp)

if (!keep_mean) {
result %<>% select(-{{ mean_name }})
Expand Down
4 changes: 2 additions & 2 deletions R/forecasters/ensemble_average.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ ensemble_average <- function(epi_data,
# their names are separated for obscure target related reasons
names(ensemble_args) <- ensemble_args_names %||% names(ensemble_args)
average_type <- ensemble_args$average_type %||% median
join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "prediction")
join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "value")

# begin actual analysis
forecasts %>%
bind_rows(.id = "id") %>%
group_by(across(all_of(join_columns))) %>%
summarize(prediction = average_type(prediction, na.rm = TRUE), .groups = "drop") %>%
summarize(value = average_type(value, na.rm = TRUE), .groups = "drop") %>%
ungroup()
}
9 changes: 6 additions & 3 deletions R/forecasters/epipredict_utilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ run_workflow_and_format <- function(preproc,
if (is.null(as_of)) {
as_of <- max(train_data$time_value)
}

# Look at the train data (uncomment for debuggin).
# df <- preproc %>% prep(train_data) %>% bake(train_data)
# browser()

workflow <- epi_workflow(preproc, trainer) %>%
fit(train_data) %>%
add_frosting(postproc)
Expand All @@ -125,9 +130,7 @@ run_workflow_and_format <- function(preproc,
# keeping only the last time_value for any given location/key
pred %<>%
group_by(across(all_of(key_colnames(train_data, exclude = "time_value")))) %>%
# TODO: slice_max(time_value)?
arrange(time_value) %>%
filter(row_number() == n()) %>%
slice_max(time_value) %>%
ungroup()
return(format_storage(pred, as_of))
}
Expand Down
1 change: 1 addition & 0 deletions R/forecasters/forecaster_baseline_linear.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#' epi_data is expected to have: geo_value, time_value, and value columns.
forecaster_baseline_linear <- function(epi_data, ahead, log = FALSE, sort = FALSE, residual_tail = 0.85, residual_center = 0.085, no_intercept = FALSE) {
epi_data <- validate_epi_data(epi_data)
forecast_date <- attributes(epi_data)$metadata$as_of
population_data <- get_population_data() %>%
rename(geo_value = state_id) %>%
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_climatological.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ climate_linear_ensembled <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

args_list <- list(...)
ahead <- as.integer(ahead / 7)
epi_data %<>% filter_extraneous(filter_source, filter_agg_level)
Expand Down
1 change: 1 addition & 0 deletions R/forecasters/forecaster_flatline.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ flatline_fc <- function(epi_data,
filter_source = "",
filter_agg_level = "",
...) {
epi_data <- validate_epi_data(epi_data)
# perform any preprocessing not supported by epipredict
epi_data %<>% filter_extraneous(filter_source, filter_agg_level)
# this is a temp fix until a real fix gets put into epipredict
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_flusion.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ flusion <- function(epi_data,
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)
derivative_estimator <- arg_match(derivative_estimator)

epi_data <- validate_epi_data(epi_data)

# perform any preprocessing not supported by epipredict
args_input <- list(...)
# this next part is basically unavoidable boilerplate you'll want to copy
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_no_recent_outcome.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ no_recent_outcome <- function(epi_data,
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)
week_method <- arg_match(week_method)

epi_data <- validate_epi_data(epi_data)

# this is for the case where there are multiple sources in the same column
epi_data %<>% filter_extraneous(filter_source, filter_agg_level)
args_input <- list(...)
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_scaled_pop.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ scaled_pop <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

# perform any preprocessing not supported by epipredict
#
# this is for the case where there are multiple sources in the same column
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_scaled_pop_seasonal.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ scaled_pop_seasonal <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

if (typeof(seasonal_method) == "list") {
seasonal_method <- seasonal_method[[1]]
}
Expand Down
4 changes: 4 additions & 0 deletions R/forecasters/forecaster_smoothed_scaled.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ smoothed_scaled <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

# perform any preprocessing not supported by epipredict
#
# this is for the case where there are multiple sources in the same column
Expand Down Expand Up @@ -170,6 +173,7 @@ smoothed_scaled <- function(epi_data,
keep_mean = keep_mean
)
}

# need to make a version with the non seasonal and problematic flu seasons removed
if (drop_non_seasons) {
season_data <- epi_data %>% drop_non_seasons()
Expand Down
1 change: 1 addition & 0 deletions R/imports.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ library(parsnip)
library(paws.storage)
library(plotly)
library(purrr)
library(qs2)
library(quantreg)
library(readr)
library(recipes)
Expand Down
Loading
Loading