Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Amélioration de l'import des donnés SIRENE #375

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions dora/sirene/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from django.db import connection, transaction

from .models import Establishment

"""
Backup lors de l'import SIRENE:
Outils permettant de créer une table SIRENE temporaire.
L'objectif et de permettre de créer une base SIRENE à jour,
sans altérer la version de production, et si possible, à partir d'un conteneur tiers à des fins d'automatisation.
La démarche :
- créer une copie vide de la table SIRENE *sans indexes*,
- importer les données,
- recréer les indexes,
- renommer l'ancienne table en `sirene_establiment_bak`,
- nommer la table nouvellement créée `sirene_establishment`.
Les ordres sont executés via du SQL "brut", donc toute modification ou optimisation du modèle `sirene`
entrainera une modification de ces commandes.
Les indexes sont désactivés en premier lieu pour accélerer *grandemenr* les ordres d'insertion.
Il sont recréés une fois la table des établissements remplie.
On pourrait imaginer recréer les DDL via réflexion / introspection, mais c'est de l'over-engineering
pour une table qui ne bouge ... jamais.
"""


ikarius marked this conversation as resolved.
Show resolved Hide resolved
def create_table(table_name: str):
create_table_ddl = f"""
DROP TABLE IF EXISTS public.{table_name};
CREATE TABLE public.{table_name} (
siret varchar(14) NOT NULL,
siren varchar(9) NOT NULL,
ape varchar(6) NOT NULL,
city_code varchar(5) NOT NULL,
postal_code varchar(5) NOT NULL,
is_siege bool NOT NULL,
longitude float8 NULL,
latitude float8 NULL,
full_search_text text NOT NULL,
address1 varchar(255) NOT NULL,
address2 varchar(255) NOT NULL,
city varchar(255) NOT NULL,
name varchar(255) NOT NULL,
parent_name varchar(255) NOT NULL,
CONSTRAINT {table_name}_pkey PRIMARY KEY (siret)
);
"""
with connection.cursor() as c:
c.execute(create_table_ddl)


def create_indexes(table_name: str):
create_indexes_ddl = f"""
CREATE INDEX {table_name}_full_text_trgm_idx ON public.{table_name} USING gin (full_search_text gin_trgm_ops);
jbuget marked this conversation as resolved.
Show resolved Hide resolved
CREATE INDEX {table_name}_code_commune_100bb2ad ON public.{table_name} USING btree (city_code);
ikarius marked this conversation as resolved.
Show resolved Hide resolved
CREATE INDEX {table_name}_code_commune_100bb2ad_like ON public.{table_name} USING btree (city_code varchar_pattern_ops);
CREATE INDEX {table_name}_is_siege_9c0272c3 ON public.{table_name} USING btree (is_siege);
CREATE INDEX {table_name}_name_d8569d90 ON public.{table_name} USING btree (name);
CREATE INDEX {table_name}_name_d8569d90_like ON public.{table_name} USING btree (name varchar_pattern_ops);
CREATE INDEX {table_name}_parent_name_1990928d ON public.{table_name} USING btree (parent_name);
CREATE INDEX {table_name}_parent_name_1990928d_like ON public.{table_name} USING btree (parent_name varchar_pattern_ops);
CREATE INDEX {table_name}_siren_b19f551a ON public.{table_name} USING btree (siren);
CREATE INDEX {table_name}_siren_b19f551a_like ON public.{table_name} USING btree (siren varchar_pattern_ops);
CREATE INDEX {table_name}_siret_3eb91925_like ON public.{table_name} USING btree (siret varchar_pattern_ops);
"""
with connection.cursor() as c:
c.execute(create_indexes_ddl)


def rename_table(orig_table_name: str, dest_table_name: str):
with connection.cursor() as c:
c.execute("ALTER TABLE %s RENAME TO %s;", [orig_table_name, dest_table_name])


def vacuum_analyze():
with connection.cursor() as c:
c.execute("VACUUM ANALYZE;")


def create_insert_statement(table_name: str) -> tuple[str, list[str]]:
fields = [f.name for f in Establishment._meta.fields]
stmt = f"INSERT INTO public.{table_name}({",".join(fields)}) VALUES({ ",".join(["%s"]*len(fields)) })"
return stmt, fields


def add_establishment(stmt: str, e: Establishment, fields: list[str]):
# non-transactionnel
values = [getattr(e, f) for f in fields]
with connection.cursor() as c:
c.execute(stmt, values)


@transaction.atomic
def bulk_add_establishments(table_name: str, ee: list[Establishment]):
stmt, fields = create_insert_statement(table_name)
for e in ee:
add_establishment(stmt, e, fields)
102 changes: 90 additions & 12 deletions dora/sirene/management/commands/import_sirene.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,51 @@
from django.db import transaction
from django.db.utils import DataError

