Skip to content

Commit

Permalink
orcid download_transform merge
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Oct 16, 2024
1 parent 0dcb14e commit 67e64c6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import time
from concurrent.futures import as_completed, ProcessPoolExecutor, ThreadPoolExecutor
import datetime
from os import PathLike
import json
import shutil
from typing import Dict, Optional, Tuple, Union

import pendulum
Expand Down Expand Up @@ -110,9 +110,8 @@ def fetch_release(
"fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run"
)
prev_release = api.get_latest_dataset_release(dag_id=dag_id, entity_id="orcid", date_key="changefile_end_date")
logging.info(f"Extra: {prev_release.extra}")
logging.info(f"Type: {type(prev_release.extra)}")
prev_latest_modified_record = pendulum.parse(json.loads(prev_release.extra["latest_modified_record_date"]))
prev_latest_modified_record = pendulum.parse(json.loads(prev_release.extra)["latest_modified_record_date"])
logging.info(f"Proceeding with previous latest modified record date: {prev_latest_modified_record}")
prev_release_end = prev_release.changefile_end_date

return OrcidRelease(
Expand Down Expand Up @@ -314,7 +313,7 @@ def transform(release: dict, max_workers: Optional[int] = None):

release = OrcidRelease.from_dict(release)
if not max_workers:
max_workers = os.cpu_count() * 2
max_workers = os.cpu_count()
logging.info(f"Using {max_workers} processes for transform operation")

total_upsert_records = 0
Expand Down Expand Up @@ -368,6 +367,14 @@ def transform(release: dict, max_workers: Optional[int] = None):
)


def clean_downloads(release: dict):
"""Removes all files in the downloads folder of the release"""
release = OrcidRelease.from_dict(release)

logging.info(f"Removing folder: {release.download_folder}")
shutil.rmtree(release.download_folder)


def upload_transformed(release: dict):
"""Uploads the upsert and delete files to the transform bucket."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,30 +217,21 @@ def latest_modified_record_date(release: dict, **context):
return tasks.latest_modified_record_date(release)

@task.kubernetes(
name="download",
name="download_transform",
container_resources=gke_make_container_resources(
{"memory": "8G", "cpu": "8"}, dag_params.gke_params.gke_resource_overrides.get("download")
{"memory": "64G", "cpu": "16"}, dag_params.gke_params.gke_resource_overrides.get("download_transform")
),
**kubernetes_task_params,
)
def download(release: dict, **context):
"""Reads each batch's manifest and downloads the files from the gcs bucket."""
from academic_observatory_workflows.orcid_telescope import tasks
def download_transform(release: dict, dag_params, **context):
"""Reads each batch's manifest and downloads the files from the gcs bucket.
The download and transform tasks are merged to avoid mounting a disk with millions of files."""

tasks.download(release)

@task.kubernetes(
name="transform",
container_resources=gke_make_container_resources(
{"memory": "32G", "cpu": "16"}, dag_params.gke_params.gke_resource_overrides.get("transform")
),
**kubernetes_task_params,
)
def transform(release: dict, dag_params, **context):
"""Transforms the downloaded files into serveral bigquery-compatible .jsonl files"""
from academic_observatory_workflows.orcid_telescope import tasks

tasks.download(release)
tasks.transform(release, max_workers=dag_params.max_workers)
tasks.clean_downloads(release)

@task.kubernetes(
name="upload_transformed",
Expand Down Expand Up @@ -312,8 +303,7 @@ def cleanup_workflow(release: dict, **context) -> None:
task_bq_create_main_table_snapshot = bq_create_main_table_snapshot(xcom_release, dag_params)
task_create_manifests = create_manifests(xcom_release, dag_params)
xcom_latest_modified_date = latest_modified_record_date(xcom_release)
task_download = download(xcom_release)
task_transform = transform(xcom_release, dag_params)
task_download_transform = download_transform(xcom_release, dag_params)
task_upload_transformed = upload_transformed(xcom_release)
task_bq_load_main_table = bq_load_main_table(xcom_release, dag_params)
task_bq_load_upsert_table = bq_load_upsert_table(xcom_release, dag_params)
Expand Down Expand Up @@ -349,8 +339,7 @@ def cleanup_workflow(release: dict, **context) -> None:
>> task_create_storage
>> task_create_manifests
>> xcom_latest_modified_date
>> task_download
>> task_transform
>> task_download_transform
>> task_upload_transformed
>> task_delete_storage
>> task_bq_load_main_table
Expand Down

0 comments on commit 67e64c6

Please sign in to comment.