Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: grist as a source #95

Closed
wants to merge 10 commits into from
3 changes: 3 additions & 0 deletions .template.env
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ EMPLOIS_API_TOKEN=
EMPLOIS_API_URL=https://emplois.inclusion.beta.gouv.fr/api/v1/structures/
ETAB_PUB_FILE_URL=https://www.data.gouv.fr/fr/datasets/r/73302880-e4df-4d4c-8676-1a61bb997f3d
FINESS_FILE_URL=https://www.data.gouv.fr/fr/datasets/r/3dc9b1d5-0157-440d-a7b5-c894fcfdfd45
GRIST_API_URL=https://grist.incubateur.net/api/
GRIST_API_TOKEN=
GRIST_WORKSPACE_ID=27
IMMERSION_FACILITEE_S3_KEY_PREFIX=sources/immersion-facilitee/2023-03-06/after-siretisation-auto/
INSEE_FIRSTNAME_FILE_URL=https://www.insee.fr/fr/statistiques/fichier/2540004/nat2021_csv.zip
INSEE_COG_DATASET_URL=https://www.insee.fr/fr/statistiques/fichier/6800675
Expand Down
3 changes: 2 additions & 1 deletion analyse/.template.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ FINESS_FILE_URL=https://www.data.gouv.fr/fr/datasets/r/3dc9b1d5-0157-440d-a7b5-c
CD72_FILE_URL=
CD93_FILE_URL=
CD35_FILE_URL=https://data.ille-et-vilaine.fr/dataset/8d5ec0f0-ebe1-442d-9d99-655b37d5ad07/resource/665776ae-fa25-46ab-9bfd-c4241866f03f/download/annuaire_sociale_fixe.csv
CD62_FILE_URL=
CD62_FILE_URL=
GRIST_API_TOKEN=
191 changes: 191 additions & 0 deletions analyse/notebooks/grist/template.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from collections import defaultdict\n",
"from datetime import datetime\n",
"import os\n",
"\n",
"import dotenv\n",
"import pandas as pd\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# !pip install -e ../../../pipeline\n",
"# !pip install -e ../../../../data-inclusion-schema\n",
"\n",
"from data_inclusion.scripts.tasks import grist\n",
"from data_inclusion.schema.models import Service\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dotenv.load_dotenv(dotenv.find_dotenv())\n",
"\n",
"GRIST_API_TOKEN = os.environ[\"GRIST_API_TOKEN\"]\n",
"GRIST_API_URL = \"https://grist.incubateur.anct.gouv.fr/api\"\n",
"WORKSPACE_ID = \"27\"\n",
"DOCUMENT_NAME = \"CHANGEME\"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grist_client = grist.GristClient(base_url=GRIST_API_URL, token=GRIST_API_TOKEN)\n",
"\n",
"document_id = grist_client.create_document(\n",
" workspace_id=WORKSPACE_ID, document_name=DOCUMENT_NAME\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grist_client.create_table(\n",
" document_id=document_id,\n",
" table_name=\"Typologie_de_structures\",\n",
" columns=[\n",
" {\"id\": \"value\", \"fields\": {\"label\": \"valeur\", \"type\": \"Text\"}},\n",
" {\"id\": \"label\", \"fields\": {\"label\": \"label\", \"type\": \"Text\"}},\n",
" ],\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"typologies_df = pd.read_csv(\n",
" \"../../../pipeline/dbt/seeds/schema/typologies_de_structures.csv\",\n",
" dtype=str,\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# attention: pas idempotent\n",
"\n",
"grist_client.add_records(\n",
" document_id=document_id,\n",
" table_id=\"Typologie_de_structures\",\n",
" records=[\n",
" {\"fields\": typologie_dict}\n",
" for typologie_dict in typologies_df[[\"value\", \"label\"]].to_dict(\n",
" orient=\"records\"\n",
" )\n",
" ],\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grist_client.create_table(\n",
" document_id=document_id,\n",
" table_name=\"Structures\",\n",
" columns=[\n",
" {\n",
" \"id\": value.name,\n",
" \"fields\": {\n",
" \"label\": value.name,\n",
" \"type\": defaultdict(\n",
" lambda: \"Text\",\n",
" **{\n",
" \"longitude\": \"Numeric\",\n",
" \"latitude\": \"Numeric\",\n",
" \"date_maj\": \"DateTime:Europe/Paris\",\n",
" \"typologie\": \"Ref:Typologie_de_structures\",\n",
" \"antenne\": \"Bool\",\n",
" },\n",
" )[value.type_.__name__],\n",
" },\n",
" }\n",
" for _, value in Service.__fields__.items()\n",
" ],\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grist_client.create_table(\n",
" document_id=document_id,\n",
" table_name=\"Services\",\n",
" columns=[\n",
" {\n",
" \"id\": value.name,\n",
" \"fields\": {\n",
" \"label\": value.name,\n",
" \"type\": defaultdict(\n",
" lambda: \"Text\",\n",
" **{\n",
" \"cumulable\": \"Bool\",\n",
" \"contact_public\": \"Bool\",\n",
" \"longitude\": \"Numeric\",\n",
" \"latitude\": \"Numeric\",\n",
" \"date_creation\": \"DateTime:Europe/Paris\",\n",
" \"date_suspension\": \"DateTime:Europe/Paris\",\n",
" \"date_maj\": \"DateTime:Europe/Paris\",\n",
" },\n",
" )[value.type_.__name__],\n",
" },\n",
" }\n",
" for _, value in Service.__fields__.items()\n",
" ],\n",
")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ x-airflow-common:
AIRFLOW_VAR_EMPLOIS_API_URL: ${EMPLOIS_API_URL}
AIRFLOW_VAR_ETAB_PUB_FILE_URL: ${ETAB_PUB_FILE_URL}
AIRFLOW_VAR_FINESS_FILE_URL: ${FINESS_FILE_URL}
AIRFLOW_VAR_GRIST_API_URL: ${GRIST_API_URL}
AIRFLOW_VAR_GRIST_API_TOKEN: ${GRIST_API_TOKEN}
AIRFLOW_VAR_GRIST_WORKSPACE_ID: ${GRIST_WORKSPACE_ID}
AIRFLOW_VAR_IMMERSION_FACILITEE_S3_KEY_PREFIX: ${IMMERSION_FACILITEE_S3_KEY_PREFIX}
AIRFLOW_VAR_INSEE_FIRSTNAME_FILE_URL: ${INSEE_FIRSTNAME_FILE_URL}
AIRFLOW_VAR_INSEE_COG_DATASET_URL: ${INSEE_COG_DATASET_URL}
Expand Down
18 changes: 18 additions & 0 deletions pipeline/dags/dags/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,24 @@
"ultra-numerique",
]
],
*[
{
"id": source_id,
"type": "grist",
"schedule_interval": "@daily",
"snapshot": False,
"streams": [
{
"id": "structures",
"filename": "structures.csv",
"url": Variable.get("GRIST_API_URL", None),
"workspace_id": Variable.get("GRIST_WORKSPACE_ID", None),
"token": Variable.get("GRIST_API_TOKEN", None),
},
],
}
for source_id in []
],
{
"id": "soliguide",
"schedule_interval": "@daily",
Expand Down
14 changes: 12 additions & 2 deletions pipeline/dags/import_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _extract(
from data_inclusion.scripts.tasks import (
dora,
emplois_de_linclusion,
grist,
mediation_numerique,
mes_aides,
soliguide,
Expand All @@ -84,6 +85,8 @@ def _extract(

if source_config["id"].startswith("mediation-numerique-"):
extract_fn = mediation_numerique.extract
elif source_config.get("type", None) == "grist":
extract_fn = grist.extract
else:
extract_fn = EXTRACT_FN_BY_SOURCE_ID[source_config["id"]]

Expand All @@ -100,7 +103,7 @@ def _extract(
]
)

with io.BytesIO(extract_fn(**stream_config)) as buf:
with io.BytesIO(extract_fn(**stream_config, source_dict=source_config)) as buf:
s3_hook.load_file_obj(
file_obj=buf,
key=stream_s3_key,
Expand Down Expand Up @@ -147,6 +150,8 @@ def _load(

if source_config["id"].startswith("mediation-numerique-"):
read_fn = utils.read_json
elif source_config.get("type", None) == "grist":
read_fn = lambda path: utils.read_csv(path, sep=",") # noqa: E731
else:
read_fn = READ_FN_BY_SOURCE_ID[source_config["id"]]

Expand Down Expand Up @@ -204,13 +209,18 @@ def _load(
# generate a dedicated DAG for every configured sources
for source_config in SOURCES_CONFIGS:
dag_id = f"import_{source_config['id']}".replace("-", "_")

tags = ["source"]
if source_config.get("type", None) == "grist":
tags += ["source_type:grist"]

dag = airflow.DAG(
dag_id=dag_id,
start_date=pendulum.datetime(2022, 1, 1, tz="Europe/Paris"),
default_args=default_args,
schedule_interval=source_config["schedule_interval"],
catchup=False,
tags=["source"],
tags=tags,
)

with dag:
Expand Down
Loading