Skip to content

Commit

Permalink
Added training DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
romanoa77 committed Jul 7, 2024
1 parent 5dc9bef commit 83b84da
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions dags_airflow_ale_trainmod.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#baseapiurl="http://gflapi.glitchflow.svc.cluster.local:8000/"
#apifrzendp="train"

WAITMSG="WAITING"
FRZMSG="FROZEN"

dag = DAG(
dag_id="Trainpipe",
start_date=airflow.utils.dates.days_ago(3),
Expand All @@ -19,6 +22,22 @@

)


def pick_branch(**context):

jsmsg=context["task_instance"].xcom_pull(
task_ids="send_frz_sign", key="return_value"
)

respob=json.loads(jsmsg)

if(respob["resp"]==WAITMSG):
return "next_sensor"
else:
return "next_metrics"



IniTrain = DummyOperator(task_id="start_training", dag=dag)

sign_train = SimpleHttpOperator(
Expand All @@ -29,11 +48,21 @@
data=json.dumps({"user":"airflow","token":"airflow"}),
headers={"Content-Type": "application/json"},


retries=3,
retry_delay=timedelta(minutes=5),
dag=dag,
)

Next = DummyOperator(task_id="next", dag=dag)
chech_train_resp=BranchPythonOperator(
task_id="check_frz_sign",
python_callable=pick_branch,
)

next_sens = DummyOperator(task_id="next_sensor", dag=dag)
next_metrics = DummyOperator(task_id="next_sensor", dag=dag)




IniTrain>>sign_train>>Next
IniTrain>>sign_train

0 comments on commit 83b84da

Please sign in to comment.