Skip to content

Download only recently added or changed forecaster files #233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
2 changes: 2 additions & 0 deletions .github/workflows/s3_upload_ec2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ jobs:
password: ${{ secrets.CMU_DELPHI_DEPLOY_MACHINE_PAT }}

- name: Deploy score files to S3 bucket
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
make deploy

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ r_build:
%.rds: dist
test -f dist/$@ || curl -o dist/$@ $(S3_URL)/$@

pull_data: score_cards_state_deaths.rds score_cards_state_cases.rds score_cards_nation_cases.rds score_cards_nation_deaths.rds score_cards_state_hospitalizations.rds score_cards_nation_hospitalizations.rds datetime_created_utc.rds
pull_data: score_cards_state_deaths.rds score_cards_state_cases.rds score_cards_nation_cases.rds score_cards_nation_deaths.rds score_cards_state_hospitalizations.rds score_cards_nation_hospitalizations.rds datetime_created_utc.rds predictions_cards.rds

dist:
mkdir $@
Expand All @@ -23,6 +23,7 @@ score_forecast: r_build dist pull_data
-v ${PWD}/Report:/var/forecast-eval \
-v ${PWD}/dist:/var/dist \
-w /var/forecast-eval \
-e GITHUB_TOKEN=${GITHUB_TOKEN} \
forecast-eval-build \
Rscript create_reports.R --dir /var/dist

Expand Down
179 changes: 166 additions & 13 deletions Report/create_reports.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ library("optparse")
library("dplyr")
library("evalcast")
library("lubridate")
library("jsonlite")
library("httr")


# TODO: Contains fixed versions of WIS component metrics, to be ported over to evalcast
# Redefines overprediction, underprediction and sharpness
Expand All @@ -21,13 +24,21 @@ option_list <- list(
)
opt_parser <- OptionParser(option_list = option_list)
opt <- parse_args(opt_parser)

output_dir <- opt$dir
prediction_cards_filename <- "predictions_cards.rds"
prediction_cards_filepath <- case_when(
!is.null(output_dir) ~ file.path(output_dir, prediction_cards_filename),
TRUE ~ prediction_cards_filename
)

github_token <- Sys.getenv("GITHUB_TOKEN")
if (github_token == "") {
stop("Token is not set or is not able to be fetched from the environment")
}

data_pull_timestamp <- now(tzone = "UTC")

options(warn = 1)

# Requested forecasters that do not get included in final scores:
Expand All @@ -46,6 +57,108 @@ forecasters <- unique(c(
get_covidhub_forecaster_names(designations = c("primary", "secondary")),
"COVIDhub-baseline", "COVIDhub-trained_ensemble", "COVIDhub-4_week_ensemble"
))

## Get list of new and modified files to download
# The `path` field filters commits to only those that modifying the listed dir
BASE_URL <- "https://api.github.com/repos/reichlab/covid19-forecast-hub/commits?sha=%s&per_page=%s&path=data-processed&since=%s&page=%s"
ITEMS_PER_PAGE <- 100
BRANCH <- "master"

# We want to fetch all commits made since the previous run. Add 1 day in as buffer.
#
# Timestamp should be in ISO 8601 format. See
# https://docs.github.com/en/rest/reference/commits#list-commits--parameters for
# details.
previous_run_ts <- readRDS(file.path(output_dir, "datetime_created_utc.rds")) %>%
pull(datetime)
since_date <- strftime(previous_run_ts - days(1), "%Y-%m-%dT%H:%M:%SZ", tz = "UTC")

page <- 0
commit_sha_dates <- list()

# Fetch list of commits from API, one page at a time. Each page contains up to
# 100 commits. If a page contains 100 commits, assume that there are more
# results and fetch the next page.
while (!exists("temp_commits") || nrow(temp_commits) == 100) {
page <- page + 1
# Construct the URL
commits_url <- sprintf(BASE_URL, BRANCH, ITEMS_PER_PAGE, since_date, page)

request <- GET(commits_url, add_headers(Authorization = paste("Bearer", github_token)))
# Convert any HTTP errors to R errors automatically.
stop_for_status(request)

# Convert results from nested JSON/list to dataframe. If no results returned,
# `temp_commits` will be an empty list.
temp_commits <- content(request, as = "text") %>%
fromJSON(simplifyDataFrame = TRUE, flatten = TRUE)

if (identical(temp_commits, list())) {
break
}

commit_sha_dates[[page]] <- select(temp_commits, sha, url)
}

# Combine all requested pages of commits into one dataframe
commit_sha_dates <- bind_rows(commit_sha_dates)

