diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index 72ab28e29..108cd63b0 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -167,6 +167,7 @@ type ComplexityRoot struct { Config func(childComplexity int) int Creator func(childComplexity int) int EndTime func(childComplexity int) int + ErrorMsg func(childComplexity int) int FileNum func(childComplexity int) int FileType func(childComplexity int) int ID func(childComplexity int) int @@ -180,6 +181,7 @@ type ComplexityRoot struct { } DataProcessItem struct { + ErrorMsg func(childComplexity int) int ID func(childComplexity int) int Name func(childComplexity int) int PostDataSetName func(childComplexity int) int @@ -1224,6 +1226,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.DataProcessDetailsItem.EndTime(childComplexity), true + case "DataProcessDetailsItem.error_msg": + if e.complexity.DataProcessDetailsItem.ErrorMsg == nil { + break + } + + return e.complexity.DataProcessDetailsItem.ErrorMsg(childComplexity), true + case "DataProcessDetailsItem.file_num": if e.complexity.DataProcessDetailsItem.FileNum == nil { break @@ -1294,6 +1303,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.DataProcessDetailsItem.Status(childComplexity), true + case "DataProcessItem.error_msg": + if e.complexity.DataProcessItem.ErrorMsg == nil { + break + } + + return e.complexity.DataProcessItem.ErrorMsg(childComplexity), true + case "DataProcessItem.id": if e.complexity.DataProcessItem.ID == nil { break @@ -4112,6 +4128,8 @@ type DataProcessItem { post_data_set_version: String # 开始时间 start_datetime: String! + # 错误日志 + error_msg: String } # 数据处理支持类型 @@ -4165,6 +4183,7 @@ type DataProcessDetailsItem { start_time: String! end_time: String! creator: String! + error_msg: String config: [DataProcessConfig!] } @@ -9794,6 +9813,8 @@ func (ec *executionContext) fieldContext_DataProcessDetails_data(ctx context.Con return ec.fieldContext_DataProcessDetailsItem_end_time(ctx, field) case "creator": return ec.fieldContext_DataProcessDetailsItem_creator(ctx, field) + case "error_msg": + return ec.fieldContext_DataProcessDetailsItem_error_msg(ctx, field) case "config": return ec.fieldContext_DataProcessDetailsItem_config(ctx, field) } @@ -10375,6 +10396,47 @@ func (ec *executionContext) fieldContext_DataProcessDetailsItem_creator(ctx cont return fc, nil } +func (ec *executionContext) _DataProcessDetailsItem_error_msg(ctx context.Context, field graphql.CollectedField, obj *DataProcessDetailsItem) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessDetailsItem_error_msg(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ErrorMsg, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*string) + fc.Result = res + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessDetailsItem_error_msg(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessDetailsItem", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _DataProcessDetailsItem_config(ctx context.Context, field graphql.CollectedField, obj *DataProcessDetailsItem) (ret graphql.Marshaler) { fc, err := ec.fieldContext_DataProcessDetailsItem_config(ctx, field) if err != nil { @@ -10777,6 +10839,47 @@ func (ec *executionContext) fieldContext_DataProcessItem_start_datetime(ctx cont return fc, nil } +func (ec *executionContext) _DataProcessItem_error_msg(ctx context.Context, field graphql.CollectedField, obj *DataProcessItem) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessItem_error_msg(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ErrorMsg, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*string) + fc.Result = res + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessItem_error_msg(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessItem", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _DataProcessMutation_createDataProcessTask(ctx context.Context, field graphql.CollectedField, obj *DataProcessMutation) (ret graphql.Marshaler) { fc, err := ec.fieldContext_DataProcessMutation_createDataProcessTask(ctx, field) if err != nil { @@ -20362,6 +20465,8 @@ func (ec *executionContext) fieldContext_PaginatedDataProcessItem_data(ctx conte return ec.fieldContext_DataProcessItem_post_data_set_version(ctx, field) case "start_datetime": return ec.fieldContext_DataProcessItem_start_datetime(ctx, field) + case "error_msg": + return ec.fieldContext_DataProcessItem_error_msg(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type DataProcessItem", field.Name) }, @@ -30536,6 +30641,8 @@ func (ec *executionContext) _DataProcessDetailsItem(ctx context.Context, sel ast if out.Values[i] == graphql.Null { out.Invalids++ } + case "error_msg": + out.Values[i] = ec._DataProcessDetailsItem_error_msg(ctx, field, obj) case "config": out.Values[i] = ec._DataProcessDetailsItem_config(ctx, field, obj) default: @@ -30609,6 +30716,8 @@ func (ec *executionContext) _DataProcessItem(ctx context.Context, sel ast.Select if out.Values[i] == graphql.Null { out.Invalids++ } + case "error_msg": + out.Values[i] = ec._DataProcessItem_error_msg(ctx, field, obj) default: panic("unknown field " + strconv.Quote(field.Name)) } diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index 58f27b22e..7424c63bf 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -416,6 +416,7 @@ type DataProcessDetailsItem struct { StartTime string `json:"start_time"` EndTime string `json:"end_time"` Creator string `json:"creator"` + ErrorMsg *string `json:"error_msg,omitempty"` Config []*DataProcessConfig `json:"config,omitempty"` } @@ -428,6 +429,7 @@ type DataProcessItem struct { PostDataSetName string `json:"post_data_set_name"` PostDataSetVersion *string `json:"post_data_set_version,omitempty"` StartDatetime string `json:"start_datetime"` + ErrorMsg *string `json:"error_msg,omitempty"` } type DataProcessMutation struct { diff --git a/apiserver/graph/schema/dataprocessing.gql b/apiserver/graph/schema/dataprocessing.gql index 166b9be98..83a8c86c9 100644 --- a/apiserver/graph/schema/dataprocessing.gql +++ b/apiserver/graph/schema/dataprocessing.gql @@ -11,6 +11,7 @@ query allDataProcessListByPage($input: AllDataProcessListByPageInput!){ post_data_set_name post_data_set_version start_datetime + error_msg } message } @@ -63,6 +64,7 @@ query dataProcessDetails($input: DataProcessDetailsInput){ start_time end_time creator + error_msg config { name description diff --git a/apiserver/graph/schema/dataprocessing.graphqls b/apiserver/graph/schema/dataprocessing.graphqls index e92633696..3ea0365b7 100644 --- a/apiserver/graph/schema/dataprocessing.graphqls +++ b/apiserver/graph/schema/dataprocessing.graphqls @@ -116,6 +116,8 @@ type DataProcessItem { post_data_set_version: String # 开始时间 start_datetime: String! + # 错误日志 + error_msg: String } # 数据处理支持类型 @@ -169,6 +171,7 @@ type DataProcessDetailsItem { start_time: String! end_time: String! creator: String! + error_msg: String config: [DataProcessConfig!] } diff --git a/data-processing/data_manipulation/data_store_process/minio_store_process.py b/data-processing/data_manipulation/data_store_process/minio_store_process.py index 9e29425cf..f807194b1 100644 --- a/data-processing/data_manipulation/data_store_process/minio_store_process.py +++ b/data-processing/data_manipulation/data_store_process/minio_store_process.py @@ -17,15 +17,21 @@ import logging import os import ulid +import traceback import pandas as pd from common import log_tag_const from common.config import config from data_store_clients import minio_store_client -from database_operate import data_process_db_operate, data_process_document_db_operate, data_process_detail_db_operate, data_process_detail_preview_db_operate +from database_operate import (data_process_db_operate, + data_process_document_db_operate, + data_process_detail_db_operate, + data_process_detail_preview_db_operate, + data_process_log_db_operate) from file_handle import csv_handle, pdf_handle, word_handle from kube import dataset_cr from utils import file_utils +from pathlib import Path logger = logging.getLogger(__name__) @@ -44,6 +50,20 @@ def text_manipulate( support_type = req_json['data_process_config_info'] file_names = req_json['file_names'] + # 新增数据处理任务日志 + log_id = ulid.ulid() + insert_log_item = { + 'id': log_id, + 'task_id': id, + 'type': 'NOW', + 'error_msg': '', + 'creator': req_json.get('creator') + } + data_process_log_db_operate.add( + insert_log_item, + pool=pool + ) + # minio 数据集统一前缀 minio_dataset_prefix = config.minio_dataset_prefix @@ -56,26 +76,21 @@ def text_manipulate( # get a minio client minio_client = minio_store_client.get_minio_client() - # 将文件都下载到本地 - for file_name in file_names: - minio_store_client.download( - minio_client, - bucket_name=bucket_name, - folder_prefix=folder_prefix, - file_name=file_name['name'] - ) - # 将文件信息存入data_process_task_document表中 for file_name in file_names: # 新增文档处理进度信息 document_id = ulid.ulid() + extension = file_utils.get_file_extension(file_name['name']) document_insert_item = { 'id': document_id, 'task_id': id, 'file_name': file_name['name'], 'status': 'not_start', 'progress': '0', - 'creator': req_json['creator'] + 'creator': req_json['creator'], + 'from_source_type': 'MinIO', + 'from_source_path': config.minio_api_url, + 'document_type': extension } data_process_document_db_operate.add( document_insert_item, @@ -85,14 +100,23 @@ def text_manipulate( # 文件处理 task_status = 'process_complete' + error_msg = '' # 存放每个文件对应的数据量 data_volumes_file = [] for item in file_names: result = None - file_name = item['name'] - file_extension = file_name.split('.')[-1].lower() + + # 将文件下载到本地 + minio_store_client.download( + minio_client, + bucket_name=bucket_name, + folder_prefix=folder_prefix, + file_name=file_name + ) + + file_extension = file_utils.get_file_extension(file_name) if file_extension in ['pdf']: # 处理PDF文件 result = pdf_handle.text_manipulate( @@ -118,11 +142,18 @@ def text_manipulate( task_id=id, create_user=req_json['creator'] ) + + # 将下载的本地文件删除 + _remove_local_file(file_name) if result is None: - logger.error(f"{log_tag_const.MINIO_STORE_PROCESS} The file type is not supported. The current file type is: {file_extension}") + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} The file type is not supported \n", + f"The current file type is: {file_extension}" + ])) # 任务失败 task_status = 'process_fail' + error_msg = f"{file_extension} file type is not currently supported." break if result.get('status') != 200: @@ -133,6 +164,7 @@ def text_manipulate( f"The error is: {result.get('message')}\n" ])) task_status = 'process_fail' + error_msg = result.get('message') break data_volumes_file.append(result['data']) @@ -168,15 +200,22 @@ def text_manipulate( data_volumes_file=data_volumes_file ) - # 将本地临时文件删除 - for item in file_names: - remove_file_path = file_utils.get_temp_file_path() - local_file_path = remove_file_path + 'original/' + item['name'] - file_utils.delete_file(local_file_path) + # 更新数据处理任务日志 + update_log_item = { + 'id': log_id, + 'status': task_status, + 'error_msg': error_msg, + 'creator': req_json['creator'] + } + data_process_log_db_operate.update_status_by_id( + update_log_item, + pool=pool + ) # 数据库更新任务状态 update_params = { 'id': id, + 'current_log_id': log_id, 'status': task_status, 'user': req_json['creator'] } @@ -199,7 +238,26 @@ def text_manipulate( } - +def _remove_local_file(file_name): + try: + remove_file_path = file_utils.get_temp_file_path() + local_file_path = remove_file_path + 'original/' + file_name + file_utils.delete_file(local_file_path) + return { + 'status': 200, + 'message': '删除成功', + 'data': '' + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} remove local file fail \n", + f"the error. \n{traceback.format_exc()}" + ])) + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } diff --git a/data-processing/data_manipulation/database_operate/data_process_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_db_operate.py index bebd384bf..1d554e33d 100644 --- a/data-processing/data_manipulation/database_operate/data_process_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_db_operate.py @@ -34,21 +34,26 @@ def list_by_page( sql = """ select - id, - name, - status, - namespace, - pre_data_set_name, - pre_data_set_version, - post_data_set_name, - post_data_set_version, - start_datetime + dpt.id, + dpt.name, + dpt.status, + dpt.namespace, + dpt.pre_data_set_name, + dpt.pre_data_set_version, + dpt.post_data_set_name, + dpt.post_data_set_version, + dpt.start_datetime, + dptl.error_msg from - public.data_process_task + public.data_process_task dpt + left join + public.data_process_task_log dptl + on + dpt.current_log_id = dptl.id where - name like %(keyword)s and - namespace = %(namespace)s - order by start_datetime desc + dpt.name like %(keyword)s and + dpt.namespace = %(namespace)s + order by dpt.start_datetime desc limit %(pageSize)s offset %(pageIndex)s """.strip() @@ -115,6 +120,7 @@ def add( 'file_type': req_json['file_type'], 'status': 'processing', 'namespace': req_json['namespace'], + 'bucket_name': req_json['bucket_name'], 'pre_data_set_name': req_json['pre_data_set_name'], 'pre_data_set_version': req_json['pre_data_set_version'], 'file_names': ujson.dumps(req_json['file_names']), @@ -137,6 +143,7 @@ def add( file_type, status, namespace, + bucket_name, pre_data_set_name, pre_data_set_version, file_names, @@ -157,6 +164,7 @@ def add( %(file_type)s, %(status)s, %(namespace)s, + %(bucket_name)s, %(pre_data_set_name)s, %(pre_data_set_version)s, %(file_names)s, @@ -189,6 +197,7 @@ def update_status_by_id( params = { 'id': req_json['id'], 'status': req_json['status'], + 'current_log_id': req_json['current_log_id'], 'end_datetime': now, 'update_datetime': now, 'update_program': program, @@ -198,6 +207,7 @@ def update_status_by_id( sql = """ update public.data_process_task set status = %(status)s, + current_log_id = %(current_log_id)s, update_datetime = %(update_datetime)s, end_datetime = %(end_datetime)s, update_program = %(update_program)s, @@ -221,24 +231,29 @@ def info_by_id( sql = """ select - id, - name, - file_type, - status, - pre_data_set_name, - pre_data_set_version, - post_data_set_name, - post_data_set_version, - file_names, - data_process_config_info, - start_datetime, - end_datetime, - create_user, - update_datetime + dpt.id, + dpt.name, + dpt.file_type, + dpt.status, + dpt.pre_data_set_name, + dpt.pre_data_set_version, + dpt.post_data_set_name, + dpt.post_data_set_version, + dpt.file_names, + dpt.data_process_config_info, + dpt.start_datetime, + dpt.end_datetime, + dpt.create_user, + dpt.update_datetime, + dptl.error_msg from - public.data_process_task + public.data_process_task dpt + left join + public.data_process_task_log dptl + on + dpt.current_log_id = dptl.id where - id = %(id)s + dpt.id = %(id)s """.strip() res = postgresql_pool_client.execute_query(pool, sql, params) diff --git a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py index 1041e06ec..dc50b22f0 100644 --- a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py @@ -29,6 +29,8 @@ def insert_transform_info( params = { 'id': req_json['id'], 'task_id': req_json['task_id'], + 'document_id': req_json['document_id'], + 'document_chunk_id': req_json['document_chunk_id'], 'file_name': req_json['file_name'], 'transform_type': req_json['transform_type'], 'pre_content': req_json['pre_content'], @@ -45,6 +47,8 @@ def insert_transform_info( insert into public.data_process_task_detail ( id, task_id, + document_id, + document_chunk_id, file_name, transform_type, pre_content, @@ -59,6 +63,8 @@ def insert_transform_info( values ( %(id)s, %(task_id)s, + %(document_id)s, + %(document_chunk_id)s, %(file_name)s, %(transform_type)s, %(pre_content)s, @@ -88,6 +94,8 @@ def insert_question_answer_info( params = { 'id': req_json['id'], 'task_id': req_json['task_id'], + 'document_id': req_json['document_id'], + 'document_chunk_id': req_json['document_chunk_id'], 'file_name': req_json['file_name'], 'question': req_json['question'], 'answer': req_json['answer'], @@ -103,6 +111,8 @@ def insert_question_answer_info( insert into public.data_process_task_question_answer ( id, task_id, + document_id, + document_chunk_id, file_name, question, answer, @@ -116,6 +126,8 @@ def insert_question_answer_info( values ( %(id)s, %(task_id)s, + %(document_id)s, + %(document_chunk_id)s, %(file_name)s, %(question)s, %(answer)s, @@ -328,6 +340,7 @@ def delete_qa_by_task_id( res = postgresql_pool_client.execute_update(pool, sql, params) return res + def list_file_name_for_clean( req_json, pool @@ -352,12 +365,107 @@ def list_file_name_for_clean( task_id = %(task_id)s and transform_type in ('remove_invisible_characters', 'space_standardization', 'remove_garbled_text', 'traditional_to_simplified', 'remove_html_tag', 'remove_emojis') group by file_name - """.strip() + """.strip() res = postgresql_pool_client.execute_query(pool, sql, params) return res +def insert_question_answer_clean_info( + req_json, + pool +): + """Insert a question answer clean info""" + now = date_time_utils.now_str() + user = req_json['create_user'] + program = '数据处理任务问题和答案-新增' + + params = { + 'id': req_json['id'], + 'task_id': req_json['task_id'], + 'document_id': req_json['document_id'], + 'document_chunk_id': req_json['document_chunk_id'], + 'file_name': req_json['file_name'], + 'question': req_json['question'], + 'answer': req_json['answer'], + 'create_datetime': now, + 'create_user': user, + 'create_program': program, + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + insert into public.data_process_task_question_answer_clean ( + id, + task_id, + document_id, + document_chunk_id, + file_name, + question, + answer, + create_datetime, + create_user, + create_program, + update_datetime, + update_program, + update_user + ) + values ( + %(id)s, + %(task_id)s, + %(document_id)s, + %(document_chunk_id)s, + %(file_name)s, + %(question)s, + %(answer)s, + %(create_datetime)s, + %(create_program)s, + %(create_user)s, + %(update_datetime)s, + %(update_program)s, + %(update_user)s + ) + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def query_question_answer_list( + document_id, + pool +): + """List question answer with document id. + + req_json is a dictionary object. for example: + { + "document_id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + pool: databasec connection pool; + """ + params = { + 'document_id': document_id + } + + sql = """ + select + id, + task_id, + document_id, + document_chunk_id, + file_name, + question, + answer + from public.data_process_task_question_answer_clean + where + document_id = %(document_id)s + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res + def list_file_name_for_privacy( req_json, pool @@ -386,3 +494,28 @@ def list_file_name_for_privacy( res = postgresql_pool_client.execute_query(pool, sql, params) return res + +def delete_qa_clean_by_task_id( + req_json, + pool +): + """delete qa clean info by task id. + + req_json is a dictionary object. for example: + { + "id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + pool: databasec connection pool; + """ + params = { + 'task_id': req_json['id'] + } + + sql = """ + delete from public.data_process_task_question_answer_clean + where + task_id = %(task_id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res diff --git a/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py new file mode 100644 index 000000000..8fe7e63bd --- /dev/null +++ b/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py @@ -0,0 +1,166 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ulid +from database_clients import postgresql_pool_client +from utils import date_time_utils + +def add( + req_json, + pool +): + """Add a new record""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '数据处理文件拆分-新增' + + params = { + 'id': req_json.get('id'), + 'document_id': req_json.get('document_id'), + 'status': req_json.get('status'), + 'task_id': req_json.get('task_id'), + 'content': req_json.get('content'), + 'meta_info': req_json.get('meta_info'), + 'page_number': req_json.get('page_number'), + 'create_datetime': now, + 'create_user': user, + 'create_program': program, + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + insert into public.data_process_task_document_chunk ( + id, + document_id, + status, + task_id, + content, + meta_info, + page_number, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + ) + values ( + %(id)s, + %(document_id)s, + %(status)s, + %(task_id)s, + %(content)s, + %(meta_info)s, + %(page_number)s, + %(create_datetime)s, + %(create_user)s, + %(create_program)s, + %(update_datetime)s, + %(update_user)s, + %(update_program)s + ) + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + +def update_document_chunk_status_and_start_time( + req_json, + pool +): + """Update the status and start time with id""" + now = req_json['start_time'] + program = '开始处理chunk后的内容' + + params = { + 'id': req_json['id'], + 'status': req_json['status'], + 'start_time': now, + 'update_datetime': now, + 'update_user': req_json['update_user'], + 'update_program': program + } + + sql = """ + update public.data_process_task_document_chunk set + status = %(status)s, + start_time = %(start_time)s, + update_datetime = %(update_datetime)s, + update_user = %(update_user)s, + update_program = %(update_program)s + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + +def update_document_chunk_status_and_end_time( + req_json, + pool +): + """Update the status and end time with id""" + now = req_json['end_time'] + program = 'chunk后的内容处理完成' + + params = { + 'id': req_json['id'], + 'status': req_json['status'], + 'end_time': now, + 'update_datetime': now, + 'update_user': req_json['update_user'], + 'update_program': program + } + + sql = """ + update public.data_process_task_document_chunk set + status = %(status)s, + end_time = %(end_time)s, + update_datetime = %(update_datetime)s, + update_user = %(update_user)s, + update_program = %(update_program)s + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + +def delete_by_task_id( + req_json, + pool +): + """delete info by task id. + + req_json is a dictionary object. for example: + { + "id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + pool: databasec connection pool; + """ + params = { + 'task_id': req_json['id'] + } + + sql = """ + delete from public.data_process_task_document_chunk + where + task_id = %(task_id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + diff --git a/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py index b6a414f4a..3d47b9677 100644 --- a/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py @@ -26,11 +26,14 @@ def add( program = '数据处理文件进度-新增' params = { - 'id': req_json['id'], - 'file_name': req_json['file_name'], - 'status': req_json['status'], - 'progress': req_json['progress'], - 'task_id': req_json['task_id'], + 'id': req_json.get('id'), + 'file_name': req_json.get('file_name'), + 'status': req_json.get('status'), + 'progress': req_json.get('progress'), + 'task_id': req_json.get('task_id'), + 'from_source_type': req_json.get('from_source_type'), + 'from_source_path': req_json.get('from_source_path'), + 'document_type': req_json.get('document_type'), 'create_datetime': now, 'create_user': user, 'create_program': program, @@ -46,6 +49,9 @@ def add( status, progress, task_id, + from_source_type, + from_source_path, + document_type, create_datetime, create_user, create_program, @@ -59,6 +65,9 @@ def add( %(status)s, %(progress)s, %(task_id)s, + %(from_source_type)s, + %(from_source_path)s, + %(document_type)s, %(create_datetime)s, %(create_user)s, %(create_program)s, @@ -106,7 +115,7 @@ def update_document_status_and_end_time( req_json, pool ): - """Update the status and start time with id""" + """Update the status and end time with id""" now = req_json['end_time'] program = '文件处理完成-修改' @@ -143,6 +152,7 @@ def update_document_progress( 'id': req_json['id'], 'progress': req_json['progress'], 'update_datetime': now, + 'update_user': req_json['update_user'], 'update_program': program } diff --git a/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py new file mode 100644 index 000000000..5caead1d4 --- /dev/null +++ b/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py @@ -0,0 +1,116 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import ujson +import ulid +from database_clients import postgresql_pool_client +from sanic.response import json +from utils import date_time_utils + + +def add( + req_json, + pool +): + """Add a new record""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '数据处理任务日志-新增' + + params = { + 'id': req_json['id'], + 'task_id': req_json['task_id'], + 'type': req_json['type'], + 'status': 'processing', + 'error_msg': req_json['error_msg'], + 'start_datetime': now, + 'create_datetime': now, + 'create_user': user, + 'create_program': program, + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + insert into public.data_process_task_log ( + id, + task_id, + type, + status, + error_msg, + start_datetime, + create_datetime, + create_program, + create_user, + update_datetime, + update_program, + update_user + ) + values ( + %(id)s, + %(task_id)s, + %(type)s, + %(status)s, + %(error_msg)s, + %(start_datetime)s, + %(create_datetime)s, + %(create_program)s, + %(create_user)s, + %(update_datetime)s, + %(update_program)s, + %(update_user)s + ) + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + +def update_status_by_id( + req_json, + pool +): + """Update the status with id""" + now = date_time_utils.now_str() + user = req_json['creator'] + program = '添加错误日志信息' + + params = { + 'id': req_json['id'], + 'status': req_json['status'], + 'error_msg': req_json['error_msg'], + 'end_datetime': now, + 'update_datetime': now, + 'update_program': program, + 'update_user': user + } + + sql = """ + update public.data_process_task_log set + status = %(status)s, + end_datetime = %(end_datetime)s, + error_msg = %(error_msg)s, + update_datetime = %(update_datetime)s, + update_program = %(update_program)s, + update_user = %(update_user)s + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + + diff --git a/data-processing/data_manipulation/file_handle/common_handle.py b/data-processing/data_manipulation/file_handle/common_handle.py index 3ab7cccc2..67469561b 100644 --- a/data-processing/data_manipulation/file_handle/common_handle.py +++ b/data-processing/data_manipulation/file_handle/common_handle.py @@ -22,7 +22,9 @@ import ulid from common import log_tag_const from common.config import config -from database_operate import data_process_detail_db_operate, data_process_document_db_operate +from database_operate import (data_process_detail_db_operate, + data_process_document_db_operate, + data_process_document_chunk_db_operate) from langchain.text_splitter import SpacyTextSplitter from llm_api_service.qa_provider_open_ai import QAProviderOpenAI from llm_api_service.qa_provider_zhi_pu_ai_online import QAProviderZhiPuAIOnline @@ -34,120 +36,125 @@ def text_manipulate( + all_document_for_process, file_name, - document_id, - content, support_type, conn_pool, - task_id, - create_user, - chunk_size, - chunk_overlap + create_user ): """Manipulate the text content. + all_document_for_process: document info file_name: file name; support_type: support type; conn_pool: database connection pool; - task_id: data process task id; - chunk_size: chunk size; - chunk_overlap: chunk overlap; + create_user: creator; """ logger.debug(f"{log_tag_const.COMMON_HANDLE} Start to manipulate the text") try: support_type_map = _convert_support_type_to_map(support_type) - - # Clean the data such as removing invisible characters. - clean_result = _data_clean( - support_type_map=support_type_map, - file_name=file_name, - data=content, - conn_pool=conn_pool, - task_id=task_id, - create_user=create_user + document_chunk_size = len(all_document_for_process) + + # 更新文件状态为开始 + task_id = all_document_for_process[0].get('task_id') + document_id = all_document_for_process[0].get('document_id') + _update_document_status_and_start_time( + id=all_document_for_process[0].get('document_id'), + chunk_size=document_chunk_size, + conn_pool=conn_pool ) - if clean_result['status'] == 200: - content = clean_result['data'] - - # Remove the privacy info such as removing email. - clean_result = _remove_privacy_info( - support_type_map=support_type_map, - file_name=file_name, - data=content, - conn_pool=conn_pool, - task_id=task_id, - create_user=create_user - ) + text_process_success_num = 0 + for document in all_document_for_process: + document_chunk_id = document.get('id') + # Clean the data such as removing invisible characters. + clean_result = _data_clean( + support_type_map=support_type_map, + file_name=file_name, + data=document.get('content'), + conn_pool=conn_pool, + task_id=task_id, + document_id=document_id, + document_chunk_id=document_chunk_id, + create_user=create_user + ) - if clean_result['status'] == 200: - content = clean_result['data'] + if clean_result['status'] == 200: + content = clean_result['data'] - - # 数据量 - object_count = 0 - object_name = '' - if support_type_map.get('qa_split'): - logger.debug(f"{log_tag_const.QA_SPLIT} Start to split QA.") - qa_list_dict = support_type_map.get('qa_split') - llm_config = qa_list_dict.get('llm_config') - - qa_response = _generate_qa_list( - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - data=content, - document_id=document_id, + # Remove the privacy info such as removing email. + clean_result = _remove_privacy_info( + support_type_map=support_type_map, + file_name=file_name, + data=document.get('content'), conn_pool=conn_pool, - llm_config=llm_config + task_id=task_id, + document_id=document_id, + document_chunk_id=document_chunk_id, + create_user=create_user ) - if qa_response.get('status') != 200: - return qa_response + if clean_result['status'] == 200: + content = clean_result['data'] - logger.debug(f"{log_tag_const.QA_SPLIT} The QA data is: \n{qa_response}\n") + if support_type_map.get('qa_split'): + logger.debug(f"{log_tag_const.QA_SPLIT} Start to split QA.") + text_process_success_num += 1 - # start to insert qa data - qa_data = qa_response.get('data') - for i in range(len(qa_data)): - if i == 0: - continue - qa_insert_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'question': qa_data[i][0], - 'answer': qa_data[i][1], - 'create_user': create_user - } - - data_process_detail_db_operate.insert_question_answer_info( - qa_insert_item, - pool=conn_pool + qa_response = _qa_split( + support_type_map=support_type_map, + task_id=task_id, + document_chunk_size=document_chunk_size, + document_chunk_id=document_chunk_id, + file_name=file_name, + content=content, + document_id=document_id, + text_process_success_num=text_process_success_num, + conn_pool=conn_pool, + create_user=create_user ) - # Save the csv file. - file_name_without_extension = file_name.rsplit('.', 1)[0] + '_final' - csv_utils.save_csv( - file_name=file_name_without_extension + '.csv', - phase_value='final', - data=qa_data - ) - - object_name = file_name_without_extension + '.csv' - # 减 1 是为了去除表头 - object_count = len(qa_data) - 1 - - logger.debug(f"{log_tag_const.QA_SPLIT} Finish splitting QA.") + if qa_response.get('status') != 200: + return qa_response + + # 文件处理成功,更新data_process_task_document中的文件状态 + _updata_document_status_and_end_time( + id=document_id, + status='success', + conn_pool=conn_pool + ) + + # 通过documentId查询生成的所有QA数据 + qa_list = data_process_detail_db_operate.query_question_answer_list( + document_id=document_id, + pool=conn_pool + ) + + qa_data_dict = [['q', 'a']] + for item in qa_list.get('data'): + qa_data_dict.append([ + item.get('question'), + item.get('answer') + ]) + + # Save the csv file. + file_name_without_extension = file_utils.get_file_name_without_extension(file_name) + file_name_csv = file_name_without_extension + '.csv' + csv_utils.save_csv( + file_name=file_name_csv, + phase_value='final', + data=qa_data_dict + ) logger.debug(f"{log_tag_const.COMMON_HANDLE} Finish manipulating the text") return { 'status': 200, 'message': '', 'data': { - 'object_name': object_name, - 'object_count': object_count + 'object_name': file_name_csv, + 'object_count': len(qa_list.get('data')) } } except Exception as ex: @@ -166,6 +173,8 @@ def _data_clean( support_type_map, data, task_id, + document_id, + document_chunk_id, file_name, create_user, conn_pool @@ -206,6 +215,8 @@ def _data_clean( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_invisible_characters', 'pre_content': item['pre_content'], @@ -231,6 +242,8 @@ def _data_clean( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'space_standardization', 'pre_content': item['pre_content'], @@ -254,6 +267,8 @@ def _data_clean( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_garbled_text', 'pre_content': data, @@ -277,6 +292,8 @@ def _data_clean( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'traditional_to_simplified', 'pre_content': data, @@ -300,6 +317,8 @@ def _data_clean( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_html_tag', 'pre_content': data, @@ -325,6 +344,8 @@ def _data_clean( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_emojis', 'pre_content': item['pre_content'], @@ -348,6 +369,8 @@ def _remove_privacy_info( support_type_map, data, task_id, + document_id, + document_chunk_id, file_name, create_user, conn_pool @@ -388,6 +411,8 @@ def _remove_privacy_info( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_email', 'pre_content': item['pre_content'], @@ -413,6 +438,8 @@ def _remove_privacy_info( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_ip_address', 'pre_content': item['pre_content'], @@ -438,6 +465,8 @@ def _remove_privacy_info( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_number', 'pre_content': item['pre_content'], @@ -461,6 +490,8 @@ def _remove_privacy_info( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_number', 'pre_content': item['pre_content'], @@ -484,6 +515,8 @@ def _remove_privacy_info( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_number', 'pre_content': item['pre_content'], @@ -507,6 +540,8 @@ def _remove_privacy_info( task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, 'file_name': file_name, 'transform_type': 'remove_number', 'pre_content': item['pre_content'], @@ -526,23 +561,102 @@ def _remove_privacy_info( } - -def _generate_qa_list( - chunk_size, - chunk_overlap, - data, +def _qa_split( + support_type_map, + task_id, + document_chunk_size, + document_chunk_id, + file_name, + content, document_id, + text_process_success_num, conn_pool, + create_user +): + qa_list_dict = support_type_map.get('qa_split') + llm_config = qa_list_dict.get('llm_config') + + # 更新chunk状态为开始 + _update_document_chunk_status_and_start_time( + id=document_chunk_id, + update_user=create_user, + conn_pool=conn_pool + ) + + qa_response = _generate_qa_list( + content=content, + llm_config=llm_config + ) + + if qa_response.get('status') != 200: + # 处理失败 + # 更新data_process_task_document_chunk中的状态 + _updata_document_chunk_status_and_end_time( + id=document_chunk_id, + update_user=create_user, + status='fail', + conn_pool=conn_pool + ) + + # 更新data_process_task_document中的文件状态 + _updata_document_status_and_end_time( + id=document_id, + status='fail', + conn_pool=conn_pool + ) + else: + # 将QA数据存入表中 + qa_data = qa_response.get('data') + for i in range(len(qa_data)): + qa_insert_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id, + 'file_name': file_name, + 'question': qa_data[i][0], + 'answer': qa_data[i][1], + 'create_user': create_user + } + + data_process_detail_db_operate.insert_question_answer_info( + qa_insert_item, + pool=conn_pool + ) + + data_process_detail_db_operate.insert_question_answer_clean_info( + qa_insert_item, + pool=conn_pool + ) + + # 更新data_process_task_document_chunk中的状态 + _updata_document_chunk_status_and_end_time( + id=document_chunk_id, + update_user=create_user, + status='success', + conn_pool=conn_pool + ) + + # 更新文件处理进度 + progress = int(text_process_success_num / document_chunk_size * 100) + _updata_document_progress( + id=document_id, + progress=progress, + update_user=create_user, + conn_pool=conn_pool + ) + + return qa_response + + +def _generate_qa_list( + content, llm_config ): """Generate the Question and Answer list. - - chunk_size: chunk size; - chunk_overlap: chunk overlap; - data: the text used to generate QA; - name: llms cr name; - namespace: llms cr namespace; - model: model id or model version; + + content: the text used to generate QA; + llm_config: llms config info; """ name=llm_config.get('name') namespace=llm_config.get('namespace') @@ -552,33 +666,6 @@ def _generate_qa_list( top_p=llm_config.get('top_p') max_tokens=llm_config.get('max_tokens') - # Split the text. - if chunk_size is None: - chunk_size = config.knowledge_chunk_size - - if chunk_overlap is None: - chunk_overlap = config.knowledge_chunk_overlap - - text_splitter = SpacyTextSplitter( - separator="\n\n", - pipeline="zh_core_web_sm", - chunk_size=int(chunk_size), - chunk_overlap=int(chunk_overlap) - ) - texts = text_splitter.split_text(data) - - logger.debug(''.join([ - f"original text is: \n{data}\n", - f"split text is: \n{texts}\n" - ])) - - # 更新文件状态为开始 - _update_document_status_and_start_time( - id=document_id, - texts=texts, - conn_pool=conn_pool - ) - # llms cr 中模型相关信息 llm_spec_info = model_cr.get_spec_for_llms_k8s_cr( name=name, @@ -586,7 +673,7 @@ def _generate_qa_list( ) # Generate the QA list. - qa_list = [['q', 'a']] + qa_list = [] if llm_spec_info.get('data').get('provider').get('worker'): # get base url for configmap base_url = model_cr.get_worker_base_url_k8s_configmap( @@ -609,43 +696,17 @@ def _generate_qa_list( temperature=temperature, max_tokens=max_tokens ) - # text process success number - text_process_success_num=0 - - for item in texts: - text = item.replace("\n", "") - data = qa_provider.generate_qa_list( - text=text, - prompt_template=prompt_template - ) - if data.get('status') != 200: - # 文件处理失败,更新data_process_task_document中的文件状态 - _updata_document_status_and_end_time( - id=document_id, - status='fail', - conn_pool=conn_pool - ) + data = qa_provider.generate_qa_list( + text=content, + prompt_template=prompt_template + ) - return data + if data.get('status') != 200: + # 文件处理失败 + return data - qa_list.extend(data.get('data')) - - # 更新文件处理进度 - text_process_success_num += 1 - progress = int(text_process_success_num / len(texts) * 100) - _updata_document_progress( - id=document_id, - progress=progress, - conn_pool=conn_pool - ) - - # 文件处理成功,更新data_process_task_document中的文件状态 - _updata_document_status_and_end_time( - id=document_id, - status='success', - conn_pool=conn_pool - ) + qa_list.extend(data.get('data')) else: endpoint = llm_spec_info.get('data').get('provider').get('endpoint') base_url = endpoint.get('url') @@ -657,7 +718,6 @@ def _generate_qa_list( namespace=namespace ) api_key = secret_info.get('apiKey') - llm_type = llm_spec_info.get('data').get('type') logger.debug(''.join([ @@ -671,46 +731,19 @@ def _generate_qa_list( if llm_type == 'zhipuai': zhipuai_api_key = base64.b64decode(api_key).decode('utf-8') qa_provider = QAProviderZhiPuAIOnline(api_key=zhipuai_api_key) - # text process success number - text_process_success_num=0 # generate QA list - for item in texts: - text = item.replace("\n", "") - data = qa_provider.generate_qa_list( - text=text, - model=model, - prompt_template=prompt_template, - top_p=top_p, - temperature=temperature - ) - if data.get('status') != 200: - # 文件处理失败,更新data_process_task_document中的文件状态 - _updata_document_status_and_end_time( - id=document_id, - status='fail', - conn_pool=conn_pool - ) - - return data - - qa_list.extend(data.get('data')) - - # 更新文件处理进度 - text_process_success_num += 1 - progress = int(text_process_success_num / len(texts) * 100) - _updata_document_progress( - id=document_id, - progress=progress, - conn_pool=conn_pool - ) - - # 文件处理成功,更新data_process_task_document中的文件状态 - _updata_document_status_and_end_time( - id=document_id, - status='success', - conn_pool=conn_pool + data = qa_provider.generate_qa_list( + text=content, + model=model, + prompt_template=prompt_template, + top_p=top_p, + temperature=temperature ) + if data.get('status') != 200: + return data + + qa_list.extend(data.get('data')) else: return { 'status': 1000, @@ -753,7 +786,7 @@ def _convert_support_type_to_map(supprt_type): def _update_document_status_and_start_time( id, - texts, + chunk_size, conn_pool ): try: @@ -762,7 +795,7 @@ def _update_document_status_and_start_time( 'id': id, 'status': 'doing', 'start_time': now, - 'chunk_size': len(texts) + 'chunk_size': chunk_size } data_process_document_db_operate.update_document_status_and_start_time( document_update_item, @@ -821,12 +854,14 @@ def _updata_document_status_and_end_time( def _updata_document_progress( id, progress, + update_user, conn_pool ): try: now = date_time_utils.now_str() document_update_item = { 'id': id, + 'update_user': update_user, 'progress': progress } data_process_document_db_operate.update_document_progress( @@ -850,3 +885,71 @@ def _updata_document_progress( 'data': traceback.format_exc() } +def _update_document_chunk_status_and_start_time( + id, + update_user, + conn_pool +): + try: + now = date_time_utils.now_str() + document_chunk_update_item = { + 'id': id, + 'status': 'doing', + 'update_user': update_user, + 'start_time': now + } + data_process_document_chunk_db_operate.update_document_chunk_status_and_start_time( + document_chunk_update_item, + pool=conn_pool + ) + + return { + 'status': 200, + 'message': '', + 'data': '' + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.COMMON_HANDLE} update chunk document status ", + f"\n{traceback.format_exc()}" + ])) + return { + 'status': 1000, + 'message': str(ex), + 'data': traceback.format_exc() + } + +def _updata_document_chunk_status_and_end_time( + id, + status, + update_user, + conn_pool +): + try: + now = date_time_utils.now_str() + document_chunk_update_item = { + 'id': id, + 'status': status, + 'update_user': update_user, + 'end_time': now + } + data_process_document_chunk_db_operate.update_document_chunk_status_and_end_time( + document_chunk_update_item, + pool=conn_pool + ) + + return { + 'status': 200, + 'message': '', + 'data': '' + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.COMMON_HANDLE} update document status ", + f"\n{traceback.format_exc()}" + ])) + return { + 'status': 1000, + 'message': str(ex), + 'data': traceback.format_exc() + } diff --git a/data-processing/data_manipulation/file_handle/pdf_handle.py b/data-processing/data_manipulation/file_handle/pdf_handle.py index 8ea024098..d832037de 100644 --- a/data-processing/data_manipulation/file_handle/pdf_handle.py +++ b/data-processing/data_manipulation/file_handle/pdf_handle.py @@ -15,10 +15,16 @@ import logging import traceback +import ulid +import ujson from common import log_tag_const +from common.config import config from file_handle import common_handle -from utils import file_utils, pdf_utils +from utils import file_utils +from langchain.document_loaders import PyPDFLoader +from langchain.text_splitter import SpacyTextSplitter +from database_operate import data_process_document_chunk_db_operate logger = logging.getLogger(__name__) @@ -49,21 +55,44 @@ def text_manipulate( pdf_file_path = file_utils.get_temp_file_path() file_path = pdf_file_path + 'original/' + file_name - # step 1 - # Get the content from the pdf fild. - content = pdf_utils.get_content(file_path) - logger.debug(f"{log_tag_const.PDF_HANDLE} The pdf content is\n {content}") + # Text splitter + documents = _get_documents_by_langchain( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + file_path=file_path + ) + + # step 2 + # save all chunk info to database + all_document_for_process = [] + for document in documents: + chunck_id = ulid.ulid() + page = document.metadata.get('page') + 1 + content = document.page_content.replace("\n", "") + meta_info = ujson.dumps(document.metadata, ensure_ascii=False) + chunk_insert_item = { + 'id': chunck_id, + 'document_id': document_id, + 'task_id': task_id, + 'status': 'not_start', + 'content': content, + 'meta_info': meta_info, + 'page_number': page, + 'creator': create_user + } + all_document_for_process.append(chunk_insert_item) + + data_process_document_chunk_db_operate.add( + chunk_insert_item, + pool=conn_pool + ) response = common_handle.text_manipulate( file_name=file_name, - document_id=document_id, - content=content, + all_document_for_process=all_document_for_process, support_type=support_type, conn_pool=conn_pool, - task_id=task_id, - create_user=create_user, - chunk_size=chunk_size, - chunk_overlap=chunk_overlap + create_user=create_user ) return response @@ -78,3 +107,26 @@ def text_manipulate( 'message': str(ex), 'data': traceback.format_exc() } + +def _get_documents_by_langchain( + chunk_size, + chunk_overlap, + file_path +): + # Split the text. + if chunk_size is None: + chunk_size = config.knowledge_chunk_size + + if chunk_overlap is None: + chunk_overlap = config.knowledge_chunk_overlap + + source_reader = PyPDFLoader(file_path) + text_splitter = SpacyTextSplitter( + separator="\n\n", + pipeline="zh_core_web_sm", + chunk_size=int(chunk_size), + chunk_overlap=int(chunk_overlap) + ) + documents = source_reader.load_and_split(text_splitter) + + return documents diff --git a/data-processing/data_manipulation/file_handle/word_handle.py b/data-processing/data_manipulation/file_handle/word_handle.py index 42e3f9436..de124d946 100644 --- a/data-processing/data_manipulation/file_handle/word_handle.py +++ b/data-processing/data_manipulation/file_handle/word_handle.py @@ -17,8 +17,11 @@ import traceback from common import log_tag_const +from common.config import config from file_handle import common_handle -from utils import csv_utils, file_utils, docx_utils +from utils import file_utils, docx_utils +from langchain.text_splitter import SpacyTextSplitter +from database_operate import data_process_document_chunk_db_operate logger = logging.getLogger(__name__) @@ -49,21 +52,42 @@ def docx_text_manipulate( word_file_path = file_utils.get_temp_file_path() file_path = word_file_path + 'original/' + file_name - # step 1 - # Get the content from the word fild. - content = docx_utils.get_content(file_path) - logger.debug(f"{log_tag_const.WORD_HANDLE} The word content is\n {content}") + # Text splitter + documents = _get_documents_by_langchain( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + file_path=file_path + ) + + # step 2 + # save all chunk info to database + all_document_for_process = [] + for document in documents: + chunck_id = ulid.ulid() + content = document.replace("\n", "") + chunk_insert_item = { + 'id': chunck_id, + 'document_id': document_id, + 'task_id': task_id, + 'status': 'not_start', + 'content': content, + 'meta_info': '', + 'page_number': '', + 'creator': create_user + } + all_document_for_process.append(chunk_insert_item) + + data_process_document_chunk_db_operate.add( + chunk_insert_item, + pool=conn_pool + ) response = common_handle.text_manipulate( file_name=file_name, - document_id=document_id, - content=content, + all_document_for_process=all_document_for_process, support_type=support_type, conn_pool=conn_pool, - task_id=task_id, - create_user=create_user, - chunk_size=chunk_size, - chunk_overlap=chunk_overlap + create_user=create_user ) return response @@ -78,3 +102,26 @@ def docx_text_manipulate( 'message': str(ex), 'data': traceback.format_exc() } + +def _get_documents_by_langchain( + chunk_size, + chunk_overlap, + file_path +): + # Split the text. + if chunk_size is None: + chunk_size = config.knowledge_chunk_size + + if chunk_overlap is None: + chunk_overlap = config.knowledge_chunk_overlap + + content = docx_utils.get_content(file_path) + text_splitter = SpacyTextSplitter( + separator="\n\n", + pipeline="zh_core_web_sm", + chunk_size=int(chunk_size), + chunk_overlap=int(chunk_overlap) + ) + documents = text_splitter.split_text(data) + + return documents diff --git a/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py b/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py index 8f4a775ef..d7668d993 100644 --- a/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py +++ b/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py @@ -93,31 +93,21 @@ def generate_qa_list( ])) status = 1000 - message = traceback.format_exc() - break else: response = llm_chain.run(text=text) result = self.__get_qa_list_from_response(response) if len(result) > 0: - break - elif invoke_count > int(config.llm_qa_retry_count): - logger.error(''.join([ - f"{log_tag_const.OPEN_AI} Cannot access the open ai service.\n", - f"The tracing error is: \n{traceback.format_exc()}\n" - ])) - - status = 1000 - message = traceback.format_exc() - break else: logger.warn('failed to get QA list, wait for 10 seconds and retry') time.sleep(10) # sleep 10 seconds invoke_count += 1 + message = '模型调用成功,生成的QA格式不对,请更换prompt' except Exception as ex: time.sleep(10) invoke_count += 1 + message = '调用本地模型失败,请检查模型是否可用' return { 'status': status, @@ -141,22 +131,25 @@ def __get_qa_list_from_response( the response from open ai service """ result = [] - - pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') - - - # 移除换行符 - response_text = response.replace('\\n', '') - matches = pattern.findall(response_text) - - result = [] - for match in matches: - q = match[1] - a = match[4] - if q and a: - a = re.sub(r'[\n]', '', a).strip() - result.append([q, a]) - + try: + pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') + + + # 移除换行符 + response_text = response.replace('\\n', '') + matches = pattern.findall(response_text) + + for match in matches: + q = match[1] + a = match[4] + if q and a: + a = re.sub(r'[\n]', '', a).strip() + result.append([q, a]) + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.OPEN_AI} 从结果中提取QA失败\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) return result diff --git a/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py b/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py index e32357e12..078c2877b 100644 --- a/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py +++ b/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py @@ -82,8 +82,6 @@ def generate_qa_list( ])) status = 1000 - message = traceback.format_exc() - break else: response = zhipuai.model_api.invoke( @@ -95,21 +93,12 @@ def generate_qa_list( if response['success']: result = self.__format_response_to_qa_list(response) if len(result) > 0: - break - elif invoke_count > int(config.llm_qa_retry_count): - logger.error(''.join([ - f"{log_tag_const.ZHI_PU_AI} Cannot access the open ai service.\n", - f"The tracing error is: \n{traceback.format_exc()}\n" - ])) - - status = 1000 - message = traceback.format_exc() - break else: logger.warn(f"failed to get QA list, wait for {wait_seconds} seconds and retry") time.sleep(wait_seconds) # sleep 120 seconds invoke_count += 1 + message = '模型调用成功,生成的QA格式不对,请更换prompt' else: logger.error(''.join([ f"{log_tag_const.ZHI_PU_AI} Cannot access the ZhiPuAI service.\n", @@ -118,10 +107,12 @@ def generate_qa_list( logger.warn(f"zhipuai request failed, wait for {wait_seconds} seconds and retry") time.sleep(wait_seconds) # sleep 120 seconds invoke_count += 1 + message = '模型调用失败,失败原因: ' + response['msg'] except Exception as ex: logger.warn(f"zhipuai request exception, wait for {wait_seconds} seconds and retry") time.sleep(wait_seconds) invoke_count += 1 + message = '模型调用失败,请检查模型是否可用!' return { 'status': status, @@ -133,18 +124,23 @@ def generate_qa_list( def __format_response_to_qa_list(self, response): """Format the response to the QA list.""" text = response['data']['choices'][0]['content'] - - pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') - # 移除换行符 - text = text.replace('\\n', '') - matches = pattern.findall(text) - result = [] - for match in matches: - q = match[1] - a = match[4] - if q and a: - result.append([q, a]) + try: + pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') + # 移除换行符 + text = text.replace('\\n', '') + matches = pattern.findall(text) + + for match in matches: + q = match[1] + a = match[4] + if q and a: + result.append([q, a]) + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.ZHI_PU_AI} 从结果中提取QA失败\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) return result \ No newline at end of file diff --git a/data-processing/data_manipulation/service/data_process_service.py b/data-processing/data_manipulation/service/data_process_service.py index e5ca0cd36..d699ccc5b 100644 --- a/data-processing/data_manipulation/service/data_process_service.py +++ b/data-processing/data_manipulation/service/data_process_service.py @@ -23,7 +23,8 @@ from database_operate import (data_process_db_operate, data_process_detail_db_operate, data_process_document_db_operate, - data_process_detail_preview_db_operate) + data_process_detail_preview_db_operate, + data_process_document_chunk_db_operate) from kube import dataset_cr from parallel import thread_parallel from utils import date_time_utils @@ -131,8 +132,11 @@ def delete_by_id( # 删除生成的QA信息 data_process_detail_db_operate.delete_qa_by_task_id(req_json, pool=pool) data_process_detail_preview_db_operate.delete_qa_by_task_id(req_json, pool=pool) + data_process_detail_db_operate.delete_qa_clean_by_task_id(req_json, pool=pool) # 删除对应的进度信息 data_process_document_db_operate.delete_by_task_id(req_json, pool=pool) + # 删除chunk的信息 + data_process_document_chunk_db_operate.delete_by_task_id(req_json, pool=pool) return data_process_db_operate.delete_by_id(req_json, pool=pool) @@ -279,6 +283,7 @@ def _get_and_set_basic_detail_info( from_result['start_time'] = detail_info_data['start_datetime'] from_result['end_time'] = detail_info_data['end_datetime'] from_result['creator'] = detail_info_data['create_user'] + from_result['error_msg'] = detail_info_data['error_msg'] from_result['data_process_config_info'] = detail_info_data['data_process_config_info'] else: from_result['id'] = '' diff --git a/data-processing/data_manipulation/utils/file_utils.py b/data-processing/data_manipulation/utils/file_utils.py index 2478d881d..b3460e10b 100644 --- a/data-processing/data_manipulation/utils/file_utils.py +++ b/data-processing/data_manipulation/utils/file_utils.py @@ -14,6 +14,7 @@ import os +from pathlib import Path def get_file_name( @@ -40,3 +41,19 @@ def get_temp_file_path(): def delete_file(file_path): """Delete file""" os.remove(file_path) + + +def get_file_extension(file_name): + """Get file extension""" + path = Path(file_name) + extension = path.suffix + file_extension = extension[1:].lower() + + return file_extension + +def get_file_name_without_extension(file_name): + """Get file name without extension""" + path = Path(file_name) + file_name_without_extension = path.stem + + return file_name_without_extension diff --git a/data-processing/db-scripts/init-database-schema.sql b/data-processing/db-scripts/init-database-schema.sql index 52b9b4b91..093f1d653 100644 --- a/data-processing/db-scripts/init-database-schema.sql +++ b/data-processing/db-scripts/init-database-schema.sql @@ -1,6 +1,6 @@ -- **************************************************************************** -- If any update to this file, MUST sync the content to - -- configMap under deploy/charts/arcadia/pg-init-data-configmap.yaml + -- configMap under deploy/charts/arcadia/templates/pg-init-data-configmap.yaml -- **************************************************************************** -- Table: public.data_process_task @@ -28,6 +28,7 @@ update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(64) COLLATE pg_catalog."default", namespace character varying(64) COLLATE pg_catalog."default", + bucket_name character varying(64) COLLATE pg_catalog."default", CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ); @@ -49,6 +50,8 @@ update_datetime character varying(32) COLLATE pg_catalog."default", update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(32) COLLATE pg_catalog."default", + document_id character varying(32) COLLATE pg_catalog."default", + document_chunk_id character varying(32) COLLATE pg_catalog."default", CONSTRAINT data_process_detail_pkey PRIMARY KEY (id) ); @@ -65,6 +68,8 @@ COMMENT ON COLUMN public.data_process_task_detail.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_detail.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_detail.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_detail.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_detail.document_chunk_id IS '文档chunk id'; CREATE TABLE public.data_process_task_question_answer ( id varchar(32) NOT NULL, -- 主键 @@ -78,6 +83,8 @@ update_datetime varchar(32) NULL, -- 更新时间 update_user varchar(32) NULL, -- 更新用户 update_program varchar(32) NULL, -- 更新程序 + document_id character varying(32) COLLATE pg_catalog."default", + document_chunk_id character varying(32) COLLATE pg_catalog."default", CONSTRAINT data_process_task_question_answer_pkey PRIMARY KEY (id) ); COMMENT ON TABLE public.data_process_task_question_answer IS '数据处理问题答案'; @@ -95,6 +102,8 @@ COMMENT ON COLUMN public.data_process_task_question_answer.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_question_answer.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_question_answer.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_question_answer.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_question_answer.document_chunk_id IS '文档chunk id'; CREATE TABLE IF NOT EXISTS public.data_process_task_document ( @@ -113,6 +122,9 @@ update_datetime character varying(32) COLLATE pg_catalog."default", update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(32) COLLATE pg_catalog."default", + from_source_type character(64) COLLATE pg_catalog."default", + from_source_path character varying(4096) COLLATE pg_catalog."default", + document_type character varying(64) COLLATE pg_catalog."default", CONSTRAINT data_process_task_document_pkey PRIMARY KEY (id) ); @@ -132,6 +144,9 @@ COMMENT ON COLUMN public.data_process_task_document.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_document.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_document.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_document.from_source_type IS '如minio等'; + COMMENT ON COLUMN public.data_process_task_document.from_source_path IS '文件路径, minio的需要包括bucket的名称'; + COMMENT ON COLUMN public.data_process_task_document.document_type IS '文档类型 如txt web_url pdf等'; CREATE TABLE IF NOT EXISTS public.data_process_task_detail_preview ( @@ -163,3 +178,78 @@ COMMENT ON COLUMN public.data_process_task_detail_preview.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_detail_preview.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_detail_preview.update_program IS '更新程序'; + + CREATE TABLE IF NOT EXISTS public.data_process_task_document_chunk + ( + id character varying(32) COLLATE pg_catalog."default" NOT NULL, + document_id character varying(32) COLLATE pg_catalog."default", + status character varying(32) COLLATE pg_catalog."default", + start_time character varying(32) COLLATE pg_catalog."default", + end_time character varying(32) COLLATE pg_catalog."default", + create_datetime character varying(32) COLLATE pg_catalog."default", + create_user character varying(32) COLLATE pg_catalog."default", + create_program character varying(64) COLLATE pg_catalog."default", + update_datetime character varying(32) COLLATE pg_catalog."default", + update_user character varying(32) COLLATE pg_catalog."default", + update_program character varying(32) COLLATE pg_catalog."default", + content text COLLATE pg_catalog."default", + task_id character varying(32) COLLATE pg_catalog."default", + content_clean text COLLATE pg_catalog."default", + content_privacy text COLLATE pg_catalog."default", + meta_info text COLLATE pg_catalog."default", + process_info text COLLATE pg_catalog."default", + page_number character varying(64) COLLATE pg_catalog."default", + CONSTRAINT data_process_task_document_chunk_pkey PRIMARY KEY (id) + ); + + COMMENT ON COLUMN public.data_process_task_document_chunk.id IS '主键id'; + COMMENT ON COLUMN public.data_process_task_document_chunk.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_document_chunk.status IS '状态'; + COMMENT ON COLUMN public.data_process_task_document_chunk.start_time IS '开始时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.end_time IS '结束时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.create_datetime IS '创建时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.create_user IS '创建用户'; + COMMENT ON COLUMN public.data_process_task_document_chunk.create_program IS '创建程序'; + COMMENT ON COLUMN public.data_process_task_document_chunk.update_datetime IS '更新时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.update_user IS '更新用户'; + COMMENT ON COLUMN public.data_process_task_document_chunk.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_document_chunk.content IS 'chunk内容'; + COMMENT ON COLUMN public.data_process_task_document_chunk.task_id IS '数据处理任务 id'; + COMMENT ON COLUMN public.data_process_task_document_chunk.content_clean IS '清洗过之后的chunk内容'; + COMMENT ON COLUMN public.data_process_task_document_chunk.content_privacy IS '对清洗后的内容进行去隐私处理'; + COMMENT ON COLUMN public.data_process_task_document_chunk.meta_info IS 'json结构, 信息包括文档名称,所在页数等。'; + COMMENT ON COLUMN public.data_process_task_document_chunk.process_info IS 'json结构 数据处理信息如进行了哪些清洗'; + COMMENT ON COLUMN public.data_process_task_document_chunk.page_number IS '所在页数'; + + CREATE TABLE IF NOT EXISTS public.data_process_task_question_answer_clean + ( + id character varying(32) COLLATE pg_catalog."default" NOT NULL, + task_id character varying(32) COLLATE pg_catalog."default", + document_id character varying(32) COLLATE pg_catalog."default", + document_chunk_id character varying(32) COLLATE pg_catalog."default", + file_name character varying(512) COLLATE pg_catalog."default", + question text COLLATE pg_catalog."default", + answer text COLLATE pg_catalog."default", + create_datetime character varying(32) COLLATE pg_catalog."default", + create_user character varying(32) COLLATE pg_catalog."default", + create_program character varying(64) COLLATE pg_catalog."default", + update_datetime character varying(32) COLLATE pg_catalog."default", + update_user character varying(32) COLLATE pg_catalog."default", + update_program character varying(32) COLLATE pg_catalog."default", + CONSTRAINT data_process_task_question_answer_clean_pkey PRIMARY KEY (id) + ); + + COMMENT ON TABLE public.data_process_task_question_answer_clean IS '数据处理问题和答案'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.id IS '主键'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.task_id IS '任务Id'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.document_chunk_id IS '文档chunk id'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.file_name IS '文件名称'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.question IS '问题'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.answer IS '答案'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.create_datetime IS '创建时间'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.create_user IS '创建用户'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.create_program IS '创建程序'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.update_datetime IS '更新时间'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.update_user IS '更新用户'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.update_program IS '更新程序'; diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index c7c66c3b2..c4a2bce21 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(KubeBB Component) for KubeAGI Arcadia type: application -version: 0.2.1 +version: 0.2.2 appVersion: "0.1.0" keywords: diff --git a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml index c2f8febfd..45bc5c899 100644 --- a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml +++ b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml @@ -3,7 +3,7 @@ data: init-database-schema.sql: |2+ -- **************************************************************************** -- If any update to this file, MUST sync the content to - -- configMap under deploy/charts/arcadia/pg-init-data-configmap.yaml + -- configMap under deploy/charts/arcadia/templates/pg-init-data-configmap.yaml -- **************************************************************************** -- Table: public.data_process_task @@ -31,6 +31,7 @@ data: update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(64) COLLATE pg_catalog."default", namespace character varying(64) COLLATE pg_catalog."default", + bucket_name character varying(64) COLLATE pg_catalog."default", CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ); @@ -52,6 +53,8 @@ data: update_datetime character varying(32) COLLATE pg_catalog."default", update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(32) COLLATE pg_catalog."default", + document_id character varying(32) COLLATE pg_catalog."default", + document_chunk_id character varying(32) COLLATE pg_catalog."default", CONSTRAINT data_process_detail_pkey PRIMARY KEY (id) ); @@ -68,6 +71,8 @@ data: COMMENT ON COLUMN public.data_process_task_detail.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_detail.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_detail.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_detail.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_detail.document_chunk_id IS '文档chunk id'; CREATE TABLE public.data_process_task_question_answer ( id varchar(32) NOT NULL, -- 主键 @@ -81,6 +86,8 @@ data: update_datetime varchar(32) NULL, -- 更新时间 update_user varchar(32) NULL, -- 更新用户 update_program varchar(32) NULL, -- 更新程序 + document_id character varying(32) COLLATE pg_catalog."default", + document_chunk_id character varying(32) COLLATE pg_catalog."default", CONSTRAINT data_process_task_question_answer_pkey PRIMARY KEY (id) ); COMMENT ON TABLE public.data_process_task_question_answer IS '数据处理问题答案'; @@ -98,6 +105,8 @@ data: COMMENT ON COLUMN public.data_process_task_question_answer.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_question_answer.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_question_answer.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_question_answer.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_question_answer.document_chunk_id IS '文档chunk id'; CREATE TABLE IF NOT EXISTS public.data_process_task_document ( @@ -116,6 +125,9 @@ data: update_datetime character varying(32) COLLATE pg_catalog."default", update_user character varying(32) COLLATE pg_catalog."default", update_program character varying(32) COLLATE pg_catalog."default", + from_source_type character(64) COLLATE pg_catalog."default", + from_source_path character varying(4096) COLLATE pg_catalog."default", + document_type character varying(64) COLLATE pg_catalog."default", CONSTRAINT data_process_task_document_pkey PRIMARY KEY (id) ); @@ -135,6 +147,9 @@ data: COMMENT ON COLUMN public.data_process_task_document.update_datetime IS '更新时间'; COMMENT ON COLUMN public.data_process_task_document.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_document.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_document.from_source_type IS '如minio等'; + COMMENT ON COLUMN public.data_process_task_document.from_source_path IS '文件路径, minio的需要包括bucket的名称'; + COMMENT ON COLUMN public.data_process_task_document.document_type IS '文档类型 如txt web_url pdf等'; CREATE TABLE IF NOT EXISTS public.data_process_task_detail_preview ( @@ -167,6 +182,82 @@ data: COMMENT ON COLUMN public.data_process_task_detail_preview.update_user IS '更新用户'; COMMENT ON COLUMN public.data_process_task_detail_preview.update_program IS '更新程序'; + CREATE TABLE IF NOT EXISTS public.data_process_task_document_chunk + ( + id character varying(32) COLLATE pg_catalog."default" NOT NULL, + document_id character varying(32) COLLATE pg_catalog."default", + status character varying(32) COLLATE pg_catalog."default", + start_time character varying(32) COLLATE pg_catalog."default", + end_time character varying(32) COLLATE pg_catalog."default", + create_datetime character varying(32) COLLATE pg_catalog."default", + create_user character varying(32) COLLATE pg_catalog."default", + create_program character varying(64) COLLATE pg_catalog."default", + update_datetime character varying(32) COLLATE pg_catalog."default", + update_user character varying(32) COLLATE pg_catalog."default", + update_program character varying(32) COLLATE pg_catalog."default", + content text COLLATE pg_catalog."default", + task_id character varying(32) COLLATE pg_catalog."default", + content_clean text COLLATE pg_catalog."default", + content_privacy text COLLATE pg_catalog."default", + meta_info text COLLATE pg_catalog."default", + process_info text COLLATE pg_catalog."default", + page_number character varying(64) COLLATE pg_catalog."default", + CONSTRAINT data_process_task_document_chunk_pkey PRIMARY KEY (id) + ); + + COMMENT ON COLUMN public.data_process_task_document_chunk.id IS '主键id'; + COMMENT ON COLUMN public.data_process_task_document_chunk.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_document_chunk.status IS '状态'; + COMMENT ON COLUMN public.data_process_task_document_chunk.start_time IS '开始时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.end_time IS '结束时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.create_datetime IS '创建时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.create_user IS '创建用户'; + COMMENT ON COLUMN public.data_process_task_document_chunk.create_program IS '创建程序'; + COMMENT ON COLUMN public.data_process_task_document_chunk.update_datetime IS '更新时间'; + COMMENT ON COLUMN public.data_process_task_document_chunk.update_user IS '更新用户'; + COMMENT ON COLUMN public.data_process_task_document_chunk.update_program IS '更新程序'; + COMMENT ON COLUMN public.data_process_task_document_chunk.content IS 'chunk内容'; + COMMENT ON COLUMN public.data_process_task_document_chunk.task_id IS '数据处理任务 id'; + COMMENT ON COLUMN public.data_process_task_document_chunk.content_clean IS '清洗过之后的chunk内容'; + COMMENT ON COLUMN public.data_process_task_document_chunk.content_privacy IS '对清洗后的内容进行去隐私处理'; + COMMENT ON COLUMN public.data_process_task_document_chunk.meta_info IS 'json结构, 信息包括文档名称,所在页数等。'; + COMMENT ON COLUMN public.data_process_task_document_chunk.process_info IS 'json结构 数据处理信息如进行了哪些清洗'; + COMMENT ON COLUMN public.data_process_task_document_chunk.page_number IS '所在页数'; + + CREATE TABLE IF NOT EXISTS public.data_process_task_question_answer_clean + ( + id character varying(32) COLLATE pg_catalog."default" NOT NULL, + task_id character varying(32) COLLATE pg_catalog."default", + document_id character varying(32) COLLATE pg_catalog."default", + document_chunk_id character varying(32) COLLATE pg_catalog."default", + file_name character varying(512) COLLATE pg_catalog."default", + question text COLLATE pg_catalog."default", + answer text COLLATE pg_catalog."default", + create_datetime character varying(32) COLLATE pg_catalog."default", + create_user character varying(32) COLLATE pg_catalog."default", + create_program character varying(64) COLLATE pg_catalog."default", + update_datetime character varying(32) COLLATE pg_catalog."default", + update_user character varying(32) COLLATE pg_catalog."default", + update_program character varying(32) COLLATE pg_catalog."default", + CONSTRAINT data_process_task_question_answer_clean_pkey PRIMARY KEY (id) + ); + + COMMENT ON TABLE public.data_process_task_question_answer_clean IS '数据处理问题和答案'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.id IS '主键'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.task_id IS '任务Id'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.document_id IS '文档id'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.document_chunk_id IS '文档chunk id'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.file_name IS '文件名称'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.question IS '问题'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.answer IS '答案'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.create_datetime IS '创建时间'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.create_user IS '创建用户'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.create_program IS '创建程序'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.update_datetime IS '更新时间'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.update_user IS '更新用户'; + COMMENT ON COLUMN public.data_process_task_question_answer_clean.update_program IS '更新程序'; + + kind: ConfigMap metadata: name: pg-init-data