Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
sirene: amélioration de l'import des donnés SIRENE
Browse files Browse the repository at this point in the history
- Les données de production ne sont plus effacées avant validation
explicite,
- l'import est effectué sur une table de travail sans indexes (bien plus
rapide),
- les indexes sont recréés en fin de procédure.

L'analyse de la base est optionnelle et déclenchable via un argument
`--analyze`.

La table de travail peut-être activée manuellement en fin d'import via
l'argument `--activate`.
  • Loading branch information
ikarius committed Sep 9, 2024
1 parent 6680551 commit e1890ae
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 12 deletions.
95 changes: 95 additions & 0 deletions dora/sirene/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from django.db import connection, transaction

from .models import Establishment

"""
Backup lors de l'import SIRENE:
Outils permettant de créer une table SIRENE temporaire.
L'objectif et de permettre de créer une base SIRENE à jour,
sans altérer la version de production, et si possible, à partir d'un conteneur tiers à des fins d'automatisation.
La démarche :
- créer une copie vide de la table SIRENE *sans indexes*,
- importer les données,
- recréer les indexes,
- renommer l'ancienne table en `sirene_establiment_bak`,
- nommer la table nouvellement créée `sirene_establishment`.
Les ordres sont executés via du SQL "brut", donc toute modification ou optimisation du modèle `sirene`
entrainera une modification de ces commandes.
Les indexes sont désactivés en premier lieu pour accélerer *grandemenr* les ordres d'insertion.
Il sont recréés une fois la table des établissements remplie.
On pourrait imaginer recréer les DDL via réflexion / introspection, mais c'est de l'over-engineering
pour une table qui ne bouge ... jamais.
"""


def create_table(table_name: str):
create_table_ddl = f"""
DROP TABLE IF EXISTS public.{table_name};
CREATE TABLE public.{table_name} (
siret varchar(14) NOT NULL,
siren varchar(9) NOT NULL,
ape varchar(6) NOT NULL,
city_code varchar(5) NOT NULL,
postal_code varchar(5) NOT NULL,
is_siege bool NOT NULL,
longitude float8 NULL,
latitude float8 NULL,
full_search_text text NOT NULL,
address1 varchar(255) NOT NULL,
address2 varchar(255) NOT NULL,
city varchar(255) NOT NULL,
name varchar(255) NOT NULL,
parent_name varchar(255) NOT NULL,
CONSTRAINT {table_name}_pkey PRIMARY KEY (siret)
);
"""
with connection.cursor() as c:
c.execute(create_table_ddl)


def create_indexes(table_name: str):
create_indexes_ddl = f"""
CREATE INDEX {table_name}_full_text_trgm_idx ON public.{table_name} USING gin (full_search_text gin_trgm_ops);
CREATE INDEX {table_name}_code_commune_100bb2ad ON public.{table_name} USING btree (city_code);
CREATE INDEX {table_name}_code_commune_100bb2ad_like ON public.{table_name} USING btree (city_code varchar_pattern_ops);
CREATE INDEX {table_name}_is_siege_9c0272c3 ON public.{table_name} USING btree (is_siege);
CREATE INDEX {table_name}_name_d8569d90 ON public.{table_name} USING btree (name);
CREATE INDEX {table_name}_name_d8569d90_like ON public.{table_name} USING btree (name varchar_pattern_ops);
CREATE INDEX {table_name}_parent_name_1990928d ON public.{table_name} USING btree (parent_name);
CREATE INDEX {table_name}_parent_name_1990928d_like ON public.{table_name} USING btree (parent_name varchar_pattern_ops);
CREATE INDEX {table_name}_siren_b19f551a ON public.{table_name} USING btree (siren);
CREATE INDEX {table_name}_siren_b19f551a_like ON public.{table_name} USING btree (siren varchar_pattern_ops);
CREATE INDEX {table_name}_siret_3eb91925_like ON public.{table_name} USING btree (siret varchar_pattern_ops);
"""
with connection.cursor() as c:
c.execute(create_indexes_ddl)


def rename_table(orig_table_name: str, dest_table_name: str):
with connection.cursor() as c:
c.execute("ALTER TABLE %s RENAME TO %s;", [orig_table_name, dest_table_name])


def vacuum_analyze():
with connection.cursor() as c:
c.execute("VACUUM ANALYZE;")


def create_insert_statement(table_name: str) -> tuple[str, list[str]]:
fields = [f.name for f in Establishment._meta.fields]
stmt = f"INSERT INTO public.{table_name}({",".join(fields)}) VALUES({ ",".join(["%s"]*len(fields)) })"
return stmt, fields


def add_establishment(stmt: str, e: Establishment, fields: list[str]):
# non-transactionnel
values = [getattr(e, f) for f in fields]
with connection.cursor() as c:
c.execute(stmt, values)


@transaction.atomic
def bulk_add_establishments(table_name: str, ee: list[Establishment]):
stmt, fields = create_insert_statement(table_name)
for e in ee:
add_establishment(stmt, e, fields)
96 changes: 84 additions & 12 deletions dora/sirene/management/commands/import_sirene.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,45 @@
from django.db import transaction
from django.db.utils import DataError

