From 364e94d74558056fbdbf5194419f7259ac2f5a2e Mon Sep 17 00:00:00 2001 From: "amaury.zarzelli" Date: Tue, 13 Feb 2024 18:04:28 +0100 Subject: [PATCH 1/4] fix(pivot2osm): OOM when selecting edges of pivot DB --- changelog.md | 11 ++++++--- r2gg/_pivot_to_osm.py | 53 +++++++++++++++++++++++++++---------------- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/changelog.md b/changelog.md index 7ad3c08..0b11572 100644 --- a/changelog.md +++ b/changelog.md @@ -1,15 +1,20 @@ # CHANGELOG +## 2.2.3 + +CHANGED: +- Pivot to osm: Using batches for fetching edges in pivot DB + ## 2.2.2 ADD: - VACUUM ANALYSE is done only on created tables -- Templates for issues and PR +- Templates for issues and PR - A code of conduct was adapted from the contributor covenant - A contributing was added -- The DCO was added +- The DCO was added - Restrict access to pedestrian ways according to BDTOPO -- Better handling of urbain column inside the BDTOPO +- Better handling of urbain column inside the BDTOPO FIX: - Durée de parcours incohérente sur OSRM entre car-fastest et car-shortest diff --git a/r2gg/_pivot_to_osm.py b/r2gg/_pivot_to_osm.py index 6124b59..24dc7ab 100644 --- a/r2gg/_pivot_to_osm.py +++ b/r2gg/_pivot_to_osm.py @@ -90,31 +90,44 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = et_execute = time.time() logger.info("Writing nodes ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - # Ecriture des ways - sql_query2 = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', f'{input_schema}.inter_nodes(geom) as internodes']) - logger.info("SQL: {}".format(sql_query2)) + # Récupération du nombre de ways + sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges" + logger.info("SQL: {}".format(sql_query)) st_execute = time.time() - cursor.execute(sql_query2) + cursor.execute(sql_query) et_execute = time.time() logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) row = cursor.fetchone() - logger.info("Writing ways") + edgesize = row["cnt"] + + # Ecriture des ways + batchsize = 500000 + offset = 0 + logger.info(f"Writing ways: {edgesize} ways to write") st_execute = time.time() - i = 1 - while row: - wayEl = writeWay(row, extraction_date) - for node in row['internodes']: - vertexSequence = vertexSequence + 1 - node['id'] = vertexSequence - nodeEl = writeNode(node, extraction_date) - xf.write(nodeEl, pretty_print=True) - wayEl = writeWayNds(wayEl, row, row['internodes']) - wayEl = writeWayTags(wayEl, row) - xf.write(wayEl, pretty_print=True) - row = cursor.fetchone() - if (i % ceil(cursor.rowcount/10) == 0): - logger.info("%s / %s ways ajoutés" %(i, cursor.rowcount)) - i += 1 + while offset < edgesize: + sql_query2 = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', f'{input_schema}.inter_nodes(geom) as internodes']) + sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) + logger.info("SQL: {}".format(sql_query2)) + cursor.execute(sql_query2) + et_execute = time.time() + offset += batchsize + logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + row = cursor.fetchone() + st_execute = time.time() + i = 1 + while row: + wayEl = writeWay(row, extraction_date) + for node in row['internodes']: + vertexSequence = vertexSequence + 1 + node['id'] = vertexSequence + nodeEl = writeNode(node, extraction_date) + xf.write(nodeEl, pretty_print=True) + wayEl = writeWayNds(wayEl, row, row['internodes']) + wayEl = writeWayTags(wayEl, row) + xf.write(wayEl, pretty_print=True) + row = cursor.fetchone() + logger.info("%s / %s ways ajoutés" %(offset, edgesize)) et_execute = time.time() logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_execute - st_execute)) From 5b4e6904bc2a02474de96a7264a7d6255bd1d779 Mon Sep 17 00:00:00 2001 From: "amaury.zarzelli" Date: Wed, 14 Feb 2024 10:37:09 +0100 Subject: [PATCH 2/4] doc: updates for 2.2.3 --- changelog.md | 2 ++ r2gg/__about__.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index 0b11572..2d20644 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,7 @@ # CHANGELOG +## x.y.z + ## 2.2.3 CHANGED: diff --git a/r2gg/__about__.py b/r2gg/__about__.py index f2a2887..425afd0 100644 --- a/r2gg/__about__.py +++ b/r2gg/__about__.py @@ -34,7 +34,7 @@ __uri_tracker__ = f"{__uri_repository__}issues/" __uri__ = __uri_repository__ -__version__ = "2.2.2" +__version__ = "2.2.3" __version_info__ = tuple( [ int(num) if num.isdigit() else num From f59daed6e296dea0c0d3130e48c6d79d0cc6e47a Mon Sep 17 00:00:00 2001 From: "amaury.zarzelli" Date: Wed, 14 Feb 2024 11:20:19 +0100 Subject: [PATCH 3/4] fix(pivot2osm): reduce batch size, use batch for nodes as well --- r2gg/_pivot_to_osm.py | 52 ++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/r2gg/_pivot_to_osm.py b/r2gg/_pivot_to_osm.py index 24dc7ab..0d6a2f7 100644 --- a/r2gg/_pivot_to_osm.py +++ b/r2gg/_pivot_to_osm.py @@ -69,26 +69,41 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = attribs = {"version": "0.6", "generator": "r2gg"} with xf.element("osm", attribs): - # Ecriture des nodes - sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) + # Récupération du nombre de nodes + sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.nodes" logger.info("SQL: {}".format(sql_query)) st_execute = time.time() cursor.execute(sql_query) et_execute = time.time() logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) row = cursor.fetchone() - logger.info("Writing nodes") - st_execute = time.time() - i = 1 - while row: - nodeEl = writeNode(row, extraction_date) - xf.write(nodeEl, pretty_print=True) - row = cursor.fetchone() - if (i % ceil(cursor.rowcount/10) == 0): - logger.info("%s / %s nodes ajoutés" %(i, cursor.rowcount)) - i += 1 - et_execute = time.time() - logger.info("Writing nodes ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + nodesize = row["cnt"] + + # Ecriture des nodes + batchsize = 500000 + offset = 0 + logger.info(f"Writing nodes: {nodesize} ways to write") + st_nodes = time.time() + while offset < nodesize: + sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) + sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) + logger.info("SQL: {}".format(sql_query)) + st_execute = time.time() + cursor.execute(sql_query) + et_execute = time.time() + offset += batchsize + logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + row = cursor.fetchone() + logger.info("Writing nodes") + st_execute = time.time() + i = 1 + while row: + nodeEl = writeNode(row, extraction_date) + xf.write(nodeEl, pretty_print=True) + row = cursor.fetchone() + logger.info("%s / %s nodes ajoutés" %(offset, nodesize)) + et_nodes = time.time() + logger.info("Writing nodes ended. Elapsed time : %s seconds." %(et_nodes - st_nodes)) # Récupération du nombre de ways sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges" @@ -101,14 +116,15 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = edgesize = row["cnt"] # Ecriture des ways - batchsize = 500000 + batchsize = 300000 offset = 0 logger.info(f"Writing ways: {edgesize} ways to write") - st_execute = time.time() + st_edges = time.time() while offset < edgesize: sql_query2 = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', f'{input_schema}.inter_nodes(geom) as internodes']) sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) logger.info("SQL: {}".format(sql_query2)) + st_execute = time.time() cursor.execute(sql_query2) et_execute = time.time() offset += batchsize @@ -128,8 +144,8 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = xf.write(wayEl, pretty_print=True) row = cursor.fetchone() logger.info("%s / %s ways ajoutés" %(offset, edgesize)) - et_execute = time.time() - logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + et_edges = time.time() + logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_edges - st_edges)) # Ecriture des restrictions sql_query3 = f"select * from {input_schema}.non_comm" From 20d02724387ffa8738dac19591d0e3940319fd59 Mon Sep 17 00:00:00 2001 From: "amaury.zarzelli" Date: Wed, 14 Feb 2024 11:24:35 +0100 Subject: [PATCH 4/4] fix(pivot2osm): variable name typo --- r2gg/_pivot_to_osm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r2gg/_pivot_to_osm.py b/r2gg/_pivot_to_osm.py index 0d6a2f7..9e1b6c4 100644 --- a/r2gg/_pivot_to_osm.py +++ b/r2gg/_pivot_to_osm.py @@ -86,7 +86,7 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = st_nodes = time.time() while offset < nodesize: sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) - sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) + sql_query += " LIMIT {} OFFSET {}".format(batchsize, offset) logger.info("SQL: {}".format(sql_query)) st_execute = time.time() cursor.execute(sql_query)