From f59daed6e296dea0c0d3130e48c6d79d0cc6e47a Mon Sep 17 00:00:00 2001 From: "amaury.zarzelli" Date: Wed, 14 Feb 2024 11:20:19 +0100 Subject: [PATCH] fix(pivot2osm): reduce batch size, use batch for nodes as well --- r2gg/_pivot_to_osm.py | 52 ++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/r2gg/_pivot_to_osm.py b/r2gg/_pivot_to_osm.py index 24dc7ab..0d6a2f7 100644 --- a/r2gg/_pivot_to_osm.py +++ b/r2gg/_pivot_to_osm.py @@ -69,26 +69,41 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = attribs = {"version": "0.6", "generator": "r2gg"} with xf.element("osm", attribs): - # Ecriture des nodes - sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) + # Récupération du nombre de nodes + sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.nodes" logger.info("SQL: {}".format(sql_query)) st_execute = time.time() cursor.execute(sql_query) et_execute = time.time() logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) row = cursor.fetchone() - logger.info("Writing nodes") - st_execute = time.time() - i = 1 - while row: - nodeEl = writeNode(row, extraction_date) - xf.write(nodeEl, pretty_print=True) - row = cursor.fetchone() - if (i % ceil(cursor.rowcount/10) == 0): - logger.info("%s / %s nodes ajoutés" %(i, cursor.rowcount)) - i += 1 - et_execute = time.time() - logger.info("Writing nodes ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + nodesize = row["cnt"] + + # Ecriture des nodes + batchsize = 500000 + offset = 0 + logger.info(f"Writing nodes: {nodesize} ways to write") + st_nodes = time.time() + while offset < nodesize: + sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) + sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) + logger.info("SQL: {}".format(sql_query)) + st_execute = time.time() + cursor.execute(sql_query) + et_execute = time.time() + offset += batchsize + logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + row = cursor.fetchone() + logger.info("Writing nodes") + st_execute = time.time() + i = 1 + while row: + nodeEl = writeNode(row, extraction_date) + xf.write(nodeEl, pretty_print=True) + row = cursor.fetchone() + logger.info("%s / %s nodes ajoutés" %(offset, nodesize)) + et_nodes = time.time() + logger.info("Writing nodes ended. Elapsed time : %s seconds." %(et_nodes - st_nodes)) # Récupération du nombre de ways sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges" @@ -101,14 +116,15 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = edgesize = row["cnt"] # Ecriture des ways - batchsize = 500000 + batchsize = 300000 offset = 0 logger.info(f"Writing ways: {edgesize} ways to write") - st_execute = time.time() + st_edges = time.time() while offset < edgesize: sql_query2 = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', f'{input_schema}.inter_nodes(geom) as internodes']) sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) logger.info("SQL: {}".format(sql_query2)) + st_execute = time.time() cursor.execute(sql_query2) et_execute = time.time() offset += batchsize @@ -128,8 +144,8 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = xf.write(wayEl, pretty_print=True) row = cursor.fetchone() logger.info("%s / %s ways ajoutés" %(offset, edgesize)) - et_execute = time.time() - logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + et_edges = time.time() + logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_edges - st_edges)) # Ecriture des restrictions sql_query3 = f"select * from {input_schema}.non_comm"