Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gdi 95 add typing and comments in the fdp gdi repo #5

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ckanext/fairdatapoint/harvesters/civity_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def import_stage(self, harvest_object):
str(package_dict['title'])))
self._save_object_error(
'TypeError: error generating package name. Package title [%s] is not a string.' %
(str(package_dict['title']), harvest_object)
str(package_dict['title']), harvest_object
)
return False

Expand Down
38 changes: 26 additions & 12 deletions ckanext/fairdatapoint/harvesters/domain/fair_data_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,49 @@
# File modified by Stichting Health-RI in January 2024 to remove a debugging graph-print function
# All changes are © Stichting Health-RI and are licensed under the AGPLv3 license

import logging
import requests

from rdflib import Graph
from rdflib import Graph, URIRef
from rdflib.exceptions import ParserError
from requests.exceptions import HTTPError, ConnectionError, Timeout, RequestException
from typing import Union

log = logging.getLogger(__name__)


class FairDataPoint:
"""Class to connect and get data from FDP"""

def __init__(self, fdp_end_point):
def __init__(self, fdp_end_point: str):
self.fdp_end_point = fdp_end_point

def get_graph(self, path):
def get_graph(self, path: Union[str, URIRef]) -> Graph:
"""
Get graph from FDP at specified path. Not using function to load graph from endpoint directly since this
function fails because of a certificate error. The library it uses probably has no certificates which would
have to be added to a trust store. But this is inconvenient in case of a new harvester which refers to an
endpoint whose certificate is not in the trust store yet.
"""
try:
g = Graph().parse(data=self._get_data(path))
except ParserError as e:
g = None

return g
graph = Graph()
data = self._get_data(path)
if data is None:
log.warning(f"No data was received from FDP {self.fdp_end_point} request {path}")
else:
try:
graph.parse(data=data)
except ParserError as e:
log.error(f"Record {data} could not be parsed: {e}")
return graph

