Skip to content

Commit

Permalink
Merge pull request #2643 from zimmerman-team/develop
Browse files Browse the repository at this point in the history
Release updated automatic parsing and updated transaction core
  • Loading branch information
sylvanr authored Jul 16, 2021
2 parents 1e14edd + 3c769f5 commit a3ad32e
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 68 deletions.
6 changes: 4 additions & 2 deletions OIPA/iati/PostmanJsonImport/importPostmanJson.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ def get_json(self):
result_for_test_datastore_iatistandard_org = json.loads(json_string.decode('utf-8-sig')) # NOQA: E501
result_for_iati_cloud = json.loads(json_string.decode('utf-8-sig'))

self.simplify(result_for_iati_cloud, 'iatidatastore.iatistandard.org')
self.simplify(result_for_test_datastore_iatistandard_org, 'test-datastore.iatistandard.org') # NOQA: E501
# TODO: 7-7-2021 restructure thiss to better fit the current
# implementation
# self.simplify(result_for_iati_cloud, 'iatidatastore.iatistandard.org') # NOQA: E501
# self.simplify(result_for_test_datastore_iatistandard_org, 'test-datastore.iatistandard.org') # NOQA: E501
try:
with open(self.file_path + '/postman/postman_json_iati_cloud.json', 'w') as outfile: # NOQA: E501
json.dump(result_for_iati_cloud, outfile)
Expand Down
6 changes: 5 additions & 1 deletion OIPA/solr/budget/conf/managed-schema
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,15 @@

<field name="reporting_org_ref" type="non_case_sensitive" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_type" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_type_name" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_secondary_reporter" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_narrative" type="text_general" indexed="true" required="false" stored="true"/>

<field name="participating_org_ref" type="non_case_sensitive" multiValued="true" indexed="true" required="false" stored="true"/>

<field name="participating_org_type" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="participating_org_narrative" type="iati_narrative" indexed="true" required="false" stored="true"/>
<field name="participating_org_narrative_lang" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="participating_org_narrative_text" type="iati_narrative" indexed="true" required="false" stored="true"/>

<field name="recipient_country_code" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="recipient_country_name" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
Expand Down
38 changes: 38 additions & 0 deletions OIPA/solr/budget/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,49 @@ def add_participating_org(serializer, activity):
participating_org_all = activity.participating_organisations.all()
if participating_org_all:
serializer.add_field('participating_org_ref', [])
serializer.add_field('participating_org_type', [])
serializer.add_field('participating_org_narrative', [])
serializer.add_field('participating_org_narrative_lang', [])
serializer.add_field('participating_org_narrative_text', [])
for participating_organisation in participating_org_all:
serializer.add_value_list(
'participating_org_ref',
participating_organisation.ref
)
serializer.add_value_list(
'participating_org_type',
participating_organisation.type_id
)
for narrative in participating_organisation.narratives.all():
if narrative.content:
serializer.add_value_list(
"participating_org_narrative",
narrative.content
)
serializer.add_value_list(
"participating_org_narrative_text",
narrative.content
)
else:
serializer.add_value_list(
"participating_org_narrative",
" "
)
serializer.add_value_list(
"participating_org_narrative_text",
" "
)

if narrative.language:
serializer.add_value_list(
"participating_org_narrative_lang",
narrative.language.code
)
else:
serializer.add_value_list(
"participating_org_narrative_lang",
" "
)


def add_activity_additional_filter_fields(serializer, activity):
Expand Down
1 change: 1 addition & 0 deletions OIPA/solr/result/conf/managed-schema
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@

<field name="reporting_org_ref" type="non_case_sensitive" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_type" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_type_name" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_secondary_reporter" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_narrative" type="iati_narrative" indexed="true" required="false" stored="true"/>

Expand Down
16 changes: 15 additions & 1 deletion OIPA/solr/transaction/conf/managed-schema
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,13 @@

<field name="activity_sector_vocabulary" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="activity_sector_code" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="activity_sector_percentage" type="pdoubles" multiValued="true" indexed="true" required="false" stored="true"/>

<field name="participating_org_ref" type="non_case_sensitive" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="participating_org_type" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="participating_org_narrative" type="iati_narrative" indexed="true" required="false" stored="true"/>
<field name="participating_org_narrative_lang" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="participating_org_narrative_text" type="iati_narrative" indexed="true" required="false" stored="true"/>

