Skip to content

Commit

Permalink
Merge pull request #562 from wangxinbiao/main
Browse files Browse the repository at this point in the history
task fails, update the 'message' information in the dataset for cr
  • Loading branch information
bjwswang authored Jan 16, 2024
2 parents a814e83 + 49ed6bc commit 0a8f671
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 60 deletions.
12 changes: 1 addition & 11 deletions apiserver/graph/generated/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apiserver/graph/generated/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apiserver/graph/schema/dataprocessing.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ input AddDataProcessInput {
post_data_set_name: String!
post_data_set_version: String!
data_process_config_info: [DataProcessConfigItem!]
bucket_name: String!
version_data_set_name: String!
namespace: String!
creator: String!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ async def add(request):
"post_data_set_name": "dataset1",
"post_data_set_version": "v2",
"version_data_set_name": "dataset1-v2",
"bucket_name": "system-tce",
"file_names": [
{
"name": "数据处理文件_小T.pdf"
}
],
"data_process_config_info": []
"data_process_config_info": [],
"creator": "",
"namespace": "abc"
}
"""
res = data_process_service.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def text_manipulate(
req_json is a dictionary object.
"""

bucket_name = req_json['bucket_name']
namespace = req_json['namespace']
support_type = req_json['data_process_config_info']
file_names = req_json['file_names']

Expand All @@ -72,9 +72,10 @@ def text_manipulate(

# update the dataset status
update_dataset = _update_dateset_status(
bucket_name=req_json['bucket_name'],
namespace=req_json['namespace'],
version_data_set_name=req_json['version_data_set_name'],
reason='processing',
message='Data processing in progress',
task_id=id,
log_id=log_id,
creator=req_json.get('creator'),
Expand Down Expand Up @@ -130,7 +131,7 @@ def text_manipulate(
# 将文件下载到本地
minio_store_client.download(
minio_client,
bucket_name=bucket_name,
bucket_name=namespace,
folder_prefix=folder_prefix,
file_name=file_name
)
Expand Down Expand Up @@ -328,17 +329,18 @@ def text_manipulate(
minio_store_client.upload_files_to_minio_with_tags(
minio_client=minio_client,
local_folder=file_path + 'final',
minio_bucket=bucket_name,
minio_bucket=namespace,
minio_prefix=folder_prefix,
support_type=support_type,
data_volumes_file=data_volumes_file
)

# update the dataset status
update_dataset = _update_dateset_status(
bucket_name=req_json['bucket_name'],
namespace=req_json['namespace'],
version_data_set_name=req_json['version_data_set_name'],
reason=task_status,
message=error_msg,
task_id=id,
log_id=log_id,
creator=req_json.get('creator'),
Expand Down Expand Up @@ -417,9 +419,10 @@ def text_manipulate_retry(

# 更新数据集状态
update_dataset = _update_dateset_status(
bucket_name=task_info_dict.get('namespace'),
namespace=task_info_dict.get('namespace'),
version_data_set_name=task_info_dict.get('pre_version_data_set_name'),
reason='processing',
message='Data processing in progress',
task_id=task_id,
log_id=log_id,
creator=creator,
Expand Down Expand Up @@ -515,17 +518,18 @@ def text_manipulate_retry(
minio_store_client.upload_files_to_minio_with_tags(
minio_client=minio_client,
local_folder=file_path + 'final',
minio_bucket=task_info_dict.get('bucket_name'),
minio_bucket=task_info_dict.get('namespace'),
minio_prefix=folder_prefix,
support_type=task_info_dict.get('data_process_config_info'),
data_volumes_file=data_volumes_file
)

# 更新数据集状态
update_dataset = _update_dateset_status(
bucket_name=task_info_dict.get('namespace'),
namespace=task_info_dict.get('namespace'),
version_data_set_name=task_info_dict.get('pre_version_data_set_name'),
reason=task_status,
message=error_msg,
task_id=task_id,
log_id=log_id,
creator=creator,
Expand Down Expand Up @@ -597,9 +601,10 @@ def _remove_local_file(file_name):
}

def _update_dateset_status(
bucket_name,
namespace,
version_data_set_name,
reason,
message,
task_id,
log_id,
creator,
Expand All @@ -608,21 +613,22 @@ def _update_dateset_status(
logger.debug(''.join([
f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n",
f"task_id: {task_id}\n",
f"bucket_name: {bucket_name}\n",
f"namespace: {namespace}\n",
f"version_data_set_name: {version_data_set_name}\n",
f"reason: {reason}"
]))
update_dataset = dataset_cr.update_dataset_k8s_cr(
bucket_name=bucket_name,
namespace=namespace,
version_data_set_name=version_data_set_name,
reason=reason
reason=reason,
message=message
)

if update_dataset['status'] != 200:
logger.error(''.join([
f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n",
f"task_id: {task_id}\n",
f"bucket_name: {bucket_name}\n",
f"namespace: {namespace}\n",
f"version_data_set_name: {version_data_set_name}\n",
f"reason: {reason}"
]))
Expand Down Expand Up @@ -1019,7 +1025,7 @@ def _text_manipulate_retry_for_document(
# 将文件下载到本地
minio_store_client.download(
minio_client,
bucket_name=task_info.get('bucket_name'),
bucket_name=task_info.get('namespace'),
folder_prefix=folder_prefix,
file_name=file_name
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def add(
'file_type': req_json['file_type'],
'status': 'processing',
'namespace': req_json['namespace'],
'bucket_name': req_json['bucket_name'],
'pre_data_set_name': req_json['pre_data_set_name'],
'pre_data_set_version': req_json['pre_data_set_version'],
'pre_version_data_set_name': req_json['version_data_set_name'],
Expand All @@ -144,7 +143,6 @@ def add(
file_type,
status,
namespace,
bucket_name,
pre_data_set_name,
pre_data_set_version,
file_names,
Expand All @@ -166,7 +164,6 @@ def add(
%(file_type)s,
%(status)s,
%(namespace)s,
%(bucket_name)s,
%(pre_data_set_name)s,
%(pre_data_set_version)s,
%(file_names)s,
Expand Down Expand Up @@ -246,7 +243,6 @@ def info_by_id(
dpt.data_process_config_info,
dpt.start_datetime,
dpt.end_datetime,
dpt.bucket_name,
dpt.namespace,
dpt.pre_version_data_set_name,
dpt.create_user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,21 @@ def query_question_answer_list(

sql = """
select
id,
task_id,
document_id,
document_chunk_id,
file_name,
question,
answer
from public.data_process_task_question_answer_clean
dptqa.id,
dptqa.task_id,
dptqa.document_id,
dptqa.document_chunk_id,
dptqa.file_name,
dptqa.question,
dptqa.answer,
dptdc.content,
dptdc.page_number
from public.data_process_task_question_answer dptqa
left join public.data_process_task_document_chunk dptdc
on
dptdc.id = dptqa.document_chunk_id
where
document_id = %(document_id)s
dptqa.document_id = %(document_id)s
""".strip()

res = postgresql_pool_client.execute_query(pool, sql, params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,14 @@ def text_manipulate(
pool=conn_pool
)

qa_data_dict = [['q', 'a']]
qa_data_dict = [['q', 'a', 'file_name', 'page_number', 'chunk_content']]
for item in qa_list.get('data'):
qa_data_dict.append([
item.get('question'),
item.get('answer')
item.get('answer'),
item.get('file_name'),
item.get('page_number'),
item.get('content')
])

# Save the csv file.
Expand Down
23 changes: 13 additions & 10 deletions data-processing/data_manipulation/kube/dataset_cr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@
logger = logging.getLogger(__name__)

def update_dataset_k8s_cr(
bucket_name,
namespace,
version_data_set_name,
reason
reason,
message
):
""" Update the condition info for the dataset.
bucket_name: bucket name;
namespace: namespace;
version_data_set_name: version dataset name;
reason: the update reason;
"""
try:
kube = client.KubeEnv()

one_cr_datasets = kube.get_versioneddatasets_status(
bucket_name,
namespace,
version_data_set_name
)

Expand All @@ -56,18 +57,20 @@ def update_dataset_k8s_cr(
'lastTransitionTime': now_utc_str,
'reason': reason,
'status': "True",
"type": "DataProcessing"
"type": "DataProcessing",
"message": message
})
else:
conditions[found_index] = {
'lastTransitionTime': now_utc_str,
'reason': reason,
'status': "True",
"type": "DataProcessing"
"type": "DataProcessing",
"message": message
}

kube.patch_versioneddatasets_status(
bucket_name,
namespace,
version_data_set_name,
{
'status': {
Expand All @@ -90,20 +93,20 @@ def update_dataset_k8s_cr(
}

def get_dataset_status_k8s_cr(
bucket_name,
namespace,
version_data_set_name
):
""" get the condition info for the dataset.
bucket_name: bucket name;
namespace: namespace;
version_data_set_name: version dataset name;
"""
try:
dataset_status = None
kube = client.KubeEnv()

one_cr_datasets = kube.get_versioneddatasets_status(
bucket_name,
namespace,
version_data_set_name
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def add(
"post_data_set_name": "dataset1",
"post_data_set_version": "v2",
"version_data_set_name": "dataset1-v2",
"bucket_name": "system-tce",
"namespace": "system-tce",
"file_names": [
{
"name": "数据处理文件_小T.pdf"
Expand Down
2 changes: 0 additions & 2 deletions data-processing/db-scripts/init-database-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
update_user character varying(32) COLLATE pg_catalog."default",
update_program character varying(64) COLLATE pg_catalog."default",
namespace character varying(64) COLLATE pg_catalog."default",
bucket_name character varying(64) COLLATE pg_catalog."default",
current_log_id character varying(32) COLLATE pg_catalog."default",
CONSTRAINT data_process_task_pkey PRIMARY KEY (id)
);

COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name';
COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id';
COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息';

Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.2.10
version: 0.2.11
appVersion: "0.1.0"

keywords:
Expand Down
2 changes: 0 additions & 2 deletions deploy/charts/arcadia/templates/pg-init-data-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ data:
update_user character varying(32) COLLATE pg_catalog."default",
update_program character varying(64) COLLATE pg_catalog."default",
namespace character varying(64) COLLATE pg_catalog."default",
bucket_name character varying(64) COLLATE pg_catalog."default",
current_log_id character varying(32) COLLATE pg_catalog."default",
CONSTRAINT data_process_task_pkey PRIMARY KEY (id)
);
COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name';
COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id';
COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息';
Expand Down

0 comments on commit 0a8f671

Please sign in to comment.