Skip to content

Commit

Permalink
chore(pipeline): clean up old geocoding task
Browse files Browse the repository at this point in the history
  • Loading branch information
vmttn committed Sep 18, 2024
1 parent 3180202 commit 34bcac0
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 351 deletions.
3 changes: 0 additions & 3 deletions api/src/data_inclusion/api/inclusion_data/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ def load_inclusion_data():
structures_df = structures_df.replace({np.nan: None})
services_df = services_df.replace({np.nan: None})

structures_df = structures_df.drop(columns=["_di_geocodage_score"])
services_df = services_df.drop(columns=["_di_geocodage_score"])

structure_errors_df = validate_df(structures_df, model_schema=schema.Structure)
service_errors_df = validate_df(services_df, model_schema=schema.Service)

Expand Down
6 changes: 2 additions & 4 deletions pipeline/dags/compute_hourly.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from dag_utils import date, marts, notifications
from dag_utils.dbt import (
get_after_geocoding_tasks,
get_before_geocoding_tasks,
get_intermediate_tasks,
get_staging_tasks,
)

Expand All @@ -24,8 +23,7 @@
(
start
>> get_staging_tasks(schedule="@hourly")
>> get_before_geocoding_tasks()
>> get_after_geocoding_tasks()
>> get_intermediate_tasks()
>> marts.export_di_dataset_to_s3()
>> end
)
18 changes: 4 additions & 14 deletions pipeline/dags/dag_utils/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def get_staging_tasks(schedule=None):
return task_list


def get_before_geocoding_tasks():
def get_intermediate_tasks():
return dbt_operator_factory(
task_id="dbt_build_before_geocoding",
task_id="dbt_build_intermediate",
command="build",
select=" ".join(
[
Expand All @@ -111,22 +111,11 @@ def get_before_geocoding_tasks():
# into a single DAG. Another way to see it is that it depended on
# main since the beginning as it required intermediate data to be
# present ?
"path:models/intermediate/int__geocodages.sql",
"path:models/intermediate/int__union_contacts.sql",
"path:models/intermediate/int__union_adresses.sql",
"path:models/intermediate/int__union_services.sql",
"path:models/intermediate/int__union_structures.sql",
]
),
trigger_rule=TriggerRule.ALL_DONE,
)


def get_after_geocoding_tasks():
return dbt_operator_factory(
task_id="dbt_build_after_geocoding",
command="build",
select=" ".join(
[
"path:models/intermediate/extra",
"path:models/intermediate/int__plausible_personal_emails.sql",
"path:models/intermediate/int__union_adresses__enhanced.sql+",
Expand All @@ -140,4 +129,5 @@ def get_after_geocoding_tasks():
"path:models/intermediate/quality/int_quality__stats.sql+",
]
),
trigger_rule=TriggerRule.ALL_DONE,
)
125 changes: 0 additions & 125 deletions pipeline/dags/dag_utils/geocoding.py

This file was deleted.

75 changes: 3 additions & 72 deletions pipeline/dags/main.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,15 @@
import pendulum

import airflow
from airflow.operators import empty, python
from airflow.operators import empty

from dag_utils import date, marts
from dag_utils.dbt import (
dbt_operator_factory,
get_after_geocoding_tasks,
get_before_geocoding_tasks,
get_intermediate_tasks,
get_staging_tasks,
)
from dag_utils.notifications import notify_failure_args
from dag_utils.virtualenvs import PYTHON_BIN_PATH


def _geocode():
import logging

import sqlalchemy as sqla

from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from dag_utils import geocoding
from dag_utils.sources import utils

logger = logging.getLogger(__name__)

pg_hook = PostgresHook(postgres_conn_id="pg")

# 1. Retrieve input data
input_df = pg_hook.get_pandas_df(
sql="""
SELECT
_di_surrogate_id,
adresse,
code_postal,
code_insee,
commune
FROM public_intermediate.int__union_adresses;
"""
)

utils.log_df_info(input_df, logger=logger)

geocoding_backend = geocoding.BaseAdresseNationaleBackend(
base_url=Variable.get("BAN_API_URL")
)

# 2. Geocode
output_df = geocoding_backend.geocode(input_df)

utils.log_df_info(output_df, logger=logger)

# 3. Write result back
engine = pg_hook.get_sqlalchemy_engine()

with engine.connect() as conn:
with conn.begin():
output_df.to_sql(
"extra__geocoded_results",
schema="public",
con=conn,
if_exists="replace",
index=False,
dtype={
"latitude": sqla.Float,
"longitude": sqla.Float,
"result_score": sqla.Float,
},
)


with airflow.DAG(
dag_id="main",
Expand All @@ -93,20 +32,12 @@ def _geocode():
command="run-operation create_udfs",
)

python_geocode = python.ExternalPythonOperator(
task_id="python_geocode",
python=str(PYTHON_BIN_PATH),
python_callable=_geocode,
)

(
start
>> dbt_seed
>> dbt_create_udfs
>> get_staging_tasks()
>> get_before_geocoding_tasks()
>> python_geocode
>> get_after_geocoding_tasks()
>> get_intermediate_tasks()
>> marts.export_di_dataset_to_s3()
>> end
)
2 changes: 1 addition & 1 deletion pipeline/dbt/macros/create_udfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ CREATE SCHEMA IF NOT EXISTS processings;

{% do run_query(sql) %}

{% endmacro %}s
{% endmacro %}
3 changes: 0 additions & 3 deletions pipeline/dbt/models/intermediate/extra/_extra__models.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
version: 2

models:
# TODO: cleanup these models, add staging models

- name: int_extra__insee_prenoms_filtered
- name: int_extra__geocoded_results

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ geocodages AS (
SELECT * FROM {{ ref('int__geocodages') }}
),

final AS (
overriden_adresses AS (
SELECT
adresses._di_surrogate_id AS "_di_surrogate_id",
adresses.id AS "id",
Expand All @@ -28,6 +28,25 @@ final AS (
ON
adresses._di_surrogate_id = geocodages.adresse_id
AND geocodages.score >= 0.8
),

final AS (
SELECT overriden_adresses.*
FROM overriden_adresses
LEFT JOIN
LATERAL
LIST_ADRESSE_ERRORS(
overriden_adresses.adresse,
overriden_adresses.code_insee,
overriden_adresses.code_postal,
overriden_adresses.commune,
overriden_adresses.complement_adresse,
overriden_adresses.id,
overriden_adresses.latitude,
overriden_adresses.longitude,
overriden_adresses.source
) AS errors ON TRUE
WHERE errors.field IS NULL
)

SELECT * FROM final
Loading

0 comments on commit 34bcac0

Please sign in to comment.