Skip to content

Commit

Permalink
Merge pull request #2611 from zimmerman-team/chore/revertAutomaticInc…
Browse files Browse the repository at this point in the history
…rementalParse

fix: temporarily reverse automatic incremental parsing task
  • Loading branch information
sylvanr authored Jun 4, 2021
2 parents 958f9c9 + a4fd5ca commit fa31036
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 286 deletions.
8 changes: 2 additions & 6 deletions OIPA/iati_synchroniser/dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
create_publisher_organisation
)
from iati_synchroniser.models import (
Dataset, DatasetDownloadsStarted, DatasetFailedPickup, DatasetUpdateDates,
Publisher
Dataset, DatasetFailedPickup, DatasetUpdateDates, Publisher
)
from task_queue.tasks import DatasetDownloadTask

Expand Down Expand Up @@ -137,10 +136,7 @@ def update_or_create_dataset(self, dataset):
if not len(dataset['resources']) or not dataset['organization']:
return

# Add a row to the DatasetDownloadsStarted table
DatasetDownloadsStarted.objects.create()

# Pass the data generated here to the DatasetDownloadTask
# Pass the data generated here to the
DatasetDownloadTask.delay(dataset_data=dataset)

def remove_deprecated(self):
Expand Down

This file was deleted.

17 changes: 0 additions & 17 deletions OIPA/iati_synchroniser/migrations/0025_auto_20210331_1603.py

This file was deleted.

12 changes: 0 additions & 12 deletions OIPA/iati_synchroniser/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,6 @@ class DatasetUpdateDates(models.Model):
null=False, blank=True, auto_now=False)


# This model is added for the automation of the incremental parsing procedure
class DatasetDownloadsStarted(models.Model):
timestamp = models.DateTimeField(
null=False, blank=True, auto_now=True)


# This model is added for the automation of the incremental parsing procedure
class AsyncTasksFinished(models.Model):
timestamp = models.DateTimeField(
null=False, blank=True, auto_now=True)


class Codelist(models.Model):
name = models.CharField(primary_key=True, max_length=100)
description = models.TextField(max_length=1000, blank=True, null=True)
Expand Down
5 changes: 1 addition & 4 deletions OIPA/task_queue/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from django.utils.encoding import smart_text

from iati_synchroniser.models import (
AsyncTasksFinished, Dataset, DatasetFailedPickup, Publisher,
filetype_choices
Dataset, DatasetFailedPickup, Publisher, filetype_choices
)

# Get an instance of a logger
Expand Down Expand Up @@ -301,8 +300,6 @@ def run(self, dataset_data, *args, **kwargs):
'internal_url': internal_url
}
)
# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()

# URL string to save as a Dataset attribute:
return os.path.join(main_download_dir, filename)
158 changes: 2 additions & 156 deletions OIPA/task_queue/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
from iati.models import Activity, Budget, Document, DocumentLink, Result
from iati.transaction.models import Transaction
from iati_organisation.models import Organisation
from iati_synchroniser.models import (
AsyncTasksFinished, Dataset, DatasetNote, DatasetUpdateDates
)
from iati_synchroniser.models import Dataset, DatasetNote, DatasetUpdateDates
from OIPA.celery import app
from solr.activity.tasks import ActivityTaskIndexing
from solr.activity.tasks import solr as solr_activity
Expand All @@ -42,9 +40,7 @@
from solr.transaction.tasks import solr as solr_transaction
from solr.transaction_sector.tasks import solr as solr_transaction_sector
from task_queue.download import DatasetDownloadTask
from task_queue.utils import (
Tasks, await_async_subtasks, reset_automatic_incremental_parse_dbs
)
from task_queue.utils import Tasks
from task_queue.validation import DatasetValidationTask

