Skip to content

Commit

Permalink
Remove grib_ls dependency, switch to beta version of emcwf because en…
Browse files Browse the repository at this point in the history
…d of life for previous API is coming this month, and improve AWS storage and retrevial of transformed grib files.
  • Loading branch information
n8layman committed Sep 20, 2024
1 parent 1d5bed9 commit e6dc3a9
Show file tree
Hide file tree
Showing 11 changed files with 509 additions and 242 deletions.
Binary file modified .env
Binary file not shown.
71 changes: 71 additions & 0 deletions R/get_ecwmf_tasks.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#' Get a tibble of tasks from ECMWF API
#'
#' This function retrieves tasks from the ECMWF (European Centre for Medium-Range Weather Forecasts) API.
#' It sends a GET request using the `httr` package and returns the content of the response as a data frame.
#'
#' @author Nathan Layman
#'
#' @param url A character string specifying the base URL for the ECMWF CDS tasks API.
#' Defaults to "https://cds.climate.copernicus.eu/api/v2/tasks/".
#'
#' @return A data frame with the tasks retrieved from ECMWF.
#'
#' @export
get_ecwmf_tasks <- function(url = "https://cds.climate.copernicus.eu/api/v2/tasks/") {
response <- httr::GET(
url,
httr::authenticate(Sys.getenv("ECMWF_USERID"), Sys.getenv("ECMWF_TOKEN"))
)

httr::content(response) |> bind_rows()
}


#' Clear ECMWF Queued Tasks
#'
#' This function retrieves the currently queued tasks from the ECMWF (European Centre for Medium-Range Weather Forecasts) CDS (Climate Data Store)
#' API and deletes them using their request IDs.
#'
#' @author Nathan Layman
#'
#' @param url A character string specifying the base URL for the ECMWF CDS tasks API.
#' Defaults to "https://cds.climate.copernicus.eu/api/v2/tasks/".
#'
#' @return A data frame with the tasks retrieved from ECMWF.
#'
#' @export
clear_ecwmf_tasks <- function(url = "https://cds.climate.copernicus.eu/api/v2/tasks/") {

tasks_to_clear <- get_ecwmf_tasks() |>
filter(state == "queued") |>
mutate(request_id = paste0(url, request_id)) |>
pull(request_id)

request <- walk(tasks_to_clear, ~httr::DELETE(.x, httr::authenticate(Sys.getenv("ECMWF_USERID"), Sys.getenv("ECMWF_TOKEN"))))

}

submit_ecwmf_request <- function(request,
url = "https://cds.climate.copernicus.eu/api/v2/tasks/") {

response <- httr::PUT(jsonlite::toJSON(request), httr::authenticate(Sys.getenv("ECMWF_USERID"), Sys.getenv("ECMWF_TOKEN")))

}
response <- POST(
url = paste0(ecmwf_url, "/datasets/data/era5"), # Update endpoint as needed
add_headers(
Authorization = paste("Bearer", api_key),
"Content-Type" = "application/json"
),
body = toJSON(query), # Convert the query to JSON format
encode = "json" # Encode the body as JSON
)

# Check if the request was successful
if (status_code(response) == 200) {
# Save the response content to the desired file
writeBin(content(response, "raw"), "output.nc")
cat("File saved successfully.\n")
} else {
cat("Failed to fetch data. Status code:", status_code(response), "\n")
}
26 changes: 26 additions & 0 deletions R/get_grib_metadata.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#' Get and parse gdalinfo metadata from a grib file without relying on grib_ls
#'
#' @author Nathan Layman
#'
#' @param file
#'
#' @return
#' @export
#'
#' @examples
get_grib_metadata <- function(raw_file) {

gdalinfo_text <- terra::describe(raw_file, options = "json")

# Remove all text up to first BAND ^GEOGCRS
metadata_start_index <- grep("^Band|^BAND", gdalinfo_text)[1]
metadata_text <- gdalinfo_text[metadata_start_index:length(gdalinfo_text)]
metadata_text <- metadata_text[!str_detect(metadata_text, "^Band|BAND|Metadata|METADATA")]

metadata <- map_dfr(metadata_text, ~stringr::str_split(.x[1], "=")[[1]] |>
stringr::str_squish() |> setNames(c("name", "value"))) |>
mutate(band = cumsum(name == "Description")) |>
pivot_wider(names_from = "name", values_from = "value")

metadata
}
12 changes: 12 additions & 0 deletions R/tar_get_error.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#' Check error recorded in meta file for a target
#'
#' @param branch
#'
#' @return
#' @export
#'
#' @examples
tar_get_error <- function(branch) {
branch_name <- deparse(substitute(branch))
tar_meta() |> filter(name == branch_name) |> pull(error)
}
150 changes: 102 additions & 48 deletions R/transform_ecmwf_forecasts.R
Original file line number Diff line number Diff line change
@@ -1,61 +1,117 @@
#' .. content for \description{} (no empty lines) ..
#' Transform ECMWF Seasonal Forecast Data
#'
#' .. content for \details{} ..
#' This function downloads ECMWF seasonal forecast data, transforms it into parquet format, and performs basic checks
#' on the downloaded GRIB files. It leverages the ECMWF API to fetch forecast data for a specific system, year, and set of variables.
#'
#' @author Nathan Layman, Emma Mendelsohn
#'
#' @title
#' @param ecmwf_forecasts_downloaded
#' @param ecmwf_forecasts_transformed_directory
#' @param continent_raster_template
#' @param overwrite
#' @return
#' @author Emma Mendelsohn
#' @param ecmwf_forecasts_api_parameters A list containing the parameters for the ECMWF API request such as system, year, month, variables, etc.
#' @param local_folder Character. The path to the local folder where transformed files will be saved. Defaults to `ecmwf_forecasts_transformed_directory`.
#' @param continent_raster_template The path to the raster file used as a template for continent-level spatial alignment.
#' @param n_workers Integer. The number of workers to use for parallel processing, defaults to 2.
#'
#' @return Returns the path to the transformed parquet file if successful, or stops the function if there is an error.
#' @details The function checks if the transformed file already exists for the given year and system. If it exists and is valid, it returns the file path.
#' If not, it downloads the raw GRIB file using the ECMWF API, attempts to load and transform it, and saves the output as a parquet file. The function
#' checks file validity at multiple stages. Notes: Must accept licenses by manually downloading a dataset from here: https://cds-beta.climate.copernicus.eu/datasets/seasonal-postprocessed-single-levels?tab=overview
#'
#' @export
transform_ecmwf_forecasts <- function(ecmwf_forecasts_downloaded,
ecmwf_forecasts_transformed_directory,
continent_raster_template,
n_workers = 1,
overwrite = FALSE) {

# Get filename for saving from the raw data
filename <- tools::file_path_sans_ext(basename(ecmwf_forecasts_downloaded))
save_filename <- glue::glue("{filename}.gz.parquet")

# Check if file already exists
existing_files <- list.files(ecmwf_forecasts_transformed_directory)
message(paste0("Transforming ", save_filename))
if(save_filename %in% existing_files & !overwrite){
message("file already exists, skipping transform")
return(file.path(ecmwf_forecasts_transformed_directory, save_filename))
transform_ecmwf_forecasts <- function(ecmwf_forecasts_api_parameters,
local_folder = ecmwf_forecasts_transformed_directory,
continent_raster_template,
n_workers = 2,
time_out = 3600) {

# Check that ecmwf_forecasts_api_parameters is only one row
stopifnot(nrow(ecmwf_forecasts_api_parameters) == 1)

# Extract necessary details from the ecmwf paramters
system <- ecmwf_forecasts_api_parameters$system
year <- ecmwf_forecasts_api_parameters$year
month <- unlist(ecmwf_forecasts_api_parameters$month)

# Create an error safe way to test if the parquet file can be read, if it exists
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

transformed_file <- gsub("\\.grib", "\\.gz\\.parquet", raw_file)

# Check if transformed file already exists and can be loaded.
# If so return file name and path unless it's the current year
if(!is.null(error_safe_read_parquet(transformed_file)) && year < year(Sys.time())) return(transformed_file)

# ensure local_folder ends in '/'

# If the transformed file doesn't exist download what we need from ECMWF
# The metadata returned in grib formats sucks and requires an externam dependancy
# so download each piece separtatly.
raw_files <- expand.grid(product_type = unlist(ecmwf_forecasts_api_parameters$product_types),
variable = unlist(ecmwf_forecasts_api_parameters$variables)) |>
rowwise() |>
mutate(raw_file = file.path(local_folder, glue::glue("ecmwf_seasonal_forecast_sys{system}_{year}_{product_type}_{variable}.grib")))

request_list <- purrr::pmap(raw_files, function(product_type, variable, raw_file) {

list(
originating_centre = "ecmwf",
system = system,
variable = variable, # This can't (easily) be extracted from terra::describe()
product_type = product_type, # This can't be extracted from terra::describe()
year = year,
month = month,
leadtime_month = unlist(ecmwf_forecasts_api_parameters$leadtime_months), # This can be extracted from terra::describe()
area = round(unlist(ecmwf_forecasts_api_parameters$spatial_bounds), 1), # This can be extracted from terra::describe()
format = "grib",
dataset_short_name = "seasonal-monthly-single-levels",
target = basename(raw_file)
)

})

ecmwfr::wf_set_key(user = Sys.getenv("ECMWF_USERID"), key = Sys.getenv("ECMWF_TOKEN"))

# https://cds-beta.climate.copernicus.eu/datasets/seasonal-postprocessed-single-levels?tab=overview
ecmwf_files <- ecmwfr::wf_request_batch(request = request_list,
user = Sys.getenv("ECMWF_USERID"),
workers = n_workers,
path = local_folder,
time_out = time_out,
total_timeout = length(request_list) * time_out/n_workers)

# Verify that terra can open all the saved grib files. If not return NULL to try again next time
error_safe_read_rast <- possibly(terra::rast, NULL)
raw_gribs = map(ecmwf_files, ~error_safe_read_rast(.x))

# If not remove the files and stop
if(any(is.null(raw_gribs))) {
file.remove(raw_files$raw_file)
stop(glue::glue("At least one of the grib files for {raw_file} could not be loaded after download."))
}

# Read in with terra
grib <- rast(ecmwf_forecasts_downloaded)

# Read in continent template raster
continent_raster_template <- rast(continent_raster_template)

# Get associated metadata and remove non-df rows
grib_meta <- system(paste("grib_ls", ecmwf_forecasts_downloaded), intern = TRUE)
remove <- c(1, (length(grib_meta)-2):length(grib_meta))
grib_meta <- system(paste("grib_ls", raw_file), intern = TRUE)
remove <- c(1, (length(grib_meta)-2):length(grib_meta))
grib_meta <- grib_meta[-remove]

# Processing metadata to join with actual data
meta <- read.table(text = grib_meta, header = TRUE) |>
as_tibble() |>
janitor::clean_names() |>
mutate(variable_id = as.character(glue::glue("{data_date}_step{step_range}_{data_type}_{short_name}"))) |>
mutate(data_date = ymd(data_date)) |>
select(-grid_type, -packing_type, -level, -type_of_level, -centre, -edition)

# Almost got rid of grib_ls dependency but can't get data_type from gdalinfo.
# Though it looks like we're only using the mean?
grib_meta <- pmap_dfr(raw_files, function(product_type, variable, raw_file) {
get_grib_metadata(raw_file) |>
mutate(step_range = as.numeric(GRIB_FORECAST_SECONDS) / 3600, # forecast step in hours from seconds
data_date = as.POSIXct(as.numeric(GRIB_REF_TIME), origin = "1970-01-01", tz = "UTC"), # Forecasting out from
short_name = stringr::str_to_sentence(GRIB_ELEMENT),
data_type = product_type,
variable = variable,
variable_id = as.character(glue::glue("{data_date}_step{step_range}_{data_type}_{short_name}"))) |>
dplyr::select(data_date, step_range, data_type, variable, short_name, variable_id)
})

raw_grib <- terra::rast(raw_gribs)
# Rename layers with the metadata names
names(grib) <- meta$variable_id

# Select only the means columns
grib_subset <- subset(grib, which(str_detect(names(grib), "fcmean")))
names(raw_grib) <- grib_meta$variable_id

# Units conversions
# 2d = "2m_dewpoint_temperature" = 2 metre dewpoint temperature K
# 2t = "2m_temperature" = 2 metre temperature K
# 2t = "2m_temperature" = 2 metre temperature K
# tprate = "total_precipitation" = Total precipitation m/s
# https://cds.climate.copernicus.eu/cdsapp#!/dataset/seasonal-monthly-single-levels?tab=overview

Expand Down Expand Up @@ -135,6 +191,4 @@ transform_ecmwf_forecasts <- function(ecmwf_forecasts_downloaded,
write_parquet(grib_means, here::here(ecmwf_forecasts_transformed_directory, save_filename), compression = "gzip", compression_level = 5)

return(file.path(ecmwf_forecasts_transformed_directory, save_filename))


}
3 changes: 1 addition & 2 deletions R/transform_modis_ndvi.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#' (parquet file), aligning it with a continental raster template. If the transformed file
#' already exists and can be read, it will return the existing file.
#'
#' @author Nathan Layman
#' @author Nathan Layman, Emma Mendelsohn
#'
#' @param modis_ndvi_token Character. The authentication token required for the AppEEARS API.
#' @param modis_ndvi_bundle_request List. Contains the `file_name`, `task_id`, and `file_id` from the AppEEARS bundle request for MODIS NDVI data.
Expand Down Expand Up @@ -46,7 +46,6 @@ transform_modis_ndvi <- function(modis_ndvi_token,
response <- httr::GET(paste("https://appeears.earthdatacloud.nasa.gov/api/bundle/", task_id, '/', file_id, sep = ""),
httr::write_disk(raw_file, overwrite = TRUE), httr::progress(), httr::add_headers(Authorization = modis_ndvi_token))


# Verify rast can open the saved raster file. If not return NULL
error_safe_read_rast <- possibly(terra::rast, NULL)
raw_raster = error_safe_read_rast(raw_file)
Expand Down
2 changes: 1 addition & 1 deletion R/transform_nasa_weather.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#' based on a continent raster template, and saves the resulting dataset as a parquet file. It checks if the
#' transformed file already exists and avoids redundant data downloads and processing.
#'
#' @author Nathan Layman
#' @author Nathan Layman, Emma Mendelsohn
#'
#' @param nasa_weather_coordinates Dataframe. A dataframe containing columns of coordinates for the bounding box (x_min, y_min, x_max, y_max) to download weather data.
#' @param nasa_weather_year Integer. The year for which to download and transform the weather data.
Expand Down
Loading

0 comments on commit e6dc3a9

Please sign in to comment.