from dora.sirene.backup import (
bulk_add_establishments,
create_indexes,
create_table,
rename_table,
vacuum_analyze,
)
from dora.sirene.models import Establishment

# Documentation des variables SIRENE : https://www.sirene.fr/static-resources/htm/v_sommaire.htm
USE_TEMP_DIR = not settings.DEBUG
SIRENE_TABLE = "sirene_establishment"
TMP_TABLE = "_sirene_establishment_tmp"
BACKUP_TABLE = "_sirene_establishment_bak"


def clean_spaces(string):
return string.replace(" ", " ").strip()


USE_TEMP_DIR = not settings.DEBUG


def commit(rows):
Establishment.objects.bulk_create(rows)
bulk_add_establishments(TMP_TABLE, rows)


class Command(BaseCommand):
help = "Import the latest Sirene database"
help = "Import de la dernière base SIRENE géolocalisée"

def add_arguments(self, parser):
parser.add_argument(
"--activate",
action="store_true",
help="Active la table de travail temporaire générée par l'import.",
)

parser.add_argument(
"--rollback",
action="store_true",
help="Active la table de travail sauvegardée en production.",
)

parser.add_argument(
"--analyze",
action="store_true",
help="Effectue un VACUUM ANALYZE sur la base.",
)

def download_data(self, tmp_dir_name):
if USE_TEMP_DIR:
Expand Down Expand Up @@ -135,12 +162,46 @@ def create_establishment(self, siren, parent_name, row):
)

def handle(self, *args, **options):
if args.get("activate"):
# activation de la table temporaire (si existante),
# comme table de production (`sirene_establishment`)
self.stdout.write(self.WARNING("Activation de la table de travail"))

# on sauvegarde la base de production
self.stdout.write(self.NOTICE(" > sauvegarde de la table actuelle"))
rename_table(SIRENE_TABLE, BACKUP_TABLE)

# on renomme la table de travail
self.stdout.write(self.NOTICE(" > renommage de la table de travail"))
rename_table(TMP_TABLE, SIRENE_TABLE)

self.stdout.write(self.NOTICE("Activation terminée"))
return

if args.get("rollback"):
# activation de la table sauvegardée
self.stdout.write(self.WARNING("Activation de la table sauvegardée"))
rename_table(SIRENE_TABLE, TMP_TABLE)
rename_table(BACKUP_TABLE, SIRENE_TABLE)
rename_table(TMP_TABLE, BACKUP_TABLE)

if args.get("analyse"):
# lance une analyse statistique sur la base Postgres
self.stdout.write(self.WARNING("Analyse de la DB en cours..."))
vacuum_analyze()
self.stdout.write(self.NOTICE("Analyse terminée"))
return

self.stdout.write(self.NOTICE(" > création de la base de travail"))
# efface la précédente
create_table(TMP_TABLE)

with tempfile.TemporaryDirectory() as tmp_dir_name:
stock_file, estab_file = self.download_data(tmp_dir_name)

num_stock_items = 0
with open(stock_file) as f:
num_stock_items = sum(1 for line in f)
num_stock_items = sum(1 for _ in f)

legal_units = {}
with open(stock_file) as units_file:
Expand All @@ -157,20 +218,30 @@ def handle(self, *args, **options):
# On ignore les unités légales fermées
legal_units[row["siren"]] = self.get_ul_name(row)

self.stdout.write(self.style.NOTICE("Counting establishments"))
self.stdout.write(
self.style.NOTICE(" > décompte des établissements...")
)

num_establishments = 0
with open(estab_file) as f:
num_establishments = sum(1 for line in f)
num_establishments = sum(1 for _ in f)
last_prog = 0

self.stdout.write(
self.style.NOTICE(f" > {num_establishments} établissements")
)

with open(estab_file) as establishment_file:
self.stdout.write(self.style.NOTICE("Importing establishments"))
self.stdout.write(self.style.NOTICE(" > import des établissements..."))
reader = csv.DictReader(establishment_file, delimiter=",")

with transaction.atomic(durable=True):
self.stdout.write(self.style.WARNING("Emptying current table"))
Establishment.objects.all().delete()
self.stdout.write(
self.style.WARNING(
" > insertion des données dans la table temporaire..."
)
)
# Establishment.objects.all().delete()
batch_size = 1_000
rows = []
for i, row in enumerate(reader):
Expand Down Expand Up @@ -199,4 +270,11 @@ def handle(self, *args, **options):

commit(rows)

self.stdout.write(self.style.SUCCESS("Import successful"))
# recréation des indexes sur la table de travail
self.stdout.write(self.style.NOTICE(" > re-création des indexes"))
create_indexes(TMP_TABLE)

# la sauvegarde de la base de production et l'analyse de la DB
# ne sont pas automatique, voir arguments `--activate` et `--analyze`

self.stdout.write(self.style.SUCCESS("L'import est terminé. Ne pas oublier d'activer la table de travail (--activate)"))