Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Furrr #64

Merged
merged 6 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,17 @@ Imports:
collapse,
data.table,
dodgr,
doSNOW,
dplyr,
digest,
foreach,
future,
furrr,
future.apply,
geodist,
httr,
iotools,
stringr,
sf,
parallel,
lubridate,
purrr (>= 1.0),
pbapply,
readr (>= 2.0),
RcppSimdJson,
tidyr,
Expand Down
28 changes: 18 additions & 10 deletions R/atoc_export.R
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ makeCalendar <- function(schedule, ncores = 1) {
CHECKROWS_NAME_VECTOR <- c(WEEKDAY_NAME_VECTOR, "duration", "start_date", "end_date")


res.calendar.days <- res.calendar[,..CHECKROWS_NAME_VECTOR]
res.calendar.days <- res.calendar[,CHECKROWS_NAME_VECTOR]
res.calendar.days <- data.table::transpose(res.calendar.days)
#transpose on the same size runs in around 3s, but causes named dataframe with mixed datatypes to be coerced to unnamed vector of integer.

Expand All @@ -444,7 +444,7 @@ makeCalendar <- function(schedule, ncores = 1) {
.f = checkrows,
.progress = TRUE)
future::plan(future::sequential)
keep <- unlist(keep)


# cl <- parallel::makeCluster(ncores)
# parallel::clusterEvalQ(cl, {
Expand All @@ -458,6 +458,7 @@ makeCalendar <- function(schedule, ncores = 1) {
} else {
keep <- purrr::map(res.calendar.days, checkrows, .progress = TRUE)
}
keep <- unlist(keep)

res.calendar <- res.calendar[keep, ]

Expand Down Expand Up @@ -564,15 +565,22 @@ duplicate.stop_times_alt <- function(calendar, stop_times, ncores = 1) {
}

if (ncores == 1) {
stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int)
#stop_times.dup <- pbapply::pblapply(stop_times_split, duplicate.stop_times.int)
stop_times.dup <- purrr::map(stop_times_split, duplicate.stop_times.int, .progress = TRUE)
} else {
cl <- parallel::makeCluster(ncores)
stop_times.dup <- pbapply::pblapply(stop_times_split,
duplicate.stop_times.int,
cl = cl
)
parallel::stopCluster(cl)
rm(cl)
# cl <- parallel::makeCluster(ncores)
# stop_times.dup <- pbapply::pblapply(stop_times_split,
# duplicate.stop_times.int,
# cl = cl
# )
# parallel::stopCluster(cl)
# rm(cl)

future::plan(future::multisession, workers = ncores)
res <- furrr::future_map(.x = stop_times_split,
.f = duplicate.stop_times.int,
.progress = TRUE)
future::plan(future::sequential)
}

stop_times.dup <- dplyr::bind_rows(stop_times.dup)
Expand Down
2 changes: 1 addition & 1 deletion R/atoc_import.R
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ strip_whitespace <- function(df) {
#'
#' @param file Path to .mca file
#' @param silent logical, should messages be displayed
#' @param ncores number of cores to use when paralell processing
#' @param ncores number of cores to use when parallel processing
#' @param full_import import all data, default FALSE
#' @param working_timetable use rail industry scheduling times instead of public times
#' @export
Expand Down
2 changes: 1 addition & 1 deletion R/atoc_nr.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#' Convert ATOC CIF files from Network Rail to GTFS
#'
#' @param path_in Character, path to Network Rail ATOC file e.g."C:/input/toc-full.CIF.gz"
#' @param silent Logical, should progress messages be surpressed (default TRUE)
#' @param silent Logical, should progress messages be suppressed (default TRUE)
#' @param ncores Numeric, When parallel processing how many cores to use
#' (default 1)
#' @param locations where to get tiploc locations (see details)
Expand Down
9 changes: 6 additions & 3 deletions R/atoc_shapes.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ ATOC_shapes <- function(gtfs) {
}
}

dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts)
#dp.list <- pbapply::pblapply(dp.list, path_to_sf, verts = verts)
dp.list <- purrr::map(dp.list, path_to_sf, verts = verts, .progress = TRUE)
dp.list <- unname(dp.list)
pairs$geometry <- sf::st_sfc(dp.list, crs = 4326)
rm(dp.list, verts)
Expand All @@ -110,7 +111,8 @@ ATOC_shapes <- function(gtfs) {
}

