Skip to content

Commit

Permalink
perf: added indices to mongodb collections; using instead of deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Stablum committed Nov 30, 2021
1 parent d04047d commit e95f343
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
37 changes: 26 additions & 11 deletions preprocess/download_sets_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,19 @@ def parse_sets(page, ti):

def clear_activity_data(rel, ti):
db = persistency.mongo_db()
coll = db['activity_data']

# remove all data previously stored for this relation
db['activity_data'].remove({})
coll.delete_many({})
coll.create_index([("activity_id", -1)])

def clear(rel, ti):
db = persistency.mongo_db()
coll = db[rel.name]

# remove all data previously stored for this relation
db[rel.name].remove({})
coll.delete_many({})
coll.create_index([("activity_id", -1)])

def persist_sets(page, ti):
"""
Expand All @@ -141,14 +145,14 @@ def persist_sets(page, ti):
data = large_mp.recv(ti, f'parse_sets_{page}')

# FIXME: per-rel tasks
for rel, sets in data.items():
for rel_name, sets in data.items():

for activity_id, set_ in sets.items():

# remove previously stored activity, if present FIXME: deprecated?
db[rel].delete_one({'activity_id': activity_id}) # remove pre-existing set for this activity
# remove pre-existing set for this activity
db[rel_name].delete_one({'activity_id': activity_id})

db[rel].insert_one({
db[rel_name].insert_one({
'activity_id': activity_id,
'set_': set_
})
Expand All @@ -163,6 +167,8 @@ def codelists(ti):
:return: None
"""
db = persistency.mongo_db()
coll_out = db['codelists']
coll_out.create_index([("name", -1)])
for codelist_name in extract_codelists(rels):
url = DATASTORE_CODELIST_URL.format(codelist_name)
params = {'format': 'json'}
Expand All @@ -171,8 +177,8 @@ def codelists(ti):
lst = []
for curr in data:
lst.append(curr['code'])
db['codelists'].delete_many({'name': codelist_name})
db['codelists'].insert({
coll_out.delete_many({'name': codelist_name})
coll_out.insert({
'name': codelist_name,
'codelist': lst
})
Expand Down Expand Up @@ -203,7 +209,8 @@ def encode(rel, ti):
coll_out = db[rel.name + "_encoded"]

# remove existing data in the collection
coll_out.remove({})
coll_out.delete_many({})
coll_out.create_index([("activity_id", -1)])

for document in coll_in.find(no_cursor_timeout=True):
document = dict(document) # copy
Expand Down Expand Up @@ -250,6 +257,7 @@ def arrayfy(rel, ti):
coll_in = db[rel.name+"_encoded"]
coll_out = db[rel.name+"_arrayfied"]
coll_out.delete_many({}) # empty the collection
coll_out.create_index([("activity_id", -1)])
for set_index, document in enumerate(coll_in.find()):
set_npas = []
set_ = document['set_']
Expand Down Expand Up @@ -279,14 +287,17 @@ def to_npa(rel, ti):
db = persistency.mongo_db()
coll_in = db[rel.name+"_arrayfied"]
coll_out = db['npas']
coll_out.create_index([("rel", -1)])
coll_out.create_index([("creation_date", -1)])
coll_out.create_index([("npa_file_id", -1)])
rel_npas = []
for document in coll_in.find():
set_npa = utils.deserialize(document['npa'])
set_index = document['set_index']
set_index_col = np.ones((set_npa.shape[0], 1))*set_index
rel_npas.append(np.hstack([set_index_col, set_npa]))
rel_npa = np.vstack(rel_npas)
coll_out.remove({'rel': rel.name})
coll_out.delete_many({'rel': rel.name})

coll_out.insert_one({
'rel': rel.name,
Expand All @@ -311,6 +322,10 @@ def to_tsets(rel, ti):
set_indices = list(set(map(lambda document: document['set_index'], set_indices_results)))
train_indices, test_indices = sklearn.model_selection.train_test_split(set_indices, train_size=0.90)
coll_out = db['npas_tsets']
coll_out.create_index([("rel", -1)])
coll_out.create_index([("creation_date", -1)])
coll_out.create_index([("train_npa_file_id", -1)])
coll_out.create_index([("test_npa_file_id", -1)])
train_npas = []
test_npas = []
for document in coll_in.find():
Expand All @@ -326,7 +341,7 @@ def to_tsets(rel, ti):
train_npa = np.vstack(train_npas)
test_npa = np.vstack(test_npas)

coll_out.remove({'rel': rel.name})
coll_out.delete_many({'rel': rel.name})
coll_out.insert_one({
'rel': rel.name,
'creation_time': utils.strnow_iso(),
Expand Down
9 changes: 5 additions & 4 deletions preprocess/vectorize_activities_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ def clear(ti):
:return:
"""
db = persistency.mongo_db()
db['activity_data_encoded'].remove()
db['activity_vectors'].remove()
db['activity_data_encoded'].delete_many({})
db['activity_data_encoded'].create_index([("activity_id", -1)])
db['activity_vectors'].delete_many({})
db['activity_vectors'].create_index([("activity_id", -1)])

def collect(ti):
"""
Expand All @@ -49,7 +51,6 @@ def collect(ti):
coll_sets[rel.name] = db[rel.name + "_encoded"]

activity_docs = coll_activity.find({}, {'activity_id':1})
activity_sets = collections.OrderedDict()
for activity_doc in activity_docs:
encoded_sets = collections.OrderedDict()
activity_id = activity_doc['activity_id']
Expand Down Expand Up @@ -106,7 +107,7 @@ def vectorize(ti):
t_vectorize = PythonOperator(
task_id="vectorize",
python_callable=vectorize,

start_date=days_ago(2)
)

t_clear >> t_collect >> t_vectorize

0 comments on commit e95f343

Please sign in to comment.