Skip to content

Commit

Permalink
Moved some tests arounds
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 25, 2024
1 parent f9a2a2d commit c1349eb
Show file tree
Hide file tree
Showing 21 changed files with 50 additions and 125 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ USER astro

# Install Observatory Platform
# RUN git clone --branch feature/astro-refactor https://github.com/The-Academic-Observatory/observatory-platform.git
RUN git clone --branch keegan-astro https://github.com/The-Academic-Observatory/observatory-platform.git
RUN git clone --branch feature/astro_kubernetes https://github.com/The-Academic-Observatory/observatory-platform.git
RUN pip install -e ./observatory-platform --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-no-providers-3.10.txt

# Install Academic Observatory Workflows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from dataclasses import dataclass
import os
from typing import Optional

import pendulum
from airflow import DAG
Expand All @@ -33,6 +34,37 @@
from observatory_platform.airflow.workflow import CloudWorkspace


class _TaskResources:
def __init__(self, overrides: Optional[dict] = None):
"""Describes the resources for task containers
:param overides: Optionally provide a custom resource definition for a task. For example, to override the
defaults for the download task, provide {"download": {"memory": "1G", "cpu": "500m"}}"""

task_resources = {
"download": {"memory": "2G", "cpu": "2000m"},
"upload_downloaded": {"memory": "4Gi", "cpu": "4"},
"extract": {"memory": "8Gi", "cpu": "8"},
"transform": {"memory": "16G", "cpu": "16"},
"upload_transformed": {"memory": "8G", "cpu": "8"},
}
if overrides:
for task, resources in overrides.items():
task_resources[task] = resources

self.download = V1ResourceRequirements(requests=task_resources["download"], limits=task_resources["download"])
self.upload_downloaded = V1ResourceRequirements(
requests=task_resources["upload_downloaded"], limits=task_resources["upload_downloaded"]
)
self.extract = V1ResourceRequirements(requests=task_resources["extract"], limits=task_resources["extract"])
self.transform = V1ResourceRequirements(
requests=task_resources["transform"], limits=task_resources["transform"]
)
self.upload_transformed = V1ResourceRequirements(
requests=task_resources["upload_transformed"], limits=task_resources["upload_transformed"]
)


