From a125a0607a7e47c9132d9d82eefaf1435968bf97 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Mon, 26 Apr 2021 10:17:30 +0300 Subject: [PATCH 1/2] Modify start_csv_import_to_hyper function - Stop sync process from running if lock is not acquired - Add HYPERFILE_SYNC_LOCK_PREFIX common tag --- app/common_tags.py | 1 + app/utils/onadata_utils.py | 82 ++++++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/app/common_tags.py b/app/common_tags.py index 37bb537..e92b823 100644 --- a/app/common_tags.py +++ b/app/common_tags.py @@ -2,6 +2,7 @@ HYPER_PROCESS_CACHE_KEY = "HYPER_PROCESS" EVENT_STATUS_SUFFIX = "-event-status" +HYPERFILE_SYNC_LOCK_PREFIX = "sync-hyperfile-" ONADATA_TOKEN_ENDPOINT = "/o/token/" ONADATA_FORMS_ENDPOINT = "/api/v1/forms" diff --git a/app/utils/onadata_utils.py b/app/utils/onadata_utils.py index b55aed1..24992a9 100644 --- a/app/utils/onadata_utils.py +++ b/app/utils/onadata_utils.py @@ -6,6 +6,8 @@ import httpx import sentry_sdk +from redis import Redis +from redis.exceptions import LockError from fastapi_cache import caches from sqlalchemy.orm.session import Session from tableauhyperapi import HyperProcess, Telemetry @@ -17,6 +19,7 @@ ONADATA_USER_ENDPOINT, JOB_ID_METADATA, HYPER_PROCESS_CACHE_KEY, + HYPERFILE_SYNC_LOCK_PREFIX, ) from app.database import SessionLocal from app.models import HyperFile, Server, User @@ -157,46 +160,57 @@ def start_csv_import_to_hyper( hyperfile_id: int, process: HyperProcess, schedule_cron: bool = True ): db = SessionLocal() + redis_client = Redis( + host=settings.redis_host, port=settings.redis_port, db=settings.redis_db + ) hyperfile: HyperFile = HyperFile.get(db, object_id=hyperfile_id) job_status: str = schemas.FileStatusEnum.file_available.value err: Exception = None if hyperfile: - user = User.get(db, hyperfile.user) - server = Server.get(db, user.server) - - hyperfile.file_status = schemas.FileStatusEnum.syncing.value - db.commit() - db.refresh(hyperfile) - try: - export = get_csv_export(hyperfile, user, server, db) - - if export: - handle_csv_import_to_hyperfile(hyperfile, export, process, db) - - if schedule_cron and not hyperfile.meta_data.get(JOB_ID_METADATA): - schedule_hyper_file_cron_job( - start_csv_import_to_hyper_job, hyperfile_id - ) - else: - job_status = schemas.FileStatusEnum.file_unavailable.value - except (CSVExportFailure, ConnectionRequestError, Exception) as exc: - err = exc - job_status = schemas.FileStatusEnum.latest_sync_failed.value - - successful_import = job_status == schemas.FileStatusEnum.file_available.value - handle_hyper_file_job_completion( - hyperfile.id, - db, - job_succeeded=successful_import, - object_updated=successful_import, - file_status=job_status, - ) - db.close() - if err: - sentry_sdk.capture_exception(err) - return successful_import + with redis_client.lock(f"{HYPERFILE_SYNC_LOCK_PREFIX}{hyperfile.id}"): + user = User.get(db, hyperfile.user) + server = Server.get(db, user.server) + + hyperfile.file_status = schemas.FileStatusEnum.syncing.value + db.commit() + db.refresh(hyperfile) + + try: + export = get_csv_export(hyperfile, user, server, db) + + if export: + handle_csv_import_to_hyperfile(hyperfile, export, process, db) + + if schedule_cron and not hyperfile.meta_data.get( + JOB_ID_METADATA + ): + schedule_hyper_file_cron_job( + start_csv_import_to_hyper_job, hyperfile_id + ) + else: + job_status = schemas.FileStatusEnum.file_unavailable.value + except (CSVExportFailure, ConnectionRequestError, Exception) as exc: + err = exc + job_status = schemas.FileStatusEnum.latest_sync_failed.value + + successful_import = ( + job_status == schemas.FileStatusEnum.file_available.value + ) + handle_hyper_file_job_completion( + hyperfile.id, + db, + job_succeeded=successful_import, + object_updated=successful_import, + file_status=job_status, + ) + db.close() + if err: + sentry_sdk.capture_exception(err) + return successful_import + except LockError: + pass def start_csv_import_to_hyper_job(hyperfile_id: int, schedule_cron: bool = False): From 55e8ffb5b8f27c34d23aa53f5bd4f36885b00482 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Mon, 26 Apr 2021 10:20:31 +0300 Subject: [PATCH 2/2] Add custom UniqueJobScheduler - Add custom scheduler class --- app/jobs/scheduler.py | 44 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/app/jobs/scheduler.py b/app/jobs/scheduler.py index 9e30a59..87c0b62 100644 --- a/app/jobs/scheduler.py +++ b/app/jobs/scheduler.py @@ -12,7 +12,49 @@ REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/1") REDIS_CONN = Redis.from_url(REDIS_URL) QUEUE = Queue(QUEUE_NAME, connection=REDIS_CONN) -SCHEDULER = Scheduler(queue=QUEUE, connection=REDIS_CONN) + + +class UniqueJobScheduler(Scheduler): + """ + Custom Redis Queue scheduler that only allows unique cron jobs + to be scheduled + """ + + def cron( + self, + cron_string, + func, + args=None, + kwargs=None, + repeat=None, + queue_name=None, + id=None, + timeout=None, + description=None, + meta=None, + use_local_timezone=False, + depends_on=None, + ): + for job in self.get_jobs(): + if job.func == func and job.args == args: + return job + super(UniqueJobScheduler, self).cron( + cron_string, + func, + args=args, + kwargs=kwargs, + repeat=repeat, + queue_name=queue_name, + id=id, + timeout=timeout, + description=description, + meta=meta, + use_local_timezone=use_local_timezone, + depends_on=depends_on, + ) + + +SCHEDULER = UniqueJobScheduler(queue=QUEUE, connection=REDIS_CONN) def cancel_job(job_id, job_args: list = None, func_name: str = None):