message(paste0(Sys.time()," Invert routes"))
pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring)
#pairs_opp$geometry <- pbapply::pblapply(pairs_opp$geometry, invert_linestring)
pairs_opp$geometry <- purrr::map(pairs_opp$geometry, invert_linestring, .progress = TRUE)
pairs_opp$geometry <- sf::st_as_sfc(pairs_opp$geometry, crs = 4326)
pairs_opp <- sf::st_as_sf(pairs_opp)
pairs_opp <- pairs_opp[, names(pairs)]
Expand All @@ -135,7 +137,8 @@ ATOC_shapes <- function(gtfs) {

message(paste0(Sys.time()," final formatting"))
rm(graph, pairs)
shape_res <- pbapply::pblapply(st_split, match_lines)
#shape_res <- pbapply::pblapply(st_split, match_lines)
shape_res <- purrr::map(st_split, match_lines, .progress = TRUE)

str5 <- lapply(shape_res, `[[`, 2)
shapes <- lapply(shape_res, `[[`, 1)
Expand Down
2 changes: 1 addition & 1 deletion R/extdata.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ check_data <- function(default_tag = "v0.1.2"){
#'
#' The ATOC data has inaccurate locations for many tiplocs, this is an improved dataset
#'
#' "naptan_missing" Bus Stop Locations missing from NapTAN
#' "naptan_missing" Bus Stop Locations missing from NaPTAN
#'
#' A database of bus stops that are missing from the NAPTAN but are known to
#' have been used. For some reason the official NAPTAN file is missing a small
Expand Down
2 changes: 1 addition & 1 deletion R/get_cal.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#' @return data frame
#' @details TransXchange records bank holidays by name (e.g. Christmas Day),
#' some UK bank holidays move around, so this function downloads the official
#' bank holiday calendar. The offical feed only covers a short period of time
#' bank holiday calendar. The official feed only covers a short period of time
#' so this may not be suitable for converting files from the past / future.
#' @export
#'
Expand Down
2 changes: 1 addition & 1 deletion R/get_naptan.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#' @param naptan_extra data frame of missing stops default uses `naptan_missing`
#' @return data frame of stop locations
#' @details TransXchange does not store the location of bus stops, so this
#' functions downloads them from the offical DfT source. NaPTAN has some
#' functions downloads them from the official DfT source. NaPTAN has some
#' missing bus stops which are added by UK2GTFS. See `naptan_missing`
#'
#'
Expand Down
9 changes: 8 additions & 1 deletion R/globals.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ utils::globalVariables(c(
'agency_lang','agency_id', 'Freq',
'UID','hash','vehicle_type','running_board','service_number',
'operator_code','route_id',
'speed_after','distance','school_terms','distance_after','historic_bank_holidays'
'speed_after','distance','school_terms','distance_after','historic_bank_holidays',
'runs_monday','runs_tuesday','runs_wednesday','runs_thursday','runs_friday',
'runs_saturday','runs_sunday', 'total_sunday',
'runs_Mon','runs_Tue','runs_Wed','runs_Thu','runs_Fri',
'runs_Sat','runs_Sun',
'tot_Mon','tot_Tue','tot_Wed','tot_Thu','tot_Fri',
'tot_Sat','tot_Sun',
'service_id','zone_id','time_bands','stop_name'
))

2 changes: 1 addition & 1 deletion R/gtfs_cleaning.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ gtfs_fast_trips <- function(gtfs, maxspeed = 83, routes = TRUE) {
}

#' Find fast stops
#' @description A varient of gtfs_fast_trips that can detect stops that may be in the wrong location
#' @description A variant of gtfs_fast_trips that can detect stops that may be in the wrong location
#' @param gtfs list of gtfs tables
#' @param maxspeed the maximum allowed speed in metres per second default 83 m/s
#' (about 185 mph the max speed of trains on HS1 line)
Expand Down
26 changes: 17 additions & 9 deletions R/gtfs_interpolate_times.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,24 @@ gtfs_interpolate_times <- function(gtfs, ncores = 1){
stop_times <- dplyr::group_split(stop_times)

if(ncores == 1){
stop_times <- pbapply::pblapply(stop_times, stops_interpolate)
#stop_times <- pbapply::pblapply(stop_times, stops_interpolate)
stop_times <- purrr::map(stop_times, stops_interpolate, .progress = TRUE)
} else {
cl <- parallel::makeCluster(ncores)
parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")})
stop_times <- pbapply::pblapply(stop_times,
stops_interpolate,
cl = cl
)
parallel::stopCluster(cl)
rm(cl)
# cl <- parallel::makeCluster(ncores)
# parallel::clusterEvalQ(cl, {loadNamespace("UK2GTFS")})
# stop_times <- pbapply::pblapply(stop_times,
# stops_interpolate,
# cl = cl
# )
# parallel::stopCluster(cl)
# rm(cl)

future::plan(future::multisession, workers = ncores)
keep <- furrr::future_map(.x = stop_times,
.f = stops_interpolate,
.progress = TRUE)
future::plan(future::sequential)

}

stop_times <- data.table::rbindlist(stop_times)
Expand Down
Loading
Loading