@staticmethod
def _get_data(path):
def _get_data(path: Union[str, URIRef]) -> Union[str, None]:
headers = {
'Accept': 'text/turtle'
}
response = requests.request("GET", path, headers=headers)
return response.text
try:
response = requests.request("GET", path, headers=headers)
response.raise_for_status()
return response.text
except (HTTPError, ConnectionError, Timeout, RequestException) as e:
log.error(f"FDP query {path} was not successful: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,25 @@
from ckanext.fairdatapoint.harvesters.domain.identifier import Identifier
from ckanext.fairdatapoint.harvesters.domain.fair_data_point import FairDataPoint

from rdflib import Namespace, URIRef, Literal
from rdflib.namespace import RDF
from rdflib import Namespace, URIRef, Literal, DCAT, DCTERMS, Graph, RDF
from rdflib.term import Node
from typing import Dict, Iterable, Union

DC_TERMS = 'http://purl.org/dc/terms/'
DC_TERMS_DESCRIPTION = DC_TERMS + 'description'
DC_TERMS_FORMAT = DC_TERMS + 'format'
DC_TERMS_LICENSE = DC_TERMS + 'license'
DC_TERMS_TITLE = DC_TERMS + 'title'

DCAT = 'http://www.w3.org/ns/dcat#'
DCAT_ACCESS_URL = DCAT + 'accessURL'
DCAT_CONTACT_POINT = DCAT + 'contactPoint'

LDP = 'http://www.w3.org/ns/ldp#'
LDP = Namespace('http://www.w3.org/ns/ldp#')
VCARD = Namespace('http://www.w3.org/2006/vcard/ns#')

log = logging.getLogger(__name__)


class FairDataPointRecordProvider:



class FairDataPointRecordProvider():
dcat = Namespace(DCAT)
ldp = Namespace(LDP)

def __init__(self, fdp_end_point):
def __init__(self, fdp_end_point: str):
self.fair_data_point = FairDataPoint(fdp_end_point)

def get_record_ids(self):
def get_record_ids(self) -> Dict.keys:
"""
Return all the FDP records which should end up as packages in CKAN to populate the "guids_in_harvest" list
Returns all the FDP records which should end up as packages in CKAN to populate the "guids_in_harvest" list
https://rdflib.readthedocs.io/en/stable/intro_to_parsing.html
"""
log.debug('FAIR Data Point get_records from {}'.format(self.fair_data_point.fdp_end_point))
Expand All @@ -50,44 +38,43 @@ def get_record_ids(self):

fdp_graph = self.fair_data_point.get_graph(self.fair_data_point.fdp_end_point)

contains_predicate = URIRef(LDP + 'contains')
contains_predicate = LDP.contains
for contains_object in fdp_graph.objects(predicate=contains_predicate):
result.update(self._process_catalog(str(contains_object)))

return result.keys()

def _process_catalog(self, path):
def _process_catalog(self, path: Union[str, URIRef]) -> Dict:
result = dict()

catalogs_graph = self.fair_data_point.get_graph(path)

if catalogs_graph is not None:
for catalog_subject in catalogs_graph.subjects(RDF.type, self.dcat.Catalog):
identifier = Identifier('')
for catalog_subject in catalogs_graph.subjects(RDF.type, DCAT.Catalog):
identifier = Identifier('')

identifier.add('catalog', str(catalog_subject))
identifier.add('catalog', str(catalog_subject))

result[identifier.guid] = catalog_subject
result[identifier.guid] = catalog_subject

catalog_graph = self.fair_data_point.get_graph(catalog_subject)
catalog_graph = self.fair_data_point.get_graph(catalog_subject)

dataset_predicate = URIRef(DCAT + 'dataset')
for dataset_subject in catalog_graph.objects(predicate=dataset_predicate):
identifier = Identifier('')
for dataset_subject in catalog_graph.objects(predicate=DCAT.dataset):
identifier = Identifier('')

identifier.add('catalog', str(catalog_subject))
identifier.add('catalog', str(catalog_subject))

identifier.add('dataset', str(dataset_subject))
identifier.add('dataset', str(dataset_subject))

result[identifier.guid] = dataset_subject
result[identifier.guid] = dataset_subject

return result

def get_record_by_id(self, guid):
def get_record_by_id(self, guid: str) -> str:
"""
Get additional information for FDP record.
"""
log.debug('FAIR data point get_record_by_id from {} for {}'.format(self.fair_data_point.fdp_end_point, guid))
log.debug(
'FAIR data point get_record_by_id from {} for {}'.format(self.fair_data_point.fdp_end_point, guid))

identifier = Identifier(guid)

Expand All @@ -97,41 +84,38 @@ def get_record_by_id(self, guid):

subject_uri = URIRef(subject_url)

distribution_predicate_uri = URIRef(DCAT + 'distribution')

# Add information from distribution to graph
for distribution_uri in g.objects(subject=subject_uri, predicate=distribution_predicate_uri):
for distribution_uri in g.objects(subject=subject_uri, predicate=DCAT.distribution):
distribution_g = self.fair_data_point.get_graph(distribution_uri)

distribution = URIRef(distribution_uri)

for predicate in [
DC_TERMS_DESCRIPTION,
DC_TERMS_FORMAT,
DC_TERMS_LICENSE,
DC_TERMS_TITLE,
DCAT_ACCESS_URL
DCTERMS.description,
DCTERMS.format,
DCTERMS.license,
DCTERMS.title,
DCAT.accessURL
]:
for literal in self.get_values(distribution_g, distribution_uri, predicate):
g.add((distribution, URIRef(predicate), literal))
for distr_attribute_value in self.get_values(distribution_g, distribution_uri, predicate):
g.add((distribution_uri, predicate, distr_attribute_value))

# Look-up contact information
contact_point_predicate_uri = URIRef(DCAT_CONTACT_POINT)
for contact_point_uri in self.get_values(g, subject_uri, contact_point_predicate_uri):
for contact_point_uri in self.get_values(g, subject_uri, DCAT.contactPoint):
if 'orcid' in contact_point_uri:
orcid_response = requests.get(contact_point_uri + '/public-record.json')
orcid_response = requests.get(str(contact_point_uri) + '/public-record.json')
json_orcid_response = orcid_response.json()
name = json_orcid_response['displayName']
name_literal = Literal(name)
g.add((subject_uri, URIRef('http://www.w3.org/2006/vcard/ns#fn'), name_literal))
g.add((subject_uri, VCARD.fn, name_literal))
# TODO add original Orcid URL in a field

result = g.serialize(format='ttl')

return result

@staticmethod
def get_values(graph, subject, predicate):
def get_values(graph: Graph,
subject: Union[str, URIRef, Node],
predicate: Union[str, URIRef, Node]) -> Iterable[Node]:
subject_uri = URIRef(subject)
predicate_uri = URIRef(predicate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
log = logging.getLogger(__name__)


class FairDataPointRecordToPackageConverter():
class FairDataPointRecordToPackageConverter:

def __init__(self, profile):
def __init__(self, profile: str):
self.profile = profile

def record_to_package(self, guid, record):
def record_to_package(self, guid: str, record: str):
parser = FairDataPointRDFParser(profiles=[self.profile])

try:
Expand Down
14 changes: 7 additions & 7 deletions ckanext/fairdatapoint/harvesters/domain/identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ class IdentifierException(Exception):

class Identifier:

def __init__(self, guid):
self.guid = guid
def __init__(self, guid: str):
self.guid: str = guid

def add(self, id_type, id_value):
def add(self, id_type: str, id_value: str):
if len(self.guid) > 0:
self.guid += SEPARATOR

self.guid += id_type + KEY_VALUE_SEPARATOR + id_value

def get_id_type(self):
def get_id_type(self) -> str:
return self.get_part(0)

def get_id_value(self):
def get_id_value(self) -> str:
return self.get_part(1)

def get_part(self, index):
def get_part(self, index: int) -> str:
key_values = self.guid.split(SEPARATOR)

if len(key_values) > 0:
# Get the last one, that's the one we are interested in
key_value = key_values[len(key_values) - 1].split(KEY_VALUE_SEPARATOR)
key_value = key_values[-1].split(KEY_VALUE_SEPARATOR)
if len(key_value) == 2:
result = key_value[index]
else:
Expand Down
10 changes: 5 additions & 5 deletions ckanext/fairdatapoint/processors.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from rdflib import RDF, Namespace
from rdflib import RDF, DCAT
from rdflib.term import Node
from typing import Iterable, Dict

from ckanext.dcat.processors import RDFParser

DCAT = Namespace("http://www.w3.org/ns/dcat#")


class FairDataPointRDFParser(RDFParser):

def _catalogs(self):
def _catalogs(self) -> Iterable[Node]:
"""
Generator that returns all DCAT catalogs on the graph

Expand All @@ -17,7 +17,7 @@ def _catalogs(self):
for catalog in self.g.subjects(RDF.type, DCAT.Catalog):
yield catalog

def catalogs(self):
def catalogs(self) -> Iterable[Dict]:
"""
Generator that returns CKAN catalogs parsed from the RDF graph

Expand Down
16 changes: 10 additions & 6 deletions ckanext/fairdatapoint/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,26 @@
from ckan.plugins import toolkit
from ckan import model
import json
from typing import Dict
from rdflib import URIRef

def _convert_extras_to_declared_schema_fields(dataset_dict):
'''

def _convert_extras_to_declared_schema_fields(dataset_dict: Dict) -> Dict:
"""
Compares the extras dictionary with the declared schema.
Updates the declared schema fields with the values that match from the extras.
Remove the extras that are present on the declared schema.
:param dataset_dict:
:return: dataset_dict - Updated dataset_dict
'''
"""
# Use the correct dataset type, Defaults to 'dataset'
dataset_type = dataset_dict.get('type', 'dataset')
# Gets the full Schema definition of the correct dataset type
context = {'model': model, 'session': model.Session}
data_dict = {'type': dataset_type}
full_schema_dict = toolkit.get_action('scheming_dataset_schema_show')(context, data_dict)

dataset_fields = { x.get('field_name') : x.get('preset') for x in full_schema_dict.get('dataset_fields', []) }
dataset_fields = {x.get('field_name'): x.get('preset') for x in full_schema_dict.get('dataset_fields', [])}

# Populate the declared schema fields, if they are present in the extras
for extra_dict in dataset_dict.get('extras', []):
Expand All @@ -46,14 +49,15 @@ class FAIRDataPointDCATAPProfile(EuropeanDCATAP2Profile):
An RDF profile for FAIR data points
"""

def parse_dataset(self, dataset_dict, dataset_ref):
def parse_dataset(self, dataset_dict: Dict, dataset_ref: URIRef) -> Dict:
super(FAIRDataPointDCATAPProfile, self).parse_dataset(dataset_dict, dataset_ref)

dataset_dict = _convert_extras_to_declared_schema_fields(dataset_dict)

# Example of adding a field
dataset_dict['extras'].append({'key': 'hello',
'value': "Hello from the FAIR data point profile. Use this function to do FAIR data point specific stuff during the import stage"})
'value': "Hello from the FAIR data point profile. Use this function to do "
"FAIR data point specific stuff during the import stage"})

return dataset_dict

Expand Down
20 changes: 10 additions & 10 deletions ckanext/fairdatapoint/tests/test_fair_data_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_fdp_get_graph_parsing_error(self, mocker):
actual = fdp.get_graph("some_path")
assert fdp_get_data.call_count == 1
assert pytest.raises(ParserError)
assert actual is None
assert to_isomorphic(actual) == to_isomorphic(Graph())

def test_fdp_get_graph_pass_empty(self, mocker):
fdp_get_data = mocker.MagicMock(name="get_data")
Expand All @@ -70,13 +70,13 @@ def test_fdp_get_graph_pass_empty(self, mocker):
assert fdp_get_data.call_count == 1
assert to_isomorphic(actual) == to_isomorphic(Graph())

@pytest.mark.xfail(raises=ValueError)
# @pytest.mark.xpassed(raises=ValueError)
a-nayden marked this conversation as resolved.
Show resolved Hide resolved
def test_fdp_get_graph_pass_none(self, mocker):
with pytest.raises(ValueError):
fdp_get_data = mocker.MagicMock(name="get_data")
mocker.patch("ckanext.fairdatapoint.harvesters.domain.fair_data_point.FairDataPoint._get_data",
new=fdp_get_data)
fdp_get_data.return_value = None
fdp = FairDataPoint("some endpoint")
actual = fdp.get_graph("some_path")
assert fdp_get_data.call_count == 1
fdp_get_data = mocker.MagicMock(name="get_data")
mocker.patch("ckanext.fairdatapoint.harvesters.domain.fair_data_point.FairDataPoint._get_data",
new=fdp_get_data)
fdp_get_data.return_value = None
fdp = FairDataPoint("some endpoint")
actual = fdp.get_graph("some_path")
assert fdp_get_data.call_count == 1
assert to_isomorphic(actual) == to_isomorphic(Graph())
Loading