Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include source hash in job search #19112

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def set_metadata(
try:
if overwrite:
hda_manager.overwrite_metadata(dataset_instance)
dataset_instance.datatype.set_meta(dataset_instance) # type:ignore [arg-type]
dataset_instance.datatype.set_meta(dataset_instance)
dataset_instance.set_peek()
# Reset SETTING_METADATA state so the dataset instance getter picks the dataset state
dataset_instance.set_metadata_success_state()
Expand Down
12 changes: 9 additions & 3 deletions lib/galaxy/datatypes/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
Location of protocols used in datatypes
"""

from typing import Any
from typing import (
Any,
TYPE_CHECKING,
)

from typing_extensions import Protocol

if TYPE_CHECKING:
from sqlalchemy.orm import Mapped


class HasClearAssociatedFiles(Protocol):
def clear_associated_files(self, metadata_safe: bool = False, purge: bool = False) -> None: ...
Expand All @@ -17,7 +23,7 @@ def creating_job(self): ...


class HasDeleted(Protocol):
deleted: bool
deleted: "Mapped[bool]"


class HasExt(Protocol):
Expand All @@ -39,7 +45,7 @@ class HasHid(Protocol):


class HasId(Protocol):
id: int
id: "Mapped[int]"


class HasInfo(Protocol):
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def get_discarded_summary(self, user: model.User) -> CleanableItemsSummary:
.where(
and_(
HistoryDatasetAssociation.deleted == true(),
HistoryDatasetAssociation.purged == false(), # type:ignore[arg-type]
HistoryDatasetAssociation.purged == false(),
model.History.user_id == user.id,
)
)
Expand Down Expand Up @@ -401,7 +401,7 @@ def get_discarded(
.where(
and_(
HistoryDatasetAssociation.deleted == true(),
HistoryDatasetAssociation.purged == false(), # type:ignore[arg-type]
HistoryDatasetAssociation.purged == false(),
model.History.user_id == user.id,
)
)
Expand Down
41 changes: 35 additions & 6 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
Dict,
List,
Optional,
Type,
TypeVar,
Union,
)

Expand All @@ -29,6 +31,7 @@
)
from sqlalchemy.orm import aliased
from sqlalchemy.sql import select
from sqlalchemy.sql.selectable import Select
from typing_extensions import TypedDict

from galaxy import model
Expand Down Expand Up @@ -117,6 +120,32 @@ def get_path_key(path_tuple):
return path_key


S = TypeVar("S", bound=Select)


def has_same_hash(stmt: S, a: Type[model.HistoryDatasetAssociation], b: Type[model.HistoryDatasetAssociation]) -> S:
a_hash = aliased(model.DatasetHash)
b_hash = aliased(model.DatasetHash)
stmt = (
stmt.outerjoin(a_hash, a.dataset_id == a_hash.dataset_id)
.outerjoin(
b_hash,
and_(
a_hash.hash_function == b_hash.hash_function,
a_hash.hash_value == b_hash.hash_value,
),
)
.join(
b,
or_(
b.dataset_id == a.dataset_id,
b_hash.dataset_id == b.dataset_id,
),
)
)
return stmt


class JobManager:
def __init__(self, app: StructuredApp):
self.app = app
Expand Down Expand Up @@ -398,7 +427,6 @@ def replace_dataset_ids(path, key, value):
stmt_sq = self._build_job_subquery(tool_id, user.id, tool_version, job_state, wildcard_param_dump)

stmt = select(Job.id).select_from(Job.table.join(stmt_sq, stmt_sq.c.id == Job.id))

data_conditions: List = []

# We now build the stmt filters that relate to the input datasets
Expand Down Expand Up @@ -561,20 +589,21 @@ def _build_job_subquery(

return stmt.subquery()

def _build_stmt_for_hda(self, stmt, data_conditions, used_ids, k, v, identifier):
def _build_stmt_for_hda(self, stmt: S, data_conditions, used_ids, k, v, identifier) -> S:
a = aliased(model.JobToInputDatasetAssociation)
b = aliased(model.HistoryDatasetAssociation)
c = aliased(model.HistoryDatasetAssociation)
d = aliased(model.JobParameter)
e = aliased(model.HistoryDatasetAssociationHistory)
stmt = stmt.add_columns(a.dataset_id)
stmt = cast(S, stmt.add_columns(a.dataset_id))
used_ids.append(a.dataset_id)
stmt = stmt.join(a, a.job_id == model.Job.id)
hda_stmt = select(model.HistoryDatasetAssociation.id).where(
model.HistoryDatasetAssociation.id == e.history_dataset_association_id
)
# b is the HDA used for the job
stmt = stmt.join(b, a.dataset_id == b.id).join(c, c.dataset_id == b.dataset_id) # type:ignore[attr-defined]
stmt = stmt.join(b, a.dataset_id == b.id)
stmt = has_same_hash(stmt, b, c)
name_condition = []
if identifier:
stmt = stmt.join(d)
Expand Down Expand Up @@ -672,7 +701,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v):
),
)
.outerjoin(d, d.id == c.hda_id)
.outerjoin(e, e.dataset_id == d.dataset_id) # type:ignore[attr-defined]
.outerjoin(e, e.dataset_id == d.dataset_id)
)
data_conditions.append(
and_(
Expand All @@ -682,7 +711,7 @@ def _build_stmt_for_dce(self, stmt, data_conditions, used_ids, k, v):
and_(
c.hda_id == b.hda_id,
d.id == c.hda_id,
e.dataset_id == d.dataset_id, # type:ignore[attr-defined]
e.dataset_id == d.dataset_id,
),
),
c.id == v,
Expand Down
40 changes: 20 additions & 20 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3152,8 +3152,8 @@ class History(Base, HasTags, Dictifiable, UsesAnnotations, HasName, Serializable
active_datasets: Mapped[List["HistoryDatasetAssociation"]] = relationship(
primaryjoin=(
lambda: and_(
HistoryDatasetAssociation.history_id == History.id, # type: ignore[arg-type]
not_(HistoryDatasetAssociation.deleted), # type: ignore[has-type]
HistoryDatasetAssociation.history_id == History.id,
not_(HistoryDatasetAssociation.deleted),
)
),
order_by=lambda: asc(HistoryDatasetAssociation.hid), # type: ignore[has-type]
Expand All @@ -3165,7 +3165,7 @@ class History(Base, HasTags, Dictifiable, UsesAnnotations, HasName, Serializable
lambda: (
and_(
HistoryDatasetCollectionAssociation.history_id == History.id,
not_(HistoryDatasetCollectionAssociation.deleted), # type: ignore[arg-type]
not_(HistoryDatasetCollectionAssociation.deleted),
)
)
),
Expand All @@ -3175,8 +3175,8 @@ class History(Base, HasTags, Dictifiable, UsesAnnotations, HasName, Serializable
visible_datasets: Mapped[List["HistoryDatasetAssociation"]] = relationship(
primaryjoin=(
lambda: and_(
HistoryDatasetAssociation.history_id == History.id, # type: ignore[arg-type]
not_(HistoryDatasetAssociation.deleted), # type: ignore[has-type]
HistoryDatasetAssociation.history_id == History.id,
not_(HistoryDatasetAssociation.deleted),
HistoryDatasetAssociation.visible, # type: ignore[has-type]
)
),
Expand All @@ -3187,7 +3187,7 @@ class History(Base, HasTags, Dictifiable, UsesAnnotations, HasName, Serializable
primaryjoin=(
lambda: and_(
HistoryDatasetCollectionAssociation.history_id == History.id,
not_(HistoryDatasetCollectionAssociation.deleted), # type: ignore[arg-type]
not_(HistoryDatasetCollectionAssociation.deleted),
HistoryDatasetCollectionAssociation.visible, # type: ignore[arg-type]
)
),
Expand Down Expand Up @@ -4110,27 +4110,27 @@ class Dataset(Base, StorableObject, Serializable):
active_history_associations: Mapped[List["HistoryDatasetAssociation"]] = relationship(
primaryjoin=(
lambda: and_(
Dataset.id == HistoryDatasetAssociation.dataset_id, # type: ignore[attr-defined]
HistoryDatasetAssociation.deleted == false(), # type: ignore[has-type]
HistoryDatasetAssociation.purged == false(), # type: ignore[arg-type]
Dataset.id == HistoryDatasetAssociation.dataset_id,
HistoryDatasetAssociation.deleted == false(),
HistoryDatasetAssociation.purged == false(),
)
),
viewonly=True,
)
purged_history_associations: Mapped[List["HistoryDatasetAssociation"]] = relationship(
primaryjoin=(
lambda: and_(
Dataset.id == HistoryDatasetAssociation.dataset_id, # type: ignore[attr-defined]
HistoryDatasetAssociation.purged == true(), # type: ignore[arg-type]
Dataset.id == HistoryDatasetAssociation.dataset_id,
HistoryDatasetAssociation.purged == true(),
)
),
viewonly=True,
)
active_library_associations: Mapped[List["LibraryDatasetDatasetAssociation"]] = relationship(
primaryjoin=(
lambda: and_(
Dataset.id == LibraryDatasetDatasetAssociation.dataset_id, # type: ignore[attr-defined]
LibraryDatasetDatasetAssociation.deleted == false(), # type: ignore[has-type]
Dataset.id == LibraryDatasetDatasetAssociation.dataset_id,
LibraryDatasetDatasetAssociation.deleted == false(),
)
),
viewonly=True,
Expand Down Expand Up @@ -4596,11 +4596,13 @@ def datatype_for_extension(extension, datatypes_registry=None) -> "Data":
class DatasetInstance(RepresentById, UsesCreateAndUpdateTime, _HasTable):
"""A base class for all 'dataset instances', HDAs, LDDAs, etc"""

purged: Mapped[Optional[bool]]
deleted: Mapped[bool]
dataset_id: Mapped[Optional[int]]
_state: Mapped[Optional[str]]
states = Dataset.states
_state: Optional[str]
conversion_messages = Dataset.conversion_messages
permitted_actions = Dataset.permitted_actions
purged: bool
creating_job_associations: List[Union[JobToOutputDatasetCollectionAssociation, JobToOutputDatasetAssociation]]
copied_from_history_dataset_association: Optional["HistoryDatasetAssociation"]
copied_from_library_dataset_dataset_association: Optional["LibraryDatasetDatasetAssociation"]
Expand Down Expand Up @@ -5055,9 +5057,7 @@ def find_conversion_destination(
self, accepted_formats: List[str], **kwd
) -> Tuple[bool, Optional[str], Optional["DatasetInstance"]]:
"""Returns ( target_ext, existing converted dataset )"""
return self.datatype.find_conversion_destination(
self, accepted_formats, _get_datatypes_registry(), **kwd # type:ignore[arg-type]
)
return self.datatype.find_conversion_destination(self, accepted_formats, _get_datatypes_registry(), **kwd)

def add_validation_error(self, validation_error):
self.validation_errors.append(validation_error)
Expand Down Expand Up @@ -5260,7 +5260,7 @@ class HistoryDatasetAssociation(DatasetInstance, HasTags, Dictifiable, UsesAnnot
Resource class that creates a relation between a dataset and a user history.
"""

history_id: Optional[int]
history_id: Mapped[Optional[int]]

def __init__(
self,
Expand Down Expand Up @@ -6947,7 +6947,7 @@ class HistoryDatasetCollectionAssociation(
name: Mapped[Optional[str]] = mapped_column(TrimmedString(255))
hid: Mapped[Optional[int]]
visible: Mapped[Optional[bool]]
deleted: Mapped[Optional[bool]] = mapped_column(default=False)
deleted: Mapped[bool] = mapped_column(default=False)
copied_from_history_dataset_collection_association_id: Mapped[Optional[int]] = mapped_column(
ForeignKey("history_dataset_collection_association.id")
)
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
# Try to set metadata directly. @mvdbeek thinks we should only record the datasets
try:
if dataset_instance.has_metadata_files:
dataset_instance.datatype.set_meta(dataset_instance) # type:ignore[arg-type]
dataset_instance.datatype.set_meta(dataset_instance)
except Exception:
log.debug(f"Metadata setting failed on {dataset_instance}", exc_info=True)
dataset_instance.state = dataset_instance.dataset.states.FAILED_METADATA
Expand Down
37 changes: 30 additions & 7 deletions lib/galaxy_test/api/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2378,8 +2378,8 @@ def _check_implicit_collection_populated(self, run_response):
for implicit_collection in implicit_collections:
assert implicit_collection["populated_state"] == "ok"

def _cat1_outputs(self, history_id, inputs):
return self._run_outputs(self._run_cat1(history_id, inputs))
def _cat1_outputs(self, history_id, inputs, use_cached_job=False):
return self._run_outputs(self._run_cat1(history_id, inputs, use_cached_job=use_cached_job))

def _run_and_get_outputs(self, tool_id, history_id, inputs=None, tool_version=None):
if inputs is None:
Expand Down Expand Up @@ -2436,22 +2436,45 @@ def test_group_tag_selection_multiple(self, history_id):
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.strip() == "123\n456\n456\n0ab"

@skip_without_tool("cat1")
def test_run_deferred_dataset(self, history_id):
def _run_deferred(self, history_id: str, use_cached_job=False, expect_cached_job=False, include_correct_hash=True):
hashes: Optional[List[Dict[str, str]]] = None
if include_correct_hash:
hashes = [{"hash_function": "SHA-1", "hash_value": "2d7dcdb10964872752bd6d081725792b3f729ac9"}]
mvdbeek marked this conversation as resolved.
Show resolved Hide resolved
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
history_id,
"https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed",
ext="bed",
hashes=hashes,
)
inputs = {
"input1": dataset_to_param(details),
}
outputs = self._cat1_outputs(history_id, inputs=inputs)
output = outputs[0]
job_response = self._run_cat1(history_id, inputs, use_cached_job=use_cached_job)
job_response.raise_for_status()
job_data = job_response.json()
output = job_data["outputs"][0]
job_id = job_data["jobs"][0]["id"]
details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=output, wait=True, assert_ok=True
)
assert details["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.startswith("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
if use_cached_job:
job = self.dataset_populator.get_job_details(job_id, full=True).json()
assert bool(job["copied_from_job_id"]) == expect_cached_job

@skip_without_tool("cat1")
def test_run_deferred_dataset(self, history_id):
self._run_deferred(history_id)

@skip_without_tool("cat1")
def test_run_deferred_dataset_with_cached_input(self, history_id):
self._run_deferred(history_id)
# Should just work because input is deferred
self._run_deferred(history_id, use_cached_job=True, expect_cached_job=True)
# Should fail because we don't have a hash
self._run_deferred(history_id, use_cached_job=True, expect_cached_job=False, include_correct_hash=False)

@skip_without_tool("metadata_bam")
def test_run_deferred_dataset_with_metadata_options_filter(self, history_id):
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,18 @@ def fetch_hda(self, history_id: str, item: Dict[str, Any], wait: bool = True) ->
assert len(hdas) == 1
return hdas[0]

def create_deferred_hda(self, history_id, uri: str, ext: Optional[str] = None) -> Dict[str, Any]:
def create_deferred_hda(
self, history_id, uri: str, ext: Optional[str] = None, hashes: Optional[List[Dict[str, str]]] = None
) -> Dict[str, Any]:
item = {
"src": "url",
"url": uri,
"deferred": True,
}
if ext:
item["ext"] = ext
if hashes:
item["hashes"] = hashes
output = self.fetch_hda(history_id, item)
details = self.get_history_dataset_details(history_id, dataset=output)
return details
Expand Down
Loading