Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

39 ingest bike data #90

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions app/scripts/extract_chi_bike_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
from typing import List
import pytz
import pandas as pd
from dotenv import load_dotenv
from route_rangers_api.models import (
BikeRidership,
BikeStation,
)
from django.contrib.gis.geos import Point
from django.db import IntegrityError
from app.scripts.utils import make_request, process_daily_ridership_data

#########################
# Load and define variables
#########################
load_dotenv()

DATA_PORTAL_APP_TOKEN = os.getenv("DATA_PORTAL_APP_TOKEN")
BIKE_DATA_DIR = os.getenv("BIKE_DATA_DIR")

REQUEST_DELAY = 0.2
RESULTS_PER_PAGE = 50000 # Max number of results for API
TIMEOUT = 30

CHI_TZ = pytz.timezone("America/Chicago")


def extract_bike_stations_api(app_token=DATA_PORTAL_APP_TOKEN) -> List:
"""
Extract bike station data from Chicago API
"""
params = {"$$APP_TOKEN": app_token}
resp = make_request(
url="https://data.cityofchicago.org/resource/bbyy-e7gq.json", params=params
)
results = resp.json()
return results


def ingest_bike_stations() -> None:
"""
Ingest bike station data into BikeStation table
"""
bike_stations = extract_bike_stations_api()
for station in bike_stations:
try:
print(f"ingesting CHI {station['id']}")
obs = BikeStation(
city="CHI",
station_id=station["id"],
station_name=station["station_name"],
short_name=station["short_name"],
n_docks=station["total_docks"],
location=Point(station["location"]["coordinates"]),
)
obs.save()
except KeyError:
try:
print(f"ingesting CHI {station['id']}, no short_name")
obs = BikeStation(
city="CHI",
station_id=station["id"],
station_name=station["station_name"],
n_docks=station["total_docks"],
location=Point(station["location"]["coordinates"]),
)
obs.save()
except IntegrityError:
print(f"Observation{station['station_name']} already ingested")
except Exception as e:
print(f"Observation {station['station_name']} not ingested: {e}")
except IntegrityError:
print(f"Observation{station['station_name']} already ingested")
except Exception as e:
print(f"Observation {station['station_name']} not ingested: {e}")


def create_daily_ridership_month(filepath: str) -> pd.DataFrame:
"""
Create a dataframe with daily information by station with the
number of trips started and ended for one monthly file
"""
monthly_df = pd.read_csv(filepath)
ridership_df = process_daily_ridership_data(monthly_df)

return ridership_df


def ingest_monthly_data(monthly_ridership_df: pd.DataFrame) -> None:
"""
Ingest ridership at the daily level into the BikeRidership table
"""
for row in monthly_ridership_df.itertuples():
try:
obs_station = (
BikeStation.objects.filter(city="CHI", short_name=row.station_id)
.first()
.id
)
print(f"Observation Station: {obs_station}")
obs = BikeRidership(
station_id=obs_station,
date=row.date,
n_started=row.n_rides_started,
n_ended=row.n_rides_ended,
)
obs.save()
except IntegrityError:
print(f"Observation Station: {obs_station} - {row.date} already ingested")
except Exception as e:
print(f"Observation Station {obs_station} - {row.date} not ingested: {e}")


def ingest_trip_data():
"""
Ingest the divvy data into the BikeRidership table
"""
for file in os.listdir(f"{BIKE_DATA_DIR}/2023-divvy-tripdata/"):
if file != ".DS_Store":
monthly_df = create_daily_ridership_month(
f"{BIKE_DATA_DIR}/2023-divvy-tripdata/{file}"
)
ingest_monthly_data(monthly_df)


def run(data: str = "both"):
ingest_bike_stations()
ingest_trip_data()

if data == "stations":
ingest_bike_stations()
elif data == "ridership":
ingest_trip_data()
elif data == "both":
ingest_bike_stations()
ingest_trip_data()
else:
print("Select one of the following options 'stations', 'ridership' or 'both'")
145 changes: 145 additions & 0 deletions app/scripts/extract_nyc_bike_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import os
from typing import List
import pandas as pd
from dotenv import load_dotenv
from app.scripts.utils import (
process_daily_ridership_data,
extract_stations,
)
from django.db import IntegrityError
from route_rangers_api.models import (
BikeRidership,
BikeStation,
)
from django.contrib.gis.geos import Point


#########################
# Load and define variables
#########################
load_dotenv()

DATA_PORTAL_APP_TOKEN = os.getenv("DATA_PORTAL_APP_TOKEN")
BIKE_DATA_DIR = os.getenv("BIKE_DATA_DIR")

REQUEST_DELAY = 0.2
RESULTS_PER_PAGE = 50000 # Max number of results for API
TIMEOUT = 30

BIKE_STATIONS_ENDPOINT = (
"https://gbfs.lyft.com/gbfs/2.3/bkn/en/station_information.json"
)


def extract_bike_stations_files() -> List:
"""
Extract bike station data from NY
"""

daily_dfs = []

for file in os.listdir(f"{BIKE_DATA_DIR}/2023-citibike-tripdata/7_July"):
file_df = pd.read_csv(f"{BIKE_DATA_DIR}/2023-citibike-tripdata/7_July/{file}")
daily_dfs.append(file_df)

daily_df = pd.concat(daily_dfs)
daily_df = daily_df.sort_values(by="started_at", ascending=True)

stations = daily_df.groupby("start_station_id").first().reset_index()

return stations


