From df70e98cda140831db928742270e30621d3d6099 Mon Sep 17 00:00:00 2001 From: Victor Perron Date: Wed, 11 Sep 2024 12:32:03 +0200 Subject: [PATCH] feat(pipeline) : Ensure the zone_diffusion_codes for DROM/COM There were cases where we did have the zone_diffusion_code set to "97" which can't be right, as it would mean that a given service coyuld be available across the oceans. Let's fix it and set the correct, 3-digit department number. This will also enable their search as we now (since the "new" communes) search for a match agains commune.departement, which can be 3 digits. There is also now a complete data validation that leaves the errors as a specific table in the public_dbt_test__audit schema. --- .../int__union_services__enhanced.sql | 43 +++++---- .../_decoupage_administratif__models.yml | 13 +++ .../stg_decoupage_administratif__epcis.sql | 13 +++ .../tests/test_zone_diffusion_consistency.sql | 94 +++++++++++++++++++ 4 files changed, 147 insertions(+), 16 deletions(-) create mode 100644 pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__epcis.sql create mode 100644 pipeline/dbt/tests/test_zone_diffusion_consistency.sql diff --git a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql index 461c1a6c..57c5e552 100644 --- a/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql +++ b/pipeline/dbt/models/intermediate/int__union_services__enhanced.sql @@ -6,34 +6,44 @@ structures AS ( SELECT * FROM {{ ref('int__union_structures__enhanced') }} ), -adresses AS ( - SELECT * FROM {{ ref('int__union_adresses__enhanced') }} -), - departements AS ( SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }} ), --- TODO: Refactoring needed to be able to do geocoding per source and then use the result in the mapping +adresses_with_code_departement AS ( + SELECT + adresses.*, + CASE + WHEN LEFT(adresses.code_insee, 2) = '97' THEN LEFT(adresses.code_insee, 3) + ELSE LEFT(adresses.code_insee, 2) + END AS "code_departement" + FROM {{ ref("int__union_adresses__enhanced") }} AS adresses +), + services_with_zone_diffusion AS ( SELECT {{ dbt_utils.star(from=ref('int__union_services'), relation_alias='services', except=["zone_diffusion_code", "zone_diffusion_nom"]) }}, CASE - WHEN services.source = ANY(ARRAY['monenfant', 'soliguide']) THEN adresses.code_insee - WHEN services.source = ANY(ARRAY['reseau-alpha', 'action-logement']) THEN LEFT(adresses.code_insee, 2) + -- FIXME(vperron) : ODSPEP services have such a catastrophic adress columns quality + -- that trying to reuse them for the zone diffusion makes the situation worse. + WHEN services.source = 'odspep' THEN services.zone_diffusion_code + WHEN services.zone_diffusion_type = 'commune' THEN COALESCE(adresses.code_insee, services.zone_diffusion_code) + WHEN services.zone_diffusion_type = 'departement' THEN COALESCE(adresses.code_departement, services.zone_diffusion_code) ELSE services.zone_diffusion_code - END AS "zone_diffusion_code", + END AS zone_diffusion_code, CASE - WHEN services.source = ANY(ARRAY['monenfant', 'soliguide']) THEN adresses.commune - WHEN services.source = ANY(ARRAY['reseau-alpha', 'action-logement']) THEN (SELECT departements."nom" FROM departements WHERE departements."code" = LEFT(adresses.code_insee, 2)) - WHEN services.source = 'mediation-numerique' THEN (SELECT departements."nom" FROM departements WHERE departements."code" = services.zone_diffusion_code) + WHEN services.source = 'odspep' THEN services.zone_diffusion_nom + WHEN services.zone_diffusion_type = 'commune' THEN COALESCE(adresses.commune, services.zone_diffusion_nom) + WHEN services.zone_diffusion_type = 'departement' THEN COALESCE(departements.nom, services.zone_diffusion_nom) ELSE services.zone_diffusion_nom - END AS "zone_diffusion_nom" - FROM - services - LEFT JOIN adresses ON services._di_adresse_surrogate_id = adresses._di_surrogate_id + END AS zone_diffusion_nom + FROM services + LEFT JOIN adresses_with_code_departement AS adresses + ON services._di_adresse_surrogate_id = adresses._di_surrogate_id + LEFT JOIN departements ON adresses.code_departement = departements.code ), + services_with_valid_structure AS ( SELECT services_with_zone_diffusion.* FROM services_with_zone_diffusion @@ -95,7 +105,8 @@ final AS ( adresses.code_insee AS "code_insee" FROM valid_services - LEFT JOIN adresses ON valid_services._di_adresse_surrogate_id = adresses._di_surrogate_id + LEFT JOIN adresses_with_code_departement AS adresses + ON valid_services._di_adresse_surrogate_id = adresses._di_surrogate_id ) SELECT * FROM final diff --git a/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml b/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml index 664acfcf..6d50ffd5 100644 --- a/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml +++ b/pipeline/dbt/models/staging/decoupage_administratif/_decoupage_administratif__models.yml @@ -32,6 +32,19 @@ models: - dbt_utils.not_constant - dbt_utils.not_empty_string + - name: stg_decoupage_administratif__epcis + columns: + - name: code + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - name: nom + data_tests: + - not_null + - dbt_utils.not_constant + - dbt_utils.not_empty_string + - name: stg_decoupage_administratif__communes columns: - name: code diff --git a/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__epcis.sql b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__epcis.sql new file mode 100644 index 00000000..ff919b31 --- /dev/null +++ b/pipeline/dbt/models/staging/decoupage_administratif/stg_decoupage_administratif__epcis.sql @@ -0,0 +1,13 @@ +WITH source AS ( + {{ stg_source_header('decoupage_administratif', 'epcis') }} +), + +final AS ( + SELECT + code AS "code", + nom AS "nom" + FROM source + ORDER BY code +) + +SELECT * FROM final diff --git a/pipeline/dbt/tests/test_zone_diffusion_consistency.sql b/pipeline/dbt/tests/test_zone_diffusion_consistency.sql new file mode 100644 index 00000000..2070c61f --- /dev/null +++ b/pipeline/dbt/tests/test_zone_diffusion_consistency.sql @@ -0,0 +1,94 @@ +/* Executed automatically when the dependent models are rebuilt. + + This test checks that the zone_diffusion_code field in the services table + is consistent with the reference tables for communes, departements, epcis and regions. + If the code is not found in the reference tables, the row is returned in the result set. + + Examples: + + -[ RECORD 6 ]-------+----------------------------------- + _di_surrogate_id | soliguide-6181a6e18ac6b179ffb9ffe8 + zone_diffusion_type | commune + zone_diffusion_code | + -[ RECORD 7 ]-------+----------------------------------- + _di_surrogate_id | odspep-49760_commune_60038 + zone_diffusion_type | commune + zone_diffusion_code | 60038 + -[ RECORD 11895 ]---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + _di_surrogate_id | dora-ed2692ba-e95d-484b-99e2-f994f9636f7f + zone_diffusion_type | epci + zone_diffusion_code | 200035459 + -[ RECORD 11896 ]---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + _di_surrogate_id | dora-edfa0021-a58d-435e-9138-00a595558254 + zone_diffusion_type | region + zone_diffusion_code | 59 + +*/ + +{{ config(severity = 'warn', store_failures = true) }} + +with invalid_communes as ( + select + services._di_surrogate_id, + services.zone_diffusion_type, + services.zone_diffusion_code + from + {{ ref('int__union_services__enhanced') }} as services + left join {{ ref('stg_decoupage_administratif__communes') }} as communes + on services.zone_diffusion_code = communes.code + where services.zone_diffusion_type='commune' and communes.code is null + order by services._di_surrogate_id +), + +invalid_departements as ( + select + services._di_surrogate_id, + services.zone_diffusion_type, + services.zone_diffusion_code + from + {{ ref('int__union_services__enhanced') }} as services + left join {{ ref('stg_decoupage_administratif__departements') }} as departements + on services.zone_diffusion_code = departements.code + where services.zone_diffusion_type='departement' and departements.code is null + order by services._di_surrogate_id +), + +invalid_epcis as ( + select + services._di_surrogate_id, + services.zone_diffusion_type, + services.zone_diffusion_code + from + {{ ref('int__union_services__enhanced') }} as services + left join {{ ref('stg_decoupage_administratif__epcis') }} as epcis + on services.zone_diffusion_code = epcis.code + where services.zone_diffusion_type='epci' and epcis.code is null + order by services._di_surrogate_id +), + +invalid_regions as ( + select + services._di_surrogate_id, + services.zone_diffusion_type, + services.zone_diffusion_code + from + {{ ref('int__union_services__enhanced') }} as services + left join {{ ref('stg_decoupage_administratif__regions') }} as regions + on services.zone_diffusion_code = regions.code + where services.zone_diffusion_type='region' and regions.code is null + order by services._di_surrogate_id +), + +validation_errors as ( + select * from invalid_communes + union all + select * from invalid_departements + union all + select * from invalid_epcis + union all + select * from invalid_regions +) + +select * +from validation_errors +