<!-- fields for activity additional filters -->

Expand Down Expand Up @@ -178,14 +183,22 @@

<!-- end fields for activity additional filters -->


<field name="activity_date_start_planned" type="string" multiValued="false" indexed="false" required="false" stored="true"/>
<field name="activity_date_start_actual" type="string" multiValued="false" indexed="false" required="false" stored="true"/>
<field name="activity_date_end_planned" type="string" multiValued="false" indexed="false" required="false" stored="true"/>
<field name="activity_date_end_actual" type="string" multiValued="false" indexed="false" required="false" stored="true"/>
<field name="activity_date_start_planned_f" type="pdate" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="activity_date_start_actual_f" type="pdate" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="activity_date_end_planned_f" type="pdate" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="activity_date_end_actual_f" type="pdate" multiValued="false" indexed="true" required="false" stored="true"/>

<field name="activity_date_type" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="activity_date_iso_date" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="activity_date_iso_date_f" type="pdate" multiValued="true" indexed="true" required="false" stored="true"/>

<field name="reporting_org_ref" type="non_case_sensitive" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_type" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_type_name" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_secondary_reporter" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="reporting_org_narrative" type="iati_narrative" indexed="true" required="false" stored="true"/>

Expand Down Expand Up @@ -226,6 +239,7 @@
<field name="transaction_sector_vocabulary" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="transaction_sector_vocabulary_uri" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="transaction_sector_code" type="string" multiValued="true" indexed="true" required="false" stored="true"/>
<field name="transaction_sector_percentage" type="pdoubles" multiValued="true" indexed="true" required="false" stored="true"/>

<field name="transaction_recipient_country_code" type="string" multiValued="false" indexed="true" required="false" stored="true"/>
<field name="transaction_recipient_country_narrative" type="iati_narrative" multiValued="false" indexed="true" required="false" stored="true"/>
Expand Down
64 changes: 61 additions & 3 deletions OIPA/solr/transaction/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,49 @@
)


def add_activity_date_fields(serializer, activity):
activity_dates_all = activity.activitydate_set.all()
if not activity_dates_all:
return
for activity_date in activity_dates_all:
if activity_date.type_id == '1':
serializer.add_field(
'activity_date_start_planned',
str(activity_date.iso_date)
)
serializer.add_field(
'activity_date_start_planned_f',
activity_date.iso_date
)
elif activity_date.type_id == '2':
serializer.add_field(
'activity_date_start_actual',
str(activity_date.iso_date)
)
serializer.add_field(
'activity_date_start_actual_f',
activity_date.iso_date
)
elif activity_date.type_id == '3':
serializer.add_field(
'activity_date_end_planned',
str(activity_date.iso_date)
)
serializer.add_field(
'activity_date_end_planned_f',
activity_date.iso_date
)
elif activity_date.type_id == '4':
serializer.add_field(
'activity_date_end_actual',
str(activity_date.iso_date)
)
serializer.add_field(
'activity_date_end_actual_f',
activity_date.iso_date
)


class TransactionIndexing(BaseIndexing):

def transaction(self):
Expand Down Expand Up @@ -192,6 +235,8 @@ def transaction(self):
self.add_field('transaction_sector_vocabulary', [])
self.add_field('transaction_sector_vocabulary_uri', [])
self.add_field('transaction_sector_code', [])
self.add_field('transaction_sector_percentage', [])

for sector in transaction.transactionsector_set.all():
self.add_value_list(
'transaction_sector_vocabulary',
Expand All @@ -205,6 +250,10 @@ def transaction(self):
'transaction_sector_code',
sector.sector.code
)
self.add_value_list(
'transaction_sector_percentage',
sector.percentage
)

self.add_field(
'transaction_recipient_country_code',
Expand Down Expand Up @@ -251,19 +300,28 @@ def transaction(self):
transaction.tied_status_id
)

# Adding customized activity information to transaction
self.add_field('activity_sector_vocabulary', [])
self.add_field('activity_sector_code', [])
self.add_field('activity_sector_percentage', [])

for activity_sector in transaction.activity.activitysector_set.all():

self.add_value_list(
'activity_sector_vocabulary',
activity_sector.vocabulary_id
)
self.add_value_list('activity_sector_code',
activity_sector.sector.code)
self.add_value_list(
'activity_sector_code',
activity_sector.sector.code
)
self.add_value_list(
'activity_sector_percentage',
activity_sector.percentage
)

