Skip to content

Commit

Permalink
fix: previously-stored data wasn't being removed ; set size cap ; enc…
Browse files Browse the repository at this point in the history
…oding timing measures
  • Loading branch information
Francesco Stablum committed Nov 24, 2021
1 parent 3075aa5 commit 5b7bb26
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
2 changes: 1 addition & 1 deletion airflow/forward_airflow.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

# FIXME: screen quits have been added to the launch scripts
ssh [email protected] "screen -X -S airflow_webserver quit ; screen -X -S airflow_scheduler quit; cd ~/learning_sets/airflow/ ; bash launch_airflow.sh; screen -ls; echo done"
ssh [email protected] "screen -X -S airflow_webserver quit ; screen -X -S airflow_scheduler quit; cd ~/learning_sets/airflow/ ; sleep 1 ; bash launch_airflow.sh; screen -ls; echo done"

ssh -L 8081:127.0.0.1:8080 [email protected] -N -f
2 changes: 1 addition & 1 deletion common/relspecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def encode(self, entries, set_size, **kwargs):
logging.warning("code is None: this shouldn't happen")
continue
elif code not in self.codelist:
logging.warning(f"code '{code}' not found in the codelist {self.codelist}")
pass # FIXME: this is way too common: logging.warning(f"code '{code}' not found in the codelist {self.codelist}")
else:
index_one = self.codelist.index(code)
ret[index_code, index_one] = 1
Expand Down
4 changes: 4 additions & 0 deletions config/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ airflow_user: someuser
airflow_password: somepassword
airflow_email: [email protected]

download_page_size: 1000
download_max_pages: 3
download_max_set_size: 50

data_loader_num_workers: 4
models_dag_config_name: dspn_deepnarrow
models_dag_days_interval: 2
44 changes: 32 additions & 12 deletions preprocess/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,19 @@
import os
import sys
import pymongo
import time

# since airflow's DAG modules are imported elsewhere (likely ~/airflow)
# we have to explicitly add the path of the parent directory to this module to python's path
path = os.path.abspath(os.path.dirname(os.path.abspath(__file__))+"/..")
sys.path = [path]+sys.path
from common import utils
from common import relspecs
from common import persistency
from common import utils, relspecs, persistency, config
from preprocess import large_mp

rels = relspecs.rels.downloadable
logging.basicConfig(level=logging.DEBUG)

DATASTORE_ACTIVITY_URL = "https://datastore.iati.cloud/api/v2/activity"
DATASTORE_CODELIST_URL = "https://datastore.iati.cloud/api/codelists/{}/"
PAGE_SIZE = 1000
MAX_PAGES = 3


def extract_codelists(_rels):
"""
Expand All @@ -45,7 +40,7 @@ def extract_codelists(_rels):
def download(start, ti):
"""
Airflow task: retrieve activities from back-end
:param start: starting search result index of the page (`start` to `start+PAGE_SIZE`) being downloaded
:param start: starting search result index of the page (`start` to `start+config.download_page_size`) being downloaded
:param ti (str): task id
:return: None
"""
Expand All @@ -54,7 +49,7 @@ def download(start, ti):
'q': "*:*",
'fl': fl,
'start': start,
'rows': PAGE_SIZE
'rows': config.download_page_size
}
logging.info(f"requesting {DATASTORE_ACTIVITY_URL} with {params}")
response = requests.get(DATASTORE_ACTIVITY_URL, params=params)
Expand Down Expand Up @@ -83,6 +78,8 @@ def parse(page, ti):
if m is not None:
rel_field = m.group(1)
if rel_field in rel.fields_names:
# cap the amount of items to config.download_max_set_size
v = v[:config.download_max_set_size]
# logging.info(f"considering field {rel_field}")
rels_vals[rel.name][activity_id][rel_field] = v

Expand Down Expand Up @@ -128,9 +125,17 @@ def persist(page, ti):
db = persistency.mongo_db()
data = large_mp.recv(ti, f'parse_{page}')

# FIXME: per-rel tasks
for rel, sets in data.items():

# remove all data previously stored for this relation
db[rel].remove({})

for activity_id, set_ in sets.items():

# remove previously stored activity, if present FIXME: deprecated?
db[rel].delete_one({'activity_id': activity_id}) # remove pre-existing set for this activity

db[rel].insert_one({
'activity_id': activity_id,
'set_': set_
Expand Down Expand Up @@ -184,15 +189,27 @@ def encode(rel, ti):
db = persistency.mongo_db()
coll_in = db[rel.name]
coll_out = db[rel.name + "_encoded"]

# remove existing data in the collection
coll_out.remove({})

# how much time does each item require to be encoded

for document in coll_in.find(no_cursor_timeout=True):
document = dict(document) # copy
set_ = document['set_']
set_size = get_set_size(set_)

start = time.time()

for field in rel.fields:
encodable = set_.get(field.name, [])
tmp = field.encode(encodable, set_size)
set_[field.name] = tmp

end = time.time()
encoding_time = end-start
document['encoding_time'] = encoding_time
del document['_id']
lens = list(map(lambda fld: len(set_[fld.name]), rel.fields))
if len(set(lens)) > 1:
Expand All @@ -205,8 +222,9 @@ def encode(rel, ti):
try:
coll_out.insert_one(document)
except pymongo.errors.DocumentTooLarge as e:
logging.info(f"document[activity_id]: {document['activity_id']}")
for field in rel.fields:
logging.info(f"{field.name} shape: {set_[field.name].shape}")
logging.info(f"{field.name} len: {len(set_[field.name])}")
raise Exception(f"cannot insert document into relation {rel.name} because {str(e)}")

def arrayfy(rel, ti):
Expand Down Expand Up @@ -296,6 +314,8 @@ def to_tsets(rel, ti):

train_npa = np.vstack(train_npas)
test_npa = np.vstack(test_npas)

coll_out.remove({'rel': rel.name})
coll_out.insert_one({
'rel': rel.name,
'creation_time': utils.strnow_iso(),
Expand All @@ -322,7 +342,7 @@ def to_tsets(rel, ti):
schedule_interval=None
) as dag:

pages = list(range(MAX_PAGES))
pages = list(range(config.download_max_pages))

t_codelists = PythonOperator(
task_id="codelists",
Expand All @@ -333,7 +353,7 @@ def to_tsets(rel, ti):

t_persist = {}
for page in pages:
start = page*PAGE_SIZE
start = page*config.download_page_size
t_download = PythonOperator(
task_id=f"download_{page}",
python_callable=download,
Expand Down

0 comments on commit 5b7bb26

Please sign in to comment.