from dora.sirene.backup import (
bulk_add_establishments,
create_indexes,
create_table,
rename_table,
vacuum_analyze,
)
from dora.sirene.models import Establishment

# Documentation des variables SIRENE : https://www.sirene.fr/static-resources/htm/v_sommaire.htm
USE_TEMP_DIR = not settings.DEBUG
SIRENE_TABLE = "sirene_establishment"
TMP_TABLE = "_sirene_establishment_tmp"
BACKUP_TABLE = "_sirene_establishment_bak"


def clean_spaces(string):
return string.replace(" ", " ").strip()


USE_TEMP_DIR = not settings.DEBUG


def commit(rows):
Establishment.objects.bulk_create(rows)
bulk_add_establishments(TMP_TABLE, rows)


class Command(BaseCommand):
help = "Import the latest Sirene database"
help = "Import de la dernière base SIRENE géolocalisée"

def add_arguments(self, parser):
parser.add_argument(
"--activate",
action="store_true",
help="Echange la table de travail temporaire générée par l'immport avec la table de production.",
)

parser.add_argument(
"--analyze",
action="store_true",
help="Effectue un VACUUM ANALYZE sur la base.",
)

def download_data(self, tmp_dir_name):
if USE_TEMP_DIR:
Expand Down Expand Up @@ -135,12 +156,46 @@ def create_establishment(self, siren, parent_name, row):
)

def handle(self, *args, **options):
if args.get("activate"):
# activation de la table temporaire (si existante),
# comme table de production (`sirene_establishment`)
self.stdout.write(self.WARNING("Activation de la table de travail"))

# on sauvegarde la base de production
self.stdout.write(self.NOTICE(" > sauvegarde de la table actuelle"))
rename_table(SIRENE_TABLE, BACKUP_TABLE)

# on renomme la table de travail
self.stdout.write(self.NOTICE(" > renommage de la table de travail"))
rename_table(TMP_TABLE, SIRENE_TABLE)

self.stdout.write(self.NOTICE("Activation terminée"))
return

if args.get("rollback"):
# activation de la table sauvegardée
self.stdout.write(self.WARNING("Activation de la table sauvegardée"))
rename_table(SIRENE_TABLE, TMP_TABLE)
rename_table(BACKUP_TABLE, SIRENE_TABLE)
rename_table(TMP_TABLE, BACKUP_TABLE)

if args.get("analyse"):
# lance une analyse statistique sur la base Postgres
self.stdout.write(self.WARNING("Analyse de la DB en cours..."))
vacuum_analyze()
self.stdout.write(self.NOTICE("Analyse terminée"))
return

self.stdout.write(self.NOTICE(" > création de la base de travail"))
# efface la précédente
create_table(TMP_TABLE)

with tempfile.TemporaryDirectory() as tmp_dir_name:
stock_file, estab_file = self.download_data(tmp_dir_name)

num_stock_items = 0
with open(stock_file) as f:
num_stock_items = sum(1 for line in f)
num_stock_items = sum(1 for _ in f)

legal_units = {}
with open(stock_file) as units_file:
Expand All @@ -157,20 +212,30 @@ def handle(self, *args, **options):
# On ignore les unités légales fermées
legal_units[row["siren"]] = self.get_ul_name(row)

self.stdout.write(self.style.NOTICE("Counting establishments"))
self.stdout.write(
self.style.NOTICE(" > décompte des établissements...")
)

num_establishments = 0
with open(estab_file) as f:
num_establishments = sum(1 for line in f)
num_establishments = sum(1 for _ in f)
last_prog = 0

self.stdout.write(
self.style.NOTICE(f" > {num_establishments} établissements")
)

with open(estab_file) as establishment_file:
self.stdout.write(self.style.NOTICE("Importing establishments"))
self.stdout.write(self.style.NOTICE(" > import des établissements..."))
reader = csv.DictReader(establishment_file, delimiter=",")

with transaction.atomic(durable=True):
self.stdout.write(self.style.WARNING("Emptying current table"))
Establishment.objects.all().delete()
self.stdout.write(
self.style.WARNING(
" > insertion des données dans la table temporaire..."
)
)
# Establishment.objects.all().delete()
batch_size = 1_000
rows = []
for i, row in enumerate(reader):
Expand Down Expand Up @@ -199,4 +264,11 @@ def handle(self, *args, **options):

commit(rows)

self.stdout.write(self.style.SUCCESS("Import successful"))
# recréation des indexes sur la table de travail
self.stdout.write(self.style.NOTICE(" > re-création des indexes"))
create_indexes(TMP_TABLE)

# la sauvegarde de la base de production et l'analyse de la DB
# ne sont pas automatique, voir arguments `--activate` et `--analyze`

self.stdout.write(self.style.SUCCESS("Import terminé"))

0 comments on commit e1890ae

Please sign in to comment.