add_participating_org(self, transaction.activity)
add_activity_additional_filter_fields(self, transaction.activity)
add_activity_date_fields(self, transaction.activity)

def to_representation(self, transaction):
self.record = transaction
Expand Down
4 changes: 4 additions & 0 deletions OIPA/solr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def add_reporting_org(serializer, activity):
'reporting_org_type',
reporting_organisation.type_id
)
serializer.add_field(
'reporting_org_type_name',
get_child_attr(reporting_organisation, 'type.name')
)
serializer.add_field(
'reporting_org_secondary_reporter',
bool_string(reporting_organisation.secondary_reporter)
Expand Down
74 changes: 14 additions & 60 deletions OIPA/task_queue/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
from solr.transaction_sector.tasks import solr as solr_transaction_sector
from task_queue.download import DatasetDownloadTask
from task_queue.utils import (
Tasks, await_async_subtasks, reset_automatic_incremental_parse_dbs
Tasks, automatic_incremental_validation, await_async_subtasks,
reset_automatic_incremental_parse_dbs
)
from task_queue.validation import DatasetValidationTask

Expand Down Expand Up @@ -89,7 +90,9 @@ def add_activity_to_solr(activity_id):
# This task 'automatically' does a complete parse and index.
# Meaning the different steps that were previously manual are all included.
@shared_task
def automatic_incremental_parse(start_at=1):
def automatic_incremental_parse(start_at=1,
force=False,
check_validation=True):
"""
There are several steps that need to be taken to complete an incremental
parse/index.
Expand Down Expand Up @@ -152,77 +155,28 @@ def automatic_incremental_parse(start_at=1):
drop_old_datasets()
# STEP TWO -- End #

# STEP THREE -- DATASET VALIDATION TASK #
# Prepare checks
if start_at in (1, 2, 3):
check_validation_has_started = False
check_validation_is_active = False
check_empty_iteration_count = 0
check_empty_iteration_maximum = 3

while True:
url = "https://iativalidator.iatistandard.org/api/v1/queue/next"
response = requests.get(url, timeout=30)

# If the response is not 200, reset and check back later.
if response.status_code != 200:
check_validation_has_started = False
check_validation_is_active = False
time.sleep(60)
continue

check_content_is_empty = response.content.decode("utf-8") == ""

"""
Case 1: content empty - started = false - active = false
wait for the validator to start
Case 2: content has data - started = false - active = false
set started to true and active to true
Case 3: content has data - started = true - active = true
wait for the content to stop having data!
Case 4: content empty - started = true - active = true
with three iterations, confirm the content is actually empty!
set active to false.
"""
# if check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501
if not check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501
check_validation_has_started = True
check_validation_is_active = True
if not check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501
check_empty_iteration_count = 0
if check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501
if check_empty_iteration_count < check_empty_iteration_maximum:
check_empty_iteration_count += 1
else: # Validation has finished
break
time.sleep(60)

# Now that the "waiting for validator to finish" loop is over, we know
# The validator is finished. Run the task. To reduce complexity, reuse
# the AsyncTasksFinished table.
get_validation_results_task()

started = len(Dataset.objects.all())
await_async_subtasks(started)
# STEP THREE -- End #
# Step three
automatic_incremental_validation(start_at, check_validation)

# STEP FOUR -- PARSE ALL DATASETS #
if start_at in (1, 2, 3, 4):
# parse_all_existing_sources_task() does not actually run the parsing,
# Reusing the code here.
for dataset in Dataset.objects.all().filter(filetype=2):
parse_source_by_id_task.delay(dataset_id=dataset.id,
force=False,
check_validation=True)
force=force,
check_validation=check_validation)
for dataset in Dataset.objects.all().filter(filetype=1):
parse_source_by_id_task.delay(dataset_id=dataset.id,
force=False,
check_validation=True)
force=force,
check_validation=check_validation)
started = len(Dataset.objects.all())
await_async_subtasks(started)
# STEP FOUR -- End #

# Restart the automatic_incremental_parse asynchronously and end this task.
automatic_incremental_parse.apply_async()
automatic_incremental_parse.delay(force=force,
check_validation=check_validation)


# This task updates all of the currency exchange rates in the local database
Expand Down
Loading

0 comments on commit a3ad32e

Please sign in to comment.