@dataclass
class DagParams:
"""Parameters for the Crossref Metadata Telescope
Expand Down Expand Up @@ -85,35 +117,26 @@ class DagParams:
gke_volume_path: str = "/data"
gke_zone: str = "us-central1"
gke_volume_size: int = 2500
gke_resource_overrides: Optional[dict] = None
kubernetes_conn_id: str = "gke_cluster"
docker_astro_uid: int = 50000


def create_dag(*, dag_params: DagParams) -> DAG:
"""The Crossref Metadata DAG"""

task_resources = _TaskResources(overrides=dag_params.gke_resource_overrides)
# Common @task.kubernetes params
volume_mounts = [k8s.V1VolumeMount(mount_path=dag_params.gke_volume_path, name=dag_params.gke_volume_name)]
volumes = [
k8s.V1Volume(
name=dag_params.gke_volume_name,
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=dag_params.gke_volume_name),
)
]
kubernetes_task_params = dict(
image=dag_params.gke_image,
# init-container is used to apply the astro:astro owner to the /data directory
init_containers=[
k8s.V1Container(
name="init-container",
image="ubuntu",
command=[
"sh",
"-c",
f"useradd -u {dag_params.docker_astro_uid} astro && chown -R astro:astro /data",
],
volume_mounts=volume_mounts,
security_context=k8s.V1PodSecurityContext(fs_group=0, run_as_group=0, run_as_user=0),
)
],
# TODO: supposed to make makes pod run as astro user so that it has access to /data volume
# It doesn't seem to work
security_context=k8s.V1PodSecurityContext(
# fs_user=docker_astro_uid,
fs_group=dag_params.docker_astro_uid,
fs_group_change_policy="OnRootMismatch",
run_as_group=dag_params.docker_astro_uid,
Expand All @@ -127,12 +150,7 @@ def create_dag(*, dag_params: DagParams) -> DAG:
namespace=dag_params.gke_namespace,
startup_timeout_seconds=dag_params.gke_startup_timeout_seconds,
env_vars={"DATA_PATH": dag_params.gke_volume_path},
volumes=[
k8s.V1Volume(
name=dag_params.gke_volume_name,
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=dag_params.gke_volume_name),
)
],
volumes=volumes,
volume_mounts=volume_mounts,
)

Expand Down Expand Up @@ -168,9 +186,7 @@ def _fetch_release(dag_params: DagParams, **context) -> dict:

@task.kubernetes(
name="download",
container_resources=V1ResourceRequirements(
requests={"memory": "2G", "cpu": "2000m"}, limits={"memory": "2G", "cpu": "2000m"}
),
container_resources=task_resources.download,
secrets=[Secret("env", "CROSSREF_METADATA_API_KEY", "crossref-metadata", "api-key")],
**kubernetes_task_params,
)
Expand All @@ -181,9 +197,7 @@ def _download(release: dict, **context):

@task.kubernetes(
name="upload_download",
container_resources=V1ResourceRequirements(
requests={"memory": "4Gi", "cpu": "4"}, limits={"memory": "4Gi", "cpu": "4"}
),
container_resources=task_resources.upload_downloaded,
**kubernetes_task_params,
)
def _upload_downloaded(release: dict, dag_params, **context):
Expand All @@ -194,9 +208,7 @@ def _upload_downloaded(release: dict, dag_params, **context):

@task.kubernetes(
name="extract",
container_resources=V1ResourceRequirements(
requests={"memory": "8Gi", "cpu": "8"}, limits={"memory": "8Gi", "cpu": "8"}
),
container_resources=task_resources.extract,
**kubernetes_task_params,
)
def _extract(release: dict, **context):
Expand All @@ -206,9 +218,7 @@ def _extract(release: dict, **context):

@task.kubernetes(
name="transform",
container_resources=V1ResourceRequirements(
requests={"memory": "16Gi", "cpu": "8"}, limits={"memory": "16Gi", "cpu": "8"}
),
container_resources=task_resources.transform,
**kubernetes_task_params,
)
def _transform(release: dict, dag_params, **context):
Expand All @@ -220,9 +230,7 @@ def _transform(release: dict, dag_params, **context):

@task.kubernetes(
name="upload_transformed",
container_resources=V1ResourceRequirements(
requests={"memory": "4Gi", "cpu": "4"}, limits={"memory": "4Gi", "cpu": "4"}
),
container_resources=task_resources.upload_transformed,
**kubernetes_task_params,
)
def _upload_transformed(release: dict, dag_params, **context) -> None:
Expand Down Expand Up @@ -259,7 +267,7 @@ def _add_dataset_release(release: dict, dag_params: DagParams, **context) -> Non
def _cleanup_workflow(release: dict, dag_params: DagParams, **context) -> None:
from academic_observatory_workflows.crossref_metadata_telescope.tasks import cleanup_workflow

cleanup_workflow(release, dag_id=dag_params.dag_id, logical_date=context["logical_date"])
cleanup_workflow(release, dag_id=dag_params.dag_id)

# Define task connections
task_check_dependencies = check_dependencies(airflow_conns=[dag_params.crossref_metadata_conn_id])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,15 @@ def add_dataset_release(release: dict, *, dag_id: str, cloud_workspace: CloudWor
api.add_dataset_release(dataset_release)


def cleanup_workflow(release: dict, *, dag_id: str, logical_date: pendulum.DateTime) -> None:
def cleanup_workflow(release: dict, *, dag_id: str) -> None:
"""Task to delete all files, folders and XComs associated with this release.
:param dag_id: The ID of the DAG
:param logical_date: The DAG run's logical/execution date
"""

release = CrossrefMetadataRelease.from_dict(release)
cleanup(dag_id=dag_id, execution_date=logical_date, workflow_folder=release.workflow_folder)
cleanup(dag_id=dag_id, workflow_folder=release.workflow_folder)


def make_snapshot_url(snapshot_date: pendulum.DateTime) -> str:
Expand Down
Empty file.
File renamed without changes.
83 changes: 0 additions & 83 deletions tests/dags/test_dag_example.py

This file was deleted.

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit c1349eb

Please sign in to comment.