Skip to content

Commit

Permalink
Fetch release updates
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 25, 2024
1 parent 3a0cf54 commit f9a2a2d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ class DagParams:
retries: int = 3
gke_image = "us-docker.pkg.dev/keegan-dev/academic-observatory/academic-observatory:latest" # TODO: change this
gke_namespace: str = "coki-astro"
gke_startup_timeout_seconds: int = 60
gke_startup_timeout_seconds: int = 300
gke_volume_name: str = "crossref-metadata"
gke_volume_path: str = "/data"
gke_zone: str = "us-central1"
gke_volume_size: int = 1000
gke_volume_size: int = 2500
kubernetes_conn_id: str = "gke_cluster"
docker_astro_uid: int = 50000

Expand Down Expand Up @@ -159,17 +159,17 @@ def _fetch_release(dag_params: DagParams, **context) -> dict:
return fetch_release(
run_id=context["run_id"],
data_interval_start=context["data_interval_start"],
data_interval_end=context["data_interval_end"],
cloud_workspace=dag_params.cloud_workspace,
crossref_metadata_conn_id=dag_params.crossref_metadata_conn_id,
dag_id=dag_params.dag_id,
start_date=dag_params.start_date,
batch_size=dag_params.batch_size,
)

@task.kubernetes(
name="download",
container_resources=V1ResourceRequirements(
requests={"memory": "2Gi", "cpu": "2"}, limits={"memory": "2Gi", "cpu": "2"}
requests={"memory": "2G", "cpu": "2000m"}, limits={"memory": "2G", "cpu": "2000m"}
),
secrets=[Secret("env", "CROSSREF_METADATA_API_KEY", "crossref-metadata", "api-key")],
**kubernetes_task_params,
Expand All @@ -182,7 +182,7 @@ def _download(release: dict, **context):
@task.kubernetes(
name="upload_download",
container_resources=V1ResourceRequirements(
requests={"memory": "2Gi", "cpu": "2"}, limits={"memory": "2Gi", "cpu": "2"}
requests={"memory": "4Gi", "cpu": "4"}, limits={"memory": "4Gi", "cpu": "4"}
),
**kubernetes_task_params,
)
Expand All @@ -195,7 +195,7 @@ def _upload_downloaded(release: dict, dag_params, **context):
@task.kubernetes(
name="extract",
container_resources=V1ResourceRequirements(
requests={"memory": "2Gi", "cpu": "2"}, limits={"memory": "2Gi", "cpu": "2"}
requests={"memory": "8Gi", "cpu": "8"}, limits={"memory": "8Gi", "cpu": "8"}
),
**kubernetes_task_params,
)
Expand All @@ -207,7 +207,7 @@ def _extract(release: dict, **context):
@task.kubernetes(
name="transform",
container_resources=V1ResourceRequirements(
requests={"memory": "8Gi", "cpu": "8"}, limits={"memory": "8Gi", "cpu": "8"}
requests={"memory": "16Gi", "cpu": "8"}, limits={"memory": "16Gi", "cpu": "8"}
),
**kubernetes_task_params,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,46 @@ def __init__(
dag_id: str,
run_id: str,
snapshot_date: pendulum.DateTime,
data_interval_start: pendulum.DateTime,
data_interval_end: pendulum.DateTime,
cloud_workspace: CloudWorkspace,
):
"""Construct a RorRelease.
"""Construct a Crossref Metadata Release Object.
:param dag_id: the DAG id.
:param run_id: the DAG run id.
:param data_interval_start: The start of the data interval
:param data_interval_end: The end of the data interval
:param snapshot_date: the release date.
:param cloud_workspace: the cloud workspace settings.
"""

super().__init__(dag_id=dag_id, run_id=run_id, snapshot_date=snapshot_date)
self.cloud_workspace = cloud_workspace
self.download_file_name = "crossref_metadata.json.tar.gz"
self.download_file_path = os.path.join(self.download_folder, self.download_file_name)
self.extract_files_regex = r".*\.json$"
self.transform_files_regex = r".*\.jsonl$"
self.data_interval_start = data_interval_start
self.data_interval_end = data_interval_end

@property
def download_file_path(self):
return os.path.join(self.download_folder, self.download_file_name)

@staticmethod
def from_dict(dict_: dict):
dag_id = dict_["dag_id"]
run_id = dict_["run_id"]
snapshot_date = pendulum.parse(dict_["snapshot_date"])
data_interval_start = pendulum.parse(dict_["data_interval_start"])
data_interval_end = pendulum.parse(dict_["data_interval_end"])
cloud_workspace = CloudWorkspace.from_dict(dict_["cloud_workspace"])
return CrossrefMetadataRelease(
dag_id=dag_id,
run_id=run_id,
snapshot_date=snapshot_date,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
cloud_workspace=cloud_workspace,
)

Expand All @@ -46,5 +59,7 @@ def to_dict(self) -> dict:
dag_id=self.dag_id,
run_id=self.run_id,
snapshot_date=self.snapshot_date.to_datetime_string(),
data_interval_start=self.data_interval_start.to_datetime_string(),
data_interval_end=self.data_interval_end.to_datetime_string(),
cloud_workspace=self.cloud_workspace.to_dict(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# Author: Aniek Roelofs, James Diprose, Keegan Smith

from concurrent.futures import as_completed, ProcessPoolExecutor
import datetime
from datetime import datetime
import functools
import json
import logging
Expand Down Expand Up @@ -53,6 +53,7 @@ def fetch_release(
run_id: str,
start_date: str,
data_interval_start: pendulum.DateTime,
data_interval_end: pendulum.DateTime,
) -> dict:
"""Task to retrieve the release for the given start date
Expand All @@ -62,6 +63,7 @@ def fetch_release(
:param run_id: The run ID for the dag run
:param start_date: The earliest date to retrieve a release for
:param data_interval_start: The start of the data interval for this dag run
:param data_interval_end: The end of the data interval for this dag run
:return: The release object in dictionary form
"""

Expand Down Expand Up @@ -90,6 +92,8 @@ def fetch_release(
dag_id=dag_id,
run_id=run_id,
snapshot_date=snapshot_date,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
cloud_workspace=cloud_workspace,
).to_dict()

Expand Down Expand Up @@ -249,15 +253,16 @@ def add_dataset_release(release: dict, *, dag_id: str, cloud_workspace: CloudWor

api = DatasetAPI(bq_project_id=cloud_workspace.project_id, bq_dataset_id=api_bq_dataset_id)
api.seed_db()
now = pendulum.now()
dataset_release = DatasetRelease(
dag_id=dag_id,
entity_id="crossref_metadata",
dag_run_id=release.run_id,
created=pendulum.now(),
modified=pendulum.now(),
created=now,
modified=now,
data_interval_start=release.data_interval_start,
data_interval_end=release.data_interval_end,
partition_date=release.partition_date,
snapshot_date=release.snapshot_date,
)
api.add_dataset_release(dataset_release)

Expand Down

0 comments on commit f9a2a2d

Please sign in to comment.