From fc03782c4d4588e12bc1ec4990ecca3cc6fbd7d3 Mon Sep 17 00:00:00 2001 From: wangxinbiao <1412146116@qq.com> Date: Fri, 15 Mar 2024 18:29:13 +0800 Subject: [PATCH] feat:add document chunk --- apiserver/graph/generated/generated.go | 126 +++++++++++++++++- apiserver/graph/generated/models_gen.go | 4 + apiserver/graph/schema/dataprocessing.gql | 2 + .../graph/schema/dataprocessing.graphqls | 4 + .../templates/pg-init-data-configmap.yaml | 4 +- .../db-scripts/init-database-schema.sql | 4 +- .../data_store_process/minio_store_process.py | 31 ++--- .../data_process_document_chunk_db_operate.py | 38 ++++++ .../src/file_handle/common_handle.py | 26 ++++ .../src/file_handle/pdf_handle.py | 27 ++-- .../src/file_handle/web_handle.py | 18 +-- .../src/file_handle/word_handle.py | 18 +-- .../src/service/data_process_service.py | 51 ++++++- 13 files changed, 292 insertions(+), 61 deletions(-) diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index fdedf6dba..6c591459c 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -139,6 +139,8 @@ type ComplexityRoot struct { } DataProcessConfigChildren struct { + ChunkOverlap func(childComplexity int) int + ChunkSize func(childComplexity int) int Description func(childComplexity int) int Enable func(childComplexity int) int FileProgress func(childComplexity int) int @@ -1298,6 +1300,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.DataProcessConfig.Status(childComplexity), true + case "DataProcessConfigChildren.chunk_overlap": + if e.complexity.DataProcessConfigChildren.ChunkOverlap == nil { + break + } + + return e.complexity.DataProcessConfigChildren.ChunkOverlap(childComplexity), true + + case "DataProcessConfigChildren.chunk_size": + if e.complexity.DataProcessConfigChildren.ChunkSize == nil { + break + } + + return e.complexity.DataProcessConfigChildren.ChunkSize(childComplexity), true + case "DataProcessConfigChildren.description": if e.complexity.DataProcessConfigChildren.Description == nil { break @@ -5141,6 +5157,8 @@ input FileItem { # 数据处理配置条目 input DataProcessConfigItem { type: String! + chunk_size: Int + chunk_overlap: Int llm_config: LLMConfigItem remove_duplicate_config: RemoveDuplicateConfig } @@ -5310,6 +5328,8 @@ type DataProcessConfigChildren { enable: String zh_name: String description: String + chunk_size: Int + chunk_overlap: Int llm_config: LLMConfig preview: [DataProcessConfigpreView] file_progress: [DataProcessConfigpreFileProgress] @@ -11111,6 +11131,10 @@ func (ec *executionContext) fieldContext_DataProcessConfig_children(ctx context. return ec.fieldContext_DataProcessConfigChildren_zh_name(ctx, field) case "description": return ec.fieldContext_DataProcessConfigChildren_description(ctx, field) + case "chunk_size": + return ec.fieldContext_DataProcessConfigChildren_chunk_size(ctx, field) + case "chunk_overlap": + return ec.fieldContext_DataProcessConfigChildren_chunk_overlap(ctx, field) case "llm_config": return ec.fieldContext_DataProcessConfigChildren_llm_config(ctx, field) case "preview": @@ -11288,6 +11312,88 @@ func (ec *executionContext) fieldContext_DataProcessConfigChildren_description(c return fc, nil } +func (ec *executionContext) _DataProcessConfigChildren_chunk_size(ctx context.Context, field graphql.CollectedField, obj *DataProcessConfigChildren) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessConfigChildren_chunk_size(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ChunkSize, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*int) + fc.Result = res + return ec.marshalOInt2ᚖint(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessConfigChildren_chunk_size(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessConfigChildren", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _DataProcessConfigChildren_chunk_overlap(ctx context.Context, field graphql.CollectedField, obj *DataProcessConfigChildren) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessConfigChildren_chunk_overlap(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ChunkOverlap, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*int) + fc.Result = res + return ec.marshalOInt2ᚖint(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessConfigChildren_chunk_overlap(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessConfigChildren", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _DataProcessConfigChildren_llm_config(ctx context.Context, field graphql.CollectedField, obj *DataProcessConfigChildren) (ret graphql.Marshaler) { fc, err := ec.fieldContext_DataProcessConfigChildren_llm_config(ctx, field) if err != nil { @@ -34572,7 +34678,7 @@ func (ec *executionContext) unmarshalInputDataProcessConfigItem(ctx context.Cont asMap[k] = v } - fieldsInOrder := [...]string{"type", "llm_config", "remove_duplicate_config"} + fieldsInOrder := [...]string{"type", "chunk_size", "chunk_overlap", "llm_config", "remove_duplicate_config"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -34586,6 +34692,20 @@ func (ec *executionContext) unmarshalInputDataProcessConfigItem(ctx context.Cont return it, err } it.Type = data + case "chunk_size": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("chunk_size")) + data, err := ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } + it.ChunkSize = data + case "chunk_overlap": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("chunk_overlap")) + data, err := ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } + it.ChunkOverlap = data case "llm_config": ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("llm_config")) data, err := ec.unmarshalOLLMConfigItem2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐLLMConfigItem(ctx, v) @@ -38121,6 +38241,10 @@ func (ec *executionContext) _DataProcessConfigChildren(ctx context.Context, sel out.Values[i] = ec._DataProcessConfigChildren_zh_name(ctx, field, obj) case "description": out.Values[i] = ec._DataProcessConfigChildren_description(ctx, field, obj) + case "chunk_size": + out.Values[i] = ec._DataProcessConfigChildren_chunk_size(ctx, field, obj) + case "chunk_overlap": + out.Values[i] = ec._DataProcessConfigChildren_chunk_overlap(ctx, field, obj) case "llm_config": out.Values[i] = ec._DataProcessConfigChildren_llm_config(ctx, field, obj) case "preview": diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index 96a2a4b20..6a5caa577 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -417,6 +417,8 @@ type DataProcessConfigChildren struct { Enable *string `json:"enable,omitempty"` ZhName *string `json:"zh_name,omitempty"` Description *string `json:"description,omitempty"` + ChunkSize *int `json:"chunk_size,omitempty"` + ChunkOverlap *int `json:"chunk_overlap,omitempty"` LlmConfig *LLMConfig `json:"llm_config,omitempty"` Preview []*DataProcessConfigpreView `json:"preview,omitempty"` FileProgress []*DataProcessConfigpreFileProgress `json:"file_progress,omitempty"` @@ -430,6 +432,8 @@ type DataProcessConfigInfo struct { type DataProcessConfigItem struct { Type string `json:"type"` + ChunkSize *int `json:"chunk_size,omitempty"` + ChunkOverlap *int `json:"chunk_overlap,omitempty"` LlmConfig *LLMConfigItem `json:"llm_config,omitempty"` RemoveDuplicateConfig *RemoveDuplicateConfig `json:"remove_duplicate_config,omitempty"` } diff --git a/apiserver/graph/schema/dataprocessing.gql b/apiserver/graph/schema/dataprocessing.gql index 2187bf073..f9a4115c5 100644 --- a/apiserver/graph/schema/dataprocessing.gql +++ b/apiserver/graph/schema/dataprocessing.gql @@ -95,6 +95,8 @@ query dataProcessDetails($input: DataProcessDetailsInput){ enable zh_name description + chunk_size + chunk_overlap llm_config { name namespace diff --git a/apiserver/graph/schema/dataprocessing.graphqls b/apiserver/graph/schema/dataprocessing.graphqls index 14ec58796..4ccc5fa79 100644 --- a/apiserver/graph/schema/dataprocessing.graphqls +++ b/apiserver/graph/schema/dataprocessing.graphqls @@ -62,6 +62,8 @@ input FileItem { # 数据处理配置条目 input DataProcessConfigItem { type: String! + chunk_size: Int + chunk_overlap: Int llm_config: LLMConfigItem remove_duplicate_config: RemoveDuplicateConfig } @@ -231,6 +233,8 @@ type DataProcessConfigChildren { enable: String zh_name: String description: String + chunk_size: Int + chunk_overlap: Int llm_config: LLMConfig preview: [DataProcessConfigpreView] file_progress: [DataProcessConfigpreFileProgress] diff --git a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml index 7be0d0193..c17fdcf19 100644 --- a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml +++ b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml @@ -305,7 +305,7 @@ data: task_id character varying(32) COLLATE pg_catalog."default", log_id character varying(32) COLLATE pg_catalog."default", log_datetime character varying(32) COLLATE pg_catalog."default", - file_name character varying(64) COLLATE pg_catalog."default", + file_name character varying(512) COLLATE pg_catalog."default", stage_name character varying(1024) COLLATE pg_catalog."default", stage_status character varying(64) COLLATE pg_catalog."default", stage_detail text COLLATE pg_catalog."default", @@ -385,7 +385,7 @@ data: task_id varchar(32), document_id varchar(32), document_chunk_id varchar(32), - file_name varchar(64), + file_name varchar(512), question text, answer text, question_vector vector, diff --git a/pypi/data-processing/db-scripts/init-database-schema.sql b/pypi/data-processing/db-scripts/init-database-schema.sql index 9b33ca07c..adc2290f7 100644 --- a/pypi/data-processing/db-scripts/init-database-schema.sql +++ b/pypi/data-processing/db-scripts/init-database-schema.sql @@ -300,7 +300,7 @@ task_id character varying(32) COLLATE pg_catalog."default", log_id character varying(32) COLLATE pg_catalog."default", log_datetime character varying(32) COLLATE pg_catalog."default", - file_name character varying(64) COLLATE pg_catalog."default", + file_name character varying(512) COLLATE pg_catalog."default", stage_name character varying(1024) COLLATE pg_catalog."default", stage_status character varying(64) COLLATE pg_catalog."default", stage_detail text COLLATE pg_catalog."default", @@ -380,7 +380,7 @@ task_id varchar(32), document_id varchar(32), document_chunk_id varchar(32), - file_name varchar(64), + file_name varchar(512), question text, answer text, question_vector vector, diff --git a/pypi/data-processing/src/data_store_process/minio_store_process.py b/pypi/data-processing/src/data_store_process/minio_store_process.py index 772a2cb6b..3f814fb6d 100644 --- a/pypi/data-processing/src/data_store_process/minio_store_process.py +++ b/pypi/data-processing/src/data_store_process/minio_store_process.py @@ -149,8 +149,6 @@ async def text_manipulate( if file_extension in ["pdf"]: # 处理PDF文件 pdf_handle = PDFHandle( - chunk_size=req_json.get("chunk_size"), - chunk_overlap=req_json.get("chunk_overlap"), file_name=file_name, document_id=item.get("document_id"), support_type=support_type, @@ -163,8 +161,6 @@ async def text_manipulate( elif file_extension in ["docx"]: # 处理.docx文件 result = word_handle.docx_manipulate( - chunk_size=req_json.get("chunk_size"), - chunk_overlap=req_json.get("chunk_overlap"), file_name=file_name, document_id=item.get("document_id"), support_type=support_type, @@ -175,8 +171,6 @@ async def text_manipulate( elif file_extension == "web": # 处理.web文件 result = await web_handle.web_manipulate( - chunk_size=req_json.get("chunk_size"), - chunk_overlap=req_json.get("chunk_overlap"), file_name=file_name, document_id=item.get("document_id"), support_type=support_type, @@ -510,19 +504,20 @@ def text_manipulate_retry(req_json, pool): data_process_stage_log_db_operate.insert(insert_stage_log_params, pool=pool) # insert QA list to detail preview - logger.debug( - f"{log_tag_const.MINIO_STORE_PROCESS} Insert QA list for detail preview." - ) - list_qa_params = {"task_id": task_id} - list_qa_res = data_process_detail_db_operate.top_n_list_qa_for_preview( - list_qa_params, pool=pool - ) + if any(d.get("type") == "qa_split" for d in support_type): + logger.debug( + f"{log_tag_const.MINIO_STORE_PROCESS} Insert QA list for detail preview." + ) + list_qa_params = {"task_id": task_id} + list_qa_res = data_process_detail_db_operate.top_n_list_qa_for_preview( + list_qa_params, pool=pool + ) - for item in list_qa_res.get("data"): - item["transform_type"] = "qa_split" - item["pre_content"] = item["question"] - item["post_content"] = item["answer"] - data_process_detail_preview_db_operate.insert(item, pool=pool) + for item in list_qa_res.get("data"): + item["transform_type"] = "qa_split" + item["pre_content"] = item["question"] + item["post_content"] = item["answer"] + data_process_detail_preview_db_operate.insert(item, pool=pool) # 将清洗后的文件上传到MinIO中 # 上传final文件夹下的文件,并添加tag diff --git a/pypi/data-processing/src/database_operate/data_process_document_chunk_db_operate.py b/pypi/data-processing/src/database_operate/data_process_document_chunk_db_operate.py index 655f42baa..30bee21b8 100644 --- a/pypi/data-processing/src/database_operate/data_process_document_chunk_db_operate.py +++ b/pypi/data-processing/src/database_operate/data_process_document_chunk_db_operate.py @@ -182,3 +182,41 @@ def list_by_status(req_json, pool): res = postgresql_pool_client.execute_query(pool, sql, params) return res + +def top_n_list_for_preview(req_json, pool): + """List chunk info with task id for preview. + + req_json is a dictionary object. for example: + { + "task_id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "file_name": "MyFile.pdf" + } + pool: databasec connection pool; + """ + params = {"task_id": req_json["task_id"]} + + sql = """ + select + id, + document_id, + status, + task_id, + content, + meta_info, + page_number, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + from + public.data_process_task_document_chunk + where + task_id = %(task_id)s + order by random() + limit 10 + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res diff --git a/pypi/data-processing/src/file_handle/common_handle.py b/pypi/data-processing/src/file_handle/common_handle.py index ab7b96a24..acc1d892a 100644 --- a/pypi/data-processing/src/file_handle/common_handle.py +++ b/pypi/data-processing/src/file_handle/common_handle.py @@ -180,6 +180,32 @@ def text_manipulate( "object_count": len(qa_list.get("data")), }, } + elif support_type_map.get("document_chunk"): + chunk_data_dict = [["chunk_content", "file_name", "page_number"]] + for document in all_document_for_process: + chunk_data_dict.append( + [ + document.get("content"), + file_name, + document.get("page_number") + ] + ) + + # Save the csv file. + file_name_without_extension = file_utils.get_file_name_without_extension( + file_name + ) + file_name_csv = file_name_without_extension + ".csv" + csv_utils.save_csv( + file_name=file_name_csv, phase_value="final", data=chunk_data_dict + ) + + logger.debug(f"{log_tag_const.COMMON_HANDLE} Finish manipulating the text") + return { + "status": 200, + "message": "", + "data": "", + } return {"status": 200, "message": "", "data": ""} except Exception as ex: diff --git a/pypi/data-processing/src/file_handle/pdf_handle.py b/pypi/data-processing/src/file_handle/pdf_handle.py index 85975d3e9..83ba6dcf3 100644 --- a/pypi/data-processing/src/file_handle/pdf_handle.py +++ b/pypi/data-processing/src/file_handle/pdf_handle.py @@ -39,8 +39,6 @@ def __init__( conn_pool, task_id, create_user, - chunk_size=None, - chunk_overlap=None, ): """ Initialize the pdf handle. @@ -52,22 +50,13 @@ def __init__( conn_pool: PostgreSQL connect pool. task_id: data processing task id. create_user: create user. - chunk_size: chunk size. - chunk_overlap: chunk overlap. """ - if chunk_size is None: - chunk_size = config.knowledge_chunk_size - - if chunk_overlap is None: - chunk_overlap = config.knowledge_chunk_overlap self._file_name = file_name self._document_id = document_id self._support_type = support_type self._conn_pool = conn_pool self._task_id = task_id self._create_user = create_user - self._chunk_size = chunk_size - self._chunk_overlap = chunk_overlap def handle( self, @@ -81,7 +70,15 @@ def handle( file_path = pdf_file_path + "original/" + self._file_name # Text splitter - documents = self._get_documents(file_path=file_path) + chunk_item = [item for item in self._support_type if item.get('type') == 'document_chunk'] + if len(chunk_item) > 0: + chunk_size = chunk_item[0].get("chunk_size") + chunk_overlap = chunk_item[0].get("chunk_overlap") + else: + chunk_size = config.knowledge_chunk_size + chunk_overlap = config.knowledge_chunk_overlap + + documents = self._get_documents(chunk_size=chunk_size, chunk_overlap=chunk_overlap, file_path=file_path) # step 2 # save all chunk info to database @@ -127,15 +124,15 @@ def handle( return {"status": 400, "message": str(ex), "data": traceback.format_exc()} - def _get_documents(self, file_path): + def _get_documents(self, chunk_size, chunk_overlap, file_path): pdf_loader = PDFLoader(file_path) docs = pdf_loader.load() text_splitter = SpacyTextSplitter( separator="\n\n", pipeline="zh_core_web_sm", - chunk_size=int(self._chunk_size), - chunk_overlap=int(self._chunk_overlap), + chunk_size=int(chunk_size), + chunk_overlap=int(chunk_overlap), ) documents = text_splitter.split_documents(docs) diff --git a/pypi/data-processing/src/file_handle/web_handle.py b/pypi/data-processing/src/file_handle/web_handle.py index 3c6fe5b34..6488d1571 100644 --- a/pypi/data-processing/src/file_handle/web_handle.py +++ b/pypi/data-processing/src/file_handle/web_handle.py @@ -35,8 +35,6 @@ async def web_manipulate( conn_pool, task_id, create_user, - chunk_size=None, - chunk_overlap=None, ): """Manipulate the text content from a web file. @@ -44,8 +42,6 @@ async def web_manipulate( support_type: support type; conn_pool: database connection pool; task_id: data process task id; - chunk_size: chunk size; - chunk_overlap: chunk overlap; """ logger.debug(f"{log_tag_const.PDF_HANDLE} Start to manipulate the text in web") @@ -54,6 +50,13 @@ async def web_manipulate( file_path = pdf_file_path + "original/" + file_name # Text splitter + chunk_item = [item for item in support_type if item.get('type') == 'document_chunk'] + if len(chunk_item) > 0: + chunk_size = chunk_item[0].get("chunk_size") + chunk_overlap = chunk_item[0].get("chunk_overlap") + else: + chunk_size = config.knowledge_chunk_size + chunk_overlap = config.knowledge_chunk_overlap documents = await _get_documents( chunk_size=chunk_size, chunk_overlap=chunk_overlap, file_path=file_path ) @@ -103,13 +106,6 @@ async def web_manipulate( async def _get_documents(chunk_size, chunk_overlap, file_path): - # Split the text. - if chunk_size is None: - chunk_size = config.knowledge_chunk_size - - if chunk_overlap is None: - chunk_overlap = config.knowledge_chunk_overlap - with open(file_path, "r", encoding="utf-8") as file: # 读取文件内容 file_content = file.read() diff --git a/pypi/data-processing/src/file_handle/word_handle.py b/pypi/data-processing/src/file_handle/word_handle.py index bd3e10722..383c60d31 100644 --- a/pypi/data-processing/src/file_handle/word_handle.py +++ b/pypi/data-processing/src/file_handle/word_handle.py @@ -36,8 +36,6 @@ def docx_manipulate( conn_pool, task_id, create_user, - chunk_size=None, - chunk_overlap=None, ): """Manipulate the text content from a word file. @@ -45,8 +43,6 @@ def docx_manipulate( support_type: support type; conn_pool: database connection pool; task_id: data process task id; - chunk_size: chunk size; - chunk_overlap: chunk overlap; """ logger.debug(f"{log_tag_const.WORD_HANDLE} Start to manipulate the text in word") @@ -56,6 +52,13 @@ def docx_manipulate( file_path = word_file_path + "original/" + file_name # Text splitter + chunk_item = [item for item in support_type if item.get('type') == 'document_chunk'] + if len(chunk_item) > 0: + chunk_size = chunk_item[0].get("chunk_size") + chunk_overlap = chunk_item[0].get("chunk_overlap") + else: + chunk_size = config.knowledge_chunk_size + chunk_overlap = config.knowledge_chunk_overlap documents = _get_documents( chunk_size=chunk_size, chunk_overlap=chunk_overlap, file_path=file_path ) @@ -107,13 +110,6 @@ def docx_manipulate( def _get_documents(chunk_size, chunk_overlap, file_path): - # Split the text. - if chunk_size is None: - chunk_size = config.knowledge_chunk_size - - if chunk_overlap is None: - chunk_overlap = config.knowledge_chunk_overlap - docx_loader = DocxLoader(file_path) docs = docx_loader.load() text_splitter = SpacyTextSplitter( diff --git a/pypi/data-processing/src/service/data_process_service.py b/pypi/data-processing/src/service/data_process_service.py index c70c5413d..048a54d35 100644 --- a/pypi/data-processing/src/service/data_process_service.py +++ b/pypi/data-processing/src/service/data_process_service.py @@ -29,6 +29,7 @@ data_process_log_db_operate, data_process_stage_log_db_operate) from parallel import thread_parallel +from utils import json_utils logger = logging.getLogger(__name__) @@ -315,7 +316,10 @@ def _set_basic_info_for_config_map_for_result( process_config_map: process config map """ # chunk processing - if process_cofig_map.get("qa_split"): + if ( + process_cofig_map.get("qa_split") + or process_cofig_map.get("document_chunk") + ): if from_result.get("chunk_processing") is None: from_result["chunk_processing"] = { "name": "chunk_processing", @@ -391,6 +395,20 @@ def _set_children_info_for_config_map_for_result( } ) + # document chunk + if process_cofig_map.get("document_chunk"): + from_result["chunk_processing"]["children"].append( + { + "name": "document_chunk", + "enable": "true", + "zh_name": "文本分段", + "description": "根据配置,自动将文件做分段处理。", + "chunk_size": process_cofig_map.get("document_chunk").get("chunk_size"), + "chunk_overlap": process_cofig_map.get("document_chunk").get("chunk_overlap"), + "preview": _get_document_chunk_preview(task_id=task_id, conn_pool=conn_pool), + } + ) + # remove invisible characters if process_cofig_map.get("remove_invisible_characters"): from_result["clean"]["children"].append( @@ -714,3 +732,34 @@ def _get_privacy_process_file_num(task_id, conn_pool): ) ) return 0 + +def _get_document_chunk_preview(task_id, conn_pool): + """Get the document chunk list preview. + + task_id: task id; + conn_pool: database connection pool + """ + logger.debug("".join([f"{log_tag_const.MINIO_STORE_PROCESS} Get preview for document chunk "])) + chunk_list_preview = [] + + # list document chunk top 10 + list_params = {"task_id": task_id} + list_file_name_res = ( + data_process_document_chunk_db_operate.top_n_list_for_preview( + list_params, pool=conn_pool + ) + ) + if list_file_name_res["status"] == 200: + for item in list_file_name_res["data"]: + meta_json = json_utils.loads(item.get("meta_info")) + chunk_list_preview.append( + { + "file_name": meta_json.get("source"), + "content": [{ + "post": item.get("content") + } + ] + } + ) + + return chunk_list_preview