added_modified_files <- lapply(commit_sha_dates$url, function(commit_url) {
# For each commit in `temp_commits`, get a list of any modified files.
print(commit_url)

# Make API call for each commit sha
request <- GET(commit_url, add_headers(Authorization = paste("Bearer", github_token)))
stop_for_status(request)
commit <- content(request, as = "text") %>%
fromJSON(simplifyDataFrame = TRUE, flatten = TRUE)

commit_files <- commit$files

# Return empty df if no files listed as modified (can happen with merges, e.g.)
if (identical(commit_files, list())) {
return(data.frame())
}

# Else return list of changed files for each commit
commit_files %>% mutate(commit_date = commit$commit$author$date)
}) %>%
bind_rows() %>%
select(filename, status, commit_date) %>%
# File must be in data-processed dir somewhere and be a csv with expected name
# format. We're only interested in added or modified files (not deleted,
# renamed, copied, etc).
filter(
grepl("data-processed/.*/[0-9]{4}-[0-9]{2}-[0-9]{2}-.*[.]csv", filename),
status %in% c("added", "modified")
) %>%
select(-status) %>%
group_by(filename) %>%
# Keep most recent reference to a given file. Implicitly deduplicates by filename.
filter(
commit_date == max(commit_date)
) %>%
ungroup()

# Get forecaster name and date from filename
filename_parts <- strsplit(added_modified_files$filename, "/")
added_modified_files <- added_modified_files %>%
mutate(
forecaster = lapply(
filename_parts, function(parts) {
parts[[2]]
}
) %>%
unlist(),
forecast.date = lapply(
filename_parts, function(parts) {
substr(parts[[3]], start = 1, stop = 10)
}
) %>%
unlist()
) %>%
filter(forecaster %in% forecasters)

locations <- covidHubUtils::hub_locations

# also includes "us", which is national level data
Expand All @@ -58,24 +171,40 @@ signals <- c(
"confirmed_admissions_covid_1d"
)

data_pull_timestamp <- now(tzone = "UTC")
predictions_cards <- get_covidhub_predictions(forecasters,
signal = signals,
ahead = 1:28,
geo_values = state_geos,
verbose = TRUE,
use_disk = TRUE
# Since forecast dates are shared across all forecasters, if a new forecaster
# is added that backfills forecast dates, we will end up requesting all those
# dates for forecasters we've already seen before. To prevent that, make a new
# call to `get_covidhub_predictions` for each forecaster with its own dates.
predictions_cards <- lapply(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could easily parallelize this.

unique(added_modified_files$forecaster),
function(forecaster_name) {
fetch_dates <- added_modified_files %>%
filter(forecaster == forecaster_name) %>%
distinct(forecast.date) %>%
pull()

get_covidhub_predictions(
forecaster_name,
signal = signals,
ahead = 1:28,
geo_values = state_geos,
forecast_dates = fetch_dates,
verbose = TRUE,
use_disk = TRUE
) %>%
filter(!(incidence_period == "epiweek" & ahead > 4))
}
) %>%
filter(!(incidence_period == "epiweek" & ahead > 4))
bind_rows()

options(warn = 0)

# Includes predictions for future dates, which will not be scored.
predictions_cards <- predictions_cards %>%
filter(!is.na(target_end_date))

# For hospitalizations, drop all US territories except Puerto Rico and the
# Virgin Islands; HHS does not report data for any territories except PR and VI.
# For hospitalizations, drop all US territories except Puerto Rico (pr) and the
# Virgin Islands (vi); HHS does not report data for any territories except these two.
territories <- c("as", "gu", "mp", "fm", "mh", "pw", "um")
predictions_cards <- predictions_cards %>%
filter(!(geo_value %in% territories & data_source == "hhs"))
Expand All @@ -90,10 +219,34 @@ predictions_cards <- predictions_cards %>%
(incidence_period == "day" & target_end_date > forecast_date)
)

# And only a forecaster's last forecast if multiple were made
predictions_cards <- predictions_cards %>%
# Load old predictions cards.
if (file.exists(prediction_cards_filepath)) {
old_predictions_cards <- readRDS(prediction_cards_filepath) %>%
mutate(updated_cards_flag = 0)
} else {
warning("Could not find prediction cards at ", prediction_cards_filepath)
old_predictions_cards <- data.frame()
}


# Merge old and new predictions cards.
predictions_cards <- bind_rows(
mutate(predictions_cards, updated_cards_flag = 1),
old_predictions_cards
) %>%
# If a given forecast appears in both old and new prediction cards, keep the new one.
group_by(forecaster, geo_value, target_end_date, quantile, ahead, signal) %>%
filter(
updated_cards_flag == max(updated_cards_flag)
) %>%
ungroup() %>%
select(-updated_cards_flag) %>%
# If multiple forecasts were made for a given set of characteristics, keep the
# newest version of the forecast.
group_by(forecaster, geo_value, target_end_date, quantile, ahead, signal) %>%
filter(forecast_date == max(forecast_date)) %>%
filter(
forecast_date == max(forecast_date)
) %>%
ungroup()
class(predictions_cards) <- c("predictions_cards", class(predictions_cards))

Expand Down