Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add dataprocessing task retry functionality #549

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
390 changes: 383 additions & 7 deletions apiserver/graph/generated/generated.go

Large diffs are not rendered by default.

26 changes: 20 additions & 6 deletions apiserver/graph/generated/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions apiserver/graph/impl/dataprocessing.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions apiserver/graph/schema/dataprocessing.gql
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,26 @@ query getLogInfo($input: DataProcessDetailsInput){
}
}

query dataProcessLogInfoByFileName($input: DataProcessFileLogInput){
dataProcess {
dataProcessLogInfoByFileName(input: $input) {
status
data
message
}
}
}

query dataProcessRetry($input: DataProcessRetryInput){
dataProcess {
dataProcessRetry(input: $input) {
status
data
message
}
}
}

mutation createDataProcessTask($input: AddDataProcessInput) {
dataProcess {
createDataProcessTask(input: $input) {
Expand Down
16 changes: 16 additions & 0 deletions apiserver/graph/schema/dataprocessing.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type DataProcessQuery {
checkDataProcessTaskName(input: CheckDataProcessTaskNameInput): DataProcessResponse
# 日志信息
getLogInfo(input: DataProcessDetailsInput): DataProcessResponse
# 获取文件处理的日志
dataProcessLogInfoByFileName(input: DataProcessFileLogInput): DataProcessResponse
# 任务重试
dataProcessRetry(input: DataProcessRetryInput): DataProcessResponse
}


Expand Down Expand Up @@ -71,6 +75,7 @@ input LLMConfigItem {
top_p: String
max_tokens: String
prompt_template: String
provider: String
}

input DeleteDataProcessInput {
Expand All @@ -86,6 +91,17 @@ input CheckDataProcessTaskNameInput {
namespace: String!
}

input DataProcessFileLogInput {
id: String!
file_name: String!
type: String!
}

input DataProcessRetryInput {
id: String!
creator: String!
}

# 数据处理列表分页
type PaginatedDataProcessItem {
status: Int!
Expand Down
56 changes: 56 additions & 0 deletions apiserver/pkg/dataprocessing/dataprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,59 @@ func GetLogInfo(ctx context.Context, c dynamic.Interface, obj *generated.DataPro
}
return data, nil
}

func DataProcessLogInfoByFileName(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessFileLogInput) (*generated.DataProcessResponse, error) {
// prepare http request
jsonParams, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url+"/get-log-by-file-name", bytes.NewBuffer(jsonParams))
if err != nil {
return nil, err
}

// call dataprocessing server
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// parse http response
data := &generated.DataProcessResponse{}
err = json.NewDecoder(resp.Body).Decode(data)
if err != nil {
return nil, err
}
return data, nil
}

func DataProcessRetry(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessRetryInput) (*generated.DataProcessResponse, error) {
// prepare http request
jsonParams, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url+"/retry", bytes.NewBuffer(jsonParams))
if err != nil {
return nil, err
}

// call dataprocessing server
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// parse http response
data := &generated.DataProcessResponse{}
err = json.NewDecoder(resp.Body).Decode(data)
if err != nil {
return nil, err
}
return data, nil
}
1 change: 1 addition & 0 deletions data-processing/data_manipulation/common/log_tag_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
MINIO = "Minio"
MINIO_STORE_PROCESS = "Minio Store Process"

DATA_PROCESS_SERVICE = "Data Process Service"
DATA_PROCESS_DETAIL = "Data Process Detail"

COMMON_HANDLE = "Common Handle"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
# whitespaces in unicode can be found here:
# https://en.wikipedia.org/wiki/Whitespace_character
VARIOUS_WHITESPACES = {
' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', '​', '‌', '‍', '⁠', '', '„'
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,36 @@ async def get_log_info(request):
)
return json(res)

@data_process.route('get-log-by-file-name', methods=['POST'])
async def get_log_by_file_name(request):
"""check task name by name and namespace.
example for request.json
{
"id": "01HGWBE48DT3ADE9ZKA62SW4WS",
"file_name": "xx.pdf",
"type": "qa_split"
}
"""
res = data_process_service.get_log_by_file_name(
request.json,
pool=request.app.config['conn_pool']
)
return json(res)


@data_process.route('retry', methods=['POST'])
async def retry(request):
"""check task name by name and namespace.
example for request.json
{
"id": "01HGWBE48DT3ADE9ZKA62SW4WS"
}
"""
res = data_process_service.retry(
request.json,
pool=request.app.config['conn_pool']
)
return json(res)

Loading