Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6266][ENG-6241] send monthly usage to trove as supplementary metadata #10764

Merged
124 changes: 78 additions & 46 deletions api/share/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
SHARE/Trove accepts metadata records as "indexcards" in turtle format: https://www.w3.org/TR/turtle/
"""
from functools import partial
from http import HTTPStatus
import logging
import random
from urllib.parse import urljoin
Expand All @@ -17,15 +18,19 @@
from framework.encryption import ensure_bytes
from framework.sentry import log_exception
from osf import models as osf_db
from osf.metadata.tools import pls_gather_metadata_file
from osf.metadata.osf_gathering import (
OsfmapPartition,
pls_get_magic_metadata_basket,
)
from osf.metadata.serializers import get_metadata_serializer
from website import settings


logger = logging.getLogger(__name__)


def shtrove_ingest_url():
return f'{settings.SHARE_URL}api/v3/ingest'
return f'{settings.SHARE_URL}trove/ingest'


def sharev2_push_url():
Expand Down Expand Up @@ -69,83 +74,100 @@ def _enqueue_update_share(osfresource):
enqueue_task(async_update_resource_share.s(_osfguid_value))


@celery_app.task(bind=True, max_retries=4, acks_late=True)
def task__update_share(self, guid: str, is_backfill=False):
@celery_app.task(
bind=True,
acks_late=True,
max_retries=4,
retry_backoff=True,
)
def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN'):
"""
This function updates share takes Preprints, Projects and Registrations.
:param self:
:param guid:
:return:
Send SHARE/trove current metadata record(s) for the osf-guid-identified object
"""
resp = _do_update_share(guid, is_backfill=is_backfill)
_osfmap_partition = OsfmapPartition[osfmap_partition_name]
_osfid_instance = apps.get_model('osf.Guid').load(guid)
if _osfid_instance is None:
raise ValueError(f'unknown osfguid "{guid}"')
_resource = _osfid_instance.referent
_is_deletion = _should_delete_indexcard(_resource)
_response = (
pls_delete_trove_record(_resource, osfmap_partition=_osfmap_partition)
if _is_deletion
else pls_send_trove_record(
_resource,
is_backfill=is_backfill,
osfmap_partition=_osfmap_partition,
)
)
try:
resp.raise_for_status()
_response.raise_for_status()
except Exception as e:
if self.request.retries == self.max_retries:
log_exception(e)
elif resp.status_code >= 500:
try:
self.retry(
exc=e,
countdown=(random.random() + 1) * min(60 + settings.CELERY_RETRY_BACKOFF_BASE ** self.request.retries, 60 * 10),
log_exception(e)
if HTTPStatus(_response.status_code).is_server_error:
raise self.retry(exc=e)
else: # success response
if not _is_deletion:
# enqueue followup task for supplementary metadata
_next_partition = _next_osfmap_partition(_osfmap_partition)
cslzchen marked this conversation as resolved.
Show resolved Hide resolved
if _next_partition is not None:
task__update_share.delay(
guid,
is_backfill=is_backfill,
osfmap_partition_name=_next_partition.name,
)
except Retry as e: # Retry is only raise after > 5 retries
log_exception(e)
else:
log_exception(e)

return resp


def pls_send_trove_indexcard(osf_item, *, is_backfill=False):
def pls_send_trove_record(osf_item, *, is_backfill: bool, osfmap_partition: OsfmapPartition):
try:
_iri = osf_item.get_semantic_iri()
except (AttributeError, ValueError):
raise ValueError(f'could not get iri for {osf_item}')
_metadata_record = pls_gather_metadata_file(osf_item, 'turtle')
_basket = pls_get_magic_metadata_basket(osf_item)
_serializer = get_metadata_serializer(
format_key='turtle',
basket=_basket,
serializer_config={'osfmap_partition': osfmap_partition},
)
_serialized_record = _serializer.serialize()
_queryparams = {
'focus_iri': _iri,
'record_identifier': _shtrove_record_identifier(osf_item),
'record_identifier': _shtrove_record_identifier(osf_item, osfmap_partition),
}
if is_backfill:
_queryparams['nonurgent'] = True
_queryparams['nonurgent'] = ''
if osfmap_partition.is_supplementary:
_queryparams['is_supplementary'] = ''
_expiration_date = osfmap_partition.get_expiration_date(_basket)
if _expiration_date is not None:
_queryparams['expiration_date'] = str(_expiration_date)
return requests.post(
shtrove_ingest_url(),
params=_queryparams,
headers={
'Content-Type': _metadata_record.mediatype,
'Content-Type': _serializer.mediatype,
**_shtrove_auth_headers(osf_item),
},
data=ensure_bytes(_metadata_record.serialized_metadata),
data=ensure_bytes(_serialized_record),
)


def pls_delete_trove_indexcard(osf_item):
def pls_delete_trove_record(osf_item, osfmap_partition: OsfmapPartition):
return requests.delete(
shtrove_ingest_url(),
params={
'record_identifier': _shtrove_record_identifier(osf_item),
'record_identifier': _shtrove_record_identifier(osf_item, osfmap_partition),
},
headers=_shtrove_auth_headers(osf_item),
)


def _do_update_share(osfguid: str, *, is_backfill=False):
logger.debug('%s._do_update_share("%s", is_backfill=%s)', __name__, osfguid, is_backfill)
_guid_instance = apps.get_model('osf.Guid').load(osfguid)
if _guid_instance is None:
raise ValueError(f'unknown osfguid "{osfguid}"')
_resource = _guid_instance.referent
_response = (
pls_delete_trove_indexcard(_resource)
if _should_delete_indexcard(_resource)
else pls_send_trove_indexcard(_resource, is_backfill=is_backfill)
def _shtrove_record_identifier(osf_item, osfmap_partition: OsfmapPartition):
cslzchen marked this conversation as resolved.
Show resolved Hide resolved
_id = osf_item.guids.values_list('_id', flat=True).first()
return (
f'{_id}/{osfmap_partition.name}'
if osfmap_partition.is_supplementary
else _id
)
return _response


def _shtrove_record_identifier(osf_item):
return osf_item.guids.values_list('_id', flat=True).first()


def _shtrove_auth_headers(osf_item):
Expand Down Expand Up @@ -182,6 +204,16 @@ def _is_item_public(guid_referent) -> bool:
return getattr(guid_referent, 'is_public', False) # quacks like AbstractNode


def _next_osfmap_partition(partition: OsfmapPartition) -> OsfmapPartition | None:
match partition:
case OsfmapPartition.MAIN:
return OsfmapPartition.SUPPLEMENT
case OsfmapPartition.SUPPLEMENT:
return OsfmapPartition.MONTHLY_SUPPLEMENT
case _:
return None


###
# BEGIN soon-to-be-deleted (🤞) legacy sharev2 push
# (until dust has settled on iri-centric (rdf-based) search)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def populate_counts(self, institution, user, user2):
department='Biology dept',
public_project_count=6,
private_project_count=5,
).save()
).save(refresh=True)
cslzchen marked this conversation as resolved.
Show resolved Hide resolved

UserInstitutionProjectCounts(
user_id=user2._id,
Expand All @@ -101,7 +101,7 @@ def populate_more_counts(self, institution, user, user2, user3, populate_counts)
department='Psychology dept',
public_project_count=int(10 * random()),
private_project_count=int(10 * random()),
).save()
).save(refresh=True)

UserInstitutionProjectCounts(
user_id=user3._id,
Expand Down
53 changes: 43 additions & 10 deletions api_tests/share/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from website import settings as website_settings
from api.share.utils import shtrove_ingest_url, sharev2_push_url
from osf.metadata.osf_gathering import OsfmapPartition


@contextlib.contextmanager
Expand Down Expand Up @@ -40,36 +41,67 @@ def mock_update_share():


@contextlib.contextmanager
def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=False, count=1):
def expect_ingest_request(mock_share_responses, osfguid, *, token=None, delete=False, count=1, error_response=False):
mock_share_responses._calls.reset()
yield
_double_count = count * 2 # pushing to share two ways
assert len(mock_share_responses.calls) == _double_count, (
f'expected {_double_count} call(s), got {len(mock_share_responses.calls)}: {list(mock_share_responses.calls)}'
_legacy_count_per_item = 1
_trove_main_count_per_item = 1
_trove_supplementary_count_per_item = (
0
if (error_response or delete)
else (len(OsfmapPartition) - 1)
)
_total_count = count * (
_legacy_count_per_item
+ _trove_main_count_per_item
+ _trove_supplementary_count_per_item
)
assert len(mock_share_responses.calls) == _total_count, (
f'expected {_total_count} call(s), got {len(mock_share_responses.calls)}: {list(mock_share_responses.calls)}'
)
_trove_ingest_calls = []
_trove_supp_ingest_calls = []
_legacy_push_calls = []
for _call in mock_share_responses.calls:
if _call.request.url.startswith(shtrove_ingest_url()):
assert_ingest_request(_call.request, osfguid, token=token, delete=delete)
if 'is_supplementary' in _call.request.url:
_trove_supp_ingest_calls.append(_call)
else:
_trove_ingest_calls.append(_call)
else:
assert _call.request.url.startswith(sharev2_push_url())
_legacy_push_calls.append(_call)
assert len(_trove_ingest_calls) == count
assert len(_trove_supp_ingest_calls) == count * _trove_supplementary_count_per_item
assert len(_legacy_push_calls) == count
for _call in _trove_ingest_calls:
assert_ingest_request(_call.request, osfguid, token=token, delete=delete)
for _call in _trove_supp_ingest_calls:
assert_ingest_request(_call.request, osfguid, token=token, delete=delete, supp=True)
for _call in _legacy_push_calls:
assert _call.request.url.startswith(sharev2_push_url())


def assert_ingest_request(request, expected_osfguid, *, token=None, delete=False):
def assert_ingest_request(request, expected_osfguid, *, token=None, delete=False, supp=False):
_querydict = QueryDict(urlsplit(request.path_url).query)
assert _querydict['record_identifier'] == expected_osfguid
if supp:
assert _querydict['record_identifier'].startswith(expected_osfguid)
assert _querydict['record_identifier'] != expected_osfguid
else:
assert _querydict['record_identifier'] == expected_osfguid
if delete:
assert request.method == 'DELETE'
else:
assert request.method == 'POST'
_focus_iri = _querydict['focus_iri']
assert _focus_iri == f'{website_settings.DOMAIN}{expected_osfguid}'
assert _focus_iri in request.body.decode('utf-8')
_request_body = request.body.decode('utf-8')
assert (_focus_iri in _request_body) or (supp and not _request_body.strip())
_token = token or website_settings.SHARE_API_TOKEN
assert request.headers['Authorization'] == f'Bearer {_token}'


@contextlib.contextmanager
def expect_preprint_ingest_request(mock_share_responses, preprint, *, delete=False, count=1):
def expect_preprint_ingest_request(mock_share_responses, preprint, *, delete=False, count=1, error_response=False):
# same as expect_ingest_request, but with convenience for preprint specifics
# and postcommit-task handling (so on_preprint_updated actually runs)
with expect_ingest_request(
Expand All @@ -78,6 +110,7 @@ def expect_preprint_ingest_request(mock_share_responses, preprint, *, delete=Fal
token=preprint.provider.access_token,
delete=delete,
count=count,
error_response=error_response,
):
# clear out postcommit tasks from factories
postcommit_queue().clear()
Expand Down
2 changes: 1 addition & 1 deletion api_tests/share/test_share_preprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_no_call_async_update_on_400_failure(self, mock_share_responses, preprin
mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400)
mock_share_responses.replace(responses.POST, sharev2_push_url(), status=400)
preprint.set_published(True, auth=auth, save=True)
with expect_preprint_ingest_request(mock_share_responses, preprint, count=1):
with expect_preprint_ingest_request(mock_share_responses, preprint, count=1, error_response=True):
preprint.update_search()

def test_delete_from_share(self, mock_share_responses):
Expand Down
6 changes: 4 additions & 2 deletions osf/management/commands/make_dummy_pageviews_for_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def _generate_random_countedusage(self, n, max_age):
item_guid=ITEM_GUID,
session_id='freshen by key',
user_is_authenticated=bool(random.randint(0, 1)),
item_public=bool(random.randint(0, 1)),
action_labels=[['view', 'download'][random.randint(0, 1)]],
)

def _run_date_query(self, time_range_filter):
Expand Down Expand Up @@ -103,8 +105,8 @@ def _run_date_query(self, time_range_filter):
},
})
return {
'min': result.aggs['min-timestamp'].value_as_string,
'max': result.aggs['max-timestamp'].value_as_string,
'min': result.aggs['min-timestamp'].value,
'max': result.aggs['max-timestamp'].value,
**{
str(bucket.key.date()): bucket.doc_count
for bucket in result.aggs['by-date']
Expand Down
7 changes: 5 additions & 2 deletions osf/management/commands/monthly_reporters_go.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ def monthly_reporters_go(report_year=None, report_month=None):
)
def monthly_reporter_go(task, reporter_key: str, yearmonth: str):
_reporter_class = AllMonthlyReporters[reporter_key].value
_parsed_yearmonth = YearMonth.from_str(yearmonth)
_reporter_class().run_and_record_for_month(_parsed_yearmonth)
_reporter = _reporter_class(YearMonth.from_str(yearmonth))
_reporter.run_and_record_for_month()
_followup = _reporter.followup_task()
if _followup is not None:
_followup.apply_async()


class Command(BaseCommand):
Expand Down
19 changes: 12 additions & 7 deletions osf/metadata/gather/basket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ class Basket:
def __init__(self, focus: Focus):
assert isinstance(focus, Focus)
self.focus = focus
self.reset() # start with an empty basket (except the focus itself)
self.reset() # start with an empty basket

def reset(self):
self._gathertasks_done = set()
self._known_focus_dict = {}
self._known_focus_dict = {self.focus.iri: {self.focus}}
self.gathered_metadata = rdfutils.contextualized_graph()
self._add_focus_reference(self.focus)

def pls_gather(self, predicate_map): # TODO: async
def pls_gather(self, predicate_map, *, include_defaults=True): # TODO: async
'''go gatherers, go!

@predicate_map: dict with rdflib.URIRef keys
Expand All @@ -48,7 +47,7 @@ def pls_gather(self, predicate_map): # TODO: async
},
})
'''
self._do_gather(self.focus, predicate_map)
self._do_gather(self.focus, predicate_map, include_defaults=include_defaults)

def __getitem__(self, slice_or_arg) -> typing.Iterable[rdflib.term.Node]:
'''convenience for getting values from the basket
Expand Down Expand Up @@ -98,14 +97,20 @@ def _maybe_gather_for_predicate_map(self, iri_or_focus, predicate_map):
else:
raise ValueError(f'expected `iri_or_focus` to be Focus or URIRef (got {iri_or_focus})')

def _do_gather(self, focus, predicate_map):
def _do_gather(self, focus, predicate_map, *, include_defaults=True):
if include_defaults:
self._add_focus_reference(focus)
if not isinstance(predicate_map, dict):
# allow iterable of predicates with no deeper paths
predicate_map = {
predicate_iri: None
for predicate_iri in predicate_map
}
for gatherer in get_gatherers(focus.rdftype, predicate_map.keys()):
for gatherer in get_gatherers(
focus.rdftype,
predicate_map.keys(),
include_focustype_defaults=include_defaults,
):
for (subj, pred, obj) in self._do_a_gathertask(gatherer, focus):
if isinstance(obj, Focus):
self._add_focus_reference(obj)
Expand Down
Loading
Loading