Skip to content

Commit

Permalink
Merge pull request #549 from wangxinbiao/main
Browse files Browse the repository at this point in the history
feat:add dataprocessing task retry functionality
  • Loading branch information
nkwangleiGIT authored Jan 12, 2024
2 parents 39effcb + af8f14c commit 80366f4
Show file tree
Hide file tree
Showing 24 changed files with 1,471 additions and 59 deletions.
390 changes: 383 additions & 7 deletions apiserver/graph/generated/generated.go

Large diffs are not rendered by default.

26 changes: 20 additions & 6 deletions apiserver/graph/generated/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions apiserver/graph/impl/dataprocessing.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions apiserver/graph/schema/dataprocessing.gql
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,26 @@ query getLogInfo($input: DataProcessDetailsInput){
}
}

query dataProcessLogInfoByFileName($input: DataProcessFileLogInput){
dataProcess {
dataProcessLogInfoByFileName(input: $input) {
status
data
message
}
}
}

query dataProcessRetry($input: DataProcessRetryInput){
dataProcess {
dataProcessRetry(input: $input) {
status
data
message
}
}
}

mutation createDataProcessTask($input: AddDataProcessInput) {
dataProcess {
createDataProcessTask(input: $input) {
Expand Down
16 changes: 16 additions & 0 deletions apiserver/graph/schema/dataprocessing.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type DataProcessQuery {
checkDataProcessTaskName(input: CheckDataProcessTaskNameInput): DataProcessResponse
# 日志信息
getLogInfo(input: DataProcessDetailsInput): DataProcessResponse
# 获取文件处理的日志
dataProcessLogInfoByFileName(input: DataProcessFileLogInput): DataProcessResponse
# 任务重试
dataProcessRetry(input: DataProcessRetryInput): DataProcessResponse
}


Expand Down Expand Up @@ -71,6 +75,7 @@ input LLMConfigItem {
top_p: String
max_tokens: String
prompt_template: String
provider: String
}

input DeleteDataProcessInput {
Expand All @@ -86,6 +91,17 @@ input CheckDataProcessTaskNameInput {
namespace: String!
}

input DataProcessFileLogInput {
id: String!
file_name: String!
type: String!
}

input DataProcessRetryInput {
id: String!
creator: String!
}

# 数据处理列表分页
type PaginatedDataProcessItem {
status: Int!
Expand Down
56 changes: 56 additions & 0 deletions apiserver/pkg/dataprocessing/dataprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,59 @@ func GetLogInfo(ctx context.Context, c dynamic.Interface, obj *generated.DataPro
}
return data, nil
}

func DataProcessLogInfoByFileName(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessFileLogInput) (*generated.DataProcessResponse, error) {
// prepare http request
jsonParams, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url+"/get-log-by-file-name", bytes.NewBuffer(jsonParams))
if err != nil {
return nil, err
}

// call dataprocessing server
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// parse http response
data := &generated.DataProcessResponse{}
err = json.NewDecoder(resp.Body).Decode(data)
if err != nil {
return nil, err
}
return data, nil
}

func DataProcessRetry(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessRetryInput) (*generated.DataProcessResponse, error) {
// prepare http request
jsonParams, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url+"/retry", bytes.NewBuffer(jsonParams))
if err != nil {
return nil, err
}

// call dataprocessing server
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// parse http response
data := &generated.DataProcessResponse{}
err = json.NewDecoder(resp.Body).Decode(data)
if err != nil {
return nil, err
}
return data, nil
}
1 change: 1 addition & 0 deletions data-processing/data_manipulation/common/log_tag_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
MINIO = "Minio"
MINIO_STORE_PROCESS = "Minio Store Process"

DATA_PROCESS_SERVICE = "Data Process Service"
DATA_PROCESS_DETAIL = "Data Process Detail"

COMMON_HANDLE = "Common Handle"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
# whitespaces in unicode can be found here:
# https://en.wikipedia.org/wiki/Whitespace_character
VARIOUS_WHITESPACES = {
' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', '​', '‌', '‍', '⁠', '', '„'
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,36 @@ async def get_log_info(request):
)
return json(res)

@data_process.route('get-log-by-file-name', methods=['POST'])
async def get_log_by_file_name(request):
"""check task name by name and namespace.
example for request.json
{
"id": "01HGWBE48DT3ADE9ZKA62SW4WS",
"file_name": "xx.pdf",
"type": "qa_split"
}
"""
res = data_process_service.get_log_by_file_name(
request.json,
pool=request.app.config['conn_pool']
)
return json(res)


@data_process.route('retry', methods=['POST'])
async def retry(request):
"""check task name by name and namespace.
example for request.json
{
"id": "01HGWBE48DT3ADE9ZKA62SW4WS"
}
"""
res = data_process_service.retry(
request.json,
pool=request.app.config['conn_pool']
)
return json(res)

Loading

0 comments on commit 80366f4

Please sign in to comment.