# Get an instance of a logger
Expand Down Expand Up @@ -86,141 +82,6 @@ def add_activity_to_solr(activity_id):
# All Registered Celery tasks that are actively used
# TODO: 25-02-2020 rename tasks
#
# This task 'automatically' does a complete parse and index.
# Meaning the different steps that were previously manual are all included.
@shared_task
def automatic_incremental_parse():
"""
There are several steps that need to be taken to complete an incremental
parse/index.
1. Import datasets task (async, not able to directly tell when finished)
2. Drop old datasets task (when is this finished?)
3. Dataset validation task (when does it start? when does it finish?)
4. Parse all datasets task (when does it finish?)
Solutions:
1 - When running the 'import datasets task', separate 'download dataset
tasks' are fired. To track whether or not all of them finished - we
add a table which gets filled from the 'update_or_create_dataset'
function which in turn queues a DatasetDownloadTask. We add a table
which gets filled when the DatasetDownloadTask finishes, leading to
eventually having two tables with an equal number of rows. To make sure
it is not a false positive, check three times whether or not the
size of both tables remains the same and still matches (to catch cases
like '5 datasets found in the IATI registry and 5 datasets downloaded).
Also, we include a 'grace' filter, where we try to catch failed datasets.
If the finished datasets hasn't changed in 10 iterations, we assume they
have failed. As we expect 0 failed downloads, but historically have seen
a maximum of four datasetDownloadTasks failing, we allow a difference of
TEN at maximum.
2 - Simple, just fire the task. Not async
3 - We check whether or not the dataset validation on the validation
endpoint has started by checking their API. If this API endpoint returns
empty, it means the validation endpoint is not running. The steps to
take to ensure the validation has completed are: Check if it has started
validating datasets. If it has not started yet, wait until it has started
If it has started, wait until it no longer returns any data. Then confirm
that it has stopped returning data by checking several times in a row.
Then, we need to actually do the validation step. Start off that task,
then re-use the logic from step one: we know the number of validation task
as it is the same as the number of existing datasets.
Make the DatasetValidationTask update a table with a row, check if the
number of rows matches the number of existing datasets.
4 - We know that the parse all sources task queues a
parse_source_by_id_task for each of the existing datasets.
We can simply reuse the logic from the previous step to check which
of the datasets have been parsed, and when all async tasks finish, we have
completed the incremental parse/index. The process can then start anew.
"""
# Before starting, reset the databases we use for checks, in case they
# Still contain data.
reset_automatic_incremental_parse_dbs()

# Loop until stopped
while True:
# STEP ONE -- Import Datasets #
# Start the task
get_new_sources_from_iati_api_task()

too_many_failed = await_async_subtasks(started_not_set=False)
if too_many_failed:
return
# STEP ONE -- End #

# STEP TWO -- DROP OLD DATASETS #
drop_old_datasets()
# STEP TWO -- End #

# STEP THREE -- DATASET VALIDATION TASK #
# Prepare checks
check_validation_has_started = False
check_validation_is_active = False
check_empty_iteration_count = 0
check_empty_iteration_maximum = 3

while True:
url = "https://iativalidator.iatistandard.org/api/v1/queue/next"
response = requests.get(url, timeout=30)
response.raise_for_status()

# If the response is not 200, reset and check back later.
if response.status_code != 200:
check_validation_has_started = False
check_validation_is_active = False
time.sleep(60)
continue

check_content_is_empty = response.content.decode("utf-8") == ""

"""
Case 1: content empty - started = false - active = false
wait for the validator to start
Case 2: content has data - started = false - active = false
set started to true and active to true
Case 3: content has data - started = true - active = true
wait for the content to stop having data!
Case 4: content empty - started = true - active = true
with three iterations, confirm the content is actually empty!
set active to false.
"""
# if check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501
if not check_content_is_empty and not check_validation_has_started and not check_validation_is_active: # NOQA: E501
check_validation_has_started = True
check_validation_is_active = True
if not check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501
check_empty_iteration_count = 0
if check_content_is_empty and check_validation_has_started and check_validation_is_active: # NOQA: E501
if check_empty_iteration_count < check_empty_iteration_maximum:
check_empty_iteration_count += 1
else: # Validation has finished
break
time.sleep(60)

# Now that the "waiting for validator to finish" loop is over, we know
# The validator is finished. Run the task. To reduce complexity, reuse
# the AsyncTasksFinished table.
get_validation_results_task()

