Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing of install task variables to downstream tasks of a dag #218

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dags/openshift_nightlies/scripts/install/ocm_gcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ postinstall(){
gcloud compute firewall-rules create ${NETWORK_NAME}-hostnet --network ${NETWORK_NAME} --priority 105 --description 'scale-ci allow tcp,udp hostnetwork tests' --rules tcp:32768-60999,udp:32768-60999 --action allow
}

display_install_data(){
IFS='@'
# Read the xcom pushed by install task and store it into an array based on '@' as delimiter
read -ra newarr <<< "$Install_vars"
clustername="${newarr[0]}"
installuuid="${newarr[1]}"
echo "Cluster Name = ${clustername}"
echo "Install UUID = ${installuuid}"
}

cleanup(){
ocm delete cluster $(_get_cluster_id ${CLUSTER_NAME})
ocm logout
Expand Down Expand Up @@ -144,8 +154,10 @@ if [[ "$operation" == "install" ]]; then
printf "INFO: Cluster ${CLUSTER_NAME} already installed but not ready, exiting..."
exit 1
fi
echo "${CLUSTER_NAME}@${UUID}"

elif [[ "$operation" == "cleanup" ]]; then
printf "Running Cleanup Steps"
display_install_data
cleanup
fi
12 changes: 12 additions & 0 deletions dags/openshift_nightlies/scripts/install/rosa.sh
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,16 @@ EOF
return 0
}

display_install_data(){
IFS='@'
# Read the xcom pushed by install task and store it into an array based on '@' as delimiter
read -ra newarr <<< "$Install_vars"
clustername="${newarr[0]}"
installuuid="${newarr[1]}"
echo "Cluster Name = ${clustername}"
echo "Install UUID = ${installuuid}"
}

cleanup(){
if [[ $INSTALL_METHOD == "osd" ]]; then
ocm delete cluster $(_get_cluster_id ${CLUSTER_NAME})
Expand Down Expand Up @@ -377,9 +387,11 @@ if [[ "$operation" == "install" ]]; then
printf "INFO: Cluster ${CLUSTER_NAME} already installed but not ready, exiting..."
exit 1
fi
echo "${CLUSTER_NAME}@${UUID}"

elif [[ "$operation" == "cleanup" ]]; then
printf "Running Cleanup Steps"
display_install_data
cleanup
index_metadata
rosa logout
Expand Down
14 changes: 14 additions & 0 deletions dags/openshift_nightlies/scripts/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ EOF
exit 1
fi
}

display_install_data(){
if [[ $PLATFORM == "rosa" || $PLATFORM == "rogcp" ]]; then
IFS='@'
# Read the xcom pushed by install task and store it into an array based on '@' as delimiter
read -ra newarr <<< "$Install_vars"
clustername="${newarr[0]}"
installuuid="${newarr[1]}"
echo "Cluster Name = ${clustername}"
echo "Install UUID = ${installuuid}"
fi
}

export UUID=$(uuidgen | head -c8)-$AIRFLOW_CTX_TASK_ID-$(date '+%Y%m%d')
echo "############################################"
echo "# Benchmark UUID: ${UUID}"
Expand All @@ -80,6 +93,7 @@ if [[ $PLATFORM == "baremetal" ]]; then
run_baremetal_benchmark
echo $UUID
else
display_install_data
setup
cd /home/airflow/workspace/e2e-benchmarking/workloads/$workload

Expand Down
2 changes: 1 addition & 1 deletion dags/openshift_nightlies/tasks/benchmarks/e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _add_indexer(self, benchmark):
benchmark >> indexer

def _get_benchmark(self, benchmark):
env = {**self.env, **benchmark.get('env', {}), **{"ES_SERVER": var_loader.get_secret('elasticsearch'), "KUBEADMIN_PASSWORD": environ.get("KUBEADMIN_PASSWORD", "")}}
env = {**self.env, "Install_vars": '{{ ti.xcom_pull(task_ids="install")}}', **benchmark.get('env', {}), **{"ES_SERVER": var_loader.get_secret('elasticsearch'), "KUBEADMIN_PASSWORD": environ.get("KUBEADMIN_PASSWORD", "")}}
# Fetch variables from a secret with the name <DAG_NAME>-<TASK_NAME>
task_variables = var_loader.get_secret(f"{self.dag.dag_id}-{benchmark['name']}", True, False)
env.update(task_variables)
Expand Down
3 changes: 2 additions & 1 deletion dags/openshift_nightlies/tasks/install/rogcp/rogcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, dag, config: DagConfig, release: OpenshiftRelease):
def _get_task(self, operation="install", trigger_rule="all_success"):
self._setup_task(operation=operation)
command=f"{constants.root_dag_dir}/scripts/install/ocm_gcp.sh -v {self.release.version} -j /tmp/{self.release_name}-{operation}-task.json -o {operation}"
env={ **self.env , "Install_vars": '{{ ti.xcom_pull(task_ids="install")}}'}
return BashOperator(
task_id=f"{operation}",
depends_on_past=False,
Expand All @@ -34,5 +35,5 @@ def _get_task(self, operation="install", trigger_rule="all_success"):
dag=self.dag,
trigger_rule=trigger_rule,
executor_config=self.exec_config,
env=self.env
env=env
)
4 changes: 2 additions & 2 deletions dags/openshift_nightlies/tasks/install/rosa/rosa.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, dag, config: DagConfig, release: OpenshiftRelease):
def _get_task(self, operation="install", trigger_rule="all_success"):
self._setup_task(operation=operation)
command=f"{constants.root_dag_dir}/scripts/install/rosa.sh -v {self.release.version} -j /tmp/{self.release_name}-{operation}-task.json -o {operation}"

env={ **self.env , "Install_vars": '{{ ti.xcom_pull(task_ids="install")}}'}
return BashOperator(
task_id=f"{operation}",
depends_on_past=False,
Expand All @@ -35,5 +35,5 @@ def _get_task(self, operation="install", trigger_rule="all_success"):
dag=self.dag,
trigger_rule=trigger_rule,
executor_config=self.exec_config,
env=self.env
env=env
)
1 change: 1 addition & 0 deletions dags/openshift_nightlies/tasks/utils/final_dag_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


def final_status(**kwargs):
ti = kwargs['ti']
failed_tasks=[]
for task_instance in kwargs['dag_run'].get_task_instances():
if "index" in task_instance.task_id:
Expand Down