def ingest_bike_stations_data() -> None:
stations = extract_stations(BIKE_STATIONS_ENDPOINT)
for station in stations:
try:
obs = BikeStation(
city="NYC",
station_id=station["station_id"],
station_name=station["name"],
short_name=station["short_name"],
n_docks=station["capacity"],
location=Point(station["lon"], station["lat"]),
)
obs.save()
except IntegrityError:
print(f"station {station['name']} has already been ingested")
except Exception as e:
print(f"station {station['name']} not ingested: {e}")


def create_daily_ridership_month(filepath: str) -> pd.DataFrame:
"""
Create a dataframe with daily information by station with the
number of trips started and ended for one monthly file
"""
monthly_dfs = []
print(f"Directory from where to read csv {filepath}")
# for file in os.listdir(f"{BIKE_DATA_DIR}/2023-citibike-tripdata/{filepath}"):
for file in os.listdir(filepath):
print(f"File to be read to concatenate df: {file}")
file_df = pd.read_csv(f"{filepath}/{file}")
monthly_dfs.append(file_df)

monthly_df = pd.concat(monthly_dfs)

ridership_df = process_daily_ridership_data(monthly_df)

return ridership_df


def ingest_monthly_data(monthly_ridership_df: pd.DataFrame) -> None:
"""
Ingest ridership at the daily level into the BikeRidership table
"""
for row in monthly_ridership_df.itertuples():
try:
obs_station = (
BikeStation.objects.filter(city="NYC", short_name=row.station_id)
.first()
.id
)
print(f"Observation Station: {obs_station}")
obs = BikeRidership(
station_id=obs_station,
date=row.date,
n_started=row.n_rides_started,
n_ended=row.n_rides_ended,
)
obs.save()
print(
f"Observation Station: {obs_station} - {row.date} succesfully ingested"
)
except IntegrityError:
print(f"Observation Station: {obs_station} - {row.date} already ingested")
except Exception as e:
print(f"Observation Station {obs_station} - {row.date} not ingested: {e}")


def ingest_citibike_ridership_data():
"""
Ingest the citibike ridership data into the BikeRidership table
"""
for file in os.listdir(f"{BIKE_DATA_DIR}/2023-citibike-tripdata/"):
print(f"File to be created as df: {file}")
if file != ".DS_Store":
print(
f"File to be passed into create_daily_ridership_month: {BIKE_DATA_DIR}/2023-citibike-tripdata/{file}"
)
monthly_df = create_daily_ridership_month(
f"{BIKE_DATA_DIR}/2023-citibike-tripdata/{file}"
)
ingest_monthly_data(monthly_df)


def run(data: str = "both"):
if data == "stations":
ingest_bike_stations_data()
elif data == "ridership":
ingest_citibike_ridership_data()
elif data == "both":
ingest_bike_stations_data()
ingest_citibike_ridership_data()
else:
print("Select one of the following options 'stations', 'ridership' or 'both'")
42 changes: 42 additions & 0 deletions app/scripts/extract_pdx_bike_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
from typing import List, Dict, Tuple
from requests.models import Response
import datetime
import pytz
import pandas as pd
from dotenv import load_dotenv
from app.scripts.utils import extract_stations

from route_rangers_api.models import (
BikeRidership,
BikeStation,
)
from django.contrib.gis.geos import Point

BIKE_STATIONS_URL = "https://gbfs.lyft.com/gbfs/2.3/pdx/en/station_information.json"


def ingest_stations():
stations = extract_stations(BIKE_STATIONS_URL)
for station in stations:
try:
print(f"Ingesting PDX {station['station_id']}")
obs = BikeStation(
city="PDX",
station_id=station["station_id"],
station_name=station["name"],
n_docks=station["capacity"],
location=Point(station["lon"], station["lat"]),
)
obs.save()
except:
print(f"station {station['name']} has already been ingested")


def run():
ingest_stations()


# if __name__ == "__main__":
# results = extract_bike_stations()
# print(results[0])
50 changes: 48 additions & 2 deletions app/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from typing import Dict, Tuple
from requests.models import Response
from typing import Dict, Tuple, List
from collections.abc import Callable
import pandas as pd
import requests
from requests.models import Response
import time
import requests
import datetime


REQUEST_DELAY = 0.5
TIMEOUT = 40

Expand All @@ -23,6 +26,7 @@ def make_request(
if session:
resp = session.get(url, params=params)
else:

resp = requests.get(url, params, timeout=timeout)
return resp

Expand All @@ -39,3 +43,45 @@ def build_start_end_date_str(date: datetime.datetime, timezone) -> Tuple[str, st
end_date = end_date.strftime("%Y-%m-%d")

return start_date, end_date


#######################
## Bike Ridership Utils
#######################


def extract_stations(url: str) -> List:
"""
Extract bike station data from GTFS
"""
resp = make_request(url=url, params={})
results = resp.json()
stations = results["data"]["stations"]

return stations


def process_daily_ridership_data(monthly_df) -> pd.DataFrame:
monthly_df["date"] = pd.to_datetime(monthly_df["started_at"]).dt.date
monthly_df["start_station_id"] = monthly_df["start_station_id"].str.strip()
monthly_df["end_station_id"] = monthly_df["end_station_id"].str.strip()

started_at = (
monthly_df.groupby(["start_station_id", "date"])
.size()
.reset_index(name="n_rides_started")
)
started_at = started_at.rename(columns={"start_station_id": "station_id"})

ended_at = (
monthly_df.groupby(["end_station_id", "date"])
.size()
.reset_index(name="n_rides_ended")
)
ended_at = ended_at.rename(columns={"end_station_id": "station_id"})

ridership_df = started_at.merge(ended_at, how="left", on=["station_id", "date"])
ridership_df["n_rides_ended"] = ridership_df["n_rides_ended"].fillna(0).astype(int)
ridership_df["date"] = pd.to_datetime(ridership_df["date"])

return ridership_df
Loading