-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #56 from navikt/airflow-dokumentasjon
enkel dokumentasjon for kjøring av dbt i airflow
- Loading branch information
Showing
4 changed files
with
190 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
|
||
### DBT operator template | ||
|
||
[Her (dbt operator eksemepl)](dbt_operator.md) er et eksempel på en dbt-operator som kan brukes i airflow. | ||
Operatoren er avhengig av 2 airflow-variabler: | ||
- TEAM_GCP_PROJECT | ||
- DVH_DB_ENVIRONMENT | ||
|
||
### DBT run | ||
|
||
[Her (dbt run eksempel)](dbt_run.md) er et eksempel på et main-script for å kjøre dbt som du kan peke på fra airflow. Denne filen plaseres sammen med dbt-kode og er generisk laget, slik at samme fil kan benyttes i alle dbt-prosjekter. | ||
|
||
```python | ||
from dbt.cli.main import dbtRunner | ||
``` | ||
Om du bytter ut `subprocess` med `dbtRunner` så vil du få logget dbt-kjøringen din live i airflow. | ||
|
||
|
||
### JSON-struktur for hemmeligheter i GSM - Google Secret Manager | ||
|
||
```json | ||
{ | ||
"DB_USER": "<DB_USER>", | ||
"DB_SCHEMA": "<DB_SCHEMA>", | ||
"DB_PASSWORD": "<DB_PASSWORD>", | ||
"DB_DSN": "<DB_DSN>" | ||
} | ||
``` | ||
|
||
Det viktige her er at DB_USER, DB_SCHEMA, DB_PASSWORD, DB_DSN er de samme som du bruker i [dbt_run](dbt_run.md). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
```python | ||
from typing import Optional | ||
|
||
from airflow.models import Variable | ||
from airflow import DAG | ||
|
||
from dataverk_airflow import python_operator | ||
|
||
|
||
# Eksempel dbt-operator | ||
# https://github.com/navikt/dv-a-team-dags/blob/main/operators/dbt_operator.py | ||
def dbt_operator( | ||
dag: DAG, | ||
name: str, | ||
repo: str, | ||
script_path: str, | ||
dbt_secret_name: str, | ||
branch: str = "main", | ||
retries: int = 2, | ||
startup_timeout_seconds: int = 60 * 10, | ||
dbt_command: str = "build", | ||
dbt_image: str = "ghcr.io/navikt/dvh-images/airflow-dbt:2024-10-21-0b5d929", # oppdateres manuelt når det kommer nytt image | ||
dbt_models: Optional[str] = None, | ||
do_xcom_push: bool = False, | ||
allowlist: list = [], | ||
publish_docs: bool = False, | ||
dbt_docs_project_name: str = None, | ||
): | ||
dvh_db_environment = Variable.get("DVH_DB_ENVIRONMENT") | ||
|
||
env_vars = { | ||
"DBT_COMMAND": dbt_command, | ||
"TEAM_GCP_PROJECT": Variable.get("TEAM_GCP_PROJECT"), | ||
"DBT_DB_TARGET": dvh_db_environment, | ||
"TEAM_GCP_SECRET_PATH": f"projects/{Variable.get('TEAM_GCP_PROJECT')}/secrets/{dbt_secret_name}/versions/latest", | ||
} | ||
if dbt_models: | ||
env_vars["DBT_MODELS"] = dbt_models | ||
|
||
if publish_docs: | ||
if not dbt_docs_project_name: | ||
raise ValueError( | ||
"Provide a not-null value for dbt_docs_project_name when publish docs=True" | ||
) | ||
env_vars["DBT_DOCS_PROJECT_NAME"] = dbt_docs_project_name | ||
env_vars["DBT_DOCS_URL"] = Variable.get("DBT_DOCS_URL") | ||
|
||
return python_operator( | ||
dag=dag, | ||
name=name, | ||
startup_timeout_seconds=startup_timeout_seconds, | ||
repo=repo, | ||
branch=branch, | ||
script_path=script_path, | ||
allowlist=allowlist, | ||
slack_channel=Variable.get("SLACK_OPS_CHANNEL"), | ||
retries=retries, | ||
extra_envs=env_vars, | ||
image=dbt_image, | ||
do_xcom_push=do_xcom_push, | ||
) | ||
|
||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
```python | ||
import requests | ||
import os | ||
import time | ||
import json | ||
import logging | ||
from pathlib import Path | ||
import shlex | ||
|
||
from dbt.cli.main import dbtRunner, dbtRunnerResult | ||
|
||
|
||
DBT_BASE_COMMAND = ["--no-use-colors", "--log-format-file", "json"] | ||
|
||
|
||
def get_dbt_log(log_path) -> str: | ||
with open(log_path) as log: | ||
return log.read() | ||
|
||
|
||
def set_secrets_as_envs(secret_name: str): | ||
from google.cloud import secretmanager | ||
|
||
secrets = secretmanager.SecretManagerServiceClient() | ||
secret = secrets.access_secret_version(name=secret_name) | ||
secret_str = secret.payload.data.decode("UTF-8") | ||
secrets = json.loads(secret_str) | ||
os.environ.update(secrets) | ||
|
||
|
||
def publish_docs(dbt_project: str): | ||
# Connection informasjon fo å pushe dbt docs | ||
dbt_docs_url = f'{os.environ["DBT_DOCS_URL"]}{dbt_project}' | ||
files = [ | ||
"target/manifest.json", | ||
"target/catalog.json", | ||
"target/index.html", | ||
] | ||
multipart_form_data = {} | ||
for file_path in files: | ||
file_name = os.path.basename(file_path) | ||
with open(file_path, "rb") as file: | ||
file_contents = file.read() | ||
multipart_form_data[file_name] = (file_name, file_contents) | ||
|
||
res = requests.put(dbt_docs_url, files=multipart_form_data) | ||
res.raise_for_status() | ||
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") | ||
|
||
if __name__ == "__main__": | ||
if secret_name := os.getenv("TEAM_GCP_SECRET_PATH"): | ||
set_secrets_as_envs(secret_name=secret_name) | ||
else: | ||
raise ValueError("missing environment variable TEAM_GCP_SECRET_PATH") | ||
|
||
log_path = Path(__file__).parent / "logs/dbt.log" | ||
|
||
os.environ["TZ"] = "Europe/Oslo" | ||
time.tzset() | ||
|
||
schema = os.environ["DB_SCHEMA"] | ||
os.environ["DBT_ENV_SECRET_USER"] = f"{os.environ['DB_USER']}[{schema}]" | ||
os.environ["DBT_DB_SCHEMA"] = schema | ||
os.environ["DBT_DB_DSN"] = os.environ["DB_DSN"] | ||
os.environ["DBT_ENV_SECRET_PASS"] = os.environ["DB_PASSWORD"] | ||
logging.info("DBT miljøvariabler er lastet inn") | ||
|
||
# default dbt kommando er build | ||
command = shlex.split(os.getenv("DBT_COMMAND", "build")) | ||
if dbt_models := os.getenv("DBT_MODELS", None): | ||
command = command + ["--select", dbt_models] | ||
|
||
dbt = dbtRunner() | ||
dbt_deps = dbt.invoke(DBT_BASE_COMMAND + ["deps"]) | ||
output: dbtRunnerResult = dbt.invoke(DBT_BASE_COMMAND + command) | ||
|
||
# Exit code 2, feil utenfor DBT | ||
if output.exception: | ||
raise output.exception | ||
# Exit code 1, feil i dbt (test eller under kjøring) | ||
if not output.success: | ||
raise Exception(output.result) | ||
|
||
if "docs" in command: | ||
dbt_project = os.environ["DBT_DOCS_PROJECT_NAME"] | ||
logging.info("publiserer dbt docs") | ||
publish_docs(dbt_project=dbt_project) | ||
|
||
# Legg til logikk for å skrive logg til xcom | ||
|
||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters