Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use threading to run jobmanager loop #614

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 61 additions & 19 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import asyncio
import contextlib
import datetime
import json
Expand Down Expand Up @@ -140,6 +141,7 @@ def __init__(
.. versionchanged:: 0.32.0
Added `cancel_running_job_after` parameter.
"""
self._stop = True
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
self._connections: Dict[str, _Backend] = {}
Expand All @@ -150,6 +152,7 @@ def __init__(
self._cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._loop_task = None

def add_backend(
self,
Expand Down Expand Up @@ -252,6 +255,43 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:

return df

def start_job_thread(self,start_job: Callable[[], BatchJob],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "thread" in naming and docs might be confusing and setting wrong expectations as asyncio is not about threading but coroutines

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's now converted to use an actual 'Thread' object, so the confusion is gone?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soxofaan if this is fine now, we can merge and continue with the other PR's

job_db: JobDatabaseInterface ):
"""
Start running the jobs in a separate thread, returns afterwards.
"""

# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
df = job_db.read()

self._stop = False

async def run_loop():

while (
df[
# TODO: risk on infinite loop if a backend reports a (non-standard) terminal status that is not covered here
(df.status != "finished")
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0 and not self._stop
):

jdries marked this conversation as resolved.
Show resolved Hide resolved
await self._job_update_loop(df, job_db, start_job)
await asyncio.sleep(self.poll_sleep)

self.loop_task = asyncio.create_task(run_loop())

def stop_job_thread(self, force_timeout_seconds = 30):
if(self._loop_task is not None):
self._stop = True
asyncio.sleep(force_timeout_seconds)
self._loop_task.cancel()

def run_jobs(
self,
df: pd.DataFrame,
Expand Down Expand Up @@ -355,28 +395,30 @@ def run_jobs(
> 0
):

with ignore_connection_errors(context="get statuses"):
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
job_db.persist(df)
asyncio.run(self._job_update_loop(df, job_db, start_job))
jdries marked this conversation as resolved.
Show resolved Hide resolved

time.sleep(self.poll_sleep)

async def _job_update_loop(self, df, job_db, start_job):
jdries marked this conversation as resolved.
Show resolved Hide resolved
with ignore_connection_errors(context="get statuses"):
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)
if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
job_db.persist(df)

def _launch_job(self, start_job, df, i, backend_name):
"""Helper method for launching jobs
Expand Down