This repository contains the Airflow DAGs for the Stellar ETL project. These DAGs provide a workflow for exporting data from the Stellar network and uploading the data into BigQuery.
Cloud Composer is the preferred method of deployment. Cloud Composer is a managed service used for Airflow deployment that provides much of the infrastructure required to host an Airflow instance. The steps for setting up a Cloud Composer environment are detailed below.
- Download the Google Cloud SDK.
- Initialize the Cloud SDK and login to your Google account
- Login to the Google Cloud Console
- Create a new Google Project
NOTE: The project name you choose corresponds to the Airflow variable "bq_project".
- Log in to Google BigQuery
- Create a new dataset with the desired name
NOTE: The dataset name you choose corresponds to the Airflow variable "bq_dataset".
- Open the Cloud Storage browser
- Create a new Google Storage bucket that will store exported files
NOTE: The dataset name you choose corresponds to the Airflow variable "gcs_exported_data_bucket_name".
WARNING: Make sure that you adhere to the location requirements for Cloud Storage buckets and BigQuery datasets. Otherwise, it will not be possible to upload data to BigQuery.
Create a new Cloud Composer environment using the UI:
gcloud composer environments create <environment_name> --location=<project_location> \
--zone=<project_zone> --disk-size=100GB --machine-type=n1-standard-4 \
--node-count=3 --python-version=3 --image-version=composer-<version>-airflow-<version> \
--service-account=<service_account>
gcloud composer environments update <environment_name> \
--location=<project_location> --update-pypi-package=docker==3.7.3
For AIRFLOW 2.x: Be wary of choosing "autopilot" for environment resource management. The ephemeral storage provided by autopilot-ed containers is capped at 10GB, which may not be enough for hefty tasks (such as
state_table_dag
'sexport_task
), or any task that runs captive core. You can add a second node pool to your Composer 1 environment, and configure it to be managed by autopilot if desired. Composer 2 environments use autopilot exclusively for resource management.
Note: If no service account is provided, GCP will use the default GKE service account. For quick setup this is an easy option.
Remember to adjust the disk size, machine type, and node count to fit your needs. The python version must be 3, and the image must be composer-1.16.11-airflow-1.10.14
or later. GCP deprecates support for older versions of composer and airflow. It is recommended that you select a stable, latest version to avoid an environment upgrade. See the command reference page for a detailed list of parameters.
TROUBLESHOOTING: If the environment creation fails because the "Composer Backend timed out" try disabling and enabling the Cloud Composer API. If the creation fails again, try creating a service account with Owner permissions and use it to create the Composer environment.
Cloud Composer may take a while to setup the environment. Once the process is finished, you can view the environment by going to the Composer section of the Cloud Console.
NOTE: Creating an environment will also create a new Google Cloud Storage bucket. You can check this bucket's name by clicking on the DAGs folder link in the Composer section of the Cloud Console.
After the environment is created, select the environment and navigate to the environment configuration tab. Look for the value under DAGs folder. It will be of the form gs://airflow_bucket/dags
. The airflow_bucket
value will be used in this step and the next. Run the command below in order to upload the DAGs and schemas to your Airflow bucket.
> bash upload_static_to_gcs.sh <airflow_bucket>
Afterwards, you can navigate to the Airflow UI for your Cloud Composer environment. To do so, navigate to the Composer section of the Cloud Console, and click the link under Airflow Webserver
. Then, pause the DAGs by clicking the on/off toggle to the left of their names. DAGs should remain paused until you have finished setting up the environment. Some DAGs may not show up due to errors that will be fixed as the following steps are completed.
The Airflow DAGs require service account keys to perform their operations. Generate a service account key for a service account that has access to BigQuery and Google Cloud Storage. Then, add this key file to the data folder in your <airflow_bucket>.
NOTE: The name of the key file corresponds to the Airflow variable "api_key_path". The data folder in Cloud Storage corresponds to the path "/home/airflow/gcs/data/", but ensure that the variable has the correct filename.
If the Kubernetes pods contain long-running or resource intensive operations, it is best to create a separate node pool for task execution. Executing the tasks on the same node pool as the airflow-scheduler
will contribute to resource starvation and transient failures in the DAG.
Find the Kubernetes cluster name that is used by your Cloud Composer environment. To do so, select the environment, navigate to environment configuration, and look for the value of GKE cluster. The cluster name is the final part of this path.
Then, run the command:
gcloud container node-pools create <pool_name> --cluster <cluster_name> \
--zone <composer_zone> --project <project_id>
Alternatively, node pools can be created through the UI with the Add Node Pool
button. Security can only be applied upon pool creation, so ensure that your security account and scopes are correct. If they need to be updated, you will need to delete the node pool and recreate it.
NOTE: The name of the pool will be used in the Airflow variable "affinity".
A sample affinity configuration is below, as well as defined in the
airflow_variables.txt
. The user must supply the node pool name invalues
.
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": [<node-pool-1>,
<node-pool-2>,]
}]
}]
}
}
},
Open the Google Cloud Shell. Run these commands:
gcloud container clusters get-credentials <cluster_name> --region=<composer_region>
kubectl create ns <namespace_name>
kubectl create clusterrolebinding default-admin --clusterrole cluster-admin \
--serviceaccount=composer-1-18-1-airflow-2-2-3-d948d67b:default --namespace hubble-composer
The first command acquires credentials, allowing you to execute the next commands. The second command creates the new namespace, and the third allows the service account that executes tasks to act in the new namespace.
To find the value of <airflow_worker_namespace>
, select your Cloud Composer environment, navigate to environment configuration, and look for the value of GKE cluster. Click on the link that says "view cluster workloads." A new page will open with a list of Kubernetes workflows. Click on airflow-worker in order to go to the details page for that Deployment. Look for the value of Namespace.
NOTE: The name of the newly created namespace corresponds to the Airflow variable "namespace".
There are a few extra hoops to jump through to configure Workload Identity, so that export
tasks have permissions to upload files to GCS.
You will be creating a Kubernetes service account, and bind it to a Google service account that your task is authenticated as.
Steps taken from this doc.
- Create a namespace in the k8s cluster where the Composer env is running:
kubectl create namespace hubble-composer
- Create a k8s service account:
kubectl create serviceaccount hubble-composer-service-account \
--namespace hubble-composer
- Create a Google service account, if one doesn't already exist:
gcloud iam service-accounts create hubble-service-account \
--project=hubble-261722
- Grant the Google service account that you're using
storage.objectAdmin
permissions, it doesn't already have it.
gcloud projects add-iam-policy-binding hubble-261722 \
--member "serviceAccount:[email protected]" \
--role "roles/storage.objectAdmin"
- Associate the Google and k8s service accounts:
gcloud iam service-accounts add-iam-policy-binding [email protected] \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:hubble-261722.svc.id.goog[hubble-composer/hubble-composer-service-account]"
- Annotate the k8s service account with the Google service account:
kubectl annotate serviceaccount hubble-composer-service-account \
--namespace hubble-composer \
iam.gke.io/gcp-service-account=hubble-service-account@hubble-261722.iam.gserviceaccount.com
- Set the corresponding airflow variables (
k8s_namespace
andk8s_service_account
) for tasks running onKubernetesPodOperator
.
Find the Kubernetes cluster workloads that are used by your Cloud Composer environment. To do so, select the environment, navigate to environment configuration, and look for the GKE cluster section. Click on the link that says "view cluster workloads."
A new page will open with a list of Kubernetes workflows. Click on airflow-worker in order to go to the details page for that Deployment. Click the edit button. This will take you to a tab with a Kubernetes configuration. In subsequent steps, you will edit this file. For an example of a finalized config file, see this example file.
WARNING: You shouldn't copy the example file directly because it has environment variables and config values that are set up for a different project.
NOTE: This deployment file contains two separate containers: airflow-worker and gcs-syncd. Only the airflow-worker container should be edited.
Mount Docker on Airflow Workers
In this step, mount the Docker.sock and Docker. In addition, edit the security config so that the container runs as privileged, allowing it to access Docker. See [this commit](https://github.com/marc-chan/cloud_composer_examples/commit/f3e6a202ef0bfd2214385def7e36be33db191df6#diff-fc2e428a07c8d60059e54e5154f0c540) for an example of how to make these changes.Add Volume for Local Files to Airflow Workers
In this step, add another volumeMount to airflow-workers. This local path will be used for temporary storage of exported files. In addition, make sure that you add the corresponding volume with the type DirectoryOrCreate.Here is an example of what your volumeMounts and volumes should look like at the end of this step:
...
volumeMounts:
- mountPath: /etc/airflow/airflow_cfg
name: airflow-config
- mountPath: /home/airflow/gcs
name: gcsdir
- mountPath: /var/run/docker.sock
name: docker-host
- mountPath: /bin/docker
name: docker-app
- mountPath: /home/airflow/etlData
name: etl-data
...
volumes:
- configMap:
defaultMode: 420
name: airflow-configmap
name: airflow-config
- emptyDir: {}
name: gcsdir
- hostPath:
path: /var/run/docker.sock
type: ""
name: docker-host
- hostPath:
path: /usr/bin/docker
type: ""
name: docker-app
- hostPath:
path: /home/airflow/etlData
type: DirectoryOrCreate
name: etl-data
NOTE: The mount path chosen corresponds to the Airflow variable "local_output_path".
Add Poststart Script to Airflow Workers
Find the namespace name in the airflow-worker config file. It should be near the top of the file, and may look like "composer-1-12-0-airflow-1-10-10-2fca78f7". This value will be used in later commandsNext, open the cloud shell. Keep your airflow-worker configuration file open, or save it. In the cloud shell, create a text file called poststart.sh
by running the command: nano poststart.sh
. Then, copy the text from the poststart.sh
file in this repository into the newly opened file. If you changed the path for the local folder in the previous step, make sure that you edit line 13:
for file in /home/airflow/etlData/*
It should reflect the path changes you made. Once the file is finalized, run these commands:
gcloud container clusters get-credentials <cluster_name> --region=<composer_region>
kubectl create configmap start-config --from-file poststart.sh -n <namespace_name>
Return to the airflow-worker config file. Add a new volumeMount to /etc/scripts.
...
volumeMounts:
...
- mountPath: /etc/scripts
name: config-volume
...
Then, add a new Volume that links to the configMap you created.
...
volumes:
...
- configMap:
defaultMode: 511
name: start-config
name: config-volume
...
This will make the script available to the Airflow workers. In order for them to call it automatically, add a postStart hook to airflow-worker above the existing preStop hook.
...
lifecycle:
postStart:
exec:
command:
- /bin/bash
- /etc/scripts/poststart.sh
preStop:
exec:
command:
- bash
- -c
- pkill -f "MainProcess"
...
Click here if you are interested in knowing what the script does.
The export tasks in the etl use Docker images with their own filesystems. Mounting a folder to the Docker image allows us to connect the airflow-worker filesystem to the Docker image filesystem. However, there are multiple airflow-worker instances, and tasks are distributed between them. This means that an export task may occur on one worker, and the subsequent task that needs that file could occur on a different worker instance. There needs to be some way to pool all the data from all the worker instances.
Fortunately, Cloud Composer provides a folder at /home/airflow/gcs/data. This folder is described in detail here. Essentially, the folder is synchronized between all the workers, and it also is linked to the data folder in the environment's Cloud Storage bucket. This means that data stored here will be available to all workers, solving the problem. Unfortunately, since this folder is already connected to a Cloud Storage bucket, it cannot also connect to a Docker image.
Instead, we connect a local folder defined in the previous step. The poststart.sh
script runs constantly in the background. It moves files from the local folder to the gcs/data folder. The script is more complicated than a simple move command because it needs to ensure that no programs are writing to the files before they are moved.
In order to add the Airflow variables and connections, navigate to the Airflow web server. To do so, navigate to the Composer section of the Cloud Console, and click the link under Airflow Webserver
.
Click the Admin tab, then Connections. Click create, then:
- Set the Conn Id field to
google_cloud_platform_connection
. - Set the Conn Type to
Google Cloud Platform
. - Set the Project Id to your project id
- Set the Keyfile Path to <api_key_path>.
- The <api_key_path> should be the same as the Airflow variable "api_key_path".
Next, add the Airflow variables. Click the Admin tab, then Variables. Click the Choose file
button, select your variables file, and click import variables.
The airflow_variables.txt file provides a set of default values for variables.
This section is currently unfinished as the Kubernetes setup is still in development.
- Install Airflow v1.10 or later:
pip install apache-airflow
- To confirm Airflow is installed, run
airflow -h
and ensure that you see a help screen
- To confirm Airflow is installed, run
- Install the required packages:
pip install -r requirements.txt
- Setup the Airflow database:
airflow initdb
- Run Airflow scheduler:
airflow scheduler
- Run Airflow web server:
airflow webserver
- Add required Airflow variables through CLI or the Airflow UI
- Add required Airflow connections through the CLI or Airflow UI
- google_cloud_platform_connection: connection of type google_cloud_platform that connects to a Google Cloud Platform API key for a specific project. See here for more information about API keys.
- fs_default: connection with fs type that sets the default filepath
Variable name | Description | Should be changed? |
---|---|---|
affinity | JSON object that represents the pod's affinity | Yes, if you followed the optional step and made a new node pool. |
api_key_path | path to the Google Cloud Platform API key | No, unless your filename is different. |
bq_dataset | name of the BigQuery dataset | Yes. Change to your dataset name. |
bq_project | name of the BigQuery project | Yes. Change to your project name. |
gcs_exported_data_bucket_name | name of the Google Cloud Storage bucket that will store exported data | Yes. Change to the name of the bucket you made. |
image_name | name of the ETL's Docker image | No, unless you need a specific image version. |
image_output_path | local output path within the ETL image | No. |
image_pull_policy | Specifies how image pull behavior. Valid values are: "Always", "IfNotPresent", or "Never" | No, unless you handle image updates manually. |
local_output_path | local output path within the airflow-worker that is used for temporary storage | No, unless you changed the path when modifying the Kubernetes config. |
namespace | namespace name for ETL tasks that generate Kubernetes pods | Yes, if you followed the optional step and made a new namespace |
output_file_names | JSON object. Each key should be a data structure, and the value should be the name of the output file for that data structure | Yes, if desired. Make sure each type has a different filename. |
output_path | shared output path for exported data | No, unless you have a different shared storage solution. |
owner | the name of the owner of the Airflow DAGs | Yes. |
schema_filepath | file path to schema folder | No, unless schemas are in a different location |
table_ids | JSON object. Each key should be a data structure, and the value should be the name of the BigQuery table | Yes, if desired. Make sure each type has a different table name. |
Variable name | Description | Should be changed? |
---|---|---|
resources | Resources to request and allocate to Kubernetes Pods. | No, unless pods need more resources |
kube_config_location | Location of the kubernetes config file. See here for a guide on finding the Kube config file. If you are running the pods in the same cluster as Airflow, you can leave this value blank. | No, unless the pods are in a different cluster than Airflow. |
kubernetes_sidecar_image | Image used for xcom sidecar | No, unless you want to pull a different alpine-based image. |
k8s_namespace | Namespace to run the task in | No, unless the pods are moved into a new namespace |
k8s_service_account | K8s service account the task runs as | No, unless k8s authentication is modified, and is likely linked to the associated GCP service account. |
volume_config | JSON objects representing the configuration for your Kubernetes volume. | Yes. Change configs to match your volume (see below for example configs) |
volume_name | Name of the persistent ReadWriteMany volume associated with the claim. | Yes. Change to your volume name. |
Here are some example volume_config
values. Note that a ReadWriteMany volume is required when tasks run in parallel.
- For a an NFS volume set
volume_config={"nfs": {"path": "/", "server": "my-server.provider.cloud"}}
. - In order to set up a persistent volume claim, set
volume_config={"persistentVolumeClaim":{"claimName": <claim>}
- In order to set up a host path volume, set
volume_config="hostPath":{"path": <path>, "type": "DirectoryOrCreate"}}
First, this image has a shows the Airflow web UI components for pausing and triggering DAGs:
- Ensure that the Airflow scheduler is running:
airflow scheduler
- Ensure that the Airflow web server is running:
airflow webserver -p <port>
- Enable the History Archive Export DAG
- Use the command
airflow unpause history_archive_export
or use the Airflow UI - The DAG will export information every 5 minutes. It also will backfill by exporting information starting at the network's beginning up until the current time
- Use the command
- Enable the DAGs for exporting ledger changes
- Unpause the dags using the Web UI or the commands below:
airflow unpause unbounded_core_changes_export airflow unpause bucket_list_export
- Manually the trigger bucket list DAG with
airflow trigger_dag bucket_list_export <current_time>
or the Web UI. - Once the bucket list has finished, trigger the
unbounded_core_changes_export
DAG with the same execution time as the bucket_list_export DAG. You can do this through the Web UI by going to Browse->DAG Runs->Create and setting the DAG id and execution date. - Unpause the processing DAG with
airflow unpause process_unbounded_core_changes
- Enable the DAGs for exporting orderbooks
- Unpause the dags using the Web UI or the commands below:
airflow unpause unbounded_core_orderbook_export airflow unpause process_unbounded_core_orderbooks
You can clear failed tasks in the task-instance context menu in the Airflow UI. Clearing failed tasks gives them a chance to run again without requiring you to run the entire DAG again.
We have two long-running DAGs: the unbounded_core_changes_export DAG and the unbounded_core_orderbook_export DAG. Each one of these has a captive stellar-core instance, which it uses to export information perpetually. The information each DAG exports is picked up and loaded into BigQuery by the process_unbounded_core_changes DAG and the process_unbound_core_orderbooks DAG.
If a long running pod fails, look at the most recent successful DAG run of the associated process DAG. Take a look at the log of the file sensors, where you will see a line:
Relative path of the earliest file is: changes_folder/<START>-<END>-accounts.txt
This line indicates the last ledger that was processed. Now you can go to Stellar Expert to get the close time of the end ledger. Then, manually create a DAG run at that time by going to Browse->DAG Runs->Create. Set the DAG id to the DAG that failed, and the Execution Time to the close time of the end ledger.
This section contains information about the Airflow setup. It includes our DAG diagrams and explanations of tasks. For general Airflow knowledge, check out the Airflow concepts overview or the Airflow tutorial.
This DAG exports ledgers, transactions, operations, trades, and assets from Stellar's history archives, loads them into Google Cloud Storage, and then sends the data to BigQuery.
This DAG connects to a stellar-core instance and exports accounts, offers, and trustlines. This DAG is a long-running process that continually exports new information as the Stellar network progresses.
This DAG connects to a stellar-core instance and exports accounts, offers, and trustlines. Unlike the unbounded version, this version is not long running. It stops once the range has been exported. Currently, this range is the ledger that includes the DAG's execution date.
This DAG processes the output of the unbounded changes DAG. File sensors watch the folder where the unbounded core DAG sends its exported information. Once a file is seen, it is loaded into Google Cloud Storage and applied to BigQuery. Once a batch has been exported completely, the DAG triggers itself again.
This DAG exports from Stellar's bucket list, which contains data on accounts, offers, and trustlines. Exports from this DAG always begins from the genesis ledger and end at the ledger that includes the DAG's execution date.
This DAG connects to a stellar-core instance and exports accounts, offers, and trustlines. This DAG is a long-running process that continually exports new information as the Stellar network progresses.
This DAG processes the output of the unbounded orderbook DAG. File sensors watch the folder where the unbounded core DAG sends its exported information. Once a file is seen, it is loaded into Google Cloud Storage and applied to BigQuery. Once a batch has been exported completely, the DAG triggers itself again.
This file contains methods for creating time tasks. Time tasks call the get_ledger_range_from_times function in the stellar-etl Docker image. The tasks receive the execution time of the current DAG run and the expected execution time of the next run. They convert this time range into a ledger range that can be passed to the export tasks.
This file contains methods for creating export tasks. Export tasks call export functions in the stellar-etl Docker image with a ledger range determined by the upstream time task. The data is exported in a newline-delimited JSON text file with a file name in the format [start ledger]-[end ledger]-[data type].txt
.
This file contains methods for creating load tasks. Load tasks load local files that were exported into Google Cloud Storage. In order to keep GCS files organized, exported data is loaded into a folder called exported
. The exported
folder contains folders for each of the exported data types.
This file contains methods for creating tasks that appends information from a Google Cloud Storage file to a BigQuery table. These tasks will create a new table if one does not exist. These tasks are used for history archive data structures, as Stellar wants to keep a complete record of the ledger's entire history.
This file contains methods for creating a file sensor task. File sensors take in a file path, and continuously check that file path until a file or folder exists. Once the file is sensed, the task succeeds. This task is important because the unbounded core DAG exports batches at variable times. Using file sensors ensures that batches are detected and processed as soon as they are exported.
This file contains methods for creating apply tasks. Apply tasks are used to merge a file from Google Cloud Storage into a BigQuery table. Apply tasks differ from the other task that appends in that they apply changes. This means that they update, delete, and insert rows. These tasks are used for accounts, offers, and trustlines, as the BigQuery table represents the point in time state of these data structures. This means that, for example, a merge task could alter the account balance field in the table if a user performed a transaction, delete a row in the table if a user deleted their account, or add a new row if a new account was created.
Apply tasks can also be used to insert unique values only. This behavior is used for orderbook and history archive data structures. Instead of performing a merge operation, which would update or delete existing rows, the task will simply insert new rows if they don't already exist. This helps prevent duplicated data in a scenario where rows shouldn't change or be deleted. Essentially, this task replicates the behavior of a primary key in a database when used for orderbooks.
This section details further areas of development. It covers a basic guide on how to add new features and test changes to existing features. It also contains a list of project TODOs (check the GitHub issues page for more!)
This section covers some possible extensions or further work that can be done.
Git can run special scripts at various places in the Git workflow (which the system calls “hooks”). These scripts can do whatever you want and, in theory, can help a team with their development flow.
pre-commit
makes hook scripts extremely accessible to teams.
- Install
pre-commit
# using pip
$ pip install pre-commit==3.2.1
- Set up the Git hook scripts
$ pre-commit install
pre-commit installed at .git/hooks/pre-commit
That's it. Now pre-commit
will run automatically on git commit
!
Adding new DAGs is a fairly straightforward process. Create a new python file in the dags
folder. Create your dag object using the code below:
dag = DAG(
'dag_id',
default_args=get_default_dag_args(),
description='DAG description.',
schedule_interval=None,
)
The get_default_dag_args() is defined in the dags/stellar-etl-airflow/default.py
file.
Feel free to add more arguments or customize the existing ones. The documentation for a DAG is available here.
If you have created a new DAG, or wish to extend an existing DAG, you can add tasks to it by calling the various create_X_task
functions that are in the repository. See here for details on how to create dependencies between tasks.
Adding new tasks is a more involved process. You likely need to add a new python file in the dags/stellar_etl_airflow
folder. This file should include a function that creates and returns the new task, as well as any auxiliary functions related to the task.
Airflow has a variety of operators. The ones that are most likely to be used are:
- DockerOperator, which can be used to run stellar-etl commands
- KubernetesPodOperator, which can start new Kubernetes Pods
- PythonOperator, which can run Python functions
- GlobFileSensor, which can detect files
You may also find this list of Google-related operators useful for interacting with Google Cloud Storage or BigQuery.
An example of a simple task is the time task. This task converts a time into a ledger range using a stellar-etl command. Since it needs to use the stellar-etl, we need a DockerOperator. We provide the operator with the command, the task_id, the parent DAG, and some parameters specific to DockerOperators, like the volume.
More complex tasks might require a good amount of extra code to set up variables, authenticate, or check for errors. However, keep in mind that tasks should be idempotent. This means that tasks should produce the same output even if they are run multiple times. The same input should always produce the same output.
You may find that you need to pass small amounts of information, like filenames or numbers, from one task to another. You can do so with Airflow's XCOM system.
Once you make a change, you can test it using the Airflow command line interface. Here's a quick outline of how to test changes:
- Run
kubectl get pods --all-namespaces
. Look for a pod that starts withairflow-worker
. - Run
kubectl -n <pod_namespace> exec -it airflow-worker-<rest_of_pod_name> -c airflow-worker -- /bin/bash
to get inside the worker - Run
airflow test history_archive_export <task_id> <test_date>
. Note that if the task you changed has dependencies, you need to runairflow test
on those upstream tasks for the exact same date. - Run
airflow test
on the tasks that depend on the the task you just changed. Ensure that they still perform as expected.
This guide can also be useful for testing deployment in a new environment. Follow this testing process for all the taks in your DAGs to ensure that they work end-to-end.
Write documentation about custom Kubernetes setup (#43).