Skip to content

Commit

Permalink
Merge pull request #7 from onaio/scheduling-updates
Browse files Browse the repository at this point in the history
Add schedule lock mechanism and custom scheduler
  • Loading branch information
DavisRayM authored Apr 26, 2021
2 parents b9ecb74 + 55e8ffb commit 13183da
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 35 deletions.
1 change: 1 addition & 0 deletions app/common_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
HYPER_PROCESS_CACHE_KEY = "HYPER_PROCESS"

EVENT_STATUS_SUFFIX = "-event-status"
HYPERFILE_SYNC_LOCK_PREFIX = "sync-hyperfile-"

ONADATA_TOKEN_ENDPOINT = "/o/token/"
ONADATA_FORMS_ENDPOINT = "/api/v1/forms"
Expand Down
44 changes: 43 additions & 1 deletion app/jobs/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,49 @@
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/1")
REDIS_CONN = Redis.from_url(REDIS_URL)
QUEUE = Queue(QUEUE_NAME, connection=REDIS_CONN)
SCHEDULER = Scheduler(queue=QUEUE, connection=REDIS_CONN)


class UniqueJobScheduler(Scheduler):
"""
Custom Redis Queue scheduler that only allows unique cron jobs
to be scheduled
"""

def cron(
self,
cron_string,
func,
args=None,
kwargs=None,
repeat=None,
queue_name=None,
id=None,
timeout=None,
description=None,
meta=None,
use_local_timezone=False,
depends_on=None,
):
for job in self.get_jobs():
if job.func == func and job.args == args:
return job
super(UniqueJobScheduler, self).cron(
cron_string,
func,
args=args,
kwargs=kwargs,
repeat=repeat,
queue_name=queue_name,
id=id,
timeout=timeout,
description=description,
meta=meta,
use_local_timezone=use_local_timezone,
depends_on=depends_on,
)


SCHEDULER = UniqueJobScheduler(queue=QUEUE, connection=REDIS_CONN)


def cancel_job(job_id, job_args: list = None, func_name: str = None):
Expand Down
82 changes: 48 additions & 34 deletions app/utils/onadata_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import httpx
import sentry_sdk
from redis import Redis
from redis.exceptions import LockError
from fastapi_cache import caches
from sqlalchemy.orm.session import Session
from tableauhyperapi import HyperProcess, Telemetry
Expand All @@ -17,6 +19,7 @@
ONADATA_USER_ENDPOINT,
JOB_ID_METADATA,
HYPER_PROCESS_CACHE_KEY,
HYPERFILE_SYNC_LOCK_PREFIX,
)
from app.database import SessionLocal
from app.models import HyperFile, Server, User
Expand Down Expand Up @@ -157,46 +160,57 @@ def start_csv_import_to_hyper(
hyperfile_id: int, process: HyperProcess, schedule_cron: bool = True
):
db = SessionLocal()
redis_client = Redis(
host=settings.redis_host, port=settings.redis_port, db=settings.redis_db
)
hyperfile: HyperFile = HyperFile.get(db, object_id=hyperfile_id)
job_status: str = schemas.FileStatusEnum.file_available.value
err: Exception = None

if hyperfile:
user = User.get(db, hyperfile.user)
server = Server.get(db, user.server)

hyperfile.file_status = schemas.FileStatusEnum.syncing.value
db.commit()
db.refresh(hyperfile)

try:
export = get_csv_export(hyperfile, user, server, db)

if export:
handle_csv_import_to_hyperfile(hyperfile, export, process, db)

if schedule_cron and not hyperfile.meta_data.get(JOB_ID_METADATA):
schedule_hyper_file_cron_job(
start_csv_import_to_hyper_job, hyperfile_id
)
else:
job_status = schemas.FileStatusEnum.file_unavailable.value
except (CSVExportFailure, ConnectionRequestError, Exception) as exc:
err = exc
job_status = schemas.FileStatusEnum.latest_sync_failed.value

successful_import = job_status == schemas.FileStatusEnum.file_available.value
handle_hyper_file_job_completion(
hyperfile.id,
db,
job_succeeded=successful_import,
object_updated=successful_import,
file_status=job_status,
)
db.close()
if err:
sentry_sdk.capture_exception(err)
return successful_import
with redis_client.lock(f"{HYPERFILE_SYNC_LOCK_PREFIX}{hyperfile.id}"):
user = User.get(db, hyperfile.user)
server = Server.get(db, user.server)

hyperfile.file_status = schemas.FileStatusEnum.syncing.value
db.commit()
db.refresh(hyperfile)

try:
export = get_csv_export(hyperfile, user, server, db)

if export:
handle_csv_import_to_hyperfile(hyperfile, export, process, db)

if schedule_cron and not hyperfile.meta_data.get(
JOB_ID_METADATA
):
schedule_hyper_file_cron_job(
start_csv_import_to_hyper_job, hyperfile_id
)
else:
job_status = schemas.FileStatusEnum.file_unavailable.value
except (CSVExportFailure, ConnectionRequestError, Exception) as exc:
err = exc
job_status = schemas.FileStatusEnum.latest_sync_failed.value

successful_import = (
job_status == schemas.FileStatusEnum.file_available.value
)
handle_hyper_file_job_completion(
hyperfile.id,
db,
job_succeeded=successful_import,
object_updated=successful_import,
file_status=job_status,
)
db.close()
if err:
sentry_sdk.capture_exception(err)
return successful_import
except LockError:
pass


def start_csv_import_to_hyper_job(hyperfile_id: int, schedule_cron: bool = False):
Expand Down

0 comments on commit 13183da

Please sign in to comment.