From 08cd35a587c8869a659237e070a932e709660a45 Mon Sep 17 00:00:00 2001 From: Heera ballabh Date: Wed, 21 Apr 2021 19:52:08 +0530 Subject: [PATCH] chore(analysis): If batch failed in between it should retry only for failed one. --- .../pipeline_workflows/src/main/python/dags/helper_dag.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/pipeline_workflows/src/main/python/dags/helper_dag.py b/packages/pipeline_workflows/src/main/python/dags/helper_dag.py index c8828823..4b03f597 100644 --- a/packages/pipeline_workflows/src/main/python/dags/helper_dag.py +++ b/packages/pipeline_workflows/src/main/python/dags/helper_dag.py @@ -180,9 +180,10 @@ def find_all_batch_without_npz(bucket_name,destination_path,source): bucket_name, full_path_for_embeddings ) + has_npz_file = False for filename in all_batch_txt_npz: print(filename.name) - has_npz_file = False + if 'txt' in filename.name: filename_without_extension = filename.name.replace('.txt','') print(filename_without_extension,"when it is txt") @@ -219,6 +220,7 @@ def generate_splitted_batches_for_audio_analysis( all_batch_set,has_npz_file = find_all_batch_without_npz(bucket_name,destination_path,source) if len(all_batch_set) > 0: + print(all_batch_set,"All batch set") # list_of_batches = list(all_batch_set) add_txt_in_path = [f'{bucket_name}/{file_path}.txt' for file_path in all_batch_set ] @@ -228,9 +230,9 @@ def generate_splitted_batches_for_audio_analysis( Variable.set("embedding_batch_file_list", batch_file_path_dict) return - if len(all_batch_set) = 0 and has_npz_file: + if len(all_batch_set) == 0 and has_npz_file: - batch_file_path_dict[source] = add_txt_in_path + batch_file_path_dict[source] = all_batch_set batch_file_path_dict = MyDict(batch_file_path_dict) Variable.set("embedding_batch_file_list", batch_file_path_dict) return