Skip to content

Commit

Permalink
Resolve incompatible types when merging parquet files. Involved simpl…
Browse files Browse the repository at this point in the history
…ifying soil_preprocessed as well. Figure out multiple join without collecting to remain in arrow until hive write. Static layers working still need to fix dynamic layers.
  • Loading branch information
n8layman committed Oct 22, 2024
1 parent 2f8f2cc commit b475f94
Show file tree
Hide file tree
Showing 11 changed files with 1,977 additions and 1,064 deletions.
Binary file modified .env
Binary file not shown.
67 changes: 15 additions & 52 deletions R/augment_data.R
Original file line number Diff line number Diff line change
@@ -1,68 +1,31 @@
#' Augment Weather Data
#' Augment and store disparate datasets
#'
#' This function collects data from three different sources, checks for missing values,
#' combines them into a single dataset, and saves the augmented data as a partitioned dataset
#' in parquet format to a specified directory.
#' This function ingests multiple datasets, augments them, and stores them in Parquet format in a specified directory.
#'
#' @author Emma Mendelsohn
#' @author Nathan C. Layman
#'
#' @param weather_anomalies File path to the weather anomalies dataset.
#' @param forecasts_anomalies File path to the forecasts anomalies dataset.
#' @param ndvi_anomalies File path to the NDVI anomalies dataset.
#' @param augmented_data_directory Directory where the augmented data will be saved in parquet format.
#' @param augmented_data_sources The list of data sources to be augmented. These sources are ingested as Arrow datasets.
#' @param augmented_data_directory The directory where augmented datasets are to be stored.
#' @param ... Additional arguments not used by this function but included for potential function extensions.
#'
#' @return A string containing the file path to the directory where the augmented data is saved.
#' @return A vector of strings containing the filepaths to the newly created Parquet files.
#'
#' @note This function performs a left join of the three datasets on the date, x, and y variables.
#' Any NA values in the 'date', 'x', and 'y' columns of the dataset will be dropped. The function
#' saves the resulting dataset in the specified directory using hive partitioning by date.
#' @note This function uses Apache Arrow for data ingestion and to write Parquet files. The output files are partitioned by date and compressed using gzip.
#'
#' @examples
#' augment_data(weather_anomalies = 'path/to/weather_data',
#' forecasts_anomalies = 'path/to/forecast_data',
#' ndvi_anomalies = 'path/to/ndvi_data',
#' augmented_data_directory = 'path/to/save/augmented_data')
#' augment_data(augmented_data_sources = list("dataset1.csv", "dataset2.feather"),
#' augmented_data_directory = "./data")
#'
#' @export
augment_data <- function(weather_anomalies,
forecasts_anomalies,
ndvi_anomalies,
augment_data <- function(augmented_data_sources,
augmented_data_directory,
overwrite = FALSE,
...) {

# Figure out how to do all this OUT of memory.
message("Loading datasets into memory")
weather <- arrow::open_dataset(weather_anomalies) |> dplyr::collect()
forecasts <- arrow::open_dataset(forecasts_anomalies) |> dplyr::collect()
ndvi <- arrow::open_dataset(ndvi_anomalies) |> dplyr::collect()

message("NA checks")
## Weather and forecasts
### NAs are in scaled precip data, due to days with 0 precip
weather_check <- purrr::map_lgl(weather, ~any(is.na(.)))
assertthat::assert_that(all(str_detect(names(weather_check[weather_check]), "scaled")))

forecasts_check <- purrr::map_lgl(forecasts, ~any(is.na(.)))
assertthat::assert_that(all(str_detect(names(forecasts_check[forecasts_check]), "scaled")))

## NDVI
### Prior to 2018: NAs are due to region missing from Eastern Africa in modis data
### After 2018: NAs are due to smaller pockets of missing data on a per-cycle basis
### okay to remove when developing RSA model (issue #72)
ndvi_check <- purrr::map_lgl(ndvi, ~any(is.na(.)))
assertthat::assert_that(!any(ndvi_check[c("date", "x", "y")]))
ndvi <- drop_na(ndvi)

message("Join into a single object")
augmented_data <- left_join(weather, forecasts, by = join_by(date, x, y)) |>
left_join(ndvi, by = join_by(date, x, y))
# DON'T collect if at all possible. Keep everything in arrow to keep it out of memory until the last possible moment before hive writing
ds <- reduce(map(unlist(augmented_data_sources$static_layers), arrow::open_dataset), dplyr::left_join, by = c("x", "y"))

message("Save as parquets using hive partitioning by date")
augmented_data |>
group_by(date) |>
write_dataset(augmented_data_directory)

return(augmented_data_directory)
ds |> mutate(hive_date = date) |> group_by(hive_date) |> arrow::write_dataset(augmented_data_directory, compression = "gzip", compression_level = 5)

return(list.files(augmented_data_directory, pattern = ".parquet", recursive = TRUE, full.names = TRUE))
}
68 changes: 68 additions & 0 deletions R/augment_data_old.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#' Augment Weather Data
#'
#' This function collects data from three different sources, checks for missing values,
#' combines them into a single dataset, and saves the augmented data as a partitioned dataset
#' in parquet format to a specified directory.
#'
#' @author Emma Mendelsohn
#'
#' @param weather_anomalies File path to the weather anomalies dataset.
#' @param forecasts_anomalies File path to the forecasts anomalies dataset.
#' @param ndvi_anomalies File path to the NDVI anomalies dataset.
#' @param augmented_data_directory Directory where the augmented data will be saved in parquet format.
#'
#' @return A string containing the file path to the directory where the augmented data is saved.
#'
#' @note This function performs a left join of the three datasets on the date, x, and y variables.
#' Any NA values in the 'date', 'x', and 'y' columns of the dataset will be dropped. The function
#' saves the resulting dataset in the specified directory using hive partitioning by date.
#'
#' @examples
#' augment_data(weather_anomalies = 'path/to/weather_data',
#' forecasts_anomalies = 'path/to/forecast_data',
#' ndvi_anomalies = 'path/to/ndvi_data',
#' augmented_data_directory = 'path/to/save/augmented_data')
#'
#' @export
augment_data_old <- function(weather_anomalies,
forecasts_anomalies,
ndvi_anomalies,
augmented_data_directory,
overwrite = FALSE,
...) {

# Figure out how to do all this OUT of memory.
message("Loading datasets into memory")
weather <- arrow::open_dataset(weather_anomalies) |> dplyr::collect()
forecasts <- arrow::open_dataset(forecasts_anomalies) |> dplyr::collect()
ndvi <- arrow::open_dataset(ndvi_anomalies) |> dplyr::collect()

message("NA checks")
## Weather and forecasts
### NAs are in scaled precip data, due to days with 0 precip
weather_check <- purrr::map_lgl(weather, ~any(is.na(.)))
assertthat::assert_that(all(str_detect(names(weather_check[weather_check]), "scaled")))

forecasts_check <- purrr::map_lgl(forecasts, ~any(is.na(.)))
assertthat::assert_that(all(str_detect(names(forecasts_check[forecasts_check]), "scaled")))

## NDVI
### Prior to 2018: NAs are due to region missing from Eastern Africa in modis data
### After 2018: NAs are due to smaller pockets of missing data on a per-cycle basis
### okay to remove when developing RSA model (issue #72)
ndvi_check <- purrr::map_lgl(ndvi, ~any(is.na(.)))
assertthat::assert_that(!any(ndvi_check[c("date", "x", "y")]))
ndvi <- drop_na(ndvi)

message("Join into a single object")
augmented_data <- left_join(weather, forecasts, by = join_by(date, x, y)) |>
left_join(ndvi, by = join_by(date, x, y))

message("Save as parquets using hive partitioning by date")
augmented_data |>
group_by(date) |>
write_dataset(augmented_data_directory)

return(augmented_data_directory)

}
42 changes: 0 additions & 42 deletions R/combine_anomolies.R

This file was deleted.

53 changes: 26 additions & 27 deletions R/get_remote_rasters.R
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
#' @title get_remote_rasters
#' Downloads and Preprocesses Global Elevation Raster Data
#'
#' @description Retrieves and processes remote raster data based on provided URLs.
#' This function retrieves global elevation raster data from specified URLs, processes it according to specified resampling and aggregation methods, and saves the resulting processed rasters to a local directory.
#'
#' @author Nathan C. Layman
#'
#' @param urls A list of URLs where the raster data can be retrieved.
#' @param output_dir The directory where the retrieved and processed raster data is saved.
#' @param output_filename The filename of the output raster data file.
#' @param continent_raster_template A template raster on which retrieved raster data is matched to.
#' @param aggregate_method The method to aggregate the raster data. Default is NULL.
#' @param resample_method The method to resample the raster data to match to the template raster. Default is NULL.
#' @param overwrite A flag to indicate if existing files should be overwritten. Default is FALSE.
#' @param ... Additional parameters not captured by the function parameters.
#' @param urls A vector of URLs from which the raster data will be downloaded.
#' @param output_dir The directory where the processed rasters will be saved.
#' @param output_filename The filename to be assigned to the output rasters.
#' @param continent_raster_template A raster template of the target continent.
#' @param aggregate_method The method to be used for raster aggregation (Optional).
#' @param resample_method The method to be used for raster resampling (Optional).
#' @param overwrite A boolean flag indicating whether existing processed files should be overwritten. Default is FALSE.
#' @param ... Additional arguments not used by this function but included for generic function compatibility.
#'
#' @return The full path to the saved raster data file.
#' @return A string containing the filepath to the saved processed rasters.
#'
#' @note This function fails if the output filename extension is not .tif or .parquet.
#' @note This function requires the terra, arrow, and here packages among others.
#'
#' @examples
#' # Define the URLs
#' urls <- list("http://example.com/raster1.tif", "http://example.com/raster2.tif")
#'
#' # Define the output directory
#' output_dir <- "./data"
#'
#' # Define the output filename
#' output_filename <- "combined_raster.tif"
#'
#' # Run the function
#' get_remote_rasters(urls, output_dir, output_filename)
#' @examples
#' test_urls <- c("http://example.com/test1.tif", "http://example.com/test2.tif")
#' get_remote_rasters(urls = test_urls,
#' output_dir = '/path/to/output_dir',
#' output_filename = 'test.tif',
#' continent_raster_template = continent_template,
#' aggregate_method = "mean",
#' resample_method = "bilinear",
#' overwrite = TRUE)
#'
#' @export
get_remote_rasters <- function(urls,
Expand Down Expand Up @@ -122,21 +119,23 @@ get_remote_rasters <- function(urls,

# Pre-process raster prior to normalization
# For example aggregate_method="which.max" identifies the layer with the highest value for each pixel
if(!is.null(aggregate_method)) combined_raster <- terra::app(combined_raster, fun = aggregate_method, na.rm = TRUE)
if(!is.null(aggregate_method)) combined_raster <- terra::app(combined_raster, fun = aggregate_method, na.rm = TRUE) |> setNames(tools::file_path_sans_ext(output_filename))

# Re-sample raster to match template
# Can change behavior with 'method' argument.
# 'Mode' is most common value within cell.
# The default is bilinear interpolation for continuous data
if(is.null(resample_method)) {
combined_raster <- terra::resample(combined_raster, continent_raster_template)
combined_raster <- combined_raster |> terra::project(continent_raster_template)
} else {
combined_raster <- terra::resample(combined_raster, continent_raster_template, method = resample_method)
combined_raster <- combined_raster |> terra::project(continent_raster_template, method = resample_method)
}

# Save as parquet if appropriate
if(grepl("(parquet|pq|arrow)", tools::file_ext(output_filename))) {

factor_raster <- classify(raster, data.frame(from = 1:5, to = factor(1:5)))

# Convert to dataframe
dat <- as.data.frame(combined_raster, xy = TRUE) |> as_tibble()

Expand Down
Loading

0 comments on commit b475f94

Please sign in to comment.