Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced cleaning and validating for GTFS #256

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
72 changes: 71 additions & 1 deletion src/transport_performance/gtfs/cleaners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@
from typing import Union

import numpy as np
from gtfs_kit.cleaners import clean_ids as clean_ids_gk
from gtfs_kit.cleaners import (
clean_route_short_names as clean_route_short_names_gk,
)
from gtfs_kit.cleaners import clean_times as clean_times_gk
from gtfs_kit.cleaners import drop_zombies as drop_zombies_gk

from transport_performance.utils.defence import _check_iterable, _gtfs_defence
from transport_performance.utils.defence import (
_check_iterable,
_gtfs_defence,
_type_defence,
)


def drop_trips(gtfs, trip_id: Union[str, list, np.ndarray]) -> None:
Expand Down Expand Up @@ -175,3 +185,63 @@ def clean_multiple_stop_fast_travel_warnings(
~gtfs.multiple_stops_invalid["trip_id"].isin(trip_ids)
]
return None


def core_cleaners(
gtfs,
clean_ids: bool = True,
clean_times: bool = True,
clean_route_short_names: bool = True,
drop_zombies: bool = True,
) -> None:
"""Clean the gtfs with the core cleaners of gtfs-kit.

The source code for the cleaners, along with detailed descriptions of the
cleaning they are performing can be found here:
https://github.com/mrcagney/gtfs_kit/blob/master/gtfs_kit/cleaners.py

All credit for these cleaners goes to the creators of the gtfs_kit package.
HOMEPAGE: https://github.com/mrcagney/gtfs_kit

Parameters
----------
gtfs : GtfsInstance
The gtfs to clean
clean_ids : bool, optional
Whether or not to use clean_ids, by default True
clean_times : bool, optional
Whether or not to use clean_times, by default True
clean_route_short_names : bool, optional
Whether or not to use clean_route_short_names, by default True
drop_zombies : bool, optional
Whether or not to use drop_zombies, by default True

Returns
-------
None

"""
# defences
_gtfs_defence(gtfs, "gtfs")
_type_defence(clean_ids, "clean_ids", bool)
_type_defence(clean_times, "clean_times", bool)
_type_defence(clean_route_short_names, "clean_route_short_names", bool)
_type_defence(drop_zombies, "drop_zombies", bool)
# cleaning
if clean_ids:
clean_ids_gk(gtfs.feed)
if clean_times:
clean_times_gk(gtfs.feed)
if clean_route_short_names:
clean_route_short_names_gk(gtfs.feed)
if drop_zombies:
try:
drop_zombies_gk(gtfs.feed)
except KeyError:
warnings.warn(
UserWarning(
"The drop_zombies cleaner was unable to operate on "
"clean_feed as the trips table has no shape_id column"
)
)
return None
33 changes: 33 additions & 0 deletions src/transport_performance/gtfs/gtfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,36 @@ def convert_pandas_to_plotly(
if return_html:
return fig.to_html(full_html=False)
return fig


def _function_pipeline(
gtfs, func_map: dict, operations: Union[dict, type[None]]
) -> None:
"""Iterate through and act on a functional pipeline."""
_gtfs_defence(gtfs, "gtfs")
_type_defence(func_map, "func_map", dict)
_type_defence(operations, "operations", (dict, type(None)))
if operations:
for key in operations.keys():
if key not in func_map.keys():
raise KeyError(
f"'{key}' function passed to 'operations' is not a "
"known operation. Known operation include: "
f"{func_map.keys()}"
)
for operation in operations:
# check value is dict or none (for kwargs)
_type_defence(
operations[operation],
f"operations[{operation}]",
(dict, type(None)),
)
operations[operation] = (
{} if operations[operation] is None else operations[operation]
)
func_map[operation](gtfs=gtfs, **operations[operation])
# if no operations passed, carry out all operations
else:
for operation in func_map:
func_map[operation](gtfs=gtfs)
return None
32 changes: 14 additions & 18 deletions src/transport_performance/gtfs/multi_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ def save_feeds(
inst.save(path, overwrite=overwrite)
return None

def clean_feeds(self, clean_kwargs: Union[dict, None] = None) -> None:
def clean_feeds(self, cleansers: Union[dict, None] = None) -> None:
"""Clean each of the feeds in the MultiGtfsInstance.

Parameters
----------
clean_kwargs : Union[dict, None], optional
cleansers : Union[dict, None], optional
The kwargs to pass to GtfsInstance.clean_feed() for each Gtfs in
the MultiGtfsInstance, by default None

Expand All @@ -175,28 +175,26 @@ def clean_feeds(self, clean_kwargs: Union[dict, None] = None) -> None:

"""
# defences
_type_defence(clean_kwargs, "clean_kwargs", (dict, type(None)))
if isinstance(clean_kwargs, type(None)):
clean_kwargs = {}
_type_defence(cleansers, "cleansers", (dict, type(None)))
if isinstance(cleansers, type(None)):
cleansers = {}
# clean GTFS instances
progress = tqdm(
zip(self.paths, self.instances), total=len(self.instances)
)
for path, inst in progress:
progress.set_description(f"Cleaning GTFS from path {path}")
inst.clean_feed(**clean_kwargs)
inst.clean_feed(cleansers=cleansers)
return None

def is_valid(
self, validation_kwargs: Union[dict, None] = None
) -> pd.DataFrame:
def is_valid(self, validators: Union[dict, None] = None) -> pd.DataFrame:
"""Validate each of the feeds in the MultiGtfsInstance.

Parameters
----------
validation_kwargs : Union[dict, None], optional
The kwargs to pass to GtfsInstance.is_valid() for each Gtfs in
the MultiGtfsInstance, by default None
validators : Union[dict, None], optional
The kwargs to pass to GtfsInstance.is_valid(validators) for each
Gtfs in the MultiGtfsInstance, by default None

Returns
-------
Expand All @@ -206,18 +204,16 @@ def is_valid(

"""
# defences
_type_defence(
validation_kwargs, "validation_kwargs", (dict, type(None))
)
if isinstance(validation_kwargs, type(None)):
validation_kwargs = {}
_type_defence(validators, "validators", (dict, type(None)))
if isinstance(validators, type(None)):
validators = {}
# clean GTFS instances
progress = tqdm(
zip(self.paths, self.instances), total=len(self.instances)
)
for path, inst in progress:
progress.set_description(f"Validating GTFS from path {path}")
inst.is_valid(**validation_kwargs)
inst.is_valid(validators=validators)

# concat all validation tables into one
tables = []
Expand Down
101 changes: 59 additions & 42 deletions src/transport_performance/gtfs/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
from plotly.graph_objects import Figure as PlotlyFigure
from pretty_html_table import build_table

import transport_performance.gtfs.cleaners as cleaners
import transport_performance.gtfs.validators as gtfs_validators
from transport_performance.gtfs.calendar import create_calendar_from_dates
from transport_performance.gtfs.cleaners import (
clean_consecutive_stop_fast_travel_warnings,
clean_multiple_stop_fast_travel_warnings,
from transport_performance.gtfs.gtfs_utils import (
_function_pipeline,
filter_gtfs,
)
from transport_performance.gtfs.gtfs_utils import filter_gtfs
from transport_performance.gtfs.report.report_utils import (
TemplateHTML,
_set_up_report_dir,
Expand All @@ -32,10 +33,6 @@
get_saved_route_type_lookup,
scrape_route_type_lookup,
)
from transport_performance.gtfs.validators import (
validate_travel_between_consecutive_stops,
validate_travel_over_multiple_stops,
)
from transport_performance.utils.constants import PKG_PATH
from transport_performance.utils.defence import (
_check_attribute,
Expand All @@ -48,6 +45,29 @@
_type_defence,
)

# THESE MAPPINGS CAN NOT BE MOVED TO CONSTANTS AS THEY INTRODUCE DEPENDENCY
# ISSUES.
# TODO: Update these once further cleaners/validators are merged
CLEAN_FEED_FUNCTION_MAP = {
"core_cleaners": cleaners.core_cleaners,
"clean_consecutive_stop_fast_travel_warnings": (
cleaners.clean_consecutive_stop_fast_travel_warnings
),
"clean_multiple_stop_fast_travel_warnings": (
cleaners.clean_multiple_stop_fast_travel_warnings
),
}

VALIDATE_FEED_FUNC_MAP = {
"core_validation": gtfs_validators.core_validation,
"validate_travel_between_consecutive_stops": (
gtfs_validators.validate_travel_between_consecutive_stops
),
"validate_travel_over_multiple_stops": (
gtfs_validators.validate_travel_over_multiple_stops
),
}


def _get_intermediate_dates(
start: pd.Timestamp, end: pd.Timestamp
Expand Down Expand Up @@ -316,25 +336,28 @@ def get_gtfs_files(self) -> list:
self.file_list = file_list
return self.file_list

def is_valid(self, far_stops: bool = True) -> pd.DataFrame:
def is_valid(self, validators: dict = None) -> pd.DataFrame:
"""Check a feed is valid with `gtfs_kit`.

Parameters
----------
far_stops : bool, optional
Whether or not to perform validation for far stops (both
between consecutive stops and over multiple stops)
validators : dict, optional
A dictionary of function name to kwargs mappings.

Returns
-------
pd.core.frame.DataFrame
Table of errors, warnings & their descriptions.

"""
self.validity_df = self.feed.validate()
if far_stops:
validate_travel_between_consecutive_stops(self)
validate_travel_over_multiple_stops(self)
_type_defence(validators, "validators", (dict, type(None)))
# create validity df
self.validity_df = pd.DataFrame(
columns=["type", "message", "table", "rows"]
)
_function_pipeline(
gtfs=self, func_map=VALIDATE_FEED_FUNC_MAP, operations=validators
)
return self.validity_df

def print_alerts(self, alert_type: str = "error") -> None:
Expand Down Expand Up @@ -385,34 +408,27 @@ def print_alerts(self, alert_type: str = "error") -> None:

return None

def clean_feed(
self, validate: bool = False, fast_travel: bool = True
) -> None:
"""Attempt to clean feed using `gtfs_kit`.
def clean_feed(self, cleansers: dict = None) -> None:
"""Clean the gtfs feed.

Parameters
----------
validate: bool, optional
Whether or not to validate the dataframe before cleaning
fast_travel: bool, optional
Whether or not to clean warnings related to fast travel.
cleansers : dict, optional
A mapping of cleansing functions and kwargs, by default None

Returns
-------
None

"""
_type_defence(fast_travel, "fast_travel", bool)
_type_defence(validate, "valiidate", bool)
if validate:
self.is_valid(far_stops=fast_travel)
try:
# In cases where shape_id is missing, keyerror is raised.
# https://developers.google.com/transit/gtfs/reference#shapestxt
# shows that shapes.txt is optional file.
self.feed = self.feed.clean()
if fast_travel:
clean_consecutive_stop_fast_travel_warnings(self)
clean_multiple_stop_fast_travel_warnings(self)
except KeyError:
# TODO: Issue 74 - Improve this to clean feed when KeyError raised
print("KeyError. Feed was not cleaned.")
# DEV NOTE: Opting not to allow for validation in clean_feed().
# .is_valid() should be used before hand.
# DEV NOTE 2: Use of param name 'cleansers' is to avoid conflicts
_type_defence(cleansers, "cleansers", (dict, type(None)))
_function_pipeline(
gtfs=self, func_map=CLEAN_FEED_FUNCTION_MAP, operations=cleansers
)
return None

def _produce_stops_map(
self, what_geoms: str, is_filtered: bool, crs: Union[int, str]
Expand Down Expand Up @@ -1431,10 +1447,11 @@ def html_report(
date = datetime.datetime.strftime(datetime.datetime.now(), "%d-%m-%Y")

# feed evaluation
self.is_valid()
if clean_feed:
self.clean_feed(validate=True, fast_travel=True)
# re-validate to clean any newly raised errors/warnings
validation_dataframe = self.is_valid(far_stops=True)
self.clean_feed()
# re-validate to clean any newly raised errors/warnings
validation_dataframe = self.is_valid()

# create extended reports if requested
if extended_validation:
Expand Down
9 changes: 9 additions & 0 deletions src/transport_performance/gtfs/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,12 @@ def validate_travel_over_multiple_stops(gtfs: "GtfsInstance") -> None:
)

return far_stops_df


def core_validation(gtfs: "GtfsInstance"):
"""Carry out the main validators of gtfs-kit."""
_gtfs_defence(gtfs, "gtfs")
validation_df = gtfs.feed.validate()
gtfs.validity_df = pd.concat(
[validation_df, gtfs.validity_df], axis=0
).reset_index(drop=True)
Loading
Loading