started = len(Dataset.objects.all())
await_async_subtasks(started)
# STEP THREE -- End #

# STEP FOUR -- PARSE ALL DATASETS #
# parse_all_existing_sources_task() does not actually run the parsing,
# Reusing the code here.
for dataset in Dataset.objects.all().filter(filetype=2):
parse_source_by_id_task.delay(dataset_id=dataset.id,
force=False,
check_validation=True)
for dataset in Dataset.objects.all().filter(filetype=1):
parse_source_by_id_task.delay(dataset_id=dataset.id,
force=False,
check_validation=True)
await_async_subtasks(started)
# STEP FOUR -- End #


# This task updates all of the currency exchange rates in the local database
@shared_task
def update_exchange_rates():
Expand Down Expand Up @@ -254,8 +115,6 @@ def get_validation_results_task():


# This task is used to parse a specific dataset by passing it an ID.
# For all of the different try and catches, store a AsyncTasksFinished
# for the automatic incremental parse procedure
# TODO: 25-02-2020 document this function.
@shared_task(bind=True)
def parse_source_by_id_task(self, dataset_id, force=False,
Expand All @@ -267,25 +126,14 @@ def parse_source_by_id_task(self, dataset_id, force=False,
validation_status__critical__lte=0) # NOQA: E501
dataset = dataset.first()
dataset.process(force_reparse=force)

# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
except AttributeError:
print('no dataset found')

# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
pass
else:
try:
dataset = Dataset.objects.get(pk=dataset_id)
dataset.process(force_reparse=force)

# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
except Dataset.DoesNotExist:
# Save a row to the AsyncTasksFinished table.
AsyncTasksFinished.objects.create()
pass
except Exception as exc:
raise self.retry(kwargs={'dataset_id': dataset_id, 'force': True},
Expand Down Expand Up @@ -978,8 +826,6 @@ def download_file(d):
# print str(e)
doc.document_content = document_content.decode("latin-1")
doc.save()


#
#
# @shared_task
Expand Down
60 changes: 0 additions & 60 deletions OIPA/task_queue/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
import time

from celery.task.control import inspect

from iati_synchroniser.models import (
AsyncTasksFinished, DatasetDownloadsStarted
)


class Tasks:
"""
Expand Down Expand Up @@ -84,57 +78,3 @@ def extract(obj, arr, key):

results = extract(obj, arr, key)
return results


def reset_automatic_incremental_parse_dbs():
dds = DatasetDownloadsStarted.objects.all()
dds.delete()
ddf = AsyncTasksFinished.objects.all()
ddf.delete()


# Await asynchronous subtasks from other tasks. Started is the number of
# elements that are expected to be in
def await_async_subtasks(started=-1, started_not_set=True):
check_iteration_count = 0
check_iteration_maximum = 3
check_previous_finished_length = 0
check_grace_iteration_count = 0
check_grace_iteration_maximum = 10
check_grace_maximum_disparity = 10
while True:
# Get the size of the started datasets
if not started_not_set:
started = len(DatasetDownloadsStarted.objects.all())
finished = len(AsyncTasksFinished.objects.all())

# Check if the grace should take effect.
# Grace is when the number of failed tasks is very small but the
# number of finished tasks no longer changes. This makes sure that
# the automatic parsing does not get stuck waiting for an unfinished
# async task.
if finished == check_previous_finished_length:
check_grace_iteration_count += 1
if check_grace_iteration_count == check_grace_iteration_maximum:
if started - finished < check_grace_maximum_disparity:
break
else: # More async tasks than expected failed,
# exit automatic parsing
return True
else:
check_grace_iteration_count = 0

# Check if the async tasks are done
if started == finished:
if finished == check_previous_finished_length:
check_iteration_count += 1
if check_iteration_count == check_iteration_maximum:
break
else:
check_iteration_count = 0

# Wait a minute and check again
time.sleep(60)
check_previous_finished_length = finished
# After this while loop finishes, we clear the DatasetDownloads tables
reset_automatic_incremental_parse_dbs()
Loading

0 comments on commit fa31036

Please sign in to comment.