Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenAlex based OA Dashboard #218

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airflow.sensors.external_task import ExternalTaskSensor
from glom import Coalesce, glom, SKIP
from jinja2 import Template

from observatory.platform.airflow import get_airflow_connection_password
from observatory.platform.bigquery import (
bq_create_dataset,
Expand All @@ -62,15 +63,16 @@
SnapshotRelease,
Workflow,
)

from academic_observatory_workflows.clearbit import clearbit_download_logo
from academic_observatory_workflows.config import project_path, Tag
from academic_observatory_workflows.github import trigger_repository_dispatch
from academic_observatory_workflows.oa_dashboard_workflow.institution_ids import INSTITUTION_IDS
from academic_observatory_workflows.wikipedia import fetch_wikipedia_descriptions
from academic_observatory_workflows.zenodo import make_draft_version, publish_new_version, Zenodo
from academic_observatory_workflows.doi_workflow.doi_workflow import ror_to_ror_hierarchy_index

INCLUSION_THRESHOLD = {"country": 5, "institution": 50}
AGGREGATION_FIELD = {"country": "country_codes", "institution": "ancestor_ror_ids"}
MAX_REPOSITORIES = 200
START_YEAR = 2000
END_YEAR = pendulum.now().year - 1
Expand Down Expand Up @@ -117,7 +119,7 @@ def __init__(
snapshot_date: pendulum.DateTime,
input_project_id: str,
output_project_id: str,
bq_agg_dataset_id: str,
bq_openalex_dataset_id: str,
bq_ror_dataset_id: str,
bq_settings_dataset_id: str,
bq_oa_dashboard_dataset_id: str,
Expand All @@ -129,7 +131,7 @@ def __init__(
:param snapshot_date: the release date.
:param input_project_id: the ID of the Google Cloud project where data will be pulled from.
:param output_project_id: the ID of the Google Cloud project where data will be written to.
:param bq_agg_dataset_id: the id of the BigQuery dataset where the Academic Observatory aggregated data lives.
:param bq_openalex_dataset_id: the id of the BigQuery dataset where the Academic Observatory aggregated data lives.
:param bq_ror_dataset_id: the id of the BigQuery dataset containing the ROR table.
:param bq_settings_dataset_id: the id of the BigQuery settings dataset, which contains the country table.
:param bq_oa_dashboard_dataset_id: the id of the BigQuery dataset where the tables produced by this workflow will be created.
Expand All @@ -140,7 +142,7 @@ def __init__(
self.output_project_id = output_project_id
self.bq_ror_dataset_id = bq_ror_dataset_id
self.bq_settings_dataset_id = bq_settings_dataset_id
self.bq_agg_dataset_id = bq_agg_dataset_id
self.bq_openalex_dataset_id = bq_openalex_dataset_id
self.bq_oa_dashboard_dataset_id = bq_oa_dashboard_dataset_id

@property
Expand Down Expand Up @@ -169,23 +171,42 @@ def ror_table_id(self):
sharded=True,
)

@functools.cached_property
def openalex_sources_table_id(self):
return bq_table_id(self.input_project_id, self.bq_openalex_dataset_id, "sources")

@functools.cached_property
def openalex_sources_table_id(self):
return bq_table_id(self.input_project_id, self.bq_openalex_dataset_id, "sources")

@functools.cached_property
def openalex_institutions_table_id(self):
return bq_table_id(self.input_project_id, self.bq_openalex_dataset_id, "institutions")

@functools.cached_property
def openalex_works_table_id(self):
return bq_table_id(self.input_project_id, self.bq_openalex_dataset_id, "works")

@functools.cached_property
def repository_table_id(self):
return bq_table_id(self.input_project_id, self.bq_settings_dataset_id, "repository")

@functools.cached_property
def country_table_id(self):
return bq_table_id(self.input_project_id, self.bq_settings_dataset_id, "country")

def observatory_agg_table_id(self, table_name: str):
return bq_select_latest_table(
table_id=bq_table_id(self.input_project_id, self.bq_agg_dataset_id, table_name),
end_date=self.snapshot_date,
sharded=True,
)

@functools.cached_property
def institution_ids_table_id(self):
return bq_sharded_table_id(
self.output_project_id, self.bq_oa_dashboard_dataset_id, "institution_ids", self.snapshot_date
)

@functools.cached_property
def ror_hierarchy_table_id(self):
return bq_sharded_table_id(
self.output_project_id, self.bq_oa_dashboard_dataset_id, "ror_hierarchy", self.snapshot_date
)

def oa_dashboard_table_id(self, table_name: str):
return bq_sharded_table_id(
self.output_project_id, self.bq_oa_dashboard_dataset_id, table_name, self.snapshot_date
Expand Down Expand Up @@ -257,9 +278,9 @@ def __init__(
cloud_workspace: CloudWorkspace,
data_bucket: str,
conceptrecid: int,
doi_dag_id: str = "doi",
external_dag_id: str = "openalex",
entity_types: List[str] = None,
bq_agg_dataset_id: str = "observatory",
bq_openalex_dataset_id: str = "openalex",
bq_ror_dataset_id: str = "ror",
bq_settings_dataset_id: str = "settings",
bq_oa_dashboard_dataset_id: str = "oa_dashboard",
Expand All @@ -278,9 +299,9 @@ def __init__(
:param data_bucket: the Google Cloud Storage bucket where image data should be stored.
:param conceptrecid: the Zenodo Concept Record ID for the COKI Open Access Dataset. The Concept Record ID is
the last set of numbers from the Concept DOI.
:param doi_dag_id: the DAG id to wait for.
:param external_dag_id: the DAG id to wait for.
:param entity_types: the table names.
:param bq_agg_dataset_id: the id of the BigQuery dataset where the Academic Observatory aggregated data lives.
:param bq_openalex_dataset_id: the id of the BigQuery dataset where the Academic Observatory aggregated data lives.
:param bq_ror_dataset_id: the id of the BigQuery dataset containing the ROR table.
:param bq_settings_dataset_id: the id of the BigQuery settings dataset, which contains the country table.
:param bq_oa_dashboard_dataset_id: the id of the BigQuery dataset where the tables produced by this workflow will be created.
Expand Down Expand Up @@ -309,7 +330,7 @@ def __init__(
self.input_project_id = cloud_workspace.input_project_id
self.output_project_id = cloud_workspace.output_project_id
self.data_bucket = data_bucket
self.bq_agg_dataset_id = bq_agg_dataset_id
self.bq_openalex_dataset_id = bq_openalex_dataset_id
self.bq_ror_dataset_id = bq_ror_dataset_id
self.bq_settings_dataset_id = bq_settings_dataset_id
self.bq_oa_dashboard_dataset_id = bq_oa_dashboard_dataset_id
Expand All @@ -324,11 +345,12 @@ def __init__(
self.zenodo: Optional[Zenodo] = None

self.add_operator(
ExternalTaskSensor(task_id=f"{doi_dag_id}_sensor", external_dag_id=doi_dag_id, mode="reschedule")
ExternalTaskSensor(task_id=f"{external_dag_id}_sensor", external_dag_id=external_dag_id, mode="reschedule")
)
self.add_setup_task(self.check_dependencies)
self.add_task(self.make_bq_datasets)
self.add_task(self.upload_institution_ids)
self.add_task(self.create_ror_hierarchy_table)
self.add_task(self.create_entity_tables)
self.add_task(
self.add_wiki_descriptions,
Expand Down Expand Up @@ -378,7 +400,7 @@ def make_release(self, **kwargs) -> OaDashboardRelease:
output_project_id=self.output_project_id,
bq_ror_dataset_id=self.bq_ror_dataset_id,
bq_settings_dataset_id=self.bq_settings_dataset_id,
bq_agg_dataset_id=self.bq_agg_dataset_id,
bq_openalex_dataset_id=self.bq_openalex_dataset_id,
bq_oa_dashboard_dataset_id=self.bq_oa_dashboard_dataset_id,
)

Expand All @@ -403,6 +425,25 @@ def upload_institution_ids(self, release: OaDashboardRelease, **kwargs):
)
set_task_state(success, self.upload_institution_ids.__name__, release)

def create_ror_hierarchy_table(self, release: OaDashboardRelease, **kwargs):
"""Create the ROR hierarchy table."""

# Fetch latest ROR table
ror_table_id = release.ror_table_id
ror = [dict(row) for row in bq_run_query(f"SELECT * FROM {ror_table_id}")]

# Create ROR hierarchy table
index = ror_to_ror_hierarchy_index(ror)

# Convert to list of dictionaries
records = []
for child_id, ancestor_ids in index.items():
records.append({"child_id": child_id, "ror_ids": [child_id] + list(ancestor_ids)})

# Upload to intermediate table
success = bq_load_from_memory(release.ror_hierarchy_table_id, records)
set_task_state(success, self.create_ror_hierarchy_table.__name__)

def create_entity_tables(self, release: OaDashboardRelease, **kwargs):
"""Create the country and institution tables"""

Expand All @@ -413,19 +454,29 @@ def create_entity_tables(self, release: OaDashboardRelease, **kwargs):
template_path = project_path("oa_dashboard_workflow", "sql", f"{entity_type}.sql.jinja2")
sql = render_template(
template_path,
agg_table_id=release.observatory_agg_table_id(entity_type),
start_year=START_YEAR,
end_year=END_YEAR,
# OpenAlex tables
openalex_sources_table_id=release.openalex_sources_table_id,
openalex_institutions_table_id=release.openalex_institutions_table_id,
openalex_works_table_id=release.openalex_works_table_id,
# Other tables
ror_table_id=release.ror_table_id,
repository_table_id=release.repository_table_id,
ror_hierarchy_table_id=release.ror_hierarchy_table_id,
country_table_id=release.country_table_id,
institution_ids_table_id=release.institution_ids_table_id,
# Fields
max_repositories=MAX_REPOSITORIES,
aggregation_field=AGGREGATION_FIELD[entity_type],
start_year=START_YEAR,
end_year=END_YEAR,
inclusion_threshold=INCLUSION_THRESHOLD[entity_type],
)
dst_table_id = release.oa_dashboard_table_id(entity_type)
queries.append((sql, dst_table_id))

# Run queries, saving to BigQuery
for (sql, dst_table_id) in queries:
print(sql)
success = bq_create_table_from_query(sql=sql, table_id=dst_table_id)
results.append(success)

Expand Down Expand Up @@ -529,7 +580,7 @@ def export_tables(self, release: OaDashboardRelease, **kwargs):
destination_uri = f"gs://{self.cloud_workspace.download_bucket}/{blob_prefix}/{entity_type}-data-*.jsonl.gz"
table_id = release.oa_dashboard_table_id(entity_type)
success = bq_query_to_gcs(
query=f"SELECT * FROM {table_id} ORDER BY stats.p_outputs_open DESC", # Uses a query to export data to make sure it is in the correct order
query=f"SELECT * FROM {table_id} ORDER BY oa_status.open.percent DESC", # Uses a query to export data to make sure it is in the correct order
project_id=self.output_project_id,
destination_uri=destination_uri,
)
Expand Down Expand Up @@ -745,18 +796,9 @@ def oa_dashboard_subset(item: Dict) -> Dict:
"country_code": Coalesce("country_code", default=SKIP),
"country_name": Coalesce("country_name", default=SKIP),
"institution_type": Coalesce("institution_type", default=SKIP),
"acronyms": "acronyms",
"stats": {
"n_outputs": "stats.n_outputs",
"n_outputs_open": "stats.n_outputs_open",
"n_outputs_black": "stats.n_outputs_black",
"p_outputs_open": "stats.p_outputs_open",
"p_outputs_publisher_open_only": "stats.p_outputs_publisher_open_only",
"p_outputs_both": "stats.p_outputs_both",
"p_outputs_other_platform_open_only": "stats.p_outputs_other_platform_open_only",
"p_outputs_closed": "stats.p_outputs_closed",
"p_outputs_black": "stats.p_outputs_black",
},
"n_citations": "n_citations",
"n_outputs": "n_outputs",
"oa_status": "oa_status",
}

return glom(item, subset_spec)
Expand All @@ -774,8 +816,11 @@ def zenodo_subset(item: Dict):
"start_year": "start_year",
"end_year": "end_year",
"acronyms": "acronyms",
"stats": "stats",
"n_citations": "n_citations",
"n_outputs": "n_outputs",
"oa_status": "oa_status",
"years": "years",
"repositories": "repositories",
}

return glom(item, subset_spec)
Expand Down Expand Up @@ -898,9 +943,9 @@ def make_entity_stats(entities: List[Dict]) -> EntityStats:
:return: the entity stats object.
"""

p_outputs_open = np.array([entity["stats"]["p_outputs_open"] for entity in entities])
n_outputs = np.array([entity["stats"]["n_outputs"] for entity in entities])
n_outputs_open = np.array([entity["stats"]["n_outputs_open"] for entity in entities])
p_outputs_open = np.array([entity["oa_status"]["open"]["percent"] for entity in entities])
n_outputs = np.array([entity["n_outputs"] for entity in entities])
n_outputs_open = np.array([entity["oa_status"]["open"]["total"] for entity in entities])

# Make median, min and max values
stats_median = dict(p_outputs_open=statistics.median(p_outputs_open))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
CAST(SUM(n_citations) AS FLOAT64) AS n_citations,
CAST(COUNT(*) AS FLOAT64) AS n_outputs,
STRUCT(
STRUCT(
CAST(SUM(CAST(oa_status.open AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.open AS INT64)), COUNT(*)) as percent
) AS open,

STRUCT(
CAST(SUM(CAST(oa_status.closed AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.closed AS INT64)), COUNT(*)) as percent
) AS closed,

STRUCT(
CAST(SUM(CAST(oa_status.publisher AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.publisher AS INT64)), COUNT(*)) as percent
) AS publisher,

STRUCT(
CAST(SUM(CAST(oa_status.other_platform AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform AS INT64)), COUNT(*)) as percent
) AS other_platform,

STRUCT(
CAST(SUM(CAST(oa_status.publisher_only AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.publisher_only AS INT64)), COUNT(*)) as percent
) AS publisher_only,

STRUCT(
CAST(SUM(CAST(oa_status.both AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.both AS INT64)), COUNT(*)) as percent
) AS both,

STRUCT(
CAST(SUM(CAST(oa_status.other_platform_only AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform_only AS INT64)), COUNT(*)) as percent
) AS other_platform_only,

STRUCT(
STRUCT(
CAST(SUM(CAST(oa_status.publisher_categories.oa_journal AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.publisher_categories.oa_journal AS INT64)), SUM(CAST(oa_status.publisher AS INT64))) as percent
) AS oa_journal,

STRUCT(
CAST(SUM(CAST(oa_status.publisher_categories.hybrid AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.publisher_categories.hybrid AS INT64)), SUM(CAST(oa_status.publisher AS INT64))) as percent
) AS hybrid,

STRUCT(
CAST(SUM(CAST(oa_status.publisher_categories.no_guarantees AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.publisher_categories.no_guarantees AS INT64)), SUM(CAST(oa_status.publisher AS INT64))) as percent
) AS no_guarantees
) AS publisher_categories,

STRUCT(
STRUCT(
CAST(SUM(CAST(oa_status.other_platform_categories.preprint AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform_categories.preprint AS INT64)), SUM(CAST(oa_status.other_platform AS INT64))) as percent
) AS preprint,

STRUCT(
CAST(SUM(CAST(oa_status.other_platform_categories.domain AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform_categories.domain AS INT64)), SUM(CAST(oa_status.other_platform AS INT64))) as percent
) AS domain,

STRUCT(
CAST(SUM(CAST(oa_status.other_platform_categories.institution AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform_categories.institution AS INT64)), SUM(CAST(oa_status.other_platform AS INT64))) as percent
) AS institution,

STRUCT(
CAST(SUM(CAST(oa_status.other_platform_categories.public AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform_categories.public AS INT64)), SUM(CAST(oa_status.other_platform AS INT64))) as percent
) AS public,

STRUCT(
CAST(SUM(CAST(oa_status.other_platform_categories.other_internet AS INT64)) AS FLOAT64) as total,
calcPercent(SUM(CAST(oa_status.other_platform_categories.other_internet AS INT64)), SUM(CAST(oa_status.other_platform AS INT64))) as percent
) AS other_internet
) as other_platform_categories

) AS oa_status
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{% include 'functions.sql.jinja2' with context %}
{% include 'data.sql.jinja2' with context %}

SELECT
Expand All @@ -15,19 +16,22 @@ SELECT
REGEXP_REPLACE(country.wikipedia_url, r'^http://', 'https://') AS wikipedia_url,
country.region as region,
country.subregion as subregion,
(SELECT MIN(year_struct.year) FROM UNNEST(years_agg.years) as year_struct) as start_year,
(SELECT MAX(year_struct.year) FROM UNNEST(years_agg.years) as year_struct) as end_year,
aggregate_data.stats as stats,
(SELECT MIN(publication_year) FROM UNNEST(years_agg.years)) as start_year,
(SELECT MAX(publication_year) FROM UNNEST(years_agg.years)) as end_year,
aggregate_data.n_citations,
aggregate_data.n_outputs,
aggregate_data.oa_status,
years_agg.years,
CASE
WHEN country.alpha3 = 'GBR' THEN ARRAY<STRING>[country.alpha3, country.alpha2, 'UK']
ELSE ARRAY<STRING>[country.alpha3, country.alpha2]
END as acronyms,
repositories.repositories
entity_repositories.repositories
FROM `{{ country_table_id }}` as country
LEFT JOIN aggregate_data ON aggregate_data.id = country.alpha3
LEFT JOIN years_agg ON years_agg.id = country.alpha3
LEFT JOIN repositories ON repositories.id = country.alpha3
WHERE ARRAY_LENGTH(years) > 0
AND stats.n_outputs >= {{ inclusion_threshold }}
ORDER BY stats.p_outputs_open DESC
LEFT JOIN aggregate_data ON aggregate_data.id = country.alpha2
LEFT JOIN years_agg ON years_agg.id = country.alpha2
LEFT JOIN entity_repositories ON entity_repositories.id = country.alpha2
WHERE
ARRAY_LENGTH(years) > 0
AND n_outputs >= {{ inclusion_threshold }}
ORDER BY oa_status.open.percent DESC
Loading
Loading