Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/Lanture1064/arcadia into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanture1064 committed Jan 15, 2024
2 parents bfe8b90 + c23b163 commit 30590ee
Show file tree
Hide file tree
Showing 30 changed files with 1,709 additions and 62 deletions.
390 changes: 383 additions & 7 deletions apiserver/graph/generated/generated.go

Large diffs are not rendered by default.

26 changes: 20 additions & 6 deletions apiserver/graph/generated/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions apiserver/graph/impl/dataprocessing.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions apiserver/graph/schema/dataprocessing.gql
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,26 @@ query getLogInfo($input: DataProcessDetailsInput){
}
}

query dataProcessLogInfoByFileName($input: DataProcessFileLogInput){
dataProcess {
dataProcessLogInfoByFileName(input: $input) {
status
data
message
}
}
}

query dataProcessRetry($input: DataProcessRetryInput){
dataProcess {
dataProcessRetry(input: $input) {
status
data
message
}
}
}

mutation createDataProcessTask($input: AddDataProcessInput) {
dataProcess {
createDataProcessTask(input: $input) {
Expand Down
16 changes: 16 additions & 0 deletions apiserver/graph/schema/dataprocessing.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type DataProcessQuery {
checkDataProcessTaskName(input: CheckDataProcessTaskNameInput): DataProcessResponse
# 日志信息
getLogInfo(input: DataProcessDetailsInput): DataProcessResponse
# 获取文件处理的日志
dataProcessLogInfoByFileName(input: DataProcessFileLogInput): DataProcessResponse
# 任务重试
dataProcessRetry(input: DataProcessRetryInput): DataProcessResponse
}


Expand Down Expand Up @@ -71,6 +75,7 @@ input LLMConfigItem {
top_p: String
max_tokens: String
prompt_template: String
provider: String
}

input DeleteDataProcessInput {
Expand All @@ -86,6 +91,17 @@ input CheckDataProcessTaskNameInput {
namespace: String!
}

input DataProcessFileLogInput {
id: String!
file_name: String!
type: String!
}

input DataProcessRetryInput {
id: String!
creator: String!
}

# 数据处理列表分页
type PaginatedDataProcessItem {
status: Int!
Expand Down
56 changes: 56 additions & 0 deletions apiserver/pkg/dataprocessing/dataprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,59 @@ func GetLogInfo(ctx context.Context, c dynamic.Interface, obj *generated.DataPro
}
return data, nil
}

func DataProcessLogInfoByFileName(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessFileLogInput) (*generated.DataProcessResponse, error) {
// prepare http request
jsonParams, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url+"/get-log-by-file-name", bytes.NewBuffer(jsonParams))
if err != nil {
return nil, err
}

// call dataprocessing server
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// parse http response
data := &generated.DataProcessResponse{}
err = json.NewDecoder(resp.Body).Decode(data)
if err != nil {
return nil, err
}
return data, nil
}

func DataProcessRetry(ctx context.Context, c dynamic.Interface, obj *generated.DataProcessQuery, input *generated.DataProcessRetryInput) (*generated.DataProcessResponse, error) {
// prepare http request
jsonParams, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", url+"/retry", bytes.NewBuffer(jsonParams))
if err != nil {
return nil, err
}

// call dataprocessing server
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// parse http response
data := &generated.DataProcessResponse{}
err = json.NewDecoder(resp.Body).Decode(data)
if err != nil {
return nil, err
}
return data, nil
}
3 changes: 2 additions & 1 deletion apiserver/pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,10 @@ func ModelFiles(ctx context.Context, c dynamic.Interface, modelName, namespace s
result := make([]generated.PageNode, 0)
for _, obj := range objectInfoList {
if keyword == "" || strings.Contains(obj.Key, keyword) {
lastModified := obj.LastModified
tf := generated.F{
Path: strings.TrimPrefix(obj.Key, prefix),
Time: &obj.LastModified,
Time: &lastModified,
}
size := utils.BytesToSizedStr(obj.Size)
tf.Size = &size
Expand Down
13 changes: 11 additions & 2 deletions apiserver/pkg/versioneddataset/versioned_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,19 @@ func CreateVersionedDataset(ctx context.Context, c dynamic.Interface, input *gen
return nil, err
}
if v.Spec.Version == *input.InheritedFrom {
isReady := false
var errMessage error
for _, cond := range v.Status.Conditions {
if !(cond.Type == v1alpha1.TypeReady && cond.Status == v1.ConditionTrue) {
return nil, fmt.Errorf("inherit from a version with an incorrect synchronization state will not be created. reason: %s, errMsg: %s", cond.Reason, cond.Message)
if cond.Type == v1alpha1.TypeReady && cond.Status == v1.ConditionTrue {
isReady = true
break
}
if cond.Type == v1alpha1.TypeReady && cond.Status != v1.ConditionTrue {
errMessage = fmt.Errorf("inherit from a version with an incorrect synchronization state will not be created. reason: %s, errMsg: %s", cond.Reason, cond.Message)
}
}
if !isReady {
return nil, errMessage
}
}
}
Expand Down
1 change: 1 addition & 0 deletions data-processing/data_manipulation/common/log_tag_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
MINIO = "Minio"
MINIO_STORE_PROCESS = "Minio Store Process"

DATA_PROCESS_SERVICE = "Data Process Service"
DATA_PROCESS_DETAIL = "Data Process Detail"

COMMON_HANDLE = "Common Handle"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
# whitespaces in unicode can be found here:
# https://en.wikipedia.org/wiki/Whitespace_character
VARIOUS_WHITESPACES = {
' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
' ', ' ', ' ', ' ', '​', '‌', '‍', '⁠', '', '„'
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,36 @@ async def get_log_info(request):
)
return json(res)

@data_process.route('get-log-by-file-name', methods=['POST'])
async def get_log_by_file_name(request):
"""check task name by name and namespace.
example for request.json
{
"id": "01HGWBE48DT3ADE9ZKA62SW4WS",
"file_name": "xx.pdf",
"type": "qa_split"
}
"""
res = data_process_service.get_log_by_file_name(
request.json,
pool=request.app.config['conn_pool']
)
return json(res)


@data_process.route('retry', methods=['POST'])
async def retry(request):
"""check task name by name and namespace.
example for request.json
{
"id": "01HGWBE48DT3ADE9ZKA62SW4WS"
}
"""
res = data_process_service.retry(
request.json,
pool=request.app.config['conn_pool']
)
return json(res)

Loading

0 comments on commit 30590ee

Please sign in to comment.