Skip to content

Commit

Permalink
Merge pull request #66 from ecohealthalliance/feature/ecmwf-anomalies
Browse files Browse the repository at this point in the history
Feature/ecmwf anomalies
  • Loading branch information
emmamendelsohn authored Nov 6, 2023
2 parents 8b6650a + d39f8bd commit 0ce8d73
Show file tree
Hide file tree
Showing 14 changed files with 7,417 additions and 6,714 deletions.
134 changes: 134 additions & 0 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#' .. content for \description{} (no empty lines) ..
#'
#' .. content for \details{} ..
#'
#' @title
#' @param ecmwf_forecasts_transformed
#' @param ecmwf_forecasts_transformed_directory
#' @param weather_historical_means
#' @param forecast_anomalies_directory
#' @param model_dates
#' @param model_dates_selected
#' @param overwrite
#' @return
#' @author Emma Mendelsohn
#' @export
calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
ecmwf_forecasts_transformed_directory,
weather_historical_means,
forecasts_anomalies_directory,
model_dates_selected,
lead_intervals,
overwrite = FALSE) {

# Set filename
date_selected <- model_dates_selected
save_filename <- glue::glue("forecast_anomaly_{date_selected}.gz.parquet")
message(paste0("Calculating forecast anomalies for ", date_selected))

# Check if file already exists
existing_files <- list.files(forecasts_anomalies_directory)
if(save_filename %in% existing_files & !overwrite) {
message("file already exists, skipping download")
return(file.path(forecasts_anomalies_directory, save_filename))
}

# Open dataset to transformed data
forecasts_transformed_dataset <- open_dataset(ecmwf_forecasts_transformed_directory)

# Get the forecasts anomalies for selected dates, mapping over the lead intervals
lead_intervals_start <- c(0 , lead_intervals[-length(lead_intervals)]) # 0 to include current day in forecast
lead_intervals_end <- lead_intervals - 1 # -1 for 30 days total include start day

anomalies <- map(1:length(lead_intervals_start), function(i){

# subset to start and end day of interval
start <- lead_intervals_start[i]
end <- lead_intervals_end[i]

lead_start_date <- date_selected + start
lead_end_date <- date_selected + end

# lead months for subsetting
lead_months <- as.character(c(i, i+1))

# this is the date from which the forecasts are made
baseline_date <- floor_date(date_selected, unit = "month")

# calculate weights
weight_a <- as.integer(days_in_month(lead_start_date) - day(lead_start_date)) + 1 # include current date
weight_b <- day(lead_end_date) - 1

# get weighted mean of forecasts means
lead_means <- forecasts_transformed_dataset |>
filter(data_date == baseline_date) |>
filter(lead_month %in% lead_months) |>
mutate(weight = case_when(lead_month == !!lead_months[1] ~ !!weight_a,
lead_month == !!lead_months[2] ~ !!weight_b)) |>
group_by(x, y, short_name) |>
summarize(lead_mean = sum(mean * weight)/ sum(weight)) |>
ungroup()

# get historical means for lead period, removing doy 366
lead_dates <- seq(lead_start_date, lead_end_date, by = "day")
lead_doys <- yday(lead_dates)
if(366 %in% lead_doys) {
if(tail(lead_doys, 1) == 366){
lead_doys <- lead_doys[lead_doys!=366]
lead_doys <- c(lead_doys, 1)
}else{
lead_doys <- lead_doys[lead_doys!=366]
lead_doys <- c(lead_doys, tail(lead_doys, 1) + 1)
}
}

doy_start <- head(lead_doys, 1)
doy_end <- tail(lead_doys, 1)
doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")

historical_means <- read_parquet(weather_historical_means[str_detect(weather_historical_means, doy_range)])
assertthat::assert_that(nrow(historical_means) > 0)

# calculate anomalies - a bit inefficient because arrow doesn't allow reshaping (should have done so in the transform function)
# NAs are expected because forecasts are for the whole continent, weather is just for areas of interest

temp_anomalies <- lead_means |>
filter(short_name == "2t") |>
left_join(historical_means |> select(x, y, contains("temperature")), by = c("x", "y")) |>
mutate(!!paste0("anomaly_temperature_forecast_", end) := lead_mean - historical_temperature_mean,
!!paste0("anomaly_temperature_scaled_forecast_", end) := (lead_mean - historical_temperature_mean)/historical_temperature_sd) |>
select(-short_name, -lead_mean, -starts_with("historical")) |>
filter(!is.na(!!sym(paste0("anomaly_temperature_forecast_", end))))

rh_anomalies <- lead_means |>
filter(short_name == "rh") |>
left_join(historical_means |> select(x, y, contains("humidity")), by = c("x", "y")) |>
mutate(!!paste0("anomaly_relative_humidity_forecast_", end) := lead_mean - historical_relative_humidity_mean,
!!paste0("anomaly_relative_humidity_scaled_forecast_", end) := (lead_mean - historical_relative_humidity_mean)/historical_relative_humidity_sd) |>
select(-short_name, -lead_mean, -starts_with("historical")) |>
filter(!is.na(!!sym(paste0("anomaly_relative_humidity_forecast_", end))))

precip_anomalies <- lead_means |>
filter(short_name == "tprate") |>
left_join(historical_means |> select(x, y, contains("precipitation")), by = c("x", "y")) |>
mutate(!!paste0("anomaly_precipitation_forecast_", end) := lead_mean - historical_precipitation_mean,
!!paste0("anomaly_precipitation_scaled_forecast_", end) := (lead_mean - historical_precipitation_mean)/historical_precipitation_sd) |>
select(-short_name, -lead_mean, -starts_with("historical")) |>
filter(!is.na(!!sym(paste0("anomaly_precipitation_forecast_", end))))

left_join(temp_anomalies, rh_anomalies, by = c("x", "y")) |>
left_join(precip_anomalies, by = c("x", "y"))

}) |>
reduce(left_join, by = c("x", "y")) |>
mutate(date = date_selected) |>
relocate(date)

# Save as parquet
write_parquet(anomalies, here::here(forecasts_anomalies_directory, save_filename), compression = "gzip", compression_level = 5)

return(file.path(forecasts_anomalies_directory, save_filename))

}
34 changes: 21 additions & 13 deletions R/calculate_ndvi_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#' @author Emma Mendelsohn
#' @export
calculate_ndvi_anomalies <- function(ndvi_date_lookup, ndvi_historical_means,
ndvi_anomalies_directory, model_dates,
ndvi_anomalies_directory,
model_dates_selected, lag_intervals,
overwrite = FALSE) {

Expand All @@ -30,25 +30,33 @@ calculate_ndvi_anomalies <- function(ndvi_date_lookup, ndvi_historical_means,
return(file.path(ndvi_anomalies_directory, save_filename))
}

# Get historical means for DOY
doy <- model_dates |> filter(date == date_selected) |> pull(day_of_year)
doy_frmt <- str_pad(doy,width = 3, side = "left", pad = "0")
historical_means <- read_parquet(ndvi_historical_means[str_detect(ndvi_historical_means, doy_frmt)]) |>
select(-day_of_year)

# Get the lagged anomalies for selected dates, mapping over the lag intervals
row_select <- which(model_dates$date == date_selected)

lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)])
lag_intervals_end <- lag_intervals
lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)]) # 1 to start with previous day
lag_intervals_end <- lag_intervals # 30 days total including end day

anomalies <- map2(lag_intervals_start, lag_intervals_end, function(start, end){

lag_dates <- model_dates |> slice((row_select - start):(row_select - end))
# get lag dates, removing doy 366
lag_dates <- seq(date_selected - end, date_selected - start, by = "day")
lag_doys <- yday(lag_dates)
if(366 %in% lag_doys){
lag_doys <- lag_doys[lag_doys!=366]
lag_doys <- c(head(lag_doys, 1) - 1, lag_doys)
}

# Get historical means for lag period
doy_start <- head(lag_doys, 1)
doy_end <- tail(lag_doys, 1)
doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")

historical_means <- read_parquet(ndvi_historical_means[str_detect(ndvi_historical_means, doy_range)])
assertthat::assert_that(nrow(historical_means) > 0)

# get files and weights for the calculations
weights <- ndvi_date_lookup |>
mutate(lag_date = map(lookup_dates, ~. %in% lag_dates$date)) |>
mutate(lag_date = map(lookup_dates, ~. %in% lag_dates)) |>
mutate(weight = unlist(map(lag_date, sum))) |>
filter(weight > 0) |>
select(start_date, filename, weight)
Expand Down
53 changes: 38 additions & 15 deletions R/calculate_ndvi_historical_means.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
#' @export
calculate_ndvi_historical_means <- function(ndvi_historical_means_directory,
ndvi_date_lookup, days_of_year,
lag_intervals,
overwrite = FALSE) {

interval_length <- unique(diff(lag_intervals))

# Set filename
doy <- days_of_year
doy_frmt <- str_pad(doy,width = 3, side = "left", pad = "0")
save_filename <- glue::glue("historical_ndvi_mean_doy_{doy_frmt}.gz.parquet")
message(paste("calculating historical ndvi means and standard deviations for doy", doy_frmt))
# use dummy dates to keep date logic
doy_start <- days_of_year
dummy_date_start <- ymd("20210101") + doy_start - 1
dummy_date_end <- dummy_date_start + interval_length - 1
doy_end <- yday(dummy_date_end)

doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")

save_filename <- glue::glue("historical_ndvi_mean_doy_{doy_start_frmt}_to_{doy_end_frmt}.gz.parquet")
message(paste("calculating historical ndvi means and standard deviations for doy", doy_start_frmt, "to", doy_end_frmt))

# Check if file already exists
existing_files <- list.files(ndvi_historical_means_directory)
Expand All @@ -30,21 +40,34 @@ calculate_ndvi_historical_means <- function(ndvi_historical_means_directory,
return(file.path(ndvi_historical_means_directory, save_filename))
}

# Get relevant NDVI intervals
doy_lookup <- ndvi_date_lookup |>
filter(map_lgl(lookup_day_of_year, ~any(. == doy)))
# Get for relevant days of the year
doy_select <- yday(seq(dummy_date_start, dummy_date_end, by = "day"))

# Create dataset of relevant files
ndvi_dataset <- open_dataset(doy_lookup$filename)
# Get relevant NDVI files and weights for the calculations
weights <- ndvi_date_lookup |>
mutate(lag_doy = map(lookup_day_of_year, ~. %in% doy_select)) |>
mutate(weight = unlist(map(lag_doy, sum))) |>
filter(weight > 0) |>
select(start_date, filename, weight)

# Calculate historical means and standard deviations
ndvi_dataset <- open_dataset(weights$filename) |>
left_join(weights |> select(-filename))

# Calculate weighted means
historical_means <- ndvi_dataset |>
mutate(day_of_year = doy) |>
group_by(x, y, day_of_year) |>
summarize(historical_ndvi_mean = mean(ndvi),
historical_ndvi_sd = sd(ndvi)) |>
ungroup()
group_by(x, y) |>
summarize(historical_ndvi_mean = sum(ndvi * weight)/ sum(weight)) |>
ungroup()

# Calculate weighted standard deviations, using weighted mean from previous step
historical_sds <- ndvi_dataset |>
left_join(historical_means) |>
group_by(x, y) |>
summarize(historical_ndvi_sd = sqrt(sum(weight * (ndvi - historical_ndvi_mean)^2) / (sum(weight)-1)) ) |>
ungroup()

historical_means <- left_join(historical_means, historical_sds)

# Save as parquet
write_parquet(historical_means, here::here(ndvi_historical_means_directory, save_filename), compression = "gzip", compression_level = 5)

Expand Down
36 changes: 22 additions & 14 deletions R/calculate_weather_anomalies.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ calculate_weather_anomalies <- function(nasa_weather_transformed,
nasa_weather_transformed_directory,
weather_historical_means,
weather_anomalies_directory,
model_dates,
model_dates_selected,
lag_intervals,
overwrite = FALSE) {
Expand All @@ -32,24 +31,33 @@ calculate_weather_anomalies <- function(nasa_weather_transformed,
# Open dataset to transformed data
weather_transformed_dataset <- open_dataset(nasa_weather_transformed_directory)

# Get historical means for DOY
doy <- model_dates |> filter(date == date_selected) |> pull(day_of_year)
doy_frmt <- str_pad(doy,width = 3, side = "left", pad = "0")
historical_means <- read_parquet(weather_historical_means[str_detect(weather_historical_means, doy_frmt)]) |>
select(-day_of_year)

# Get the lagged anomalies for selected dates, mapping over the lag intervals
row_select <- which(model_dates$date == date_selected)

lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)])
lag_intervals_end <- lag_intervals
lag_intervals_start <- c(1 , 1+lag_intervals[-length(lag_intervals)]) # 1 to start with previous day
lag_intervals_end <- lag_intervals # 30 days total including end day

anomalies <- map2(lag_intervals_start, lag_intervals_end, function(start, end){
lag_dates <- model_dates |> slice((row_select - start):(row_select - end))

# Lag: calculate mean by pixel for the preceding x days
# get lag dates, removing doy 366
lag_dates <- seq(date_selected - end, date_selected - start, by = "day")
lag_doys <- yday(lag_dates)
if(366 %in% lag_doys){
lag_doys <- lag_doys[lag_doys!=366]
lag_doys <- c(head(lag_doys, 1) - 1, lag_doys)
}

# Get historical means for lag period
doy_start <- head(lag_doys, 1)
doy_end <- tail(lag_doys, 1)
doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")
doy_range <- glue::glue("{doy_start_frmt}_to_{doy_end_frmt}")

historical_means <- read_parquet(weather_historical_means[str_detect(weather_historical_means, doy_range)])
assertthat::assert_that(nrow(historical_means) > 0)

# Lag: calculate mean by pixel for the lag days
lagged_means <- weather_transformed_dataset |>
filter(date %in% !!lag_dates$date) |>
filter(date %in% lag_dates) |>
group_by(x, y) |>
summarize(lag_relative_humidity_mean = mean(relative_humidity),
lag_temperature_mean = mean(temperature),
Expand Down
30 changes: 22 additions & 8 deletions R/calculate_weather_historical_means.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,29 @@
#' @return
#' @author Emma Mendelsohn
#' @export
calculate_weather_historical_means <- function(nasa_weather_transformed,
calculate_weather_historical_means <- function(nasa_weather_transformed, # enforce dependency
nasa_weather_transformed_directory,
weather_historical_means_directory,
days_of_year,
lag_intervals,
lead_intervals,
overwrite = FALSE) {

interval_length <- unique(c(diff(lag_intervals), diff(lead_intervals)))
assertthat::are_equal(length(interval_length), 1)

# Set filename
doy <- days_of_year
doy_frmt <- str_pad(doy,width = 3, side = "left", pad = "0")
save_filename <- glue::glue("historical_weather_mean_doy_{doy_frmt}.gz.parquet")
message(paste("calculating historical weather means and standard deviations for doy", doy_frmt))
# use dummy dates to keep date logic
doy_start <- days_of_year
dummy_date_start <- ymd("20210101") + doy_start - 1
dummy_date_end <- dummy_date_start + interval_length - 1
doy_end <- yday(dummy_date_end)

doy_start_frmt <- str_pad(doy_start, width = 3, side = "left", pad = "0")
doy_end_frmt <- str_pad(doy_end, width = 3, side = "left", pad = "0")

save_filename <- glue::glue("historical_weather_mean_doy_{doy_start_frmt}_to_{doy_end_frmt}.gz.parquet")
message(paste("calculating historical weather means and standard deviations for doy", doy_start_frmt, "to", doy_end_frmt))

# Check if file already exists
existing_files <- list.files(weather_historical_means_directory)
Expand All @@ -30,10 +42,12 @@ calculate_weather_historical_means <- function(nasa_weather_transformed,
# Open dataset to transformed data
weather_transformed_dataset <- open_dataset(nasa_weather_transformed_directory)

# Filter for day of year and calculate historical means and standard deviations
# Filter for relevant days of the year and calculate historical means and standard deviations
doy_select <- yday(seq(dummy_date_start, dummy_date_end, by = "day"))

historical_means <- weather_transformed_dataset |>
filter(day_of_year == doy) |>
group_by(x, y, day_of_year) |>
filter(day_of_year %in% doy_select) |>
group_by(x, y) |>
summarize(historical_relative_humidity_mean = mean(relative_humidity),
historical_temperature_mean = mean(temperature),
historical_precipitation_mean = mean(precipitation),
Expand Down
Loading

0 comments on commit 0ce8d73

Please sign in to comment.