From 5b7bb26af890462c69f62fee47eaa83b27f2cb98 Mon Sep 17 00:00:00 2001 From: Francesco Stablum Date: Wed, 24 Nov 2021 08:47:58 +0100 Subject: [PATCH] fix: previously-stored data wasn't being removed ; set size cap ; encoding timing measures --- airflow/forward_airflow.sh | 2 +- common/relspecs.py | 2 +- config/example.yaml | 4 ++++ preprocess/dag.py | 44 +++++++++++++++++++++++++++----------- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/airflow/forward_airflow.sh b/airflow/forward_airflow.sh index 3192d48..adb11c8 100644 --- a/airflow/forward_airflow.sh +++ b/airflow/forward_airflow.sh @@ -1,6 +1,6 @@ #!/bin/bash # FIXME: screen quits have been added to the launch scripts -ssh root@ml.nyuki.io "screen -X -S airflow_webserver quit ; screen -X -S airflow_scheduler quit; cd ~/learning_sets/airflow/ ; bash launch_airflow.sh; screen -ls; echo done" +ssh root@ml.nyuki.io "screen -X -S airflow_webserver quit ; screen -X -S airflow_scheduler quit; cd ~/learning_sets/airflow/ ; sleep 1 ; bash launch_airflow.sh; screen -ls; echo done" ssh -L 8081:127.0.0.1:8080 root@ml.nyuki.io -N -f diff --git a/common/relspecs.py b/common/relspecs.py index c533061..2c113c0 100644 --- a/common/relspecs.py +++ b/common/relspecs.py @@ -274,7 +274,7 @@ def encode(self, entries, set_size, **kwargs): logging.warning("code is None: this shouldn't happen") continue elif code not in self.codelist: - logging.warning(f"code '{code}' not found in the codelist {self.codelist}") + pass # FIXME: this is way too common: logging.warning(f"code '{code}' not found in the codelist {self.codelist}") else: index_one = self.codelist.index(code) ret[index_code, index_one] = 1 diff --git a/config/example.yaml b/config/example.yaml index 5798a26..449b0c1 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -16,6 +16,10 @@ airflow_user: someuser airflow_password: somepassword airflow_email: someuser@zimmerman.team +download_page_size: 1000 +download_max_pages: 3 +download_max_set_size: 50 + data_loader_num_workers: 4 models_dag_config_name: dspn_deepnarrow models_dag_days_interval: 2 diff --git a/preprocess/dag.py b/preprocess/dag.py index d3381e9..8808409 100644 --- a/preprocess/dag.py +++ b/preprocess/dag.py @@ -12,24 +12,19 @@ import os import sys import pymongo +import time # since airflow's DAG modules are imported elsewhere (likely ~/airflow) # we have to explicitly add the path of the parent directory to this module to python's path path = os.path.abspath(os.path.dirname(os.path.abspath(__file__))+"/..") sys.path = [path]+sys.path -from common import utils -from common import relspecs -from common import persistency +from common import utils, relspecs, persistency, config from preprocess import large_mp - rels = relspecs.rels.downloadable logging.basicConfig(level=logging.DEBUG) DATASTORE_ACTIVITY_URL = "https://datastore.iati.cloud/api/v2/activity" DATASTORE_CODELIST_URL = "https://datastore.iati.cloud/api/codelists/{}/" -PAGE_SIZE = 1000 -MAX_PAGES = 3 - def extract_codelists(_rels): """ @@ -45,7 +40,7 @@ def extract_codelists(_rels): def download(start, ti): """ Airflow task: retrieve activities from back-end - :param start: starting search result index of the page (`start` to `start+PAGE_SIZE`) being downloaded + :param start: starting search result index of the page (`start` to `start+config.download_page_size`) being downloaded :param ti (str): task id :return: None """ @@ -54,7 +49,7 @@ def download(start, ti): 'q': "*:*", 'fl': fl, 'start': start, - 'rows': PAGE_SIZE + 'rows': config.download_page_size } logging.info(f"requesting {DATASTORE_ACTIVITY_URL} with {params}") response = requests.get(DATASTORE_ACTIVITY_URL, params=params) @@ -83,6 +78,8 @@ def parse(page, ti): if m is not None: rel_field = m.group(1) if rel_field in rel.fields_names: + # cap the amount of items to config.download_max_set_size + v = v[:config.download_max_set_size] # logging.info(f"considering field {rel_field}") rels_vals[rel.name][activity_id][rel_field] = v @@ -128,9 +125,17 @@ def persist(page, ti): db = persistency.mongo_db() data = large_mp.recv(ti, f'parse_{page}') + # FIXME: per-rel tasks for rel, sets in data.items(): + + # remove all data previously stored for this relation + db[rel].remove({}) + for activity_id, set_ in sets.items(): + + # remove previously stored activity, if present FIXME: deprecated? db[rel].delete_one({'activity_id': activity_id}) # remove pre-existing set for this activity + db[rel].insert_one({ 'activity_id': activity_id, 'set_': set_ @@ -184,15 +189,27 @@ def encode(rel, ti): db = persistency.mongo_db() coll_in = db[rel.name] coll_out = db[rel.name + "_encoded"] + + # remove existing data in the collection + coll_out.remove({}) + + # how much time does each item require to be encoded + for document in coll_in.find(no_cursor_timeout=True): document = dict(document) # copy set_ = document['set_'] set_size = get_set_size(set_) + + start = time.time() + for field in rel.fields: encodable = set_.get(field.name, []) tmp = field.encode(encodable, set_size) set_[field.name] = tmp + end = time.time() + encoding_time = end-start + document['encoding_time'] = encoding_time del document['_id'] lens = list(map(lambda fld: len(set_[fld.name]), rel.fields)) if len(set(lens)) > 1: @@ -205,8 +222,9 @@ def encode(rel, ti): try: coll_out.insert_one(document) except pymongo.errors.DocumentTooLarge as e: + logging.info(f"document[activity_id]: {document['activity_id']}") for field in rel.fields: - logging.info(f"{field.name} shape: {set_[field.name].shape}") + logging.info(f"{field.name} len: {len(set_[field.name])}") raise Exception(f"cannot insert document into relation {rel.name} because {str(e)}") def arrayfy(rel, ti): @@ -296,6 +314,8 @@ def to_tsets(rel, ti): train_npa = np.vstack(train_npas) test_npa = np.vstack(test_npas) + + coll_out.remove({'rel': rel.name}) coll_out.insert_one({ 'rel': rel.name, 'creation_time': utils.strnow_iso(), @@ -322,7 +342,7 @@ def to_tsets(rel, ti): schedule_interval=None ) as dag: - pages = list(range(MAX_PAGES)) + pages = list(range(config.download_max_pages)) t_codelists = PythonOperator( task_id="codelists", @@ -333,7 +353,7 @@ def to_tsets(rel, ti): t_persist = {} for page in pages: - start = page*PAGE_SIZE + start = page*config.download_page_size t_download = PythonOperator( task_id=f"download_{page}", python_callable=download,