Skip to content

Commit

Permalink
Finish updating modis_ndvi. Allow pipeline to continue will bundle re…
Browse files Browse the repository at this point in the history
…quest is processing using previous bundle request.
  • Loading branch information
n8layman committed Sep 11, 2024
1 parent b3c29d1 commit 09cee7b
Show file tree
Hide file tree
Showing 4 changed files with 925 additions and 135 deletions.
73 changes: 29 additions & 44 deletions R/submit_modis_ndvi_bundle_request.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,64 +8,49 @@
#' @return
#' @author Emma Mendelsohn
#' @export
submit_modis_ndvi_bundle_request <- function(modis_ndvi_token, modis_ndvi_task_id_continent, timeout = 1000) {
submit_modis_ndvi_bundle_request <- function(modis_ndvi_token,
modis_ndvi_task_id_continent,
modis_ndvi_bundle_request_file) {

# Extract current task id
task_id <- modis_ndvi_task_id_continent$task_id

# Get sys time for the loop timeout
sys_start_time <- Sys.time()

# token <- paste("Bearer", fromJSON(token_response)$token)
# response <- GET("https://appeears.earthdatacloud.nasa.gov/api/task", add_headers(Authorization = modis_ndvi_token))
# task_response <- prettify(toJSON(content(response), auto_unbox = TRUE))
# task_response

# Function to check task status
check_task_status <- function() {
# Check the previous bundle if it exits
previous_bundle <- tryCatch(read_rds(modis_ndvi_bundle_request_file), error = function(e) NULL, warning = function(e) NULL)

if(!is.null(previous_bundle) && previous_bundle$task_id == task_id) {
bundle_response <- previous_bundle
} else {
task_response <- httr::GET("https://appeears.earthdatacloud.nasa.gov/api/task", httr::add_headers(Authorization = modis_ndvi_token))
task_response <- jsonlite::fromJSON(jsonlite::toJSON(httr::content(task_response))) |> filter(task_id == !!task_id)
task_response <- jsonlite::fromJSON(jsonlite::toJSON(httr::content(task_response))) |> filter(task_id == !!task_id)
task_status <- task_response |> pull(status) |> unlist()
assertthat::assert_that(task_status %in% c("queued", "pending", "processing", "done"))
return(task_status)
}

# Check task status in a loop
task_status <- ""
while (task_status != "done") {
task_status <- check_task_status()

# Print current task status
message(paste("task status:", task_status))

# Check timeout
elapsed_time <- difftime(Sys.time(), sys_start_time, units = "secs")
if (task_status != "done" & elapsed_time >= timeout) {
task_status <- NULL
break
}

# Sleep for a few seconds before checking again
if(task_status != "done"){
message("pausing 60 seconds before checking again")
Sys.sleep(60)

if(task_status == "done") {
# Fetch bundle response
bundle_response <- httr::GET(paste("https://appeears.earthdatacloud.nasa.gov/api/bundle/", task_id, sep = ""), httr::add_headers(Authorization = modis_ndvi_token))
bundle_response <- jsonlite::fromJSON(jsonlite::toJSON(httr::content(bundle_response)))
} else {
if(is.null(previous_bundle)) {
stop(glue::glue("modis_ndvi bundle is {task_status} and no previous bundle has been recorded. \nRe-run pipeline later."))
} else {
message(glue::glue("modis_ndvi bundle is {task_status}. Proceeding with {previous_bundle$task_id} bundle."))
bundle_response <- previous_bundle
}
}
}

# If timeout bail
if(is.null(task_status)) {
message("timeout reached")
return(tibble())
}

bundle_response <- GET(paste("https://appeears.earthdatacloud.nasa.gov/api/bundle/", task_id, sep = ""), add_headers(Authorization = modis_ndvi_token))
bundle_response <- fromJSON(toJSON(content(bundle_response)))

# Record the bundle response to save time next time
write_rds(bundle_response, modis_ndvi_bundle_request_file)

# Extract files from bundle response
bundle_response_files <- bundle_response$files |>
mutate(file_type = unlist(file_type)) |>
mutate(file_name = unlist(file_name)) |>
mutate(file_id = unlist(file_id)) |>
filter(file_type == "tif") |>
mutate(task_id = task_id)


# Return bundle response files
return(bundle_response_files)
}
16 changes: 12 additions & 4 deletions _targets.R
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,26 @@ dynamic_targets <- tar_plan(
modis_ndvi_end_year,
modis_ndvi_token,
bbox_coords = continent_bounding_box)),
# check if the request is posted, then get bundle
# this uses a while loop to check every 30 seconds if the request is complete - it takes about 10 minutes

tar_target(modis_ndvi_bundle_request_file, file.path(modis_ndvi_transformed_directory, "modis_ndvi_bundle_request.RDS")),

# Check if the request is posted, then get bundle
# this uses a while loop to check every 30 seconds if the request is complete - it can take a long time
# this function could be refactored to check time of modis_ndvi_task_request and pause for some time before submitting bundle request
# Instead of a loop just check the bundle request status once and move on.
# check on status of bundle request. If processing do nothing and move on
# with the pipeline using the previous bundle request or error if one doesn't exist
# If it's done, read in bundle request file if it exists and write the
# the bundle request to a file if it doesn't or the task_id doesn't match
tar_target(modis_ndvi_bundle_request, submit_modis_ndvi_bundle_request(modis_ndvi_token,
modis_ndvi_task_id_continent,
timeout = 1500) |>
modis_ndvi_bundle_request_file) |>
rowwise() |>
tar_group(),
cue = tar_cue("always"),
iteration = "group"
),


# Plan for large data:
# Step 1. Download any transformed files from AWS (separate target). No hash or error checks
# Step 2. Branch over bundle request
Expand Down
Loading

0 comments on commit 09cee7b

Please sign in to comment.