diff --git a/run_local.sh b/run_local.sh index 5fe2f2e56..74bee6daf 100755 --- a/run_local.sh +++ b/run_local.sh @@ -17,6 +17,7 @@ export DIRACX_CONFIG_BACKEND_URL="git+file://${tmp_dir}/cs_store/initialRepo" export DIRACX_DB_URL_AUTHDB="sqlite+aiosqlite:///:memory:" export DIRACX_DB_URL_JOBDB="sqlite+aiosqlite:///:memory:" export DIRACX_DB_URL_JOBLOGGINGDB="sqlite+aiosqlite:///:memory:" +export DIRACX_DB_URL_TASKQUEUEDB="sqlite+aiosqlite:///:memory:" export DIRACX_SERVICE_AUTH_TOKEN_KEY="file://${tmp_dir}/signing-key/rs256.key" export DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS='["http://'$(hostname| tr -s '[:upper:]' '[:lower:]')':8000/docs/oauth2-redirect"]' diff --git a/setup.cfg b/setup.cfg index f5d5c3532..f1565d9fa 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,6 +40,7 @@ diracx.dbs = AuthDB = diracx.db:AuthDB JobDB = diracx.db:JobDB JobLoggingDB = diracx.db:JobLoggingDB + TaskQueueDB = diracx.db:TaskQueueDB #DummyDB = diracx.db:DummyDB diracx.services = jobs = diracx.routers.job_manager:router diff --git a/src/diracx/cli/internal.py b/src/diracx/cli/internal.py index 75b9ab0c8..855a3b7dd 100644 --- a/src/diracx/cli/internal.py +++ b/src/diracx/cli/internal.py @@ -48,9 +48,7 @@ def generate_cs( DefaultGroup=user_group, Users={}, Groups={ - user_group: GroupConfig( - JobShare=None, Properties=["NormalUser"], Quota=None, Users=[] - ) + user_group: GroupConfig(Properties=["NormalUser"], Quota=None, Users=[]) }, ) config = Config( diff --git a/src/diracx/core/config/schema.py b/src/diracx/core/config/schema.py index 8f3470e05..eee65af90 100644 --- a/src/diracx/core/config/schema.py +++ b/src/diracx/core/config/schema.py @@ -5,7 +5,7 @@ from typing import Any, Optional from pydantic import BaseModel as _BaseModel -from pydantic import EmailStr, PrivateAttr, root_validator +from pydantic import EmailStr, PrivateAttr, root_validator, validator from ..properties import SecurityProperty @@ -49,7 +49,7 @@ class GroupConfig(BaseModel): AutoAddVOMS: bool = False AutoUploadPilotProxy: bool = False AutoUploadProxy: bool = False - JobShare: Optional[int] + JobShare: int = 1000 Properties: list[SecurityProperty] Quota: Optional[int] Users: list[str] @@ -86,9 +86,35 @@ class JobMonitoringConfig(BaseModel): useESForJobParametersFlag: bool = False +class JobSchedulingConfig(BaseModel): + EnableSharesCorrection: bool = False + taskQueueCPUTimeIntervals: list[int] = [ + 6 * 60, + 30 * 60, + 1 * 3600, + 6 * 3600, + 12 * 3600, + 1 * 86400, + 2 * 86400, + 3 * 86400, + 4 * 86400, + 6 * 86400, + 8 * 86400, + 10 * 86400, + int(12.5 * 86400), + ] + + @validator("taskQueueCPUTimeIntervals") + def check_taskQueueCPUTimeIntervals(cls, v): + assert len(v) >= 1 + assert all(x > 0 for x in v) + return sorted(v) + + class ServicesConfig(BaseModel): Catalogs: dict[str, Any] | None JobMonitoring: JobMonitoringConfig = JobMonitoringConfig() + JobScheduling: JobSchedulingConfig = JobSchedulingConfig() class OperationsConfig(BaseModel): diff --git a/src/diracx/core/models.py b/src/diracx/core/models.py index fa6048011..7402ba29a 100644 --- a/src/diracx/core/models.py +++ b/src/diracx/core/models.py @@ -3,9 +3,11 @@ from datetime import datetime from enum import Enum from typing import Literal, TypedDict +from uuid import UUID from pydantic import BaseModel, Field +from diracx.core.properties import SecurityProperty from diracx.core.utils import JobStatus @@ -82,4 +84,36 @@ class SetJobStatusReturn(BaseModel): last_update_time: datetime | None = Field(alias="LastUpdateTime") +class tqDefDict(BaseModel): + cpu_time: int = Field(alias="CPUTime") + owner: str = Field(alias="Owner") + owner_group: str = Field(alias="OwnerGroup") + sites: list[str] = Field(alias="Sites") + grid_ces: list[str] = Field(alias="GridCEs") + banned_sites: list[str] = Field(alias="BannedSites") + platforms: list[str] = Field(alias="Platforms") + job_types: list[str] = Field(alias="JobTypes") + tags: list[str] = Field(alias="Tags") + + +class AuthInfo(BaseModel): + # raw token for propagation + bearer_token: str + + # token ID in the DB for Component + # unique jwt identifier for user + token_id: UUID + + # list of DIRAC properties + properties: list[SecurityProperty] + + +class UserInfo(AuthInfo): + # dirac generated vo:sub + sub: str + preferred_username: str + dirac_group: str + vo: str + + SearchSpec = ScalarSearchSpec | VectorSearchSpec diff --git a/src/diracx/db/__init__.py b/src/diracx/db/__init__.py index 7e8a3c85f..06dada71e 100644 --- a/src/diracx/db/__init__.py +++ b/src/diracx/db/__init__.py @@ -1,6 +1,6 @@ -__all__ = ("AuthDB", "JobDB", "JobLoggingDB") +__all__ = ("AuthDB", "JobDB", "JobLoggingDB", "TaskQueueDB") from .auth.db import AuthDB -from .jobs.db import JobDB, JobLoggingDB +from .jobs.db import JobDB, JobLoggingDB, TaskQueueDB # from .dummy.db import DummyDB diff --git a/src/diracx/db/jobs/db.py b/src/diracx/db/jobs/db.py index c84236f6d..e558f737d 100644 --- a/src/diracx/db/jobs/db.py +++ b/src/diracx/db/jobs/db.py @@ -7,18 +7,30 @@ from sqlalchemy import delete, func, insert, select, update from sqlalchemy.exc import NoResultFound +from diracx.core import properties +from diracx.core.config.schema import Config from diracx.core.exceptions import InvalidQueryError from diracx.core.models import JobStatusReturn, LimitedJobStatusReturn +from diracx.core.properties import JOB_SHARING from diracx.core.utils import JobStatus from ..utils import BaseDB, apply_search_filters from .schema import ( + BannedSitesQueue, + GridCEsQueue, InputData, JobDBBase, JobJDLs, JobLoggingDBBase, Jobs, + JobsQueue, + JobTypesQueue, LoggingInfo, + PlatformsQueue, + SitesQueue, + TagsQueue, + TaskQueueDBBase, + TaskQueues, ) @@ -417,3 +429,549 @@ async def get_wms_time_stamps(self, job_id): result[event] = str(etime + MAGIC_EPOC_NUMBER) return result + + +class TaskQueueDB(BaseDB): + metadata = TaskQueueDBBase.metadata + + # async def enable_all_queues(self): + # """Enable all task queues""" + # stmt = update(TaskQueues).values(Enabled=True) + # await self.conn.execute(stmt) + + # async def get_groups_in_task_queues(self): + # stmt = select(TaskQueues.OwnerGroup).distinct() + # return [x[0] async for x in (await self.conn.stream(stmt))] + + # async def fit_cpu_time_to_segments( + # self, cpu_time, config: Config, user_info: UserInfo + # ): + # """Fit the CPU time to the valid segment""" + # # TODO: this part has been quite heavily modified, so a solid review is needed here (or a test) + # cpu_segments = config.Operations[ + # user_info.vo + # ].Services.JobScheduling.taskQueueCPUTimeIntervals + + # # Map to a segment + # for cpu_segment in cpu_segments: + # if cpu_time <= cpu_segment: + # return cpu_segment + + # # We return the last segment if the CPU time is bigger than the last segment + # return cpu_segments[-1] + + # async def _check_task_queue_definition(self, tq_def_dict): + # """Check the task queue definition""" + # # TODO: Maybe a pydantic model could be used here ? (and in this whole class in general) + # # This could be done later on though, maybe to ease the transition from the DIRAC code + # # by not making to many changes at once + # from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import singleValueDefFields + + # for field in singleValueDefFields: + # if field not in tq_def_dict: + # raise InvalidQueryError( + # f"Missing field {field} in task queue definition" + # ) + + # async def __create_task_queue( + # self, tqDefDict, config: Config, user_info: UserInfo, priority=1 + # ): + # # TODO: this could be a pydantic validator + # tqDefDict["CPUTime"] = self.fit_cpu_time_to_segments( + # tqDefDict["CPUTime"], config, user_info + # ) + + # # for field in singleValueDefFields: + # # sqlSingleFields.append(field) + # # sqlValues.append(tqDefDict[field]) + # # # Insert the TQ Disabled + # # sqlSingleFields.append("Enabled") + # # sqlValues.append("0") + # # cmd = "INSERT INTO tq_TaskQueues ( {} ) VALUES ( {} )".format( + # # ", ".join(sqlSingleFields), + # # ", ".join([str(v) for v in sqlValues]), + # # ) + # # result = self._update(cmd, conn=connObj) + # # if not result["OK"]: + # # self.log.error("Can't insert TQ in DB", result["Message"]) + # # return result + # # if "lastRowId" in result: + # # tqId = result["lastRowId"] + + # stmt = insert(TaskQueues).values( + # Owner=tqDefDict["Owner"], + # OwnerGroup=tqDefDict["OwnerGroup"], + # CPUTime=tqDefDict["CPUTime"], + # Priority=priority, + # Enabled=False, + # ) + # rows = await self.conn.execute(stmt) + # tq_id = rows.lastrowid + + # for table, field in { + # (SitesQueue, "Sites"), + # (GridCEsQueue, "GridCEs"), + # (BannedSitesQueue, "BannedSites"), + # (PlatformsQueue, "Platforms"), + # (JobTypesQueue, "JobTypes"), + # (TagsQueue, "Tags"), + # }: + # if field in tqDefDict and tqDefDict[field]: + # stmt = insert(table).values( + # [ + # { + # "TQId": tq_id, + # "Value": value, + # } + # for value in tqDefDict[field] + # ] + # ) + # await self.conn.execute(stmt) + + # async def clear_orphaned_task_queues(self): + # """Delete all empty task queues""" + + # orphans_stmt = ( + # select(TaskQueues.TQId) + # .where( + # TaskQueues.Enabled + # >= 1, # TODO: experiment with is true here for easier comprehension + # ~TaskQueues.TQId.in_(select(JobsQueue.TQId)), + # ) + # .scalar_subquery() + # ) + + # stmt = delete(TaskQueues).where(TaskQueues.TQId.in_(orphans_stmt)) + # await self.conn.execute(stmt) + + # async def __set_task_queue_enabled(self, tqId: int, enabled=True): + # """Enable or disable a task queue""" + + # stmt = update(TaskQueues).where(TaskQueues.TQId == tqId).values(Enabled=enabled) + # await self.conn.execute(stmt) + # return enabled + + # async def __hack_job_priority(self, job_priority: int): + # # This could also be a pydantic validator + # if job_priority < 1: + # job_priority = 1 + # elif job_priority > 10: + # job_priority = 10 + + # return job_priority + + # async def insert_job( + # self, + # job_id: int, + # tqDefDict: dict, + # job_priority: int, + # config: Config, + # user_info: UserInfo, + # skipTQDefCheck=False, + # ) -> int: + # """Insert a job in a task queue (creating one if it doesn't exit) + + # :param jobId: job ID + # :param dict tqDefDict: dict for TQ definition + # :param int jobPriority: integer that defines the job priority + + # :returns: tq_id + # """ + + # # TODO: make this a pydantic model, in the end it's going to be easier to transition this way + # # if not skipTQDefCheck: + # # tqDefDict = dict(tqDefDict) + # # retVal = self._checkTaskQueueDefinition(tqDefDict) + # # if not retVal["OK"]: + # # self.log.error("TQ definition check failed", retVal["Message"]) + # # return retVal + # # tqDefDict = retVal["Value"] + # # tqDefDict["CPUTime"] = self.fitCPUTimeToSegments(tqDefDict["CPUTime"]) + + # tq_info = await self.__find_and_disable_task_queue(tqDefDict) + # newTQ = False + # if not tq_info["found"]: + # tq_id = await self.__create_task_queue(tqDefDict, config, user_info) + # newTQ = True + # else: + # tq_id = tq_info["tqId"] + # try: + # await self.__insert_job_in_task_queue(job_id, tq_id, job_priority) + # if newTQ: + # await self.recalculate_tq_shares_for_entity( + # user_info.sub, user_info.dirac_group, user_info.vo, config + # ) + # finally: + # await self.__set_task_queue_enabled(tq_id, True) + + # return tq_id + + # async def __insert_job_in_task_queue( + # self, job_id: int, tq_id: int, job_priority: int + # ): + # """Insert a job in a task queue + + # :param int jobId: job ID + # :param int tqId: task queue ID + # :param int jobPriority: integer that defines the job priority + # :returns: S_OK() / S_ERROR + # """ + # # TODO: this part has also been modified because we assert that job_priority is an int + # hackedPriority = await self.__hack_job_priority(job_priority) + # insert_stmt = insert(JobsQueue).values( + # JobId=job_id, + # TQId=tq_id, + # Priority=job_priority, + # RealPriority=hackedPriority, + # ) + + # try: + # await self.conn.execute(insert_stmt) + # except IntegrityError: + # update_stmt = ( + # update(JobsQueue) + # .where(JobsQueue.JobId == job_id) + # .values( + # TQId=tq_id, + # Priority=job_priority, + # RealPriority=hackedPriority, + # ) + # ) + # await self.conn.execute(update_stmt) + + # async def __find_and_disable_task_queue(self, tqDefDict): + # """Disable and find TQ + + # :param dict tqDefDict: dict for TQ definition + # """ + # data = await self.__find_smallest_task_queue(tqDefDict) + # if data["found"] and data["enabled"]: + # await self.__set_task_queue_enabled(data["tqId"], False) + # return data + + # async def __find_smallest_task_queue(self, tqDefDict): + # """Find a task queue that has at least the given requirements + + # :param dict tqDefDict: dict for TQ definition + # """ + # stmt = ( + # select(func.count(JobsQueue.JobId), TaskQueues.TQId, TaskQueues.Enabled) + # .select_from(TaskQueues.join(JobsQueue, TaskQueues.TQId == JobsQueue.TQId)) + # .where(TaskQueues.CPUTime == tqDefDict["CPUTime"]) + # .where(TaskQueues.Owner == tqDefDict["Owner"]) + # .where(TaskQueues.OwnerGroup == tqDefDict["OwnerGroup"]) + # .group_by(JobsQueue.TQId) + # .order_by(func.count(JobsQueue.JobId).asc()) + # ) + # for table, field in { + # (SitesQueue, "Sites"), + # (GridCEsQueue, "GridCEs"), + # (BannedSitesQueue, "BannedSites"), + # (PlatformsQueue, "Platforms"), + # (JobTypesQueue, "JobTypes"), + # (TagsQueue, "Tags"), + # }: + # if field in tqDefDict and tqDefDict[field]: + # # TODO: move the following line in a pydantic validator + # # value_list = [value.strip() for value in tqDefDict["Sites"]] if value.strip()] + # # TODO: I slightly modified the code here, so if it doesn't work, check the original code + # stmt = ( + # stmt.select_from( + # TaskQueues.join(table, TaskQueues.TQId == table.TQId) + # ) + # .where( + # select(func.count(table.Value)) + # .where(table.TQId == TaskQueues.TQId) + # .where(table.Value.in_(tqDefDict["Sites"])) + # == len(tqDefDict[field]) + # ) + # .group_by(table.TQId) + # ) + # else: + # stmt = stmt.where( + # ~table.TQId.in_(select(table.TQId).distinct().scalar_subquery()) + # ) + + # rows = await self.conn.execute(stmt) + # # TODO: maybe consider using one() here to raise NoResultFound instead of returning {"found": False} + # # (rows[0] >= 5000 could also raise a exception here, maybe the same one ?) + # if ( + # not rows or rows[0][0] >= 5000 + # ): # TODO: magic value here, maybe this should be in the config + # return {"found": False} + # return { + # "found": True, + # "tqId": rows[0][1], + # "enabled": rows[0][2], + # "jobs": rows[0][0], + # } + + # async def match_and_get_job( + # self, numJobsPerTry=50, numQueuesPerTry=10, negativeCond=None + # ): + # """ + # Match a job to a task queue + # """ + # res = await self.match_and_get_task_queue( + # tq_match_dict, numQueuesToGet=1, negativeCond=negativeCond + # ) + # if not res: + # return None + # return await self.get_job_from_task_queue(res[0][0]) + + async def delete_job(self, job_id: int): + """ + Delete a job from the task queues + """ + stmt = ( + delete(JobsQueue).where(JobsQueue.JobId == job_id).returning(JobsQueue.TQId) + ) + return await self.conn.execute(stmt) + + async def delete_task_queue_if_empty(self, tq_id: int, config: Config): + """ + Try to delete a task queue if ideletets empty + The config parameter is needed to get the VO, but this is just a hack + and should be refactored + """ + stmt = ( + select(TaskQueues.Owner, TaskQueues.OwnerGroup) + .where( + TaskQueues.Enabled >= 1 + ) # TODO: refactor to "is true" for easier comprehension + .where(TaskQueues.TQId == tq_id) + .where(~TaskQueues.TQId.in_(select(JobsQueue.TQId))) + ) + try: + tq_owner, tq_owner_group = (await self.conn.execute(stmt)).one() + except NoResultFound: + return + + # Deleting the task queue (the other tables will be deleted in cascade) + stmt = delete(TaskQueues).where(TaskQueues.TQId == tq_id) + await self.conn.execute(stmt) + + # TODO: get the VO here, this might be very tricky without changing the db schema + for vo in config.Registry: + if tq_owner_group in [group for group in config.Registry[vo].Groups]: + tq_vo = vo + + await self.recalculate_tq_shares_for_entity( + tq_owner, tq_owner_group, tq_vo, config + ) + + async def recalculate_tq_shares_for_entity(self, owner, group, vo, config: Config): + """ + Recalculate the shares for a user/userGroup combo + """ + share = float(config.Registry[vo].Groups[group].JobShare) + if JOB_SHARING in config.Registry[vo].Groups[group].Properties: + # If group has JobSharing just set prio for that entry, user is irrelevant + return await self.__set_priorities_for_entity( + config, owner, group, vo, share + ) + + stmt = ( + select(TaskQueues.Owner, func.count(TaskQueues.Owner)) + .where(TaskQueues.OwnerGroup == group) + .group_by(TaskQueues.Owner) + ) + rows = await self.conn.execute(stmt) + # make the rows a list of tuples + # Get owners in this group and the amount of times they appear + # TODO: I guess the rows are already a list of tupes + # maybe refactor + data = [(r[0], r[1]) for r in rows if r] + numOwners = len(data) + # If there are no owners do now + if numOwners == 0: + return + # Split the share amongst the number of owners + share /= numOwners + entities_shares = {row[0]: share for row in data} + + # TODO: implement the following + # If corrector is enabled let it work it's magic + # if config.Operations[vo].Services.JobScheduling.EnableSharesCorrection: + # entities_shares = await self.__shares_corrector.correct_shares( + # entitiesShares, group=group + # ) + + # Keep updating + owners = dict(data) + # IF the user is already known and has more than 1 tq, the rest of the users don't need to be modified + # (The number of owners didn't change) + if owner in owners and owners[owner] > 1: + await self.__set_priorities_for_entity( + config, + owner, + group, + vo, + entities_shares[owner], + ) + return + # Oops the number of owners may have changed so we recalculate the prio for all owners in the group + for owner in owners: + await self.__set_priorities_for_entity( + config, owner, group, vo, entities_shares[owner] + ) + pass + + async def __set_priorities_for_entity( + self, config: Config, owner: str, group: str, vo: str, share + ): + """ + Set the priority for a user/userGroup combo given a splitted share + """ + from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import ( + TQ_MIN_SHARE, + priorityIgnoredFields, + ) + + stmt = ( + select( + TaskQueues.TQId, + func.sum(JobsQueue.RealPriority) / func.count(JobsQueue.RealPriority), + ) + # TODO: uncomment me and understand why mypy is unhappy with join here and not elsewhere + # .select_from(TaskQueues.join(JobsQueue, TaskQueues.TQId == JobsQueue.TQId)) + .where(TaskQueues.OwnerGroup == group).group_by(TaskQueues.TQId) + ) + if properties.JOB_SHARING not in config.Registry[vo].Groups[group].Properties: + stmt = stmt.where(TaskQueues.Owner == owner) + rows = await self.conn.execute(stmt) + # Make a dict of TQId:priority + tqDict: dict[int, float] = {row[0]: row[1] for row in rows} + + if not tqDict: + return + + allowBgTQs = config.Registry[vo].Groups[group].AllowBackgroundTQs + + # TODO: one of the only place the logic could actually be encapsulated + # so refactor + + # Calculate Sum of priorities + totalPrio = 0.0 + for k in tqDict: + if tqDict[k] > 0.1 or not allowBgTQs: + totalPrio += tqDict[k] + # Update prio for each TQ + for tqId in tqDict: + if tqDict[tqId] > 0.1 or not allowBgTQs: + prio = (share / totalPrio) * tqDict[tqId] + else: + prio = TQ_MIN_SHARE + prio = max(prio, TQ_MIN_SHARE) + tqDict[tqId] = prio + + # Generate groups of TQs that will have the same prio=sum(prios) maomenos + rows = await self.retrieve_task_queues(list(tqDict)) + # TODO: check the following asumption is correct + allTQsData = rows + tqGroups: dict[str, list] = {} + for tqid in allTQsData: + tqData = allTQsData[tqid] + for field in ("Jobs", "Priority") + priorityIgnoredFields: + if field in tqData: + tqData.pop(field) + tqHash = [] + for f in sorted(tqData): + tqHash.append(f"{f}:{tqData[f]}") + tqHash = "|".join(tqHash) + if tqHash not in tqGroups: + tqGroups[tqHash] = [] + tqGroups[tqHash].append(tqid) + tqGroups = [tqGroups[td] for td in tqGroups] + + # Do the grouping + for tqGroup in tqGroups: + totalPrio = 0 + if len(tqGroup) < 2: + continue + for tqid in tqGroup: + totalPrio += tqDict[tqid] + for tqid in tqGroup: + tqDict[tqid] = totalPrio + + # Group by priorities + prioDict: dict[int, list] = {} + for tqId in tqDict: + prio = tqDict[tqId] + if prio not in prioDict: + prioDict[prio] = [] + prioDict[prio].append(tqId) + + # Execute updates + for prio, tqs in prioDict.items(): + update_stmt = ( + update(TaskQueues).where(TaskQueues.TQId.in_(tqs)).values(Priority=prio) + ) + await self.conn.execute(update_stmt) + + # async def get_matching_task_queues(self, tq_match_dict, negativeCond=False): + # """Get the info of the task queues that match a resource""" + # res = await self.match_and_get_task_queue( + # tq_match_dict, numQueuesToGet=0, negativeCond=negativeCond + # ) + # return await self.retrieve_task_queues([tqTuple[0] for tqTuple in res]) + + async def retrieve_task_queues(self, tqIdList=None): + """ + Get all the task queues + """ + if tqIdList is not None and not tqIdList: + # Empty list => Fast-track no matches + return {} + + stmt = ( + select( + TaskQueues.TQId, + TaskQueues.Priority, + func.count(JobsQueue.TQId).label("Jobs"), + TaskQueues.Owner, + TaskQueues.OwnerGroup, + TaskQueues.CPUTime, + ) + .select_from(TaskQueues.join(JobsQueue, TaskQueues.TQId == JobsQueue.TQId)) + .select_from( + TaskQueues.join(SitesQueue, TaskQueues.TQId == SitesQueue.TQId) + ) + .select_from( + TaskQueues.join(GridCEsQueue, TaskQueues.TQId == GridCEsQueue.TQId) + ) + .group_by( + TaskQueues.TQId, + TaskQueues.Priority, + TaskQueues.Owner, + TaskQueues.OwnerGroup, + TaskQueues.CPUTime, + ) + ) + if tqIdList is not None: + stmt = stmt.where(TaskQueues.TQId.in_(tqIdList)) + + tqData = dict(row._mapping for row in await self.conn.execute(stmt)) + # TODO: the line above should be equivalent to the following commented code, check this is the case + # for record in rows: + # tqId = record[0] + # tqData[tqId] = { + # "Priority": record[1], "Jobs": record[2], "Owner": record[3], "OwnerGroup": record[4], "CPUTime": record[5] + # } + + for tqId in tqData: + # TODO: maybe factorize this handy tuple list + for table, field in { + (SitesQueue, "Sites"), + (GridCEsQueue, "GridCEs"), + (BannedSitesQueue, "BannedSites"), + (PlatformsQueue, "Platforms"), + (JobTypesQueue, "JobTypes"), + (TagsQueue, "Tags"), + }: + stmt = select(table.Value).where(table.TQId == tqId) + tqData[tqId][field] = list( + row[0] for row in await self.conn.execute(stmt) + ) + + return tqData diff --git a/src/diracx/db/jobs/schema.py b/src/diracx/db/jobs/schema.py index 49f6a0a4b..71959e434 100644 --- a/src/diracx/db/jobs/schema.py +++ b/src/diracx/db/jobs/schema.py @@ -1,7 +1,9 @@ import sqlalchemy.types as types from sqlalchemy import ( + Boolean, DateTime, Enum, + Float, ForeignKey, ForeignKeyConstraint, Index, @@ -17,6 +19,7 @@ JobDBBase = declarative_base() JobLoggingDBBase = declarative_base() +TaskQueueDBBase = declarative_base() class EnumBackedBool(types.TypeDecorator): @@ -199,3 +202,98 @@ class LoggingInfo(JobLoggingDBBase): StatusTimeOrder = Column(Numeric(precision=12, scale=3), default=0) StatusSource = Column(String(32), default="Unknown") __table_args__ = (PrimaryKeyConstraint("JobID", "SeqNum"),) + + +class TaskQueues(TaskQueueDBBase): + __tablename__ = "tq_TaskQueues" + TQId = Column(Integer, primary_key=True) + Owner = Column(String(255), nullable=False) + OwnerDN = Column(String(255)) + OwnerGroup = Column(String(32), nullable=False) + CPUTime = Column(Integer, nullable=False) + Priority = Column(Float, nullable=False) + Enabled = Column(Boolean, nullable=False, default=0) + __table_args__ = (Index("TQOwner", "Owner", "OwnerGroup", "CPUTime"),) + + +class JobsQueue(TaskQueueDBBase): + __tablename__ = "tq_Jobs" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + JobId = Column(Integer, primary_key=True) + Priority = Column(Integer, nullable=False) + RealPriority = Column(Float, nullable=False) + __table_args__ = (Index("TaskIndex", "TQId"),) + + +class SitesQueue(TaskQueueDBBase): + __tablename__ = "tq_TQToSites" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + Value = Column(String(64), primary_key=True) + __table_args__ = ( + Index("SitesTaskIndex", "TQId"), + Index("SitesIndex", "Value"), + ) + + +class GridCEsQueue(TaskQueueDBBase): + __tablename__ = "tq_TQToGridCEs" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + Value = Column(String(64), primary_key=True) + __table_args__ = ( + Index("GridCEsTaskIndex", "TQId"), + Index("GridCEsValueIndex", "Value"), + ) + + +class BannedSitesQueue(TaskQueueDBBase): + __tablename__ = "tq_TQToBannedSites" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + Value = Column(String(64), primary_key=True) + __table_args__ = ( + Index("BannedSitesTaskIndex", "TQId"), + Index("BannedSitesValueIndex", "Value"), + ) + + +class PlatformsQueue(TaskQueueDBBase): + __tablename__ = "tq_TQToPlatforms" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + Value = Column(String(64), primary_key=True) + __table_args__ = ( + Index("PlatformsTaskIndex", "TQId"), + Index("PlatformsValueIndex", "Value"), + ) + + +class JobTypesQueue(TaskQueueDBBase): + __tablename__ = "tq_TQToJobTypes" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + Value = Column(String(64), primary_key=True) + __table_args__ = ( + Index("JobTypesTaskIndex", "TQId"), + Index("JobTypesValueIndex", "Value"), + ) + + +class TagsQueue(TaskQueueDBBase): + __tablename__ = "tq_TQToTags" + TQId = Column( + Integer, ForeignKey("tq_TaskQueues.TQId", ondelete="CASCADE"), primary_key=True + ) + Value = Column(String(64), primary_key=True) + __table_args__ = ( + Index("TagsTaskIndex", "TQId"), + Index("TagsValueIndex", "Value"), + ) diff --git a/src/diracx/routers/auth.py b/src/diracx/routers/auth.py index ac88c4ad4..03445a989 100644 --- a/src/diracx/routers/auth.py +++ b/src/diracx/routers/auth.py @@ -7,7 +7,7 @@ import secrets from datetime import datetime, timedelta from typing import Annotated, Literal, TypedDict -from uuid import UUID, uuid4 +from uuid import uuid4 import httpx from authlib.integrations.starlette_client import OAuthError @@ -32,6 +32,7 @@ ExpiredFlowError, PendingAuthorizationError, ) +from diracx.core.models import UserInfo from diracx.core.properties import SecurityProperty, UnevaluatedProperty from diracx.core.settings import ServiceSettingsBase, TokenSigningKey from diracx.db.auth.schema import FlowStatus @@ -146,26 +147,6 @@ async def parse_id_token(config, vo, raw_id_token: str, audience: str): return token -class AuthInfo(BaseModel): - # raw token for propagation - bearer_token: str - - # token ID in the DB for Component - # unique jwt identifier for user - token_id: UUID - - # list of DIRAC properties - properties: list[SecurityProperty] - - -class UserInfo(AuthInfo): - # dirac generated vo:sub - sub: str - preferred_username: str - dirac_group: str - vo: str - - async def verify_dirac_token( authorization: Annotated[str, Depends(oidc_scheme)], settings: AuthSettings, diff --git a/src/diracx/routers/dependencies.py b/src/diracx/routers/dependencies.py index 17d4cf830..8f4211f2a 100644 --- a/src/diracx/routers/dependencies.py +++ b/src/diracx/routers/dependencies.py @@ -5,6 +5,7 @@ "AuthDB", "JobDB", "JobLoggingDB", + "TaskQueueDB", "add_settings_annotation", "AvailableSecurityProperties", ) @@ -19,6 +20,7 @@ from diracx.db import AuthDB as _AuthDB from diracx.db import JobDB as _JobDB from diracx.db import JobLoggingDB as _JobLoggingDB +from diracx.db import TaskQueueDB as _TaskQueueDB T = TypeVar("T") @@ -32,6 +34,7 @@ def add_settings_annotation(cls: T) -> T: AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)] JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)] JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)] +TaskQueueDB = Annotated[_TaskQueueDB, Depends(_TaskQueueDB.transaction)] # Miscellaneous Config = Annotated[_Config, Depends(ConfigSource.create)] diff --git a/src/diracx/routers/job_manager/__init__.py b/src/diracx/routers/job_manager/__init__.py index 563affa9c..8af2fc937 100644 --- a/src/diracx/routers/job_manager/__init__.py +++ b/src/diracx/routers/job_manager/__init__.py @@ -27,7 +27,7 @@ from diracx.core.utils import JobStatus from ..auth import UserInfo, has_properties, verify_dirac_token -from ..dependencies import JobDB, JobLoggingDB +from ..dependencies import JobDB, JobLoggingDB, TaskQueueDB from ..fastapi_classes import DiracxRouter MAX_PARAMETRIC_JOBS = 20 @@ -228,6 +228,10 @@ async def delete_bulk_jobs(job_ids: Annotated[list[int], Query()]): @router.post("/kill") async def kill_bulk_jobs( job_ids: Annotated[list[int], Query()], + job_db: JobDB, + job_logging_db: JobLoggingDB, + task_queue_db: TaskQueueDB, + user_info: Annotated[UserInfo, Depends(verify_dirac_token)], ): return job_ids diff --git a/tests/conftest.py b/tests/conftest.py index 9b6e30e45..f146159cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -74,6 +74,7 @@ def with_app(test_auth_settings, with_config_repo): database_urls={ "JobDB": "sqlite+aiosqlite:///:memory:", "JobLoggingDB": "sqlite+aiosqlite:///:memory:", + "TaskQueueDB": "sqlite+aiosqlite:///:memory:", "AuthDB": "sqlite+aiosqlite:///:memory:", }, config_source=ConfigSource.create_from_url( @@ -117,6 +118,7 @@ def with_config_repo(tmp_path): "Operations": {"Defaults": {}}, } ) + # TODO: check why the JobShare value is required even though a default value is set in the group config cs_file.write_text(example_cs.json()) repo.index.add([cs_file]) # add it to the index repo.index.commit("Added a new file") diff --git a/tests/db/jobs/test_taskQueueDB.py b/tests/db/jobs/test_taskQueueDB.py new file mode 100644 index 000000000..62eae885a --- /dev/null +++ b/tests/db/jobs/test_taskQueueDB.py @@ -0,0 +1,2049 @@ +# import asyncio + +# import pytest +# from sqlalchemy.exc import NoResultFound + +# from diracx.core.config.schema import Config +# from diracx.core.models import UserInfo, tqDefDict + +# # from diracx.core.config.schema import Config +# from diracx.db.jobs.db import TaskQueueDB + + +# @pytest.fixture +# async def task_queue_db(): +# job_db = TaskQueueDB("sqlite+aiosqlite:///:memory:") +# async with job_db.engine_context(): +# yield job_db + + +# async def test_insert_job(task_queue_db: TaskQueueDB, tmp_path): +# # Arrange +# config = Config.parse_file(tmp_path) +# assert config +# assert tmp_path == "azerty" +# tq_def_dict = tqDefDict(owner="userName", owner_group="myGroup", cpu_time=50000) +# tq_def_dict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} +# config = Config() +# user_info = UserInfo() + +# # Act +# tq = await task_queue_db.insert_job(123, tq_def_dict, 10, config, user_info) + +# # Assert +# assert tq == 1 + + +# async def test_retrieve_task_queues(task_queue_db: TaskQueueDB): +# tq_def_dict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} +# await task_queue_db.insert_job(1, tq_def_dict, 10) +# await task_queue_db.get_task_queue_for_job(1) +# res = await task_queue_db.retrieve_task_queues() +# assert list(res.values())[0] == { +# "Owner": "userName", +# "Jobs": 1, +# "OwnerGroup": "myGroup", +# "CPUTime": 86400, +# "Priority": 1.0, +# } + + +# async def test_chain_with_parameter(task_queue_db: TaskQueueDB): +# tq_def_dict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} + +# # first job +# await task_queue_db.insert_job(123, tq_def_dict, 10) +# tq = await task_queue_db.get_task_queue_for_job(123) +# await task_queue_db.clean_orphaned_task_queues() +# res = await task_queue_db.delete_task_queue_if_empty( +# tq +# ) # this won't delete anything +# assert res is False + +# # second job +# await task_queue_db.insert_job(125, tq_def_dict, 10) +# tq = await task_queue_db.get_task_queue_for_job(125) +# res = await task_queue_db.delete_task_queue_if_empty( +# tq +# ) # this won't delete anything, as both 123 and 125 are in +# assert res is False # but still it won't fail +# res = await task_queue_db.retrieve_task_queues() +# assert list(res.values())[0] == { +# "Owner": "userName", +# "Jobs": 2, +# "OwnerGroup": "myGroup", +# "CPUTime": 86400, +# "Priority": 1.0, +# } + +# # now we will try to delete +# await task_queue_db.delete_job(123) +# await task_queue_db.delete_job(125) +# res = await task_queue_db.delete_task_queue_if_empty( +# tq +# ) # this should now delete tq +# assert res is True +# res = await task_queue_db.retrieve_task_queues() +# assert res == {} + + +# async def test_chain_with_states(task_queue_db: TaskQueueDB): +# """put - remove with parameters including sites""" +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Sites": ["LCG.CERN.ch"], +# # } +# # result = tqDB.insertJob(201, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(201) +# # tq_job1 = result["Value"] + +# # result = tqDB.insertJob(2011, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(2011) +# # tq_job11 = result["Value"] + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Sites": ["CLOUD.IN2P3.fr"], +# # } +# # result = tqDB.insertJob(203, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(203) +# # tq_job2 = result["Value"] + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Sites": ["LCG.CERN.ch", "CLOUD.IN2P3.fr"], +# # } +# # result = tqDB.insertJob(203, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(203) +# # tq_job3 = result["Value"] + +# # # matching +# # # this should match everything +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000}, numQueuesToGet=5) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job11} + +# # # this should match those for CERN +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Site": "LCG.CERN.ch"}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job3, tq_job11} + +# # # this should match those for IN2P3 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Site": "CLOUD.IN2P3.fr"}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2, tq_job3} + +# # # this should match those for IN2P3 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Site": "CLOUD.IN2P3.fr"}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2, tq_job3} + +# # # now we will try to delete +# # for jobID in [201, 2011, 202, 203]: +# # result = tqDB.deleteJob(jobID) +# # assert result["OK"] +# # for tqID in [tq_job1, tq_job2, tq_job3]: +# # result = tqDB.deleteTaskQueueIfEmpty(tqID) +# # assert result["OK"] + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Sites": ["LCG.CERN.ch"], +# } +# await task_queue_db.insert_job(201, tq_def_dict, 10) +# tq_job1 = await task_queue_db.get_task_queue_for_job(201) + +# await task_queue_db.insert_job(2011, tq_def_dict, 10) +# tq_job11 = await task_queue_db.get_task_queue_for_job(2011) + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Sites": ["CLOUD.IN2P3.fr"], +# } +# await task_queue_db.insert_job(203, tq_def_dict, 10) +# tq_job2 = await task_queue_db.get_task_queue_for_job(203) + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Sites": ["LCG.CERN.ch", "CLOUD.IN2P3.fr"], +# } +# await task_queue_db.insert_job( +# 203, tq_def_dict, 10 +# ) # TODO/ figure out why this job is also id 203 +# # this doesn't really make sense to me rn, maybe try changing this in DIRAC to see if tests still pass +# tq_job3 = await task_queue_db.get_task_queue_for_job(203) + +# # matching +# # this should match everything +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000}, num_queues_to_get=5 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job11, +# } # TODO: check if the int cast is necessary + +# # this should match those for CERN +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Site": "LCG.CERN.ch"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job1, tq_job3, tq_job11} + +# # this should match those for IN2P3 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Site": "CLOUD.IN2P3.fr"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job2, tq_job3} + +# # deleting +# # for job_id in [201, 2011, 202, 203]: +# # await task_queue_db.delete_job(job_id) +# # for tq_id in [tq_job1, tq_job2, tq_job3]: +# # await task_queue_db.delete_task_queue_if_empty(tq_id) +# # make this async gather +# # TODO: check this is correct regarding the order +# await asyncio.gather( +# *(task_queue_db.delete_job(job_id) for job_id in [201, 2011, 202, 203]), +# ) +# await asyncio.gather( +# *( +# task_queue_db.delete_task_queue_if_empty(tq_id) +# for tq_id in [tq_job1, tq_job2, tq_job3] +# ) +# ) + + +# async def test_chain_with_banned_sites(task_queue_db: TaskQueueDB): +# # """put - remove with parameters including Banned sites""" +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "BannedSites": ["LCG.CERN.ch", "CLOUD.IN2P3.fr"], +# # } +# # result = tqDB.insertJob(127, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(127) +# # tq_job1 = result["Value"] + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "BannedSites": ["CLOUD.IN2P3.fr", "DIRAC.Test.org"], +# # } +# # result = tqDB.insertJob(128, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(128) +# # tq_job2 = result["Value"] + +# # # matching +# # # this should match everything +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2} + +# # # this should match also everything +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "centos7"}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2} + +# # # this should match the first +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Site": "DIRAC.Test.org"}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1} + +# # # this should match the second +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Site": "LCG.CERN.ch"}, numQueuesToGet=4) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2} + +# # # this should not match anything because of the banned site CLOUD.IN2P3.fr +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Site": "CLOUD.IN2P3.fr"}, numQueuesToGet=4) +# # assert result["OK"] +# # assert result["Value"] == [] + +# # result = tqDB.deleteTaskQueueIfEmpty(tq_job1) # this won't delete anything, as 127 is in +# # assert result["OK"] # but still it won't fail +# # assert result["Value"] is False +# # result = tqDB.deleteJob(127) +# # assert result["OK"] +# # result = tqDB.deleteTaskQueueIfEmpty(tq_job1) # this should now delete tq +# # assert result["OK"] + +# # result = tqDB.deleteJob(128) +# # assert result["OK"] + +# # for tqId in [tq_job1, tq_job2]: +# # result = tqDB.deleteTaskQueueIfEmpty(tqId) +# # assert result["OK"] + +# # result = tqDB.retrieveTaskQueues() +# # assert result["OK"] +# # assert result["Value"] == {} + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "BannedSites": ["LCG.CERN.ch", "CLOUD.IN2P3.fr"], +# } +# await task_queue_db.insert_job(127, tq_def_dict, 10) +# tq_job1 = await task_queue_db.get_task_queue_for_job(127) + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "BannedSites": ["CLOUD.IN2P3.fr", "DIRAC.Test.org"], +# } +# await task_queue_db.insert_job(128, tq_def_dict, 10) +# tq_job2 = await task_queue_db.get_task_queue_for_job(128) + +# # matching +# # this should match everything +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2} + +# # this should match also everything +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "centos7"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2} + +# # this should match the first +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Site": "DIRAC.Test.org"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job1} + +# # this should match the second +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Site": "LCG.CERN.ch"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job2} + +# # this should not match anything because of the banned site CLOUD.IN2P3.fr +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Site": "CLOUD.IN2P3.fr"}, num_queues_to_get=4 +# ) +# assert res == [] + +# # deleting +# res = await task_queue_db.delete_task_queue_if_empty( +# tq_job1 +# ) # this won't delete anything, as 127 is in +# assert res is False # but still it won't fail +# await task_queue_db.delete_job(127) +# res = await task_queue_db.delete_task_queue_if_empty( +# tq_job1 +# ) # this should now delete tq +# assert res is True + +# await task_queue_db.delete_job(128) + +# await asyncio.gather( +# *( +# task_queue_db.delete_task_queue_if_empty(tq_id) +# for tq_id in [tq_job1, tq_job2] +# ) +# ) + +# res = await task_queue_db.retrieve_task_queues() +# assert res == {} + + +# async def test_chain_with_platforms(task_queue_db: TaskQueueDB): +# """put - remove with parameters including a platform""" + +# # We'll try the following case +# # +# # possible platforms: slc5, slc6, centos7, debian, ubuntu +# # where: +# # - centos7 > slc6 > slc5 +# # - ubuntu > debian +# # and of course what runs on rhel family does not run on debian family + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["centos7"], +# } +# await task_queue_db.insert_job(1, tq_def_dict, 10) +# tq_job1 = await task_queue_db.get_task_queue_for_job(1) +# assert tq_job1 > 0 + +# await task_queue_db.insert_job(2, tq_def_dict, 10) +# tq_job2 = await task_queue_db.get_task_queue_for_job(2) +# assert tq_job1 == tq_job2 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["ubuntu"], +# } +# await task_queue_db.insert_job(3, tq_def_dict, 10) +# tq_job3 = await task_queue_db.get_task_queue_for_job(3) +# assert tq_job3 == tq_job1 + 1 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["centos7", "slc6"], +# } +# await task_queue_db.insert_job(4, tq_def_dict, 10) +# tq_job4 = await task_queue_db.get_task_queue_for_job(4) +# assert tq_job4 == tq_job3 + 1 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["debian", "ubuntu"], +# } +# await task_queue_db.insert_job(5, tq_def_dict, 10) +# tq_job5 = await task_queue_db.get_task_queue_for_job(5) +# assert tq_job5 == tq_job4 + 1 + +# # We should be in this situation (TQIds are obviously invented): +# # +# # select TQId, JobId FROM `tq_Jobs` +# # +--------+---------+ +# # | TQId | JobId | +# # +--------+---------| +# # | 101 | 1 | +# # | 101 | 2 | +# # | 102 | 3 | +# # | 103 | 4 | +# # | 104 | 5 | +# # +--------+---------+ +# # +# # select * FROM `tq_TQToPlatforms` +# # +--------+---------+ +# # | TQId | Value | +# # |--------+---------| +# # | 101 | centos7 | +# # | 102 | ubuntu | +# # | 103 | centos7 | +# # | 103 | slc6 | +# # | 104 | debian | +# # | 104 | ubuntu | +# # +--------+---------+ + +# # strict matching + +# # # centos7 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "centos7"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this should match one in [tq_job1, tq_job2, tq_job4] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job4} + +# # centos7 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "centos7"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2, tq_job4} + +# # # ubuntu +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "ubuntu"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this should match one in [tq_job3, tq_job5] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job3, tq_job5} + +# # ubuntu +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "ubuntu"}, num_queues_to_get=4 +# ) +# assert {int(x[0]) for x in res} == {tq_job3, tq_job5} + +# # # slc6 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "slc6"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this should match only tq_job4, as this is the only one that can run on slc6 +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4} + +# # slc6 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "slc6"}, num_queues_to_get=4 +# ) +# # this should match only tq_job4, as this is the only one that can run on slc6 +# assert {int(x[0]) for x in res} == {tq_job4} + +# # # slc5 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "slc5"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this should not match anything +# # assert result["Value"] == [] + +# # slc5 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "slc5"}, num_queues_to_get=4 +# ) +# assert res == [] + +# # compatibility matching + +# # # ANY platform +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "ANY"}, numQueuesToGet=5) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5} + +# # ANY platform +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "ANY"}, num_queues_to_get=5 +# ) +# # this should match whatever +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5} + +# # Now we insert a TQ without platform + +# # tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} +# # result = tqDB.insertJob(6, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(6) +# # tq_job6 = result["Value"] +# # assert tq_job6 == tq_job5 + 1 + +# tq_def_dict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} +# await task_queue_db.insert_job(6, tq_def_dict, 10) +# tq_job6 = await task_queue_db.get_task_queue_for_job(6) +# assert tq_job6 == tq_job5 + 1 + +# # matching for this one + +# # # ANY platform +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "ANY"}, numQueuesToGet=6) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6} + +# # ANY platform (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "ANY"}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# } + +# # # ANY platform within a list +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": ["ANY"]}, numQueuesToGet=6) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6} + +# # ANY platform within a list (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": ["ANY"]}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# } + +# # # no platform at all +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000}, numQueuesToGet=6) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6} + +# # no platform at all (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# } + +# # # slc5 -- this time it should match 1 (the one without specified platform) +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "slc5"}, numQueuesToGet=6) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job6} + +# # slc5 -- this time it should match 1 (the one without specified platform) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "slc5"}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == {tq_job6} + +# # # slc6 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "slc6"}, numQueuesToGet=6) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job6} + +# # slc6 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "slc6"}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job6} + +# # # slc5, slc6 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": ["slc5", "slc6"]}, numQueuesToGet=6) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job6} + +# # slc5, slc6 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": ["slc5", "slc6"]}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job6} + +# # # slc5, slc6, ubuntu +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": ["slc5", "slc6", "ubuntu"]}, numQueuesToGet=6) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job3, tq_job4, tq_job5, tq_job6} + +# # slc5, slc6, ubuntu +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": ["slc5", "slc6", "ubuntu"]}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == {tq_job3, tq_job4, tq_job5, tq_job6} + +# # Now we insert a TQ with platform "ANY" (same as no platform) + +# # tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platform": "ANY"} +# # result = tqDB.insertJob(7, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(7) +# # tq_job7 = result["Value"] +# # assert tq_job7 == tq_job6 # would be inserted in the same TQ + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platform": "ANY", +# } +# await task_queue_db.insert_job(7, tq_def_dict, 10) +# tq_job7 = await task_queue_db.get_task_queue_for_job(7) +# assert tq_job7 == tq_job6 # would be inserted in the same TQ + +# # matching for this one + +# # # ANY platform +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "ANY"}, numQueuesToGet=7) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6, tq_job7} + +# # ANY platform (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "ANY"}, num_queues_to_get=7 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# tq_job7, +# } + +# # # NO platform +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000}, numQueuesToGet=7) +# # assert result["OK"] +# # # this should match whatever +# # assert int(result["Value"][0][0]) in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6, tq_job7] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6, tq_job7} + +# # NO platform (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000}, num_queues_to_get=7 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# tq_job7, +# } + +# # # slc5 -- this time it should match 2 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "slc5"}, numQueuesToGet=7) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job6, tq_job7} + +# # slc5 -- this time it should match 2 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "slc5"}, num_queues_to_get=7 +# ) +# assert {int(x[0]) for x in res} == {tq_job6, tq_job7} + +# # # slc6 +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "slc6"}, numQueuesToGet=7) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job6, tq_job7} + +# # slc6 +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "slc6"}, num_queues_to_get=7 +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job6, tq_job7} + +# # # new platform appears +# # # centos8 (> centos7) +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "centos8"}, numQueuesToGet=7) +# # assert result["OK"] +# # # here, I would like to see 3 TQs matched: those for slc6 + centos7 + ANY +# # assert len(result["Value"]) == 1 +# # # but here it returns only 1 (those for ANY), by construction +# # # so, this should be in theory improved + +# # new platform appears +# # centos8 (> centos7) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "centos8"}, num_queues_to_get=7 +# ) +# # here, I would like to see 3 TQs matched: those for slc6 + centos7 + ANY +# assert len(res) == 1 +# # but here it returns only 1 (those for ANY), by construction +# # so, this should be in theory improved + +# # deleting + +# # for jobId in range(1, 8): +# # result = tqDB.deleteJob(jobId) +# # assert result["OK"] + +# # for tqId in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6, tq_job7]: +# # result = tqDB.deleteTaskQueueIfEmpty(tqId) +# # assert result["OK"] + +# # make this async gather +# await asyncio.gather( +# *(task_queue_db.delete_job(job_id) for job_id in range(1, 8)), +# ) +# await asyncio.gather( +# *( +# task_queue_db.delete_task_queue_if_empty(tq_id) +# for tq_id in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6, tq_job7] +# ) +# ) + + +# async def test_chain_with_tags(task_queue_db: TaskQueueDB): +# """put - remove with parameters including one or more Tag(s) and/or RequiredTag(s)""" + +# # We'll try the following case +# # +# # Tags: MultiProcessor, SingleProcessor, GPU +# # +# # We'll insert 5 jobs: +# # 1 : MultiProcessor +# # 2 : SingleProcessor +# # 3 : SingleProcessor, MultiProcessor +# # 4 : MultiProcessor, GPU +# # 5 : -- no tags +# # 6 : MultiProcessor, 17Processors + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Tags": ["MultiProcessor"], +# # } +# # result = tqDB.insertJob(1, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(1) +# # tq_job1 = result["Value"] +# # assert tq_job1 > 0 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Tags": ["MultiProcessor"], +# } +# await task_queue_db.insert_job(1, tq_def_dict, 10) +# tq_job1 = await task_queue_db.get_task_queue_for_job(1) +# assert tq_job1 > 0 + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Tags": ["SingleProcessor"], +# # } +# # result = tqDB.insertJob(2, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(2) +# # tq_job2 = result["Value"] +# # assert tq_job2 > tq_job1 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Tags": ["SingleProcessor"], +# } +# await task_queue_db.insert_job(2, tq_def_dict, 10) +# tq_job2 = await task_queue_db.get_task_queue_for_job(2) +# assert tq_job2 > tq_job1 + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Tags": ["SingleProcessor", "MultiProcessor"], +# # } +# # result = tqDB.insertJob(3, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(3) +# # tq_job3 = result["Value"] +# # assert tq_job3 > tq_job2 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Tags": ["SingleProcessor", "MultiProcessor"], +# } +# await task_queue_db.insert_job(3, tq_def_dict, 10) +# tq_job3 = await task_queue_db.get_task_queue_for_job(3) +# assert tq_job3 > tq_job2 + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Tags": ["MultiProcessor", "GPU"], +# # } +# # result = tqDB.insertJob(4, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(4) +# # tq_job4 = result["Value"] +# # assert tq_job4 > tq_job3 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Tags": ["MultiProcessor", "GPU"], +# } +# await task_queue_db.insert_job(4, tq_def_dict, 10) +# tq_job4 = await task_queue_db.get_task_queue_for_job(4) +# assert tq_job4 > tq_job3 + +# # tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} +# # result = tqDB.insertJob(5, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(5) +# # tq_job5 = result["Value"] +# # assert tq_job5 > tq_job4 + +# tq_def_dict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} +# await task_queue_db.insert_job(5, tq_def_dict, 10) +# tq_job5 = await task_queue_db.get_task_queue_for_job(5) +# assert tq_job5 > tq_job4 + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Tags": ["MultiProcessor", "17Processors"], +# # } +# # result = tqDB.insertJob(6, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(6) +# # tq_job6 = result["Value"] +# # assert tq_job6 > tq_job5 + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Tags": ["MultiProcessor", "17Processors"], +# } +# await task_queue_db.insert_job(6, tq_def_dict, 10) +# tq_job6 = await task_queue_db.get_task_queue_for_job(6) +# assert tq_job6 > tq_job5 + +# # We should be in this situation +# # (TQIds are obviously invented): +# # +# # mysql Dirac@localhost:TaskQueueDB> select `TQId`,`JobId` FROM `tq_Jobs` +# # +--------+---------+ +# # | TQId | JobId | +# # |--------+---------| +# # | 101 | 1 | +# # | 102 | 2 | +# # | 103 | 3 | +# # | 104 | 4 | +# # | 105 | 5 | +# # | 106 | 6 | +# # +--------+---------+ +# # +# # mysql Dirac@localhost:TaskQueueDB> select * FROM `tq_TQToTags` +# # +--------+-----------------+ +# # | TQId | Value | +# # |--------+-----------------| +# # | 101 | MultiProcessor | +# # | 102 | SingleProcessor | +# # | 103 | MultiProcessor | +# # | 103 | SingleProcessor | +# # | 104 | GPU | +# # | 104 | MultiProcessor | +# # | 106 | MultiProcessor | +# # | 106 | 17Processors | +# # +--------+-----------------+ + +# # Matching + +# # # Matching Everything with Tag = "ANY" +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "ANY"}, numQueuesToGet=6) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6} + +# # Matching Everything with Tag = "ANY" (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": "ANY"}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# } + +# # # Matching Everything with Tag = "aNy" +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "aNy"}, numQueuesToGet=6) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6} + +# # Matching Everything with Tag = "aNy" (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": "aNy"}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# } + +# # # Matching Everything with Tag contains "aNy" +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": ["MultiProcessor", "aNy"]}, numQueuesToGet=6) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6} + +# # Matching Everything with Tag contains "aNy" (this should match whatever) +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": ["MultiProcessor", "aNy"]}, num_queues_to_get=6 +# ) +# assert {int(x[0]) for x in res} == { +# tq_job1, +# tq_job2, +# tq_job3, +# tq_job4, +# tq_job5, +# tq_job6, +# } + +# # # Matching only tq_job5 when no tag is specified +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000}, numQueuesToGet=5) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job5} + +# # Matching only tq_job5 when no tag is specified +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000}, num_queues_to_get=5 +# ) +# assert {int(x[0]) for x in res} == {tq_job5} + +# # # Matching only tq_job5 when Tag = "" +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": ""}, numQueuesToGet=5) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job5} + +# # Matching only tq_job5 when Tag = "" +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": ""}, num_queues_to_get=5 +# ) +# assert {int(x[0]) for x in res} == {tq_job5} + +# # # Matching only tq_job5 when Tag = [] +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": []}, numQueuesToGet=5) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job5} + +# # Matching only tq_job5 when Tag = [] +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": []}, num_queues_to_get=5 +# ) +# assert {int(x[0]) for x in res} == {tq_job5} + +# # # Matching MultiProcessor + +# # # Tag: 'MultiProcessor' +# # # By doing this, we are basically saying that this CE is accepting ALSO MultiProcessor payloads +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "MultiProcessor"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this matches the tq_job1, as it is the only one that requires ONLY MultiProcessor, +# # # AND the tq_job5, for which we have inserted no tags +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job5} + +# # Tag: 'MultiProcessor' +# # By doing this, we are basically saying that this CE is accepting ALSO MultiProcessor payloads +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": "MultiProcessor"}, num_queues_to_get=4 +# ) +# # this matches the tq_job1, as it is the only one that requires ONLY MultiProcessor, +# # AND the tq_job5, for which we have inserted no tags +# assert {int(x[0]) for x in res} == {tq_job1, tq_job5} + +# # # Tags: ['MultiProcessor', 'GPU'] +# # By doing this, we are saying that this CE is accepting ALSO payloads that require MultiProcessor or GPU +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": ["MultiProcessor", "GPU"]}, numQueuesToGet=4) +# # assert result["OK"] +# # # this matches the tq_job1, as it requires ONLY MultiProcessor +# # # the tq_job4, as it is the only one that requires BOTH MultiProcessor and GPU, +# # # AND the tq_job5, for which we have inserted no tags +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job4, tq_job5, tq_job5} + +# # Tags: ['MultiProcessor', 'GPU'] +# # By doing this, we are saying that this CE is accepting ALSO payloads that require MultiProcessor or GPU +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": ["MultiProcessor", "GPU"]}, num_queues_to_get=4 +# ) +# # this matches the tq_job1, as it requires ONLY MultiProcessor +# # the tq_job4, as it is the only one that requires BOTH MultiProcessor and GPU, +# # AND the tq_job5, for which we have inserted no tags +# assert {int(x[0]) for x in res} == {tq_job1, tq_job4, tq_job5, tq_job5} + +# # # RequiredTag: 'MultiProcessor' (but no Tag) +# # # By doing this, we would be saying that this CE is accepting ONLY MultiProcessor payloads, +# # # BUT since there are no Tags, we can't know what's POSSIBLE to run, so nothing should be matched +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RequiredTag": "MultiProcessor"}, numQueuesToGet=4) +# # assert result["OK"] is False + +# # RequiredTag: 'MultiProcessor' (but no Tag) +# # By doing this, we would be saying that this CE is accepting ONLY MultiProcessor payloads, +# # BUT since there are no Tags, we can't know what's POSSIBLE to run, so nothing should be matched +# with pytest.raises(Exception): # TODO: reconsider what should be done here # noqa +# await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "RequiredTag": "MultiProcessor"}, num_queues_to_get=4 +# ) +# # assert res is None + +# # # Tag: 'MultiProcessor' + RequiredTag: 'MultiProcessor' +# # # By doing this, we are basically saying that this CE is accepting ONLY MultiProcessor payloads +# # # which have ONLY the 'MultiProcessor' tag +# # result = tqDB.matchAndGetTaskQueue( +# # {"CPUTime": 50000, "Tag": "MultiProcessor", "RequiredTag": "MultiProcessor"}, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # # this matches the tq_job1 as it is the only one that exposes the MultiProcessor tag ONLY +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1}assert result["OK"] is False + +# # Tag: 'MultiProcessor' + RequiredTag: 'MultiProcessor' +# # By doing this, we are basically saying that this CE is accepting ONLY MultiProcessor payloads +# # which have ONLY the 'MultiProcessor' tag +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": "MultiProcessor", "RequiredTag": "MultiProcessor"}, +# num_queues_to_get=4, +# ) +# # this matches the tq_job1 as it is the only one that exposes the MultiProcessor tag ONLY +# assert {int(x[0]) for x in res} == {tq_job1} + +# # # Tag: ['MultiProcessor', 'GPU'] + RequiredTag: 'MultiProcessor' +# # # By doing this, we are basically saying that this CE is accepting MultiProcessor and GPU payloads +# # # but requires to have the MultiProcessor tag +# # result = tqDB.matchAndGetTaskQueue( +# # {"CPUTime": 50000, "Tag": ["MultiProcessor", "GPU"], "RequiredTag": "MultiProcessor"}, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # # this matches the tq_job1 as it is the only one that exposes the MultiProcessor tag ONLY +# # # and tq_job4 because it has GPU and MultiProcessor tags +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job4} + +# # Tag: ['MultiProcessor', 'GPU'] + RequiredTag: 'MultiProcessor' +# # By doing this, we are basically saying that this CE is accepting MultiProcessor and GPU payloads +# # but requires to have the MultiProcessor tag +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 50000, +# "Tag": ["MultiProcessor", "GPU"], +# "RequiredTag": "MultiProcessor", +# }, +# num_queues_to_get=4, +# ) +# # this matches the tq_job1 as it is the only one that exposes the MultiProcessor tag ONLY +# # and tq_job4 because it has GPU and MultiProcessor tags +# assert {int(x[0]) for x in res} == {tq_job1, tq_job4} + +# # # CINECA type +# # # We only want to have MultiProcessor payloads +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 50000, +# # "Tag": ["MultiProcessor", "17Processors", "20Processors", "4Processors"], +# # "RequiredTag": "MultiProcessor", +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # # this matches the tq_job1 as it is the only one that exposes the MultiProcessor tag ONLY +# # # and tq_job6 because it has 17Processors and MultiProcessor tags +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job6} + +# # CINECA type +# # We only want to have MultiProcessor payloads +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 50000, +# "Tag": ["MultiProcessor", "17Processors", "20Processors", "4Processors"], +# "RequiredTag": "MultiProcessor", +# }, +# num_queues_to_get=4, +# ) +# # this matches the tq_job1 as it is the only one that exposes the MultiProcessor tag ONLY +# # and tq_job6 because it has 17Processors and MultiProcessor tags +# assert {int(x[0]) for x in res} == {tq_job1, tq_job6} + +# # # NumberOfProcessors and MaxRAM +# # # This is translated to "#Processors" by the SiteDirector +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "4Processors"}, numQueuesToGet=4) +# # assert result["OK"] +# # # FIXME: this is not interpreted in any different way --- is it correct? +# # # I believe it should be instead interpreted in a way similar to CPUTime +# # # FIXME: the MaxRam parameter has a similar fate, and becomes "#GB", +# # # and then there's no specific matching about it. + +# # NumberOfProcessors and MaxRAM +# # This is translated to "#Processors" by the SiteDirector +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Tag": "4Processors"}, num_queues_to_get=4 +# ) +# # FIXME: this is not interpreted in any different way --- is it correct? +# # I believe it should be instead interpreted in a way similar to CPUTime +# # FIXME: the MaxRam parameter has a similar fate, and becomes "#GB", +# # and then there's no specific matching about it. + +# # for jobId in range(1, 8): +# # result = tqDB.deleteJob(jobId) +# # assert result["OK"] + +# # for tqId in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6]: +# # result = tqDB.deleteTaskQueueIfEmpty(tqId) +# # assert result["OK"] + +# # deleting +# await asyncio.gather( +# *(task_queue_db.delete_job(job_id) for job_id in range(1, 8)), +# ) +# await asyncio.gather( +# *( +# task_queue_db.delete_task_queue_if_empty(tq_id) +# for tq_id in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6] +# ) +# ) + + +# async def test_chain_with_tags_and_platforms(task_queue_db: TaskQueueDB): +# """put - remove with parameters including one or more Tag(s) and platforms""" + +# # # platform only +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Platforms": ["centos7"], +# # } +# # result = tqDB.insertJob(1, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(1) +# # tq_job1 = result["Value"] +# # assert tq_job1 > 0 + +# # platform only +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["centos7"], +# } +# await task_queue_db.insert_job(1, tq_def_dict, 10) +# tq_job1 = await task_queue_db.get_task_queue_for_job(1) +# assert tq_job1 > 0 + +# # # Tag only +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Tags": ["MultiProcessor"], +# # } +# # result = tqDB.insertJob(2, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(2) +# # tq_job2 = result["Value"] +# # assert tq_job2 > tq_job1 + +# # Tag only +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Tags": ["MultiProcessor"], +# } +# await task_queue_db.insert_job(2, tq_def_dict, 10) +# tq_job2 = await task_queue_db.get_task_queue_for_job(2) +# assert tq_job2 > tq_job1 + +# # # Platforms and Tag +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Platforms": ["centos7"], +# # "Tags": ["MultiProcessor"], +# # } +# # result = tqDB.insertJob(3, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(3) +# # tq_job3 = result["Value"] +# # assert tq_job3 > tq_job2 + +# # Platforms and Tag +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["centos7"], +# "Tags": ["MultiProcessor"], +# } +# await task_queue_db.insert_job(3, tq_def_dict, 10) +# tq_job3 = await task_queue_db.get_task_queue_for_job(3) +# assert tq_job3 > tq_job2 + +# # # Tag and another platform +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "myGroup", +# # "CPUTime": 5000, +# # "Platforms": ["slc6"], +# # "Tags": ["MultiProcessor"], +# # } +# # result = tqDB.insertJob(4, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(4) +# # tq_job4 = result["Value"] +# # assert tq_job4 > tq_job3 + +# # Tag and another platform +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "myGroup", +# "CPUTime": 5000, +# "Platforms": ["slc6"], +# "Tags": ["MultiProcessor"], +# } +# await task_queue_db.insert_job(4, tq_def_dict, 10) +# tq_job4 = await task_queue_db.get_task_queue_for_job(4) +# assert tq_job4 > tq_job3 + +# # We should be in this situation (TQIds are obviously invented): +# # +# # mysql Dirac@localhost:TaskQueueDB> select `TQId`,`JobId` FROM `tq_Jobs` +# # +--------+---------+ +# # | TQId | JobId | +# # |--------+---------| +# # | 101 | 1 | +# # | 102 | 2 | +# # | 103 | 3 | +# # | 104 | 4 | +# # +--------+---------+ +# # +# # +# # select * FROM `tq_TQToPlatforms` +# # +--------+---------+ +# # | TQId | Value | +# # |--------+---------| +# # | 101 | centos7 | +# # | 103 | centos7 | +# # | 104 | debian | +# # | 104 | slc6 | +# # +--------+---------+ +# # +# # mysql Dirac@localhost:TaskQueueDB> select * FROM `tq_TQToTags` +# # +--------+-----------------+ +# # | TQId | Value | +# # |--------+-----------------| +# # | 102 | MultiProcessor | +# # | 103 | MultiProcessor | +# # | 104 | MultiProcessor | +# # +--------+-----------------+ + +# # Matching + +# # Matching Everything + +# # # No Tag, Platform = "ANY" +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "ANY"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this should match whatever that does not have tags required, so only tq_job1 +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1} + +# # No Tag, Platform = "ANY" +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "ANY"}, num_queues_to_get=4 +# ) +# # this should match whatever that does not have tags required, so only tq_job1 +# assert {int(x[0]) for x in res} == {tq_job1} + +# # # Tag = "ANY", Platform = "ANY" +# # result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Platform": "ANY", "Tag": "ANY"}, numQueuesToGet=4) +# # assert result["OK"] +# # # this should match whatever +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job4} + +# # Tag = "ANY", Platform = "ANY" +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "ANY", "Tag": "ANY"}, num_queues_to_get=4 +# ) +# # this should match whatever +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2, tq_job3, tq_job4} + +# # # Tag = "ANY", Platform = "centos7" +# # result = tqDB.matchAndGetTaskQueue( +# # {"CPUTime": 50000, "Platform": "centos7", "Tag": "MultiProcessor"}, numQueuesToGet=4 +# # ) +# # assert result["OK"] +# # # this should match whatever has platform == centos7, or no platform +# # # and either no tags or the MultiProcessor tag +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3} + +# # Tag = "ANY", Platform = "centos7" +# res = await task_queue_db.match_and_get_task_queue( +# {"CPUTime": 50000, "Platform": "centos7", "Tag": "MultiProcessor"}, +# num_queues_to_get=4, +# ) + +# # deleting +# await asyncio.gather( +# *(task_queue_db.delete_job(job_id) for job_id in range(1, 5)), +# ) + +# await asyncio.gather( +# *( +# task_queue_db.delete_task_queue_if_empty(tq_id) +# for tq_id in [tq_job1, tq_job2, tq_job3, tq_job4] +# ) +# ) + + +# async def test_complex_matching(task_queue_db: TaskQueueDB): +# """test of a complex (realistic) matching. Something like: + +# {'NumberOfProcessors': 1, +# 'MaxRAM': 128000, +# 'Site': ['Site_1', 'Site_2'], +# 'Community': 'vo', +# 'OwnerGroup': ['admin', 'prod', 'user'], +# 'Platform': ['slc6', 'centos7'], +# 'Tag': [], +# 'CPUTime': 9999999} +# """ + +# # Let's first insert few jobs (no tags, for now, and always a platform) + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "admin", +# # "CPUTime": 5000, +# # "Sites": ["Site_1", "Site_2"], +# # "Platforms": ["centos7"], +# # } +# # result = tqDB.insertJob(1, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(1) +# # tq_job1 = result["Value"] + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "admin", +# "CPUTime": 5000, +# "Sites": ["Site_1", "Site_2"], +# "Platforms": ["centos7"], +# } +# await task_queue_db.insert_job(1, tq_def_dict, 10) +# tq_job1 = await task_queue_db.get_task_queue_for_job(1) + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "prod", +# # "CPUTime": 5000, +# # "Sites": ["Site_1"], +# # "Platforms": ["slc6", "centos7"], +# # } +# # result = tqDB.insertJob(2, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(2) +# # tq_job2 = result["Value"] + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "prod", +# "CPUTime": 5000, +# "Sites": ["Site_1"], +# "Platforms": ["slc6", "centos7"], +# } +# await task_queue_db.insert_job(2, tq_def_dict, 10) +# tq_job2 = await task_queue_db.get_task_queue_for_job(2) + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "user", +# # "CPUTime": 5000, +# # "Sites": ["Site_2"], +# # "Platforms": ["slc6", "centos7"], +# # } +# # result = tqDB.insertJob(3, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(3) +# # tq_job3 = result["Value"] + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "user", +# "CPUTime": 5000, +# "Sites": ["Site_2"], +# "Platforms": ["slc6", "centos7"], +# } +# await task_queue_db.insert_job(3, tq_def_dict, 10) +# tq_job3 = await task_queue_db.get_task_queue_for_job(3) + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "user", +# # "CPUTime": 5000, +# # "Sites": ["Site_1", "Site_2"], +# # "Platforms": ["ubuntu"], +# # } +# # result = tqDB.insertJob(4, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(4) +# # tq_job4 = result["Value"] + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "user", +# "CPUTime": 5000, +# "Sites": ["Site_1", "Site_2"], +# "Platforms": ["ubuntu"], +# } +# await task_queue_db.insert_job(4, tq_def_dict, 10) +# tq_job4 = await task_queue_db.get_task_queue_for_job(4) + +# # now let's try some matching + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["slc6", "centos7"], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": "ANY", +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7"], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": "ANY", +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2, tq_job3} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["ubuntu"], +# # "Tag": [], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": "ANY", +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["ubuntu"], +# "Tag": [], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": "ANY", +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job4} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["slc6", "centos7", "ubuntu"], +# # "Tag": [], +# # "OwnerGroup": ["prod", "user"], +# # "Site": "ANY", +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2, tq_job3, tq_job4} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7", "ubuntu"], +# "Tag": [], +# "OwnerGroup": ["prod", "user"], +# "Site": "ANY", +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job2, tq_job3, tq_job4} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["slc6", "centos7"], +# # "Tag": [], +# # "OwnerGroup": ["prod", "user"], +# # "Site": "ANY", +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2, tq_job3} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7"], +# "Tag": [], +# "OwnerGroup": ["prod", "user"], +# "Site": "ANY", +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job2, tq_job3} + +# # result = tqDB.matchAndGetTaskQueue( +# # {"CPUTime": 9999999, "Platform": ["slc6", "centos7"], "OwnerGroup": ["prod", "user"]}, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2, tq_job3} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7"], +# "OwnerGroup": ["prod", "user"], +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job2, tq_job3} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["slc6", "centos7"], +# # "OwnerGroup": ["prod", "user"], +# # "Site": ["Site_1", "Site_2"], +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2, tq_job3} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7"], +# "OwnerGroup": ["prod", "user"], +# "Site": ["Site_1", "Site_2"], +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job2, tq_job3} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["slc6", "centos7"], +# # "OwnerGroup": ["prod", "user"], +# # "Site": ["Site_1"], +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job2} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7"], +# "OwnerGroup": ["prod", "user"], +# "Site": ["Site_1"], +# }, +# num_queues_to_get=4, +# ) +# assert {int(x[0]) for x in res} == {tq_job2} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 10, +# # "Platform": ["slc6", "centos7"], +# # "OwnerGroup": ["prod", "user"], +# # "Site": ["Site_1", "Site_2"], +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # assert len(result["Value"]) == 0 + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 10, +# "Platform": ["slc6", "centos7"], +# "OwnerGroup": ["prod", "user"], +# "Site": ["Site_1", "Site_2"], +# }, +# num_queues_to_get=4, +# ) +# assert len(res) == 0 + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": "ANY", +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": ["ANY"], +# # }, +# # numQueuesToGet=4, +# # ) +# # assert result["OK"] +# # assert len(result["Value"]) == 4 + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": "ANY", +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": ["ANY"], +# }, +# num_queues_to_get=4, +# ) +# assert len(res) == 4 + +# # now inserting one without platform, and try again + +# # tqDefDict = { +# # "Owner": "userName", +# # "OwnerGroup": "user", +# # "CPUTime": 5000, +# # "Sites": ["Site_1", "Site_2"], +# # } +# # result = tqDB.insertJob(5, tqDefDict, 10) +# # assert result["OK"] +# # result = tqDB.getTaskQueueForJob(5) +# # tq_job5 = result["Value"] + +# tq_def_dict = { +# "Owner": "userName", +# "OwnerGroup": "user", +# "CPUTime": 5000, +# "Sites": ["Site_1", "Site_2"], +# } +# await task_queue_db.insert_job(5, tq_def_dict, 10) +# tq_job5 = await task_queue_db.get_task_queue_for_job(5) + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["slc6", "centos7"], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": "ANY", +# # }, +# # numQueuesToGet=5, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job1, tq_job2, tq_job3, tq_job5} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["slc6", "centos7"], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": "ANY", +# }, +# num_queues_to_get=5, +# ) +# assert {int(x[0]) for x in res} == {tq_job1, tq_job2, tq_job3, tq_job5} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["ubuntu"], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": "Any", +# # }, +# # numQueuesToGet=5, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job5} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["ubuntu"], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": "Any", +# }, +# num_queues_to_get=5, +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job5} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["ubuntu"], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": "Any", +# # "Tag": [], +# # }, +# # numQueuesToGet=5, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job5} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["ubuntu"], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": "Any", +# "Tag": [], +# }, +# num_queues_to_get=5, +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job5} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["ubuntu"], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": ["Any", "Site_1"], +# # "Tag": [], +# # }, +# # numQueuesToGet=5, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job5} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["ubuntu"], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": ["Any", "Site_1"], +# "Tag": [], +# }, +# num_queues_to_get=5, +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job5} + +# # result = tqDB.matchAndGetTaskQueue( +# # { +# # "CPUTime": 9999999, +# # "Platform": ["ubuntu"], +# # "OwnerGroup": ["admin", "prod", "user"], +# # "Site": ["Any", "Site_1"], +# # "Tag": ["SomeTAG"], +# # }, +# # numQueuesToGet=5, +# # ) +# # assert result["OK"] +# # res = {int(x[0]) for x in result["Value"]} +# # assert res == {tq_job4, tq_job5} + +# res = await task_queue_db.match_and_get_task_queue( +# { +# "CPUTime": 9999999, +# "Platform": ["ubuntu"], +# "OwnerGroup": ["admin", "prod", "user"], +# "Site": ["Any", "Site_1"], +# "Tag": ["SomeTAG"], +# }, +# num_queues_to_get=5, +# ) +# assert {int(x[0]) for x in res} == {tq_job4, tq_job5} + +# # for jobId in range(1, 8): +# # result = tqDB.deleteJob(jobId) +# # assert result["OK"] + +# # for tqId in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5]: +# # result = tqDB.deleteTaskQueueIfEmpty(tqId) +# # assert result["OK"] + +# # deleting +# await asyncio.gather( +# *(task_queue_db.delete_job(job_id) for job_id in range(1, 8)), +# ) +# await asyncio.gather( +# *( +# task_queue_db.delete_task_queue_if_empty(tq_id) +# for tq_id in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5] +# ) +# ) + + +# # def test_TQ(): +# # """test of various functions""" +# # tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} +# # tqDB.insertJob(123, tqDefDict, 10) + +# # result = tqDB.retrieveTaskQueues() +# # assert result["OK"] +# # assert list(result["Value"].values())[0] == { +# # "Owner": "userName", +# # "Jobs": 1, +# # "OwnerGroup": "myGroup", +# # "CPUTime": 86400, +# # "Priority": 1.0, +# # } +# # result = tqDB.findOrphanJobs() +# # assert result["OK"] +# # result = tqDB.recalculateTQSharesForAll() +# # assert result["OK"] + +# # # this will also remove the job +# # result = tqDB.matchAndGetJob({"CPUTime": 300000}) +# # assert result["OK"] +# # assert result["Value"]["matchFound"] is True +# # assert result["Value"]["jobId"] in [123, 125] +# # tq = result["Value"]["taskQueueId"] + +# # result = tqDB.deleteTaskQueueIfEmpty(tq) +# # assert result["OK"] + + +# async def test_TQ(task_queue_db: TaskQueueDB): +# """test of various function""" +# tq_def_dict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} +# await task_queue_db.insert_job(123, tq_def_dict, 10) + +# result = await task_queue_db.retrieve_task_queues() +# assert list(result.values())[0] == { +# "Owner": "userName", +# "Jobs": 1, +# "OwnerGroup": "myGroup", +# "CPUTime": 86400, +# "Priority": 1.0, +# } +# await task_queue_db.recalculate_tq_shares_for_all() + +# # this will also remove the job +# result = await task_queue_db.match_and_get_job({"CPUTime": 300000}) +# assert result["matchFound"] is True +# assert result["jobId"] in [123, 125] +# result["taskQueueId"] + +# # TODO: figure out a way to get a config object here +# # config = Config() +# # await task_queue_db.delete_task_queue_if_empty(tq, config) + + +# # Test added by me + + +# async def test_retrieve_task_queues_no_queue(task_queue_db: TaskQueueDB): +# """Try to call the etrieve_task_queues methods when there is no queue""" + +# # Act +# result = await task_queue_db.retrieve_task_queues() + +# # Assert +# assert result == {} + + +# async def test_match_and_get_job_no_queue(task_queue_db: TaskQueueDB): +# """Try to call the match_and_get_job methods when there is no queue""" + +# # Act +# result = await task_queue_db.match_and_get_job({"CPUTime": 300000}) + +# # Assert +# assert result == {"matchFound": False} + + +# async def test_delete_task_queue_if_empty_no_queue(task_queue_db: TaskQueueDB): +# """ +# Try to call the delete_task_queue_if_empty methods when there is no queue +# This should not raise an exception +# """ + +# # Act +# await task_queue_db.delete_task_queue_if_empty(1) + + +# async def test_delete_task_queue_if_empty_no_empty_task_queue( +# task_queue_db: TaskQueueDB, +# ): +# # Arrange +# tq_id = await task_queue_db.insert_job(123, tq_def_dict, 10, config, user_info) + +# # Act +# await task_queue_db.delete_task_queue_if_empty(tq_id) + +# # Assert +# assert await task_queue_db.retrieve_task_queues() == {} + + +# async def test_delete_task_queue_if_empty_empty_task_queue(task_queue_db: TaskQueueDB): +# # Arrange +# tq_id = await task_queue_db.insert_job(123, tq_def_dict, 10, config, user_info) +# await task_queue_db.delete_job(123) + +# # Act +# await task_queue_db.delete_task_queue_if_empty(tq_id) + +# # Assert +# assert await task_queue_db.retrieve_task_queues() == {} + + +# async def test_get_task_queue_for_job_valid_job(task_queue_db: TaskQueueDB): +# async with task_queue_db as task_queue_db: +# # Arrange +# tq_id = await task_queue_db.insert_job(123, tq_def_dict, 10, config, user_info) + +# # Act +# tq = await task_queue_db.get_task_queue_for_job(123) + +# # Assert +# assert tq == tq_id + + +# async def test_get_task_queue_for_job_invalid_job(task_queue_db: TaskQueueDB): +# async with task_queue_db as task_queue_db: +# with pytest.raises(NoResultFound): +# await task_queue_db.get_task_queue_for_job(1) + + +# async def test_delete_invalid_job(task_queue_db: TaskQueueDB): +# async with task_queue_db as task_queue_db: +# await task_queue_db.delete_job(1) + + +# async def test_delete_valid_job(task_queue_db: TaskQueueDB): +# async with task_queue_db as task_queue_db: +# # Arrange +# await task_queue_db.insert_job(123, tq_def_dict, 10, config, user_info) + +# # Act +# await task_queue_db.delete_job(123)