diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index b871e26d9..4948de501 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -4237,7 +4237,6 @@ input AddDataProcessInput { post_data_set_name: String! post_data_set_version: String! data_process_config_info: [DataProcessConfigItem!] - bucket_name: String! version_data_set_name: String! namespace: String! creator: String! @@ -27410,7 +27409,7 @@ func (ec *executionContext) unmarshalInputAddDataProcessInput(ctx context.Contex asMap[k] = v } - fieldsInOrder := [...]string{"name", "file_type", "pre_data_set_name", "pre_data_set_version", "file_names", "post_data_set_name", "post_data_set_version", "data_process_config_info", "bucket_name", "version_data_set_name", "namespace", "creator"} + fieldsInOrder := [...]string{"name", "file_type", "pre_data_set_name", "pre_data_set_version", "file_names", "post_data_set_name", "post_data_set_version", "data_process_config_info", "version_data_set_name", "namespace", "creator"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -27489,15 +27488,6 @@ func (ec *executionContext) unmarshalInputAddDataProcessInput(ctx context.Contex return it, err } it.DataProcessConfigInfo = data - case "bucket_name": - var err error - - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("bucket_name")) - data, err := ec.unmarshalNString2string(ctx, v) - if err != nil { - return it, err - } - it.BucketName = data case "version_data_set_name": var err error diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index 35f212917..339bbc922 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -19,7 +19,6 @@ type AddDataProcessInput struct { PostDataSetName string `json:"post_data_set_name"` PostDataSetVersion string `json:"post_data_set_version"` DataProcessConfigInfo []*DataProcessConfigItem `json:"data_process_config_info,omitempty"` - BucketName string `json:"bucket_name"` VersionDataSetName string `json:"version_data_set_name"` Namespace string `json:"namespace"` Creator string `json:"creator"` diff --git a/apiserver/graph/schema/dataprocessing.graphqls b/apiserver/graph/schema/dataprocessing.graphqls index c47eb9299..447d4a56b 100644 --- a/apiserver/graph/schema/dataprocessing.graphqls +++ b/apiserver/graph/schema/dataprocessing.graphqls @@ -49,7 +49,6 @@ input AddDataProcessInput { post_data_set_name: String! post_data_set_version: String! data_process_config_info: [DataProcessConfigItem!] - bucket_name: String! version_data_set_name: String! namespace: String! creator: String! diff --git a/data-processing/data_manipulation/controller/data_process_controller.py b/data-processing/data_manipulation/controller/data_process_controller.py index 41dd684a4..a816d0692 100644 --- a/data-processing/data_manipulation/controller/data_process_controller.py +++ b/data-processing/data_manipulation/controller/data_process_controller.py @@ -54,13 +54,14 @@ async def add(request): "post_data_set_name": "dataset1", "post_data_set_version": "v2", "version_data_set_name": "dataset1-v2", - "bucket_name": "system-tce", "file_names": [ { "name": "数据处理文件_小T.pdf" } ], - "data_process_config_info": [] + "data_process_config_info": [], + "creator": "", + "namespace": "abc" } """ res = data_process_service.add( diff --git a/data-processing/data_manipulation/data_store_process/minio_store_process.py b/data-processing/data_manipulation/data_store_process/minio_store_process.py index b6d55b417..5b554663e 100644 --- a/data-processing/data_manipulation/data_store_process/minio_store_process.py +++ b/data-processing/data_manipulation/data_store_process/minio_store_process.py @@ -52,7 +52,7 @@ def text_manipulate( req_json is a dictionary object. """ - bucket_name = req_json['bucket_name'] + namespace = req_json['namespace'] support_type = req_json['data_process_config_info'] file_names = req_json['file_names'] @@ -72,9 +72,10 @@ def text_manipulate( # update the dataset status update_dataset = _update_dateset_status( - bucket_name=req_json['bucket_name'], + namespace=req_json['namespace'], version_data_set_name=req_json['version_data_set_name'], reason='processing', + message='Data processing in progress', task_id=id, log_id=log_id, creator=req_json.get('creator'), @@ -130,7 +131,7 @@ def text_manipulate( # 将文件下载到本地 minio_store_client.download( minio_client, - bucket_name=bucket_name, + bucket_name=namespace, folder_prefix=folder_prefix, file_name=file_name ) @@ -328,7 +329,7 @@ def text_manipulate( minio_store_client.upload_files_to_minio_with_tags( minio_client=minio_client, local_folder=file_path + 'final', - minio_bucket=bucket_name, + minio_bucket=namespace, minio_prefix=folder_prefix, support_type=support_type, data_volumes_file=data_volumes_file @@ -336,9 +337,10 @@ def text_manipulate( # update the dataset status update_dataset = _update_dateset_status( - bucket_name=req_json['bucket_name'], + namespace=req_json['namespace'], version_data_set_name=req_json['version_data_set_name'], reason=task_status, + message=error_msg, task_id=id, log_id=log_id, creator=req_json.get('creator'), @@ -417,9 +419,10 @@ def text_manipulate_retry( # 更新数据集状态 update_dataset = _update_dateset_status( - bucket_name=task_info_dict.get('namespace'), + namespace=task_info_dict.get('namespace'), version_data_set_name=task_info_dict.get('pre_version_data_set_name'), reason='processing', + message='Data processing in progress', task_id=task_id, log_id=log_id, creator=creator, @@ -515,7 +518,7 @@ def text_manipulate_retry( minio_store_client.upload_files_to_minio_with_tags( minio_client=minio_client, local_folder=file_path + 'final', - minio_bucket=task_info_dict.get('bucket_name'), + minio_bucket=task_info_dict.get('namespace'), minio_prefix=folder_prefix, support_type=task_info_dict.get('data_process_config_info'), data_volumes_file=data_volumes_file @@ -523,9 +526,10 @@ def text_manipulate_retry( # 更新数据集状态 update_dataset = _update_dateset_status( - bucket_name=task_info_dict.get('namespace'), + namespace=task_info_dict.get('namespace'), version_data_set_name=task_info_dict.get('pre_version_data_set_name'), reason=task_status, + message=error_msg, task_id=task_id, log_id=log_id, creator=creator, @@ -597,9 +601,10 @@ def _remove_local_file(file_name): } def _update_dateset_status( - bucket_name, + namespace, version_data_set_name, reason, + message, task_id, log_id, creator, @@ -608,21 +613,22 @@ def _update_dateset_status( logger.debug(''.join([ f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n", f"task_id: {task_id}\n", - f"bucket_name: {bucket_name}\n", + f"namespace: {namespace}\n", f"version_data_set_name: {version_data_set_name}\n", f"reason: {reason}" ])) update_dataset = dataset_cr.update_dataset_k8s_cr( - bucket_name=bucket_name, + namespace=namespace, version_data_set_name=version_data_set_name, - reason=reason + reason=reason, + message=message ) if update_dataset['status'] != 200: logger.error(''.join([ f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n", f"task_id: {task_id}\n", - f"bucket_name: {bucket_name}\n", + f"namespace: {namespace}\n", f"version_data_set_name: {version_data_set_name}\n", f"reason: {reason}" ])) @@ -1019,7 +1025,7 @@ def _text_manipulate_retry_for_document( # 将文件下载到本地 minio_store_client.download( minio_client, - bucket_name=task_info.get('bucket_name'), + bucket_name=task_info.get('namespace'), folder_prefix=folder_prefix, file_name=file_name ) diff --git a/data-processing/data_manipulation/database_operate/data_process_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_db_operate.py index 025525fea..92dee60de 100644 --- a/data-processing/data_manipulation/database_operate/data_process_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_db_operate.py @@ -120,7 +120,6 @@ def add( 'file_type': req_json['file_type'], 'status': 'processing', 'namespace': req_json['namespace'], - 'bucket_name': req_json['bucket_name'], 'pre_data_set_name': req_json['pre_data_set_name'], 'pre_data_set_version': req_json['pre_data_set_version'], 'pre_version_data_set_name': req_json['version_data_set_name'], @@ -144,7 +143,6 @@ def add( file_type, status, namespace, - bucket_name, pre_data_set_name, pre_data_set_version, file_names, @@ -166,7 +164,6 @@ def add( %(file_type)s, %(status)s, %(namespace)s, - %(bucket_name)s, %(pre_data_set_name)s, %(pre_data_set_version)s, %(file_names)s, @@ -246,7 +243,6 @@ def info_by_id( dpt.data_process_config_info, dpt.start_datetime, dpt.end_datetime, - dpt.bucket_name, dpt.namespace, dpt.pre_version_data_set_name, dpt.create_user, diff --git a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py index 52b0c574b..631167634 100644 --- a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py @@ -457,16 +457,21 @@ def query_question_answer_list( sql = """ select - id, - task_id, - document_id, - document_chunk_id, - file_name, - question, - answer - from public.data_process_task_question_answer_clean + dptqa.id, + dptqa.task_id, + dptqa.document_id, + dptqa.document_chunk_id, + dptqa.file_name, + dptqa.question, + dptqa.answer, + dptdc.content, + dptdc.page_number + from public.data_process_task_question_answer dptqa + left join public.data_process_task_document_chunk dptdc + on + dptdc.id = dptqa.document_chunk_id where - document_id = %(document_id)s + dptqa.document_id = %(document_id)s """.strip() res = postgresql_pool_client.execute_query(pool, sql, params) diff --git a/data-processing/data_manipulation/file_handle/common_handle.py b/data-processing/data_manipulation/file_handle/common_handle.py index 3ea203431..45ec1d63f 100644 --- a/data-processing/data_manipulation/file_handle/common_handle.py +++ b/data-processing/data_manipulation/file_handle/common_handle.py @@ -132,11 +132,14 @@ def text_manipulate( pool=conn_pool ) - qa_data_dict = [['q', 'a']] + qa_data_dict = [['q', 'a', 'file_name', 'page_number', 'chunk_content']] for item in qa_list.get('data'): qa_data_dict.append([ item.get('question'), - item.get('answer') + item.get('answer'), + item.get('file_name'), + item.get('page_number'), + item.get('content') ]) # Save the csv file. diff --git a/data-processing/data_manipulation/kube/dataset_cr.py b/data-processing/data_manipulation/kube/dataset_cr.py index b38381e41..54bc7b436 100644 --- a/data-processing/data_manipulation/kube/dataset_cr.py +++ b/data-processing/data_manipulation/kube/dataset_cr.py @@ -21,13 +21,14 @@ logger = logging.getLogger(__name__) def update_dataset_k8s_cr( - bucket_name, + namespace, version_data_set_name, - reason + reason, + message ): """ Update the condition info for the dataset. - bucket_name: bucket name; + namespace: namespace; version_data_set_name: version dataset name; reason: the update reason; """ @@ -35,7 +36,7 @@ def update_dataset_k8s_cr( kube = client.KubeEnv() one_cr_datasets = kube.get_versioneddatasets_status( - bucket_name, + namespace, version_data_set_name ) @@ -56,18 +57,20 @@ def update_dataset_k8s_cr( 'lastTransitionTime': now_utc_str, 'reason': reason, 'status': "True", - "type": "DataProcessing" + "type": "DataProcessing", + "message": message }) else: conditions[found_index] = { 'lastTransitionTime': now_utc_str, 'reason': reason, 'status': "True", - "type": "DataProcessing" + "type": "DataProcessing", + "message": message } kube.patch_versioneddatasets_status( - bucket_name, + namespace, version_data_set_name, { 'status': { @@ -90,12 +93,12 @@ def update_dataset_k8s_cr( } def get_dataset_status_k8s_cr( - bucket_name, + namespace, version_data_set_name ): """ get the condition info for the dataset. - bucket_name: bucket name; + namespace: namespace; version_data_set_name: version dataset name; """ try: @@ -103,7 +106,7 @@ def get_dataset_status_k8s_cr( kube = client.KubeEnv() one_cr_datasets = kube.get_versioneddatasets_status( - bucket_name, + namespace, version_data_set_name ) diff --git a/data-processing/data_manipulation/service/data_process_service.py b/data-processing/data_manipulation/service/data_process_service.py index 3da1a5315..a60f9c60f 100644 --- a/data-processing/data_manipulation/service/data_process_service.py +++ b/data-processing/data_manipulation/service/data_process_service.py @@ -65,7 +65,7 @@ def add( "post_data_set_name": "dataset1", "post_data_set_version": "v2", "version_data_set_name": "dataset1-v2", - "bucket_name": "system-tce", + "namespace": "system-tce", "file_names": [ { "name": "数据处理文件_小T.pdf" diff --git a/data-processing/db-scripts/init-database-schema.sql b/data-processing/db-scripts/init-database-schema.sql index 29eac8b02..ba7ff471d 100644 --- a/data-processing/db-scripts/init-database-schema.sql +++ b/data-processing/db-scripts/init-database-schema.sql @@ -25,12 +25,10 @@ update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(64) COLLATE pg_catalog."default", namespace character varying(64) COLLATE pg_catalog."default", - bucket_name character varying(64) COLLATE pg_catalog."default", current_log_id character varying(32) COLLATE pg_catalog."default", CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ); - COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name'; COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id'; COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息'; diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index 04e38f5bd..6b0700271 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(KubeBB Component) for KubeAGI Arcadia type: application -version: 0.2.10 +version: 0.2.11 appVersion: "0.1.0" keywords: diff --git a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml index 72fa6b55b..56e5cb198 100644 --- a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml +++ b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml @@ -29,12 +29,10 @@ data: update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(64) COLLATE pg_catalog."default", namespace character varying(64) COLLATE pg_catalog."default", - bucket_name character varying(64) COLLATE pg_catalog."default", current_log_id character varying(32) COLLATE pg_catalog."default", CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ); - COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name'; COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id'; COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息';