diff --git a/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/tasks.py b/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/tasks.py index 696411628..962b36c1e 100644 --- a/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/tasks.py +++ b/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/tasks.py @@ -24,8 +24,8 @@ import time from concurrent.futures import as_completed, ProcessPoolExecutor, ThreadPoolExecutor import datetime -from os import PathLike import json +import shutil from typing import Dict, Optional, Tuple, Union import pendulum @@ -110,9 +110,8 @@ def fetch_release( "fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run" ) prev_release = api.get_latest_dataset_release(dag_id=dag_id, entity_id="orcid", date_key="changefile_end_date") - logging.info(f"Extra: {prev_release.extra}") - logging.info(f"Type: {type(prev_release.extra)}") - prev_latest_modified_record = pendulum.parse(json.loads(prev_release.extra["latest_modified_record_date"])) + prev_latest_modified_record = pendulum.parse(json.loads(prev_release.extra)["latest_modified_record_date"]) + logging.info(f"Proceeding with previous latest modified record date: {prev_latest_modified_record}") prev_release_end = prev_release.changefile_end_date return OrcidRelease( @@ -314,7 +313,7 @@ def transform(release: dict, max_workers: Optional[int] = None): release = OrcidRelease.from_dict(release) if not max_workers: - max_workers = os.cpu_count() * 2 + max_workers = os.cpu_count() logging.info(f"Using {max_workers} processes for transform operation") total_upsert_records = 0 @@ -368,6 +367,14 @@ def transform(release: dict, max_workers: Optional[int] = None): ) +def clean_downloads(release: dict): + """Removes all files in the downloads folder of the release""" + release = OrcidRelease.from_dict(release) + + logging.info(f"Removing folder: {release.download_folder}") + shutil.rmtree(release.download_folder) + + def upload_transformed(release: dict): """Uploads the upsert and delete files to the transform bucket.""" diff --git a/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/telescope.py b/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/telescope.py index 746781cb4..4194aa873 100644 --- a/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/telescope.py +++ b/academic-observatory-workflows/academic_observatory_workflows/orcid_telescope/telescope.py @@ -217,30 +217,21 @@ def latest_modified_record_date(release: dict, **context): return tasks.latest_modified_record_date(release) @task.kubernetes( - name="download", + name="download_transform", container_resources=gke_make_container_resources( - {"memory": "8G", "cpu": "8"}, dag_params.gke_params.gke_resource_overrides.get("download") + {"memory": "64G", "cpu": "16"}, dag_params.gke_params.gke_resource_overrides.get("download_transform") ), **kubernetes_task_params, ) - def download(release: dict, **context): - """Reads each batch's manifest and downloads the files from the gcs bucket.""" - from academic_observatory_workflows.orcid_telescope import tasks + def download_transform(release: dict, dag_params, **context): + """Reads each batch's manifest and downloads the files from the gcs bucket. + The download and transform tasks are merged to avoid mounting a disk with millions of files.""" - tasks.download(release) - - @task.kubernetes( - name="transform", - container_resources=gke_make_container_resources( - {"memory": "32G", "cpu": "16"}, dag_params.gke_params.gke_resource_overrides.get("transform") - ), - **kubernetes_task_params, - ) - def transform(release: dict, dag_params, **context): - """Transforms the downloaded files into serveral bigquery-compatible .jsonl files""" from academic_observatory_workflows.orcid_telescope import tasks + tasks.download(release) tasks.transform(release, max_workers=dag_params.max_workers) + tasks.clean_downloads(release) @task.kubernetes( name="upload_transformed", @@ -312,8 +303,7 @@ def cleanup_workflow(release: dict, **context) -> None: task_bq_create_main_table_snapshot = bq_create_main_table_snapshot(xcom_release, dag_params) task_create_manifests = create_manifests(xcom_release, dag_params) xcom_latest_modified_date = latest_modified_record_date(xcom_release) - task_download = download(xcom_release) - task_transform = transform(xcom_release, dag_params) + task_download_transform = download_transform(xcom_release, dag_params) task_upload_transformed = upload_transformed(xcom_release) task_bq_load_main_table = bq_load_main_table(xcom_release, dag_params) task_bq_load_upsert_table = bq_load_upsert_table(xcom_release, dag_params) @@ -349,8 +339,7 @@ def cleanup_workflow(release: dict, **context) -> None: >> task_create_storage >> task_create_manifests >> xcom_latest_modified_date - >> task_download - >> task_transform + >> task_download_transform >> task_upload_transformed >> task_delete_storage >> task_bq_load_main_table