Skip to content

Commit

Permalink
Merge pull request #16264 from jdavcs/dev_sa20_fix10
Browse files Browse the repository at this point in the history
Towards SQLAlchemy 2.0 (upgrades to SA Core usage)
  • Loading branch information
jdavcs authored Jul 21, 2023
2 parents cf44aee + 00a600e commit da965dc
Show file tree
Hide file tree
Showing 20 changed files with 418 additions and 462 deletions.
18 changes: 8 additions & 10 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __init__(

def setup_query(self):
subq = (
select([self.grab_this.id])
select(self.grab_this.id)
.where(
and_(
self.grab_this.table.c.handler.in_(self.self_handler_tags),
Expand Down Expand Up @@ -817,7 +817,7 @@ def get_user_job_count(self, user_id):
rval = self.user_job_count.get(user_id, 0)
if not self.app.config.cache_user_job_count:
result = self.sa_session.execute(
select([func.count(model.Job.table.c.id)]).where(
select(func.count(model.Job.table.c.id)).where(
and_(
model.Job.table.c.state.in_(
(model.Job.states.QUEUED, model.Job.states.RUNNING, model.Job.states.RESUBMITTED)
Expand All @@ -836,7 +836,7 @@ def __cache_user_job_count(self):
if self.user_job_count is None and self.app.config.cache_user_job_count:
self.user_job_count = {}
query = self.sa_session.execute(
select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)])
select(model.Job.table.c.user_id, func.count(model.Job.table.c.user_id))
.where(
and_(
model.Job.table.c.state.in_(
Expand Down Expand Up @@ -866,7 +866,7 @@ def get_user_job_count_per_destination(self, user_id):
rval.update(cached)
result = self.sa_session.execute(
select(
[model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label("job_count")]
model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label("job_count")
)
.where(
and_(
Expand All @@ -887,11 +887,9 @@ def __cache_user_job_count_per_destination(self):
self.user_job_count_per_destination = {}
result = self.sa_session.execute(
select(
[
model.Job.table.c.user_id,
model.Job.table.c.destination_id,
func.count(model.Job.table.c.user_id).label("job_count"),
]
model.Job.table.c.user_id,
model.Job.table.c.destination_id,
func.count(model.Job.table.c.user_id).label("job_count"),
)
.where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))))
.group_by(model.Job.table.c.user_id, model.Job.table.c.destination_id)
Expand Down Expand Up @@ -987,7 +985,7 @@ def __cache_total_job_count_per_destination(self):
self.total_job_count_per_destination = {}
result = self.sa_session.execute(
select(
[model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label("job_count")]
model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label("job_count")
)
.where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))))
.group_by(model.Job.table.c.destination_id)
Expand Down
12 changes: 5 additions & 7 deletions lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def __init__(self, hda_manager: HDAManager, dataset_manager: datasets.DatasetMan

def get_discarded_summary(self, user: model.User) -> CleanableItemsSummary:
stmt = (
select([func.sum(model.Dataset.total_size), func.count(model.HistoryDatasetAssociation.id)])
select(func.sum(model.Dataset.total_size), func.count(model.HistoryDatasetAssociation.id))
.select_from(model.HistoryDatasetAssociation)
.join(model.Dataset, model.HistoryDatasetAssociation.table.c.dataset_id == model.Dataset.id)
.join(model.History, model.HistoryDatasetAssociation.table.c.history_id == model.History.id)
Expand All @@ -385,12 +385,10 @@ def get_discarded(
) -> List[StoredItem]:
stmt = (
select(
[
model.HistoryDatasetAssociation.id,
model.HistoryDatasetAssociation.name,
model.HistoryDatasetAssociation.update_time,
model.Dataset.total_size,
]
model.HistoryDatasetAssociation.id,
model.HistoryDatasetAssociation.name,
model.HistoryDatasetAssociation.update_time,
model.Dataset.total_size,
)
.select_from(model.HistoryDatasetAssociation)
.join(model.Dataset, model.HistoryDatasetAssociation.table.c.dataset_id == model.Dataset.id)
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/managers/histories.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def __init__(self, history_manager: HistoryManager):
}

def get_discarded_summary(self, user: model.User) -> CleanableItemsSummary:
stmt = select([func.sum(model.History.disk_size), func.count(model.History.id)]).where(
stmt = select(func.sum(model.History.disk_size), func.count(model.History.id)).where(
model.History.user_id == user.id,
model.History.deleted == true(),
model.History.purged == false(),
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/managers/history_contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def state_counts(self, history):
]
contents_subquery = self._union_of_contents_query(history, filters=filters).subquery()
statement = (
sql.select([sql.column("state"), func.count("*")])
sql.select(sql.column("state"), func.count("*"))
.select_from(contents_subquery)
.group_by(sql.column("state"))
)
Expand Down
48 changes: 16 additions & 32 deletions lib/galaxy/managers/job_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ def _get_union_results(self, *selects):
def outputs_derived_from_input_hda(self, input_hda_id: int):
hda_select = (
select(
[
literal("HistoryDatasetAssociation").label("src"),
model.JobToOutputDatasetAssociation.dataset_id.label("id"),
]
literal("HistoryDatasetAssociation").label("src"),
model.JobToOutputDatasetAssociation.dataset_id.label("id"),
)
.join(
model.JobToInputDatasetAssociation,
Expand All @@ -80,10 +78,8 @@ def outputs_derived_from_input_hda(self, input_hda_id: int):
)
hdca_select = (
select(
[
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToOutputDatasetCollectionAssociation.dataset_collection_id.label("id"),
]
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToOutputDatasetCollectionAssociation.dataset_collection_id.label("id"),
)
.join(
model.JobToInputDatasetAssociation,
Expand All @@ -97,10 +93,8 @@ def outputs_derived_from_input_hda(self, input_hda_id: int):
def outputs_derived_from_input_hdca(self, input_hdca_id: int):
hda_select = (
select(
[
literal("HistoryDatasetAssociation").label("src"),
model.JobToOutputDatasetAssociation.dataset_id.label("id"),
]
literal("HistoryDatasetAssociation").label("src"),
model.JobToOutputDatasetAssociation.dataset_id.label("id"),
)
.join(
model.JobToInputDatasetCollectionAssociation,
Expand All @@ -111,10 +105,8 @@ def outputs_derived_from_input_hdca(self, input_hdca_id: int):
)
hdca_select = (
select(
[
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToOutputDatasetCollectionAssociation.dataset_collection_id.label("id"),
]
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToOutputDatasetCollectionAssociation.dataset_collection_id.label("id"),
)
.join(
model.JobToInputDatasetCollectionAssociation,
Expand All @@ -129,10 +121,8 @@ def outputs_derived_from_input_hdca(self, input_hdca_id: int):
def inputs_for_hda(self, input_hda_id: int):
input_hdas = (
select(
[
literal("HistoryDatasetAssociation").label("src"),
model.JobToInputDatasetAssociation.dataset_id.label("id"),
]
literal("HistoryDatasetAssociation").label("src"),
model.JobToInputDatasetAssociation.dataset_id.label("id"),
)
.join(
model.JobToOutputDatasetAssociation,
Expand All @@ -143,10 +133,8 @@ def inputs_for_hda(self, input_hda_id: int):
)
input_hdcas = (
select(
[
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToInputDatasetCollectionAssociation.dataset_collection_id.label("id"),
]
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToInputDatasetCollectionAssociation.dataset_collection_id.label("id"),
)
.join(
model.JobToOutputDatasetAssociation,
Expand All @@ -160,10 +148,8 @@ def inputs_for_hda(self, input_hda_id: int):
def inputs_for_hdca(self, input_hdca_id: int):
input_hdas = (
select(
[
literal("HistoryDatasetAssociation").label("src"),
model.JobToInputDatasetAssociation.dataset_id.label("id"),
]
literal("HistoryDatasetAssociation").label("src"),
model.JobToInputDatasetAssociation.dataset_id.label("id"),
)
.join(
model.JobToOutputDatasetCollectionAssociation,
Expand All @@ -174,10 +160,8 @@ def inputs_for_hdca(self, input_hdca_id: int):
)
input_hdcas = (
select(
[
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToInputDatasetCollectionAssociation.dataset_collection_id.label("id"),
]
literal("HistoryDatasetCollectionAssociation").label("src"),
model.JobToInputDatasetCollectionAssociation.dataset_collection_id.label("id"),
)
.join(
model.JobToOutputDatasetCollectionAssociation,
Expand Down
12 changes: 5 additions & 7 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def replace_dataset_ids(path, key, value):
c = aliased(model.HistoryDatasetAssociation)
d = aliased(model.JobParameter)
e = aliased(model.HistoryDatasetAssociationHistory)
stmt = select([model.HistoryDatasetAssociation.id]).where(
stmt = select(model.HistoryDatasetAssociation.id).where(
model.HistoryDatasetAssociation.id == e.history_dataset_association_id
)
name_condition = []
Expand Down Expand Up @@ -614,11 +614,9 @@ def invocation_job_source_iter(sa_session, invocation_id):
join = model.WorkflowInvocationStep.table.join(model.WorkflowInvocation)
statement = (
select(
[
model.WorkflowInvocationStep.job_id,
model.WorkflowInvocationStep.implicit_collection_jobs_id,
model.WorkflowInvocationStep.state,
]
model.WorkflowInvocationStep.job_id,
model.WorkflowInvocationStep.implicit_collection_jobs_id,
model.WorkflowInvocationStep.state,
)
.select_from(join)
.where(model.WorkflowInvocation.id == invocation_id)
Expand Down Expand Up @@ -789,7 +787,7 @@ def summarize_jobs_to_dict(sa_session, jobs_source):
model.ImplicitCollectionJobsJobAssociation.table.join(model.Job)
)
statement = (
select([model.Job.state, func.count("*")])
select(model.Job.state, func.count("*"))
.select_from(join)
.where(model.ImplicitCollectionJobs.id == jobs_source.id)
.group_by(model.Job.state)
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/managers/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def get_user_total_unread_notification_count(self, user: User) -> int:
Only published and not expired notifications are accounted.
"""
stmt = (
select([func.count(UserNotificationAssociation.id)])
select(func.count(UserNotificationAssociation.id))
.select_from(UserNotificationAssociation)
.join(
Notification,
Expand Down
14 changes: 6 additions & 8 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3296,12 +3296,10 @@ def disk_size(cls):
)
distinct_datasets = (
select(
[
# use labels here to better access from the query above
HistoryDatasetAssociation.table.c.history_id.label("history_id"),
Dataset.total_size.label("dataset_size"),
Dataset.id.label("dataset_id"),
]
# use labels here to better access from the query above
HistoryDatasetAssociation.table.c.history_id.label("history_id"),
Dataset.total_size.label("dataset_size"),
Dataset.id.label("dataset_id"),
)
.where(HistoryDatasetAssociation.table.c.purged != true())
.where(Dataset.table.c.purged != true())
Expand All @@ -3313,7 +3311,7 @@ def disk_size(cls):
distinct_datasets_alias = aliased(distinct_datasets.subquery(), name="datasets")
# then, bind as property of history using the cls.id
size_query = (
select([func.coalesce(func.sum(distinct_datasets_alias.c.dataset_size), 0)])
select(func.coalesce(func.sum(distinct_datasets_alias.c.dataset_size), 0))
.select_from(distinct_datasets_alias)
.where(distinct_datasets_alias.c.history_id == cls.id)
)
Expand Down Expand Up @@ -8595,7 +8593,7 @@ class WorkflowInvocationStep(Base, Dictifiable, Serializable):
viewonly=True,
)
order_index = column_property(
select([WorkflowStep.order_index]).where(WorkflowStep.id == workflow_step_id).scalar_subquery()
select(WorkflowStep.order_index).where(WorkflowStep.id == workflow_step_id).scalar_subquery()
)

subworkflow_invocation_id: column_property
Expand Down
20 changes: 8 additions & 12 deletions lib/galaxy/model/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,17 @@ def get_community_tags(self, item=None, limit=None):
if not item_tag_assoc_class:
return []
# Build select statement.
cols_to_select = [item_tag_assoc_class.table.c.tag_id, func.count("*")]
from_obj = item_tag_assoc_class.table.join(item_class.table).join(galaxy.model.Tag.table)
where_clause = self.get_id_col_in_item_tag_assoc_table(item_class) == item.id
order_by = [func.count("*").desc()]
group_by = item_tag_assoc_class.table.c.tag_id
# Do query and get result set.
query = select(
columns=cols_to_select,
from_obj=from_obj,
whereclause=where_clause,
group_by=group_by,
order_by=order_by,
limit=limit,
query = (
select(item_tag_assoc_class.table.c.tag_id, func.count())
.select_from(from_obj)
.where(where_clause)
.group_by(group_by)
.order_by(func.count().desc())
.limit(limit)
)
result_set = self.sa_session.execute(query)
# Return community tags.
Expand All @@ -118,9 +116,7 @@ def get_community_tags(self, item=None, limit=None):
return community_tags

def get_tool_tags(self):
query = select(
columns=[galaxy.model.ToolTagAssociation.table.c.tag_id], from_obj=galaxy.model.ToolTagAssociation.table
).distinct()
query = select(galaxy.model.ToolTagAssociation.table.c.tag_id).distinct()
result_set = self.sa_session.execute(query)

tags = []
Expand Down
Loading

0 comments on commit da965dc

Please sign in to comment.