diff --git a/.github/workflows/s3_upload_ec2.yml b/.github/workflows/s3_upload_ec2.yml index 06fef8c..ee404b2 100644 --- a/.github/workflows/s3_upload_ec2.yml +++ b/.github/workflows/s3_upload_ec2.yml @@ -45,6 +45,8 @@ jobs: password: ${{ secrets.CMU_DELPHI_DEPLOY_MACHINE_PAT }} - name: Deploy score files to S3 bucket + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | make deploy diff --git a/Makefile b/Makefile index 64d570a..9abb01a 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ r_build: %.rds: dist test -f dist/$@ || curl -o dist/$@ $(S3_URL)/$@ -pull_data: score_cards_state_deaths.rds score_cards_state_cases.rds score_cards_nation_cases.rds score_cards_nation_deaths.rds score_cards_state_hospitalizations.rds score_cards_nation_hospitalizations.rds datetime_created_utc.rds +pull_data: score_cards_state_deaths.rds score_cards_state_cases.rds score_cards_nation_cases.rds score_cards_nation_deaths.rds score_cards_state_hospitalizations.rds score_cards_nation_hospitalizations.rds datetime_created_utc.rds predictions_cards.rds dist: mkdir $@ @@ -23,6 +23,7 @@ score_forecast: r_build dist pull_data -v ${PWD}/Report:/var/forecast-eval \ -v ${PWD}/dist:/var/dist \ -w /var/forecast-eval \ + -e GITHUB_TOKEN=${GITHUB_TOKEN} \ forecast-eval-build \ Rscript create_reports.R --dir /var/dist diff --git a/Report/create_reports.R b/Report/create_reports.R index f17a868..f5fbe3b 100644 --- a/Report/create_reports.R +++ b/Report/create_reports.R @@ -3,6 +3,9 @@ library("optparse") library("dplyr") library("evalcast") library("lubridate") +library("jsonlite") +library("httr") + # TODO: Contains fixed versions of WIS component metrics, to be ported over to evalcast # Redefines overprediction, underprediction and sharpness @@ -21,6 +24,7 @@ option_list <- list( ) opt_parser <- OptionParser(option_list = option_list) opt <- parse_args(opt_parser) + output_dir <- opt$dir prediction_cards_filename <- "predictions_cards.rds" prediction_cards_filepath <- case_when( @@ -28,6 +32,13 @@ prediction_cards_filepath <- case_when( TRUE ~ prediction_cards_filename ) +github_token <- Sys.getenv("GITHUB_TOKEN") +if (github_token == "") { + stop("Token is not set or is not able to be fetched from the environment") +} + +data_pull_timestamp <- now(tzone = "UTC") + options(warn = 1) # Requested forecasters that do not get included in final scores: @@ -46,6 +57,108 @@ forecasters <- unique(c( get_covidhub_forecaster_names(designations = c("primary", "secondary")), "COVIDhub-baseline", "COVIDhub-trained_ensemble", "COVIDhub-4_week_ensemble" )) + +## Get list of new and modified files to download +# The `path` field filters commits to only those that modifying the listed dir +BASE_URL <- "https://api.github.com/repos/reichlab/covid19-forecast-hub/commits?sha=%s&per_page=%s&path=data-processed&since=%s&page=%s" +ITEMS_PER_PAGE <- 100 +BRANCH <- "master" + +# We want to fetch all commits made since the previous run. Add 1 day in as buffer. +# +# Timestamp should be in ISO 8601 format. See +# https://docs.github.com/en/rest/reference/commits#list-commits--parameters for +# details. +previous_run_ts <- readRDS(file.path(output_dir, "datetime_created_utc.rds")) %>% + pull(datetime) +since_date <- strftime(previous_run_ts - days(1), "%Y-%m-%dT%H:%M:%SZ", tz = "UTC") + +page <- 0 +commit_sha_dates <- list() + +# Fetch list of commits from API, one page at a time. Each page contains up to +# 100 commits. If a page contains 100 commits, assume that there are more +# results and fetch the next page. +while (!exists("temp_commits") || nrow(temp_commits) == 100) { + page <- page + 1 + # Construct the URL + commits_url <- sprintf(BASE_URL, BRANCH, ITEMS_PER_PAGE, since_date, page) + + request <- GET(commits_url, add_headers(Authorization = paste("Bearer", github_token))) + # Convert any HTTP errors to R errors automatically. + stop_for_status(request) + + # Convert results from nested JSON/list to dataframe. If no results returned, + # `temp_commits` will be an empty list. + temp_commits <- content(request, as = "text") %>% + fromJSON(simplifyDataFrame = TRUE, flatten = TRUE) + + if (identical(temp_commits, list())) { + break + } + + commit_sha_dates[[page]] <- select(temp_commits, sha, url) +} + +# Combine all requested pages of commits into one dataframe +commit_sha_dates <- bind_rows(commit_sha_dates) + +added_modified_files <- lapply(commit_sha_dates$url, function(commit_url) { + # For each commit in `temp_commits`, get a list of any modified files. + print(commit_url) + + # Make API call for each commit sha + request <- GET(commit_url, add_headers(Authorization = paste("Bearer", github_token))) + stop_for_status(request) + commit <- content(request, as = "text") %>% + fromJSON(simplifyDataFrame = TRUE, flatten = TRUE) + + commit_files <- commit$files + + # Return empty df if no files listed as modified (can happen with merges, e.g.) + if (identical(commit_files, list())) { + return(data.frame()) + } + + # Else return list of changed files for each commit + commit_files %>% mutate(commit_date = commit$commit$author$date) +}) %>% + bind_rows() %>% + select(filename, status, commit_date) %>% + # File must be in data-processed dir somewhere and be a csv with expected name + # format. We're only interested in added or modified files (not deleted, + # renamed, copied, etc). + filter( + grepl("data-processed/.*/[0-9]{4}-[0-9]{2}-[0-9]{2}-.*[.]csv", filename), + status %in% c("added", "modified") + ) %>% + select(-status) %>% + group_by(filename) %>% + # Keep most recent reference to a given file. Implicitly deduplicates by filename. + filter( + commit_date == max(commit_date) + ) %>% + ungroup() + +# Get forecaster name and date from filename +filename_parts <- strsplit(added_modified_files$filename, "/") +added_modified_files <- added_modified_files %>% + mutate( + forecaster = lapply( + filename_parts, function(parts) { + parts[[2]] + } + ) %>% + unlist(), + forecast.date = lapply( + filename_parts, function(parts) { + substr(parts[[3]], start = 1, stop = 10) + } + ) %>% + unlist() + ) %>% + filter(forecaster %in% forecasters) + locations <- covidHubUtils::hub_locations # also includes "us", which is national level data @@ -58,15 +171,31 @@ signals <- c( "confirmed_admissions_covid_1d" ) -data_pull_timestamp <- now(tzone = "UTC") -predictions_cards <- get_covidhub_predictions(forecasters, - signal = signals, - ahead = 1:28, - geo_values = state_geos, - verbose = TRUE, - use_disk = TRUE +# Since forecast dates are shared across all forecasters, if a new forecaster +# is added that backfills forecast dates, we will end up requesting all those +# dates for forecasters we've already seen before. To prevent that, make a new +# call to `get_covidhub_predictions` for each forecaster with its own dates. +predictions_cards <- lapply( + unique(added_modified_files$forecaster), + function(forecaster_name) { + fetch_dates <- added_modified_files %>% + filter(forecaster == forecaster_name) %>% + distinct(forecast.date) %>% + pull() + + get_covidhub_predictions( + forecaster_name, + signal = signals, + ahead = 1:28, + geo_values = state_geos, + forecast_dates = fetch_dates, + verbose = TRUE, + use_disk = TRUE + ) %>% + filter(!(incidence_period == "epiweek" & ahead > 4)) + } ) %>% - filter(!(incidence_period == "epiweek" & ahead > 4)) + bind_rows() options(warn = 0) @@ -74,8 +203,8 @@ options(warn = 0) predictions_cards <- predictions_cards %>% filter(!is.na(target_end_date)) -# For hospitalizations, drop all US territories except Puerto Rico and the -# Virgin Islands; HHS does not report data for any territories except PR and VI. +# For hospitalizations, drop all US territories except Puerto Rico (pr) and the +# Virgin Islands (vi); HHS does not report data for any territories except these two. territories <- c("as", "gu", "mp", "fm", "mh", "pw", "um") predictions_cards <- predictions_cards %>% filter(!(geo_value %in% territories & data_source == "hhs")) @@ -90,10 +219,34 @@ predictions_cards <- predictions_cards %>% (incidence_period == "day" & target_end_date > forecast_date) ) -# And only a forecaster's last forecast if multiple were made -predictions_cards <- predictions_cards %>% +# Load old predictions cards. +if (file.exists(prediction_cards_filepath)) { + old_predictions_cards <- readRDS(prediction_cards_filepath) %>% + mutate(updated_cards_flag = 0) +} else { + warning("Could not find prediction cards at ", prediction_cards_filepath) + old_predictions_cards <- data.frame() +} + + +# Merge old and new predictions cards. +predictions_cards <- bind_rows( + mutate(predictions_cards, updated_cards_flag = 1), + old_predictions_cards +) %>% + # If a given forecast appears in both old and new prediction cards, keep the new one. + group_by(forecaster, geo_value, target_end_date, quantile, ahead, signal) %>% + filter( + updated_cards_flag == max(updated_cards_flag) + ) %>% + ungroup() %>% + select(-updated_cards_flag) %>% + # If multiple forecasts were made for a given set of characteristics, keep the + # newest version of the forecast. group_by(forecaster, geo_value, target_end_date, quantile, ahead, signal) %>% - filter(forecast_date == max(forecast_date)) %>% + filter( + forecast_date == max(forecast_date) + ) %>% ungroup() class(predictions_cards) <- c("predictions_cards", class(predictions_cards))