From af8f14c26247c28776d0167f4a730041cf0b901c Mon Sep 17 00:00:00 2001 From: wangxinbiao <1412146116@qq.com> Date: Fri, 12 Jan 2024 09:59:16 +0800 Subject: [PATCH] feat:add dataprocessing task retry functionality --- apiserver/graph/generated/generated.go | 390 ++++++++++++- apiserver/graph/generated/models_gen.go | 26 +- .../graph/impl/dataprocessing.resolvers.go | 18 + apiserver/graph/schema/dataprocessing.gql | 20 + .../graph/schema/dataprocessing.graphqls | 16 + .../pkg/dataprocessing/dataprocessing.go | 56 ++ .../data_manipulation/common/log_tag_const.py | 1 + .../common/special_characters.py | 2 +- .../controller/data_process_controller.py | 33 ++ .../data_store_process/minio_store_process.py | 520 +++++++++++++++++- .../data_process_db_operate.py | 40 ++ .../data_process_detail_db_operate.py | 32 ++ .../data_process_document_chunk_db_operate.py | 33 ++ .../data_process_document_db_operate.py | 33 ++ .../data_process_log_db_operate.py | 8 +- .../data_process_stage_log_db_operate.py | 33 ++ .../file_handle/common_handle.py | 132 +++++ .../file_handle/pdf_handle.py | 9 +- .../file_handle/word_handle.py | 7 +- .../service/data_process_service.py | 85 ++- .../transform/text/clean_transform.py | 23 +- .../db-scripts/init-database-schema.sql | 4 + .../templates/pg-init-data-configmap.yaml | 5 + gqlgen.yaml | 4 + 24 files changed, 1471 insertions(+), 59 deletions(-) diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index f8e58cc5b..7c3e1939a 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -199,12 +199,14 @@ type ComplexityRoot struct { } DataProcessQuery struct { - AllDataProcessListByCount func(childComplexity int, input *AllDataProcessListByCountInput) int - AllDataProcessListByPage func(childComplexity int, input *AllDataProcessListByPageInput) int - CheckDataProcessTaskName func(childComplexity int, input *CheckDataProcessTaskNameInput) int - DataProcessDetails func(childComplexity int, input *DataProcessDetailsInput) int - DataProcessSupportType func(childComplexity int) int - GetLogInfo func(childComplexity int, input *DataProcessDetailsInput) int + AllDataProcessListByCount func(childComplexity int, input *AllDataProcessListByCountInput) int + AllDataProcessListByPage func(childComplexity int, input *AllDataProcessListByPageInput) int + CheckDataProcessTaskName func(childComplexity int, input *CheckDataProcessTaskNameInput) int + DataProcessDetails func(childComplexity int, input *DataProcessDetailsInput) int + DataProcessLogInfoByFileName func(childComplexity int, input *DataProcessFileLogInput) int + DataProcessRetry func(childComplexity int, input *DataProcessRetryInput) int + DataProcessSupportType func(childComplexity int) int + GetLogInfo func(childComplexity int, input *DataProcessDetailsInput) int } DataProcessResponse struct { @@ -644,6 +646,8 @@ type DataProcessQueryResolver interface { DataProcessDetails(ctx context.Context, obj *DataProcessQuery, input *DataProcessDetailsInput) (*DataProcessDetails, error) CheckDataProcessTaskName(ctx context.Context, obj *DataProcessQuery, input *CheckDataProcessTaskNameInput) (*DataProcessResponse, error) GetLogInfo(ctx context.Context, obj *DataProcessQuery, input *DataProcessDetailsInput) (*DataProcessResponse, error) + DataProcessLogInfoByFileName(ctx context.Context, obj *DataProcessQuery, input *DataProcessFileLogInput) (*DataProcessResponse, error) + DataProcessRetry(ctx context.Context, obj *DataProcessQuery, input *DataProcessRetryInput) (*DataProcessResponse, error) } type DatasetResolver interface { Versions(ctx context.Context, obj *Dataset, input ListVersionedDatasetInput) (*PaginatedResult, error) @@ -1466,6 +1470,30 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.DataProcessQuery.DataProcessDetails(childComplexity, args["input"].(*DataProcessDetailsInput)), true + case "DataProcessQuery.dataProcessLogInfoByFileName": + if e.complexity.DataProcessQuery.DataProcessLogInfoByFileName == nil { + break + } + + args, err := ec.field_DataProcessQuery_dataProcessLogInfoByFileName_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.DataProcessQuery.DataProcessLogInfoByFileName(childComplexity, args["input"].(*DataProcessFileLogInput)), true + + case "DataProcessQuery.dataProcessRetry": + if e.complexity.DataProcessQuery.DataProcessRetry == nil { + break + } + + args, err := ec.field_DataProcessQuery_dataProcessRetry_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.DataProcessQuery.DataProcessRetry(childComplexity, args["input"].(*DataProcessRetryInput)), true + case "DataProcessQuery.dataProcessSupportType": if e.complexity.DataProcessQuery.DataProcessSupportType == nil { break @@ -3698,6 +3726,8 @@ func (e *executableSchema) Exec(ctx context.Context) graphql.ResponseHandler { ec.unmarshalInputCreateWorkerInput, ec.unmarshalInputDataProcessConfigItem, ec.unmarshalInputDataProcessDetailsInput, + ec.unmarshalInputDataProcessFileLogInput, + ec.unmarshalInputDataProcessRetryInput, ec.unmarshalInputDeleteCommonInput, ec.unmarshalInputDeleteDataProcessInput, ec.unmarshalInputDeleteVersionedDatasetInput, @@ -4163,6 +4193,10 @@ type DataProcessQuery { checkDataProcessTaskName(input: CheckDataProcessTaskNameInput): DataProcessResponse # 日志信息 getLogInfo(input: DataProcessDetailsInput): DataProcessResponse + # 获取文件处理的日志 + dataProcessLogInfoByFileName(input: DataProcessFileLogInput): DataProcessResponse + # 任务重试 + dataProcessRetry(input: DataProcessRetryInput): DataProcessResponse } @@ -4213,6 +4247,7 @@ input LLMConfigItem { top_p: String max_tokens: String prompt_template: String + provider: String } input DeleteDataProcessInput { @@ -4228,6 +4263,17 @@ input CheckDataProcessTaskNameInput { namespace: String! } +input DataProcessFileLogInput { + id: String! + file_name: String! + type: String! +} + +input DataProcessRetryInput { + id: String! + creator: String! +} + # 数据处理列表分页 type PaginatedDataProcessItem { status: Int! @@ -6387,6 +6433,36 @@ func (ec *executionContext) field_DataProcessQuery_dataProcessDetails_args(ctx c return args, nil } +func (ec *executionContext) field_DataProcessQuery_dataProcessLogInfoByFileName_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 *DataProcessFileLogInput + if tmp, ok := rawArgs["input"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("input")) + arg0, err = ec.unmarshalODataProcessFileLogInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessFileLogInput(ctx, tmp) + if err != nil { + return nil, err + } + } + args["input"] = arg0 + return args, nil +} + +func (ec *executionContext) field_DataProcessQuery_dataProcessRetry_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 *DataProcessRetryInput + if tmp, ok := rawArgs["input"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("input")) + arg0, err = ec.unmarshalODataProcessRetryInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessRetryInput(ctx, tmp) + if err != nil { + return nil, err + } + } + args["input"] = arg0 + return args, nil +} + func (ec *executionContext) field_DataProcessQuery_getLogInfo_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -11589,6 +11665,126 @@ func (ec *executionContext) fieldContext_DataProcessQuery_getLogInfo(ctx context return fc, nil } +func (ec *executionContext) _DataProcessQuery_dataProcessLogInfoByFileName(ctx context.Context, field graphql.CollectedField, obj *DataProcessQuery) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessQuery_dataProcessLogInfoByFileName(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.DataProcessQuery().DataProcessLogInfoByFileName(rctx, obj, fc.Args["input"].(*DataProcessFileLogInput)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*DataProcessResponse) + fc.Result = res + return ec.marshalODataProcessResponse2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessResponse(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessQuery_dataProcessLogInfoByFileName(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessQuery", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "status": + return ec.fieldContext_DataProcessResponse_status(ctx, field) + case "data": + return ec.fieldContext_DataProcessResponse_data(ctx, field) + case "message": + return ec.fieldContext_DataProcessResponse_message(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type DataProcessResponse", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_DataProcessQuery_dataProcessLogInfoByFileName_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + +func (ec *executionContext) _DataProcessQuery_dataProcessRetry(ctx context.Context, field graphql.CollectedField, obj *DataProcessQuery) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_DataProcessQuery_dataProcessRetry(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.DataProcessQuery().DataProcessRetry(rctx, obj, fc.Args["input"].(*DataProcessRetryInput)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*DataProcessResponse) + fc.Result = res + return ec.marshalODataProcessResponse2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessResponse(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_DataProcessQuery_dataProcessRetry(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "DataProcessQuery", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "status": + return ec.fieldContext_DataProcessResponse_status(ctx, field) + case "data": + return ec.fieldContext_DataProcessResponse_data(ctx, field) + case "message": + return ec.fieldContext_DataProcessResponse_message(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type DataProcessResponse", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_DataProcessQuery_dataProcessRetry_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + func (ec *executionContext) _DataProcessResponse_status(ctx context.Context, field graphql.CollectedField, obj *DataProcessResponse) (ret graphql.Marshaler) { fc, err := ec.fieldContext_DataProcessResponse_status(ctx, field) if err != nil { @@ -21311,6 +21507,10 @@ func (ec *executionContext) fieldContext_Query_dataProcess(ctx context.Context, return ec.fieldContext_DataProcessQuery_checkDataProcessTaskName(ctx, field) case "getLogInfo": return ec.fieldContext_DataProcessQuery_getLogInfo(ctx, field) + case "dataProcessLogInfoByFileName": + return ec.fieldContext_DataProcessQuery_dataProcessLogInfoByFileName(ctx, field) + case "dataProcessRetry": + return ec.fieldContext_DataProcessQuery_dataProcessRetry(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type DataProcessQuery", field.Name) }, @@ -28381,6 +28581,91 @@ func (ec *executionContext) unmarshalInputDataProcessDetailsInput(ctx context.Co return it, nil } +func (ec *executionContext) unmarshalInputDataProcessFileLogInput(ctx context.Context, obj interface{}) (DataProcessFileLogInput, error) { + var it DataProcessFileLogInput + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"id", "file_name", "type"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "id": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("id")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.ID = data + case "file_name": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("file_name")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.FileName = data + case "type": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("type")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.Type = data + } + } + + return it, nil +} + +func (ec *executionContext) unmarshalInputDataProcessRetryInput(ctx context.Context, obj interface{}) (DataProcessRetryInput, error) { + var it DataProcessRetryInput + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"id", "creator"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "id": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("id")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.ID = data + case "creator": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("creator")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.Creator = data + } + } + + return it, nil +} + func (ec *executionContext) unmarshalInputDeleteCommonInput(ctx context.Context, obj interface{}) (DeleteCommonInput, error) { var it DeleteCommonInput asMap := map[string]interface{}{} @@ -28699,7 +28984,7 @@ func (ec *executionContext) unmarshalInputLLMConfigItem(ctx context.Context, obj asMap[k] = v } - fieldsInOrder := [...]string{"name", "namespace", "model", "temperature", "top_p", "max_tokens", "prompt_template"} + fieldsInOrder := [...]string{"name", "namespace", "model", "temperature", "top_p", "max_tokens", "prompt_template", "provider"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -28769,6 +29054,15 @@ func (ec *executionContext) unmarshalInputLLMConfigItem(ctx context.Context, obj return it, err } it.PromptTemplate = data + case "provider": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("provider")) + data, err := ec.unmarshalOString2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.Provider = data } } @@ -31998,6 +32292,72 @@ func (ec *executionContext) _DataProcessQuery(ctx context.Context, sel ast.Selec continue } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + case "dataProcessLogInfoByFileName": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._DataProcessQuery_dataProcessLogInfoByFileName(ctx, field, obj) + return res + } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + case "dataProcessRetry": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._DataProcessQuery_dataProcessRetry(ctx, field, obj) + return res + } + + if field.Deferrable != nil { + dfs, ok := deferred[field.Deferrable.Label] + di := 0 + if ok { + dfs.AddField(field) + di = len(dfs.Values) - 1 + } else { + dfs = graphql.NewFieldSet([]graphql.CollectedField{field}) + deferred[field.Deferrable.Label] = dfs + } + dfs.Concurrently(di, func(ctx context.Context) graphql.Marshaler { + return innerFunc(ctx, dfs) + }) + + // don't run the out.Concurrently() call below + out.Values[i] = graphql.Null + continue + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) default: panic("unknown field " + strconv.Quote(field.Name)) @@ -37592,6 +37952,14 @@ func (ec *executionContext) unmarshalODataProcessDetailsInput2ᚖgithubᚗcomᚋ return &res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) unmarshalODataProcessFileLogInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessFileLogInput(ctx context.Context, v interface{}) (*DataProcessFileLogInput, error) { + if v == nil { + return nil, nil + } + res, err := ec.unmarshalInputDataProcessFileLogInput(ctx, v) + return &res, graphql.ErrorOnPath(ctx, err) +} + func (ec *executionContext) marshalODataProcessItem2ᚕᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessItemᚄ(ctx context.Context, sel ast.SelectionSet, v []*DataProcessItem) graphql.Marshaler { if v == nil { return graphql.Null @@ -37660,6 +38028,14 @@ func (ec *executionContext) marshalODataProcessResponse2ᚖgithubᚗcomᚋkubeag return ec._DataProcessResponse(ctx, sel, v) } +func (ec *executionContext) unmarshalODataProcessRetryInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessRetryInput(ctx context.Context, v interface{}) (*DataProcessRetryInput, error) { + if v == nil { + return nil, nil + } + res, err := ec.unmarshalInputDataProcessRetryInput(ctx, v) + return &res, graphql.ErrorOnPath(ctx, err) +} + func (ec *executionContext) marshalODataProcessSupportType2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDataProcessSupportType(ctx context.Context, sel ast.SelectionSet, v *DataProcessSupportType) graphql.Marshaler { if v == nil { return graphql.Null diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index 2070f14b6..cbe263760 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -424,6 +424,12 @@ type DataProcessDetailsItem struct { Config []*DataProcessConfig `json:"config,omitempty"` } +type DataProcessFileLogInput struct { + ID string `json:"id"` + FileName string `json:"file_name"` + Type string `json:"type"` +} + type DataProcessItem struct { ID string `json:"id"` Name string `json:"name"` @@ -442,12 +448,14 @@ type DataProcessMutation struct { } type DataProcessQuery struct { - AllDataProcessListByPage *PaginatedDataProcessItem `json:"allDataProcessListByPage,omitempty"` - AllDataProcessListByCount *CountDataProcessItem `json:"allDataProcessListByCount,omitempty"` - DataProcessSupportType *DataProcessSupportType `json:"dataProcessSupportType,omitempty"` - DataProcessDetails *DataProcessDetails `json:"dataProcessDetails,omitempty"` - CheckDataProcessTaskName *DataProcessResponse `json:"checkDataProcessTaskName,omitempty"` - GetLogInfo *DataProcessResponse `json:"getLogInfo,omitempty"` + AllDataProcessListByPage *PaginatedDataProcessItem `json:"allDataProcessListByPage,omitempty"` + AllDataProcessListByCount *CountDataProcessItem `json:"allDataProcessListByCount,omitempty"` + DataProcessSupportType *DataProcessSupportType `json:"dataProcessSupportType,omitempty"` + DataProcessDetails *DataProcessDetails `json:"dataProcessDetails,omitempty"` + CheckDataProcessTaskName *DataProcessResponse `json:"checkDataProcessTaskName,omitempty"` + GetLogInfo *DataProcessResponse `json:"getLogInfo,omitempty"` + DataProcessLogInfoByFileName *DataProcessResponse `json:"dataProcessLogInfoByFileName,omitempty"` + DataProcessRetry *DataProcessResponse `json:"dataProcessRetry,omitempty"` } type DataProcessResponse struct { @@ -456,6 +464,11 @@ type DataProcessResponse struct { Message string `json:"message"` } +type DataProcessRetryInput struct { + ID string `json:"id"` + Creator string `json:"creator"` +} + type DataProcessSupportType struct { Status int `json:"status"` Data []*DataProcessSupportTypeItem `json:"data,omitempty"` @@ -815,6 +828,7 @@ type LLMConfigItem struct { TopP *string `json:"top_p,omitempty"` MaxTokens *string `json:"max_tokens,omitempty"` PromptTemplate *string `json:"prompt_template,omitempty"` + Provider *string `json:"provider,omitempty"` } type LLMQuery struct { diff --git a/apiserver/graph/impl/dataprocessing.resolvers.go b/apiserver/graph/impl/dataprocessing.resolvers.go index 5a72c557f..c2b5e4e8c 100644 --- a/apiserver/graph/impl/dataprocessing.resolvers.go +++ b/apiserver/graph/impl/dataprocessing.resolvers.go @@ -83,6 +83,24 @@ func (r *dataProcessQueryResolver) GetLogInfo(ctx context.Context, obj *generate return dataprocessing.GetLogInfo(ctx, c, obj, input) } +// DataProcessLogInfoByFileName is the resolver for the dataProcessLogInfoByFileName field. +func (r *dataProcessQueryResolver) DataProcessLogInfoByFileName(ctx context.Context, obj *generated.DataProcessQuery, input *generated.DataProcessFileLogInput) (*generated.DataProcessResponse, error) { + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return dataprocessing.DataProcessLogInfoByFileName(ctx, c, obj, input) +} + +// DataProcessRetry is the resolver for the dataProcessRetry field. +func (r *dataProcessQueryResolver) DataProcessRetry(ctx context.Context, obj *generated.DataProcessQuery, input *generated.DataProcessRetryInput) (*generated.DataProcessResponse, error) { + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return dataprocessing.DataProcessRetry(ctx, c, obj, input) +} + // DataProcess is the resolver for the dataProcess field. func (r *mutationResolver) DataProcess(ctx context.Context) (*generated.DataProcessMutation, error) { return &generated.DataProcessMutation{}, nil diff --git a/apiserver/graph/schema/dataprocessing.gql b/apiserver/graph/schema/dataprocessing.gql index 2fdd08959..518efc6cc 100644 --- a/apiserver/graph/schema/dataprocessing.gql +++ b/apiserver/graph/schema/dataprocessing.gql @@ -128,6 +128,26 @@ query getLogInfo($input: DataProcessDetailsInput){ } } +query dataProcessLogInfoByFileName($input: DataProcessFileLogInput){ + dataProcess { + dataProcessLogInfoByFileName(input: $input) { + status + data + message + } + } +} + +query dataProcessRetry($input: DataProcessRetryInput){ + dataProcess { + dataProcessRetry(input: $input) { + status + data + message + } + } +} + mutation createDataProcessTask($input: AddDataProcessInput) { dataProcess { createDataProcessTask(input: $input) { diff --git a/apiserver/graph/schema/dataprocessing.graphqls b/apiserver/graph/schema/dataprocessing.graphqls index c6180a86b..c47eb9299 100644 --- a/apiserver/graph/schema/dataprocessing.graphqls +++ b/apiserver/graph/schema/dataprocessing.graphqls @@ -21,6 +21,10 @@ type DataProcessQuery { checkDataProcessTaskName(input: CheckDataProcessTaskNameInput): DataProcessResponse # 日志信息 getLogInfo(input: DataProcessDetailsInput): DataProcessResponse + # 获取文件处理的日志 + dataProcessLogInfoByFileName(input: DataProcessFileLogInput): DataProcessResponse + # 任务重试 + dataProcessRetry(input: DataProcessRetryInput): DataProcessResponse } @@ -71,6 +75,7 @@ input LLMConfigItem { top_p: String max_tokens: String prompt_template: String + provider: String } input DeleteDataProcessInput { @@ -86,6 +91,17 @@ input CheckDataProcessTaskNameInput { namespace: String! } +input DataProcessFileLogInput { + id: String! + file_name: String! + type: String! +} + +input DataProcessRetryInput { + id: String! + creator: String! +} + # 数据处理列表分页 type PaginatedDataProcessItem { status: Int! diff --git a/apiserver/pkg/dataprocessing/dataprocessing.go b/apiserver/pkg/dataprocessing/dataprocessing.go index 6a430abad..09f4c23cd 100644 --- a/apiserver/pkg/dataprocessing/dataprocessing.go +++ b/apiserver/pkg/dataprocessing/dataprocessing.go @@ -259,3 +259,59 @@ func GetLogInfo(ctx context.Context, c dynamic.Interface, obj *generated.DataPro } return data, nil } + +func DataProcessLogInfoByFileName(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessFileLogInput) (*generated.DataProcessResponse, error) { + // prepare http request + jsonParams, err := json.Marshal(input) + if err != nil { + return nil, err + } + req, err := http.NewRequest("POST", url+"/get-log-by-file-name", bytes.NewBuffer(jsonParams)) + if err != nil { + return nil, err + } + + // call dataprocessing server + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // parse http response + data := &generated.DataProcessResponse{} + err = json.NewDecoder(resp.Body).Decode(data) + if err != nil { + return nil, err + } + return data, nil +} + +func DataProcessRetry(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessRetryInput) (*generated.DataProcessResponse, error) { + // prepare http request + jsonParams, err := json.Marshal(input) + if err != nil { + return nil, err + } + req, err := http.NewRequest("POST", url+"/retry", bytes.NewBuffer(jsonParams)) + if err != nil { + return nil, err + } + + // call dataprocessing server + client := http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // parse http response + data := &generated.DataProcessResponse{} + err = json.NewDecoder(resp.Body).Decode(data) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/data-processing/data_manipulation/common/log_tag_const.py b/data-processing/data_manipulation/common/log_tag_const.py index ba80ad2cb..71bd846e2 100644 --- a/data-processing/data_manipulation/common/log_tag_const.py +++ b/data-processing/data_manipulation/common/log_tag_const.py @@ -22,6 +22,7 @@ MINIO = "Minio" MINIO_STORE_PROCESS = "Minio Store Process" +DATA_PROCESS_SERVICE = "Data Process Service" DATA_PROCESS_DETAIL = "Data Process Detail" COMMON_HANDLE = "Common Handle" diff --git a/data-processing/data_manipulation/common/special_characters.py b/data-processing/data_manipulation/common/special_characters.py index 2c0d70795..ac104881d 100644 --- a/data-processing/data_manipulation/common/special_characters.py +++ b/data-processing/data_manipulation/common/special_characters.py @@ -34,6 +34,6 @@ # whitespaces in unicode can be found here: # https://en.wikipedia.org/wiki/Whitespace_character VARIOUS_WHITESPACES = { - ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', + ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', '​', '‌', '‍', '⁠', '', '„' } \ No newline at end of file diff --git a/data-processing/data_manipulation/controller/data_process_controller.py b/data-processing/data_manipulation/controller/data_process_controller.py index b48db9b73..41dd684a4 100644 --- a/data-processing/data_manipulation/controller/data_process_controller.py +++ b/data-processing/data_manipulation/controller/data_process_controller.py @@ -153,3 +153,36 @@ async def get_log_info(request): ) return json(res) +@data_process.route('get-log-by-file-name', methods=['POST']) +async def get_log_by_file_name(request): + """check task name by name and namespace. + + example for request.json + { + "id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "file_name": "xx.pdf", + "type": "qa_split" + } + """ + res = data_process_service.get_log_by_file_name( + request.json, + pool=request.app.config['conn_pool'] + ) + return json(res) + + +@data_process.route('retry', methods=['POST']) +async def retry(request): + """check task name by name and namespace. + + example for request.json + { + "id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + """ + res = data_process_service.retry( + request.json, + pool=request.app.config['conn_pool'] + ) + return json(res) + diff --git a/data-processing/data_manipulation/data_store_process/minio_store_process.py b/data-processing/data_manipulation/data_store_process/minio_store_process.py index c6baa236c..b6d55b417 100644 --- a/data-processing/data_manipulation/data_store_process/minio_store_process.py +++ b/data-processing/data_manipulation/data_store_process/minio_store_process.py @@ -29,8 +29,12 @@ data_process_detail_db_operate, data_process_detail_preview_db_operate, data_process_log_db_operate, - data_process_stage_log_db_operate) -from file_handle import csv_handle, pdf_handle, word_handle + data_process_stage_log_db_operate, + data_process_document_chunk_db_operate) +from file_handle import (csv_handle, + pdf_handle, + word_handle, + common_handle) from kube import dataset_cr from utils import file_utils, date_time_utils, json_utils from pathlib import Path @@ -73,7 +77,8 @@ def text_manipulate( reason='processing', task_id=id, log_id=log_id, - creator=req_json.get('creator') + creator=req_json.get('creator'), + pool=pool ) if update_dataset['status'] != 200: return update_dataset @@ -195,7 +200,7 @@ def text_manipulate( error_msg = f"{file_extension} file type is not currently supported." break - # 新增阶段性日志-clean + # 新增阶段性日志-clean clean_stage_detail=_get_stage_detail( req_json, pool=pool, @@ -336,7 +341,8 @@ def text_manipulate( reason=task_status, task_id=id, log_id=log_id, - creator=req_json.get('creator') + creator=req_json.get('creator'), + pool=pool ) if update_dataset['status'] != 200: return update_dataset @@ -372,6 +378,203 @@ def text_manipulate( } +def text_manipulate_retry( + req_json, + pool +): + try: + task_id = req_json.get('id') + creator = req_json.get('creator') + + # 更新任务状态 + update_status_res = _update_status_and_log_id( + id=task_id, + current_log_id='', + status='processing', + end_datetime='', + creator=creator, + pool=pool + ) + if update_status_res.get('status') != 200: + return update_status_res + + # 新增数据处理任务日志 + log_id = ulid.ulid() + log_info = _insert_log_info( + id=log_id, + task_id=task_id, + execute_type='RETRY', + creator=creator, + pool=pool + ) + + # 根据id获取任务信息 + task_info = data_process_db_operate.info_by_id( + req_json, + pool=pool + ) + task_info_dict = task_info.get('data')[0] + + # 更新数据集状态 + update_dataset = _update_dateset_status( + bucket_name=task_info_dict.get('namespace'), + version_data_set_name=task_info_dict.get('pre_version_data_set_name'), + reason='processing', + task_id=task_id, + log_id=log_id, + creator=creator, + pool=pool + ) + if update_dataset['status'] != 200: + return update_dataset + + # 根据task_id查询处理未成功的文件 + document_list = data_process_document_db_operate.list_by_task_id_and_status( + req_json, + pool=pool + ) + + task_status = 'process_complete' + error_msg = '' + if len(document_list.get('data')) > 0: + # 文件处理 + # 存放每个文件对应的数据量 + data_volumes_file = [] + + for document in document_list.get('data'): + logger.debug(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} document retry \n", + f"file_name: {document.get('file_name')}" + ])) + result = _text_manipulate_retry_for_document( + document=document, + task_info=task_info_dict, + log_id=log_id, + creator=creator, + pool=pool + ) + + if result.get('status') != 200: + # 任务失败 + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} Data process fail \n", + f"The file name: {document.get('file_name')}\n", + f"The error is: {result.get('message')}\n" + ])) + task_status = 'process_fail' + error_msg = result.get('message') + break + + data_volumes_file.append(result['data']) + + # 新增阶段性日志-finish + finish_stage_detail=f"{date_time_utils.now_str()} Task Finished!!!" + insert_stage_log_params = { + 'task_id': task_id, + 'log_id': log_id, + 'file_name': '', + 'stage_name': 'finish', + 'stage_status': 'success', + 'stage_detail': finish_stage_detail, + 'creator': creator + } + data_process_stage_log_db_operate.insert( + insert_stage_log_params, + pool=pool + ) + + # insert QA list to detail preview + logger.debug(f"{log_tag_const.MINIO_STORE_PROCESS} Insert QA list for detail preview.") + list_qa_params = { + 'task_id': task_id + } + list_qa_res = data_process_detail_db_operate.top_n_list_qa_for_preview( + list_qa_params, + pool=pool + ) + + for item in list_qa_res.get('data'): + item['transform_type']='qa_split' + item['pre_content']=item['question'] + item['post_content']=item['answer'] + data_process_detail_preview_db_operate.insert( + item, + pool=pool + ) + + # 将清洗后的文件上传到MinIO中 + # 上传final文件夹下的文件,并添加tag + file_path = file_utils.get_temp_file_path() + minio_dataset_prefix = config.minio_dataset_prefix + folder_prefix = '/'.join([ + minio_dataset_prefix, + task_info_dict['pre_data_set_name'], + task_info_dict['pre_data_set_version'] + ]) + minio_client = minio_store_client.get_minio_client() + minio_store_client.upload_files_to_minio_with_tags( + minio_client=minio_client, + local_folder=file_path + 'final', + minio_bucket=task_info_dict.get('bucket_name'), + minio_prefix=folder_prefix, + support_type=task_info_dict.get('data_process_config_info'), + data_volumes_file=data_volumes_file + ) + + # 更新数据集状态 + update_dataset = _update_dateset_status( + bucket_name=task_info_dict.get('namespace'), + version_data_set_name=task_info_dict.get('pre_version_data_set_name'), + reason=task_status, + task_id=task_id, + log_id=log_id, + creator=creator, + pool=pool + ) + if update_dataset['status'] != 200: + return update_dataset + + # 更新数据处理任务日志 + update_log_item = { + 'id': log_id, + 'status': task_status, + 'error_msg': error_msg, + 'creator': creator + } + data_process_log_db_operate.update_status_by_id( + update_log_item, + pool=pool + ) + + # 数据库更新任务状态 + update_params = { + 'id': task_id, + 'current_log_id': log_id, + 'status': task_status, + 'user': creator + } + data_process_db_operate.update_status_by_id( + update_params, + pool=pool + ) + + return { + 'status': 200, + 'message': '', + 'data': '' + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} Data process fail \n", + f"{traceback.format_exc()}" + ])) + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + + def _remove_local_file(file_name): try: remove_file_path = file_utils.get_temp_file_path() @@ -399,8 +602,16 @@ def _update_dateset_status( reason, task_id, log_id, - creator + creator, + pool ): + logger.debug(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n", + f"task_id: {task_id}\n", + f"bucket_name: {bucket_name}\n", + f"version_data_set_name: {version_data_set_name}\n", + f"reason: {reason}" + ])) update_dataset = dataset_cr.update_dataset_k8s_cr( bucket_name=bucket_name, version_data_set_name=version_data_set_name, @@ -408,6 +619,13 @@ def _update_dateset_status( ) if update_dataset['status'] != 200: + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n", + f"task_id: {task_id}\n", + f"bucket_name: {bucket_name}\n", + f"version_data_set_name: {version_data_set_name}\n", + f"reason: {reason}" + ])) # 更新数据处理任务日志 update_log_item = { 'id': log_id, @@ -651,3 +869,293 @@ def _list_for_transform_type( params, pool=pool ) + + +def _update_status_and_log_id( + id, + current_log_id, + status, + end_datetime, + creator, + pool +): + try: + """update task status and current log id with task id""" + logger.debug(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} update task status \n", + f"task_id: {id}\n", + f"status: {status}\n" + ])) + update_task_params = { + 'id': id, + 'current_log_id': current_log_id, + 'status': status, + 'end_datetime': end_datetime, + 'user': creator + } + data_process_db_operate.update_status_and_log_id( + update_task_params, + pool=pool + ) + + return { + 'status': 200, + 'message': '', + 'data': '' + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} update task fail \n", + f"{traceback.format_exc()}" + ])) + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + + +def _insert_log_info( + id, + task_id, + execute_type, + creator, + pool +): + try: + """insert task log info""" + logger.debug(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} insert task log \n", + f"task_id: {task_id}\n", + f"execute_type: {execute_type}\n" + ])) + insert_log_item = { + 'id': id, + 'task_id': task_id, + 'type': execute_type, + 'creator': creator + } + data_process_log_db_operate.add( + insert_log_item, + pool=pool + ) + + return { + 'status': 200, + 'message': '', + 'data': '' + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} insert task log info \n", + f"{traceback.format_exc()}" + ])) + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + + +def _text_manipulate_retry_for_document( + document, + task_info, + log_id, + pool, + creator +): + file_name = document.get('file_name') + task_id = task_info.get('id') + document_id = document.get('id') + support_type = task_info.get('data_process_config_info') + + + # 新增阶段性日志-开始 + received_task={ + 'task_id': task_id, + 'pre_dataset_name': document.get('pre_data_set_name'), + 'pre_dataset_version': document.get('pre_data_set_version'), + 'file_names': document.get('file_names') + } + + start_stage_detail = '\n'.join([ + f"{date_time_utils.now_str()} Data Processing Task Retry Starts!!!", + f"Received Task: {json_utils.dumps(received_task)}", + f"Operations: {json_utils.dumps(support_type)}" + ]) + insert_stage_log_params = { + 'task_id': task_id, + 'log_id': log_id, + 'file_name': file_name, + 'stage_name': 'start', + 'stage_status': 'success', + 'stage_detail': start_stage_detail, + 'creator': creator + } + data_process_stage_log_db_operate.insert( + insert_stage_log_params, + pool=pool + ) + + logger.debug(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} text manipulate retry \n", + f"document status: {document.get('status')}" + ])) + result = None + # 判断文件状态 + if document.get('status') == 'not_start': + # 针对未开始的文件进行重试 + + # minio 数据集统一前缀 + minio_dataset_prefix = config.minio_dataset_prefix + folder_prefix = '/'.join([ + minio_dataset_prefix, + task_info.get('pre_data_set_name'), + task_info.get('pre_data_set_version') + ]) + + # get a minio client + minio_client = minio_store_client.get_minio_client() + # 将文件下载到本地 + minio_store_client.download( + minio_client, + bucket_name=task_info.get('bucket_name'), + folder_prefix=folder_prefix, + file_name=file_name + ) + + document_type = document.get('document_type') + if document_type in ['pdf']: + # 处理PDF文件 + result = pdf_handle.text_manipulate( + file_name=file_name, + document_id=document.get('id'), + support_type=support_type, + conn_pool=pool, + task_id=task_id, + create_user=creator + ) + + elif document_type in ['docx']: + # 处理.docx文件 + result = word_handle.docx_text_manipulate( + file_name=file_name, + document_id=document.get('id'), + support_type=support_type, + conn_pool=pool, + task_id=task_id, + create_user=creator + ) + + # 将下载的本地文件删除 + _remove_local_file(file_name) + + else: + # 针对进行中和失败的文件进行重试 + + # 获取未成功的chunk列表 + query_chunk_params = { + 'document_id': document.get('id') + } + document_chunk_dict = data_process_document_chunk_db_operate.list_by_status( + query_chunk_params, + pool=pool + ) + if len(document_chunk_dict.get('data')) > 0: + result = common_handle.text_manipulate( + file_name=file_name, + all_document_for_process=document_chunk_dict.get('data'), + support_type=support_type, + conn_pool=pool, + create_user=creator + ) + + # 判断是否存在qa拆分 + has_qa_split = any(item.get('type') == 'qa_split' for item in support_type) + + if result is None: + logger.error(''.join([ + f"{log_tag_const.MINIO_STORE_PROCESS} The file type is not supported \n", + f"The current file type is: {document_type}" + ])) + # 任务失败 + error_msg = f"{document_type} file type is not currently supported." + return { + 'status': 400, + 'message': error_msg, + 'data': '' + } + + # 新增阶段性日志-clean + clean_stage_detail=_get_stage_detail( + task_info, + pool=pool, + task_id=task_id, + document_id=document_id, + stage='clean', + file_name=file_name + ) + if clean_stage_detail.get('status') == 200: + insert_stage_log_params = { + 'task_id': task_id, + 'log_id': log_id, + 'file_name': file_name, + 'stage_name': 'clean', + 'stage_status': 'success', + 'stage_detail': clean_stage_detail.get('data'), + 'creator': creator + } + data_process_stage_log_db_operate.insert( + insert_stage_log_params, + pool=pool + ) + + # 新增阶段性日志-privacy + privacy_stage_detail=_get_stage_detail( + task_info, + pool=pool, + task_id=task_id, + document_id=document_id, + stage='privacy', + file_name=file_name + ) + if privacy_stage_detail.get('status') == 200: + insert_stage_log_params = { + 'task_id': task_id, + 'log_id': log_id, + 'file_name': file_name, + 'stage_name': 'privacy', + 'stage_status': 'success', + 'stage_detail': privacy_stage_detail.get('data'), + 'creator': creator + } + data_process_stage_log_db_operate.insert( + insert_stage_log_params, + pool=pool + ) + + # 新增阶段性日志-qa_split + if has_qa_split: + if result.get('status') != 200: + _get_qa_stage_detail( + task_id=task_id, + log_id=log_id, + status='fail', + file_name=file_name, + creator=creator, + result=result, + pool=pool + ) + else: + _get_qa_stage_detail( + task_id=task_id, + log_id=log_id, + status='success', + file_name=file_name, + creator=creator, + result=result, + pool=pool + ) + + return result + diff --git a/data-processing/data_manipulation/database_operate/data_process_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_db_operate.py index 1d554e33d..025525fea 100644 --- a/data-processing/data_manipulation/database_operate/data_process_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_db_operate.py @@ -123,6 +123,7 @@ def add( 'bucket_name': req_json['bucket_name'], 'pre_data_set_name': req_json['pre_data_set_name'], 'pre_data_set_version': req_json['pre_data_set_version'], + 'pre_version_data_set_name': req_json['version_data_set_name'], 'file_names': ujson.dumps(req_json['file_names']), 'post_data_set_name': req_json['post_data_set_name'], 'post_data_set_version': req_json['post_data_set_version'], @@ -151,6 +152,7 @@ def add( post_data_set_version, data_process_config_info, start_datetime, + pre_version_data_set_name, create_datetime, create_program, create_user, @@ -172,6 +174,7 @@ def add( %(post_data_set_version)s, %(data_process_config_info)s, %(start_datetime)s, + %(pre_version_data_set_name)s, %(create_datetime)s, %(create_program)s, %(create_user)s, @@ -243,6 +246,9 @@ def info_by_id( dpt.data_process_config_info, dpt.start_datetime, dpt.end_datetime, + dpt.bucket_name, + dpt.namespace, + dpt.pre_version_data_set_name, dpt.create_user, dpt.update_datetime, dptl.error_msg @@ -282,3 +288,37 @@ def count_by_name( res = postgresql_pool_client.execute_count_query(pool, sql, params) return res + +def update_status_and_log_id( + req_json, + pool +): + """Update the status and current log id with task id""" + user = req_json['user'] + program = '修改任务状态' + + params = { + 'id': req_json.get('id'), + 'status': req_json.get('status'), + 'current_log_id': req_json.get('current_log_id'), + 'end_datetime': req_json.get('end_datetime'), + 'update_datetime': req_json.get('end_datetime'), + 'update_program': program, + 'update_user': user + } + + sql = """ + update public.data_process_task set + status = %(status)s, + current_log_id = %(current_log_id)s, + end_datetime = %(end_datetime)s, + update_datetime = %(update_datetime)s, + update_program = %(update_program)s, + update_user = %(update_user)s + where + id = %(id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + diff --git a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py index e56f3e8cf..52b0c574b 100644 --- a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py @@ -559,3 +559,35 @@ def list_for_transform_type( res = postgresql_pool_client.execute_query(pool, sql, params) return res + +def delete_transform_by_document_chunk( + req_json, + pool +): + """delete transform by task id and document id and chunk id. + + req_json is a dictionary object. for example: + { + "task_id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "document_id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "document_chunk_id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + pool: databasec connection pool; + """ + params = { + 'task_id': req_json.get('task_id'), + 'document_id': req_json.get('document_id'), + 'document_chunk_id': req_json.get('document_chunk_id') + } + + sql = """ + delete from public.data_process_task_detail + where + task_id = %(task_id)s and + document_id = %(document_id)s and + document_chunk_id = %(document_chunk_id)s + """.strip() + + res = postgresql_pool_client.execute_update(pool, sql, params) + return res + diff --git a/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py index 8fe7e63bd..95611dac2 100644 --- a/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_document_chunk_db_operate.py @@ -164,3 +164,36 @@ def delete_by_task_id( res = postgresql_pool_client.execute_update(pool, sql, params) return res +def list_by_status( + req_json, + pool +): + """Retrieve a list of statuses marked as in progress and failed.""" + params = { + 'document_id': req_json.get('document_id') + } + + sql = """ + select + id, + document_id, + status, + task_id, + content, + meta_info, + page_number, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + from + public.data_process_task_document_chunk + where + document_id = %(document_id)s and + status != 'success' + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res diff --git a/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py index 3d47b9677..c0bea85ab 100644 --- a/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_document_db_operate.py @@ -218,3 +218,36 @@ def delete_by_task_id( res = postgresql_pool_client.execute_update(pool, sql, params) return res + + +def list_by_task_id_and_status( + req_json, + pool +): + """info with task id and status""" + params = { + 'task_id': req_json.get('id') + } + + sql = """ + select + id, + file_name, + status, + start_time, + end_time, + progress, + chunk_size, + task_id, + from_source_type, + from_source_path, + document_type + from + public.data_process_task_document + where + task_id = %(task_id)s and + status != 'success' + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res diff --git a/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py index c0305db4e..c50549265 100644 --- a/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_log_db_operate.py @@ -28,11 +28,11 @@ def add( program = '数据处理任务日志-新增' params = { - 'id': req_json['id'], - 'task_id': req_json['task_id'], - 'type': req_json['type'], + 'id': req_json.get('id'), + 'task_id': req_json.get('task_id'), + 'type': req_json.get('type'), 'status': 'processing', - 'error_msg': req_json['error_msg'], + 'error_msg': req_json.get('error_msg'), 'start_datetime': now, 'create_datetime': now, 'create_user': user, diff --git a/data-processing/data_manipulation/database_operate/data_process_stage_log_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_stage_log_db_operate.py index 474627b93..4c3463381 100644 --- a/data-processing/data_manipulation/database_operate/data_process_stage_log_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_stage_log_db_operate.py @@ -140,3 +140,36 @@ def delete_by_task_id( res = postgresql_pool_client.execute_update(pool, sql, params) return res + + +def info_by_stage_and_file_name( + req_json, + pool +): + params = { + 'task_id': req_json.get('id'), + 'stage_name': req_json.get('type'), + 'file_name': req_json.get('file_name') + } + + sql = """ + select + id, + task_id, + log_id, + log_datetime, + stage_name, + stage_status, + stage_detail, + error_msg + from + public.data_process_task_stage_log + where + task_id = %(task_id)s and + stage_name = %(stage_name)s and + file_name = %(file_name)s + order by log_datetime desc + """.strip() + + res = postgresql_pool_client.execute_query(pool, sql, params) + return res diff --git a/data-processing/data_manipulation/file_handle/common_handle.py b/data-processing/data_manipulation/file_handle/common_handle.py index a76ef9db7..3ea203431 100644 --- a/data-processing/data_manipulation/file_handle/common_handle.py +++ b/data-processing/data_manipulation/file_handle/common_handle.py @@ -212,6 +212,17 @@ def _data_clean( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -256,6 +267,17 @@ def _data_clean( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -298,6 +320,17 @@ def _data_clean( ) if result['status'] == 200: if result['data']['found'] > 0: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -340,6 +373,17 @@ def _data_clean( ) if result['status'] == 200: if result['data']['found'] > 0: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -382,6 +426,17 @@ def _data_clean( ) if result['status'] == 200: if result['data']['found'] > 0: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -426,6 +481,17 @@ def _data_clean( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -510,6 +576,17 @@ def _remove_privacy_info( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -554,6 +631,17 @@ def _remove_privacy_info( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -598,6 +686,17 @@ def _remove_privacy_info( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -640,6 +739,17 @@ def _remove_privacy_info( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -682,6 +792,17 @@ def _remove_privacy_info( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, @@ -724,6 +845,17 @@ def _remove_privacy_info( clean_data = result['data']['clean_data'] if len(clean_data) > 0: for item in clean_data: + # 避免重试的时候,新增重复性数据 + delete_transform_item = { + 'task_id': task_id, + 'document_id': document_id, + 'document_chunk_id': document_chunk_id + } + data_process_detail_db_operate.delete_transform_by_document_chunk( + delete_transform_item, + pool=conn_pool + ) + task_detail_item = { 'id': ulid.ulid(), 'task_id': task_id, diff --git a/data-processing/data_manipulation/file_handle/pdf_handle.py b/data-processing/data_manipulation/file_handle/pdf_handle.py index d832037de..f9816aead 100644 --- a/data-processing/data_manipulation/file_handle/pdf_handle.py +++ b/data-processing/data_manipulation/file_handle/pdf_handle.py @@ -36,8 +36,8 @@ def text_manipulate( conn_pool, task_id, create_user, - chunk_size, - chunk_overlap + chunk_size=None, + chunk_overlap=None ): """Manipulate the text content from a pdf file. @@ -69,14 +69,15 @@ def text_manipulate( chunck_id = ulid.ulid() page = document.metadata.get('page') + 1 content = document.page_content.replace("\n", "") - meta_info = ujson.dumps(document.metadata, ensure_ascii=False) + meta_info = document.metadata + meta_info['source'] = file_name chunk_insert_item = { 'id': chunck_id, 'document_id': document_id, 'task_id': task_id, 'status': 'not_start', 'content': content, - 'meta_info': meta_info, + 'meta_info': ujson.dumps(meta_info, ensure_ascii=False), 'page_number': page, 'creator': create_user } diff --git a/data-processing/data_manipulation/file_handle/word_handle.py b/data-processing/data_manipulation/file_handle/word_handle.py index de124d946..8e8207700 100644 --- a/data-processing/data_manipulation/file_handle/word_handle.py +++ b/data-processing/data_manipulation/file_handle/word_handle.py @@ -15,6 +15,7 @@ import logging import traceback +import ulid from common import log_tag_const from common.config import config @@ -33,8 +34,8 @@ def docx_text_manipulate( conn_pool, task_id, create_user, - chunk_size, - chunk_overlap + chunk_size=None, + chunk_overlap=None ): """Manipulate the text content from a word file. @@ -122,6 +123,6 @@ def _get_documents_by_langchain( chunk_size=int(chunk_size), chunk_overlap=int(chunk_overlap) ) - documents = text_splitter.split_text(data) + documents = text_splitter.split_text(content) return documents diff --git a/data-processing/data_manipulation/service/data_process_service.py b/data-processing/data_manipulation/service/data_process_service.py index 457ed3d2d..3da1a5315 100644 --- a/data-processing/data_manipulation/service/data_process_service.py +++ b/data-processing/data_manipulation/service/data_process_service.py @@ -242,6 +242,70 @@ def get_log_info( } +def get_log_by_file_name( + req_json, + pool +): + try: + stage_log_info = data_process_stage_log_db_operate.info_by_stage_and_file_name( + req_json, + pool=pool + ) + + if stage_log_info.get('status') != 200: + return stage_log_info + + stage_detail = stage_log_info.get('data')[0].get('stage_detail') + + return { + 'status': 200, + 'message': '', + 'data': stage_detail + } + except Exception as ex: + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + + +def retry( + req_json, + pool +): + """When a task fails, attempt a retry.""" + try: + logger.debug(f"{log_tag_const.DATA_PROCESS_SERVICE} The task retry start") + + async def async_text_manipulate_retry( + req_json, + pool + ): + minio_store_process.text_manipulate_retry(req_json, pool=pool) + + def execute_text_manipulate_task_retry(loop): + asyncio.set_event_loop(loop) + loop.run_until_complete(async_text_manipulate_retry(req_json, pool=pool)) + + thread_parallel.run_async_background_task( + execute_text_manipulate_task_retry, + 'execute text manipuate task retry' + ) + + return { + 'status': 200, + 'message': '任务开始重试!', + 'data': '' + } + except Exception as ex: + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + + def _get_default_data_for_detail(): """Get the data for the detail""" return { @@ -418,9 +482,7 @@ def _set_children_info_for_config_map_for_result( 'enable': 'true', 'zh_name': 'QA拆分', 'description': '根据文件中的文档内容,自动将文件做 QA 拆分处理。', - 'llm_config': _get_llm_config( - qa_split_config = process_cofig_map.get('qa_split') - ), + 'llm_config': process_cofig_map.get('qa_split').get('llm_config'), 'preview': _get_qa_list_preview( task_id=task_id, conn_pool=conn_pool @@ -719,23 +781,6 @@ def _get_qa_split_status( return status -def _get_llm_config( - qa_split_config -): - llm_config = qa_split_config.get('llm_config') - - # llms cr 中模型相关信息 - llm_spec_info = model_cr.get_spec_for_llms_k8s_cr( - name=llm_config.get('name'), - namespace=llm_config.get('namespace') - ) - - if llm_spec_info.get('data').get('provider').get('worker'): - llm_config['provider'] = 'worker' - else: - llm_config['provider'] = '3rd_party' - - return llm_config def _get_qa_process_file_num( task_id, diff --git a/data-processing/data_manipulation/transform/text/clean_transform.py b/data-processing/data_manipulation/transform/text/clean_transform.py index 793f5c0f6..9b8e18660 100644 --- a/data-processing/data_manipulation/transform/text/clean_transform.py +++ b/data-processing/data_manipulation/transform/text/clean_transform.py @@ -39,13 +39,15 @@ def remove_invisible_characters(text): try: pattern = r'[\x00-\x1F\x7F-\x9F\xAD\r\t\b\x0B\x1C\x1D\x1E]' find_pattern = r'[^,。!?,.!?]*[\x00-\x1F\x7F-\x9F\xAD\r\t\b\x0B\x1C\x1D\x1E][^,。!?,.!?]*' + replace_text = '' - clean_text = re.sub(pattern, '', text) + clean_text = re.sub(pattern, replace_text, text) clean_data = _find_clean_data( text=text, pattern=pattern, - find_pattern=find_pattern + find_pattern=find_pattern, + replace_text=replace_text ) return { 'status': 200, @@ -83,13 +85,15 @@ def space_standardization(text): various_whitespaces = special_characters.VARIOUS_WHITESPACES pattern = '|'.join(re.escape(value) for value in various_whitespaces) find_pattern = '|'.join(f'[^,。!?,.!?]*{re.escape(value)}[^,。!?,.!?]*' for value in various_whitespaces) + replace_text = ' ' - clean_text = re.sub(pattern, ' ', text) + clean_text = re.sub(pattern, replace_text, text) clean_data = _find_clean_data( text=text, pattern=pattern, - find_pattern=find_pattern + find_pattern=find_pattern, + replace_text=replace_text ) return { @@ -243,13 +247,15 @@ def remove_emojis(text): emojis = special_characters.EMOJI pattern = '|'.join(re.escape(value) for value in emojis) find_pattern = '|'.join(f'[^,。!?,.!?]*{re.escape(value)}[^,。!?,.!?]*' for value in emojis) + replace_text = '' - clean_text = re.sub(pattern, '', text) + clean_text = re.sub(pattern, replace_text, text) clean_data = _find_clean_data( text=text, pattern=pattern, - find_pattern=find_pattern + find_pattern=find_pattern, + replace_text=replace_text ) return { @@ -277,7 +283,8 @@ def remove_emojis(text): def _find_clean_data( text, pattern, - find_pattern + find_pattern, + replace_text ): """find clean data for pre_content and post_content. @@ -290,7 +297,7 @@ def _find_clean_data( sentences = re.findall(find_pattern, text) for sentence in sentences: - post_content = re.sub(pattern, '', sentence) + post_content = re.sub(pattern, replace_text, sentence) clean_data.append({ 'pre_content': sentence, 'post_content': post_content diff --git a/data-processing/db-scripts/init-database-schema.sql b/data-processing/db-scripts/init-database-schema.sql index a57939b00..29eac8b02 100644 --- a/data-processing/db-scripts/init-database-schema.sql +++ b/data-processing/db-scripts/init-database-schema.sql @@ -11,6 +11,7 @@ status character varying(32) COLLATE pg_catalog."default", pre_data_set_name character varying(32) COLLATE pg_catalog."default", pre_data_set_version character varying(32) COLLATE pg_catalog."default", + pre_version_data_set_name character varying(64) COLLATE pg_catalog."default", file_names jsonb, post_data_set_name character varying(32) COLLATE pg_catalog."default", post_data_set_version character varying(32) COLLATE pg_catalog."default", @@ -29,6 +30,9 @@ CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ); + COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name'; + COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id'; + COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息'; CREATE TABLE IF NOT EXISTS public.data_process_task_detail ( diff --git a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml index 2c11f133b..72fa6b55b 100644 --- a/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml +++ b/deploy/charts/arcadia/templates/pg-init-data-configmap.yaml @@ -15,6 +15,7 @@ data: status character varying(32) COLLATE pg_catalog."default", pre_data_set_name character varying(32) COLLATE pg_catalog."default", pre_data_set_version character varying(32) COLLATE pg_catalog."default", + pre_version_data_set_name character varying(64) COLLATE pg_catalog."default", file_names jsonb, post_data_set_name character varying(32) COLLATE pg_catalog."default", post_data_set_version character varying(32) COLLATE pg_catalog."default", @@ -33,6 +34,10 @@ data: CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ); + COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name'; + COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id'; + COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息'; + CREATE TABLE IF NOT EXISTS public.data_process_task_detail ( diff --git a/gqlgen.yaml b/gqlgen.yaml index ac45d01e4..a2e2c1aaf 100644 --- a/gqlgen.yaml +++ b/gqlgen.yaml @@ -164,6 +164,10 @@ models: resolver: true getLogInfo: resolver: true + dataProcessLogInfoByFileName: + resolver: true + dataProcessRetry: + resolver: true DataProcessMutation: fields: createDataProcessTask: