Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: implement "cicd_feedback" to integrate external cicd similar to integrated actions #31493

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
gitea.com/lunny/dingtalk_webhook v0.0.0-20171025031554-e3534c89ef96
gitea.com/lunny/levelqueue v0.4.2-0.20230414023320-3c0159fe0fe4
github.com/42wim/sshsig v0.0.0-20211121163825-841cf5bbc121
github.com/6543/cicd_feedback v0.0.0-20240625213231-0f894fa6f0f9
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:lSA0F4e9A2NcQSqGq
gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0pAQhH8yz+DNjUbjppKQzKFAn28TMYPB6IU=
github.com/42wim/sshsig v0.0.0-20211121163825-841cf5bbc121 h1:r3qt8PCHnfjOv9PN3H+XXKmDA1dfFMIN1AislhlA/ps=
github.com/42wim/sshsig v0.0.0-20211121163825-841cf5bbc121/go.mod h1:Ock8XgA7pvULhIaHGAk/cDnRfNrF9Jey81nPcc403iU=
github.com/6543/cicd_feedback v0.0.0-20240625213231-0f894fa6f0f9 h1:/QqqEnwNhZk4GwAk0GE6F6lfdIUfjWTod+zCa/jBQSI=
github.com/6543/cicd_feedback v0.0.0-20240625213231-0f894fa6f0f9/go.mod h1:xJjRB6hyl1f8XMjBajWkP98wk4Jq+Kxm+eIGy+Piozo=
github.com/6543/go-version v1.3.1 h1:HvOp+Telns7HWJ2Xo/05YXQSB2bE0WmVgbHqwMPZT4U=
github.com/6543/go-version v1.3.1/go.mod h1:oqFAHCwtLVUTLdhQmVZWYvaHXTdsbB4SY85at64SQEo=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 h1:1nGuui+4POelzDwI7RG56yfQJHCnKvwfMoU7VsEp+Zg=
Expand Down
6 changes: 6 additions & 0 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type ActionRun struct {
PreviousDuration time.Duration
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
// External is true if it's an cicd_feedback pipeline
External bool `xorm:"NOT NULL DEFAULT false"`
}

func init() {
Expand Down Expand Up @@ -286,6 +288,10 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
run.Repo = repo
}

if run.External {
return committer.Commit()
}

if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ActionRunJob struct {
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
External bool `xorm:"-"`
}

func init() {
Expand Down
227 changes: 164 additions & 63 deletions routers/web/repo/actions/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"archive/zip"
"compress/gzip"
"context"
"encoding/json"

Check failure on line 10 in routers/web/repo/actions/view.go

View workflow job for this annotation

GitHub Actions / lint-backend

import 'encoding/json' is not allowed from list 'main': use gitea's modules/json instead of encoding/json (depguard)

Check failure on line 10 in routers/web/repo/actions/view.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

import 'encoding/json' is not allowed from list 'main': use gitea's modules/json instead of encoding/json (depguard)

Check failure on line 10 in routers/web/repo/actions/view.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

import 'encoding/json' is not allowed from list 'main': use gitea's modules/json instead of encoding/json (depguard)
"errors"
"fmt"
"io"
Expand All @@ -28,7 +29,9 @@
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/modules/web"
actions_service "code.gitea.io/gitea/services/actions"
"code.gitea.io/gitea/services/cicdfeedback"
context_module "code.gitea.io/gitea/services/context"
"github.com/6543/cicd_feedback"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint fails here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect the linter to not scream 😆

this pull is for demonstration ... but I thin some code parts need refactoring first anywa ... to make it an nice integration.

e.g. I think the "gitea action" code has some 🍝 vibes!!! witch we should solve first


"xorm.io/builder"
)
Expand Down Expand Up @@ -133,7 +136,7 @@
runIndex := ctx.PathParamInt64("run")
jobIndex := ctx.PathParamInt64("job")

current, jobs := getRunJobs(ctx, runIndex, jobIndex)
current, jobs, externalInfo := getRunJobs(ctx, runIndex, jobIndex)
if ctx.Written() {
return
}
Expand Down Expand Up @@ -162,7 +165,7 @@
ID: v.ID,
Name: v.Name,
Status: v.Status.String(),
CanRerun: v.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions),
CanRerun: v.Status.IsDone() && ctx.Repo.CanWrite(unit.TypeActions) && !v.External,
Duration: v.Duration().String(),
})
}
Expand Down Expand Up @@ -205,64 +208,133 @@
resp.State.CurrentJob.Steps = make([]*ViewJobStep, 0) // marshal to '[]' instead fo 'null' in json
resp.Logs.StepsLog = make([]*ViewStepLog, 0) // marshal to '[]' instead fo 'null' in json
if task != nil {
steps := actions.FullSteps(task)

for _, v := range steps {
resp.State.CurrentJob.Steps = append(resp.State.CurrentJob.Steps, &ViewJobStep{
Summary: v.Name,
Duration: v.Duration().String(),
Status: v.Status.String(),
})
loadTaskAttrib(ctx, req, resp, task)
if ctx.Written() {
return
}
}
if current.External {
loadExternalTask(ctx, req, resp, current, externalInfo)
if ctx.Written() {
return
}
}

for _, cursor := range req.LogCursors {
if !cursor.Expanded {
continue
}
ctx.JSON(http.StatusOK, resp)
}

step := steps[cursor.Step]

logLines := make([]*ViewStepLogLine, 0) // marshal to '[]' instead fo 'null' in json

index := step.LogIndex + cursor.Cursor
validCursor := cursor.Cursor >= 0 &&
// !(cursor.Cursor < step.LogLength) when the frontend tries to fetch next line before it's ready.
// So return the same cursor and empty lines to let the frontend retry.
cursor.Cursor < step.LogLength &&
// !(index < task.LogIndexes[index]) when task data is older than step data.
// It can be fixed by making sure write/read tasks and steps in the same transaction,
// but it's easier to just treat it as fetching the next line before it's ready.
index < int64(len(task.LogIndexes))

if validCursor {
length := step.LogLength - cursor.Cursor
offset := task.LogIndexes[index]
var err error
logRows, err := actions.ReadLogs(ctx, task.LogInStorage, task.LogFilename, offset, length)
if err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
return
}
func loadExternalTask(ctx *context_module.Context, req *ViewRequest, resp *ViewResponse, job *actions_model.ActionRunJob, info *cicdfeedback.WorkflowInfo) {
externalSteps := make([]*cicd_feedback.Step, 0, 4)
if err := json.Unmarshal(job.WorkflowPayload, &externalSteps); err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
return
}

for i, row := range logRows {
logLines = append(logLines, &ViewStepLogLine{
Index: cursor.Cursor + int64(i) + 1, // start at 1
Message: row.Content,
Timestamp: float64(row.Time.AsTime().UnixNano()) / float64(time.Second),
})
}
for _, v := range externalSteps {
status, _ := cicdfeedback.ConvertStatus(v.Status)

resp.State.CurrentJob.Steps = append(resp.State.CurrentJob.Steps, &ViewJobStep{
Summary: v.Name,
Status: status.String(),
})
}

for _, cursor := range req.LogCursors {
if !cursor.Expanded {
continue
}

logLines := make([]*ViewStepLogLine, 0) // marshal to '[]' instead fo 'null' in json

// we don't support live update atm
if job.Status != actions_model.StatusRunning {
if cursor.Step >= len(externalSteps) {
// out of bounds
ctx.Error(http.StatusForbidden, "out ouf bounds step cursor")
return
}
step := externalSteps[cursor.Step]
logs, err := cicdfeedback.LoadLogs(ctx, step, info)
if err != nil {
ctx.Error(http.StatusInternalServerError, "could not fetch external logs")
return
}
if logs[len(logs)-1] == '\n' {
logs = logs[:len(logs)-1]
}

resp.Logs.StepsLog = append(resp.Logs.StepsLog, &ViewStepLog{
Step: cursor.Step,
Cursor: cursor.Cursor + int64(len(logLines)),
Lines: logLines,
Started: int64(step.Started),
})
for i, line := range strings.Split(logs, "\n") {
logLines = append(logLines, &ViewStepLogLine{
Index: int64(i + 1),
Message: line,
})
}
}

resp.Logs.StepsLog = []*ViewStepLog{{
Step: cursor.Step,
Cursor: cursor.Cursor + int64(len(logLines)),
Lines: logLines,
}}
}
}

ctx.JSON(http.StatusOK, resp)
func loadTaskAttrib(ctx *context_module.Context, req *ViewRequest, resp *ViewResponse, task *actions_model.ActionTask) {
steps := actions.FullSteps(task)

for _, v := range steps {
resp.State.CurrentJob.Steps = append(resp.State.CurrentJob.Steps, &ViewJobStep{
Summary: v.Name,
Duration: v.Duration().String(),
Status: v.Status.String(),
})
}

for _, cursor := range req.LogCursors {
if !cursor.Expanded {
continue
}

step := steps[cursor.Step]

logLines := make([]*ViewStepLogLine, 0) // marshal to '[]' instead fo 'null' in json

index := step.LogIndex + cursor.Cursor
validCursor := cursor.Cursor >= 0 &&
// !(cursor.Cursor < step.LogLength) when the frontend tries to fetch next line before it's ready.
// So return the same cursor and empty lines to let the frontend retry.
cursor.Cursor < step.LogLength &&
// !(index < task.LogIndexes[index]) when task data is older than step data.
// It can be fixed by making sure write/read tasks and steps in the same transaction,
// but it's easier to just treat it as fetching the next line before it's ready.
index < int64(len(task.LogIndexes))

if validCursor {
length := step.LogLength - cursor.Cursor
offset := task.LogIndexes[index]
var err error
logRows, err := actions.ReadLogs(ctx, task.LogInStorage, task.LogFilename, offset, length)
if err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
return
}

for i, row := range logRows {
logLines = append(logLines, &ViewStepLogLine{
Index: cursor.Cursor + int64(i) + 1, // start at 1
Message: row.Content,
Timestamp: float64(row.Time.AsTime().UnixNano()) / float64(time.Second),
})
}
}

resp.Logs.StepsLog = append(resp.Logs.StepsLog, &ViewStepLog{
Step: cursor.Step,
Cursor: cursor.Cursor + int64(len(logLines)),
Lines: logLines,
Started: int64(step.Started),
})
}
}

// Rerun will rerun jobs in the given run
Expand All @@ -281,6 +353,11 @@
return
}

if run.External {
ctx.Error(http.StatusForbidden, "can not control external run")
return
}

// can not rerun job when workflow is disabled
cfgUnit := ctx.Repo.Repository.MustGetUnit(ctx, unit.TypeActions)
cfg := cfgUnit.ActionsConfig()
Expand All @@ -300,11 +377,15 @@
}
}

job, jobs := getRunJobs(ctx, runIndex, jobIndex)
job, jobs, _ := getRunJobs(ctx, runIndex, jobIndex)
if ctx.Written() {
return
}

if job.External {
ctx.Error(http.StatusForbidden, "can not control external run")
}

if jobIndexStr == "" { // rerun all jobs
for _, j := range jobs {
// if the job has needs, it should be set to "blocked" status to wait for other jobs
Expand Down Expand Up @@ -361,7 +442,7 @@
runIndex := ctx.PathParamInt64("run")
jobIndex := ctx.PathParamInt64("job")

job, _ := getRunJobs(ctx, runIndex, jobIndex)
job, _, externalInfo := getRunJobs(ctx, runIndex, jobIndex)
if ctx.Written() {
return
}
Expand All @@ -370,6 +451,11 @@
return
}

if job.External || externalInfo != nil {
ctx.Error(http.StatusForbidden, "streaming of external logs not implemented")
return
}

err := job.LoadRun(ctx)
if err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -409,10 +495,13 @@
func Cancel(ctx *context_module.Context) {
runIndex := ctx.PathParamInt64("run")

_, jobs := getRunJobs(ctx, runIndex, -1)
_, jobs, _ := getRunJobs(ctx, runIndex, -1)
if ctx.Written() {
return
}
if jobs[0].External {
ctx.Error(http.StatusForbidden, "can not control external run")
}

if err := db.WithTx(ctx, func(ctx context.Context) error {
for _, job := range jobs {
Expand Down Expand Up @@ -450,10 +539,15 @@
func Approve(ctx *context_module.Context) {
runIndex := ctx.PathParamInt64("run")

current, jobs := getRunJobs(ctx, runIndex, -1)
current, jobs, _ := getRunJobs(ctx, runIndex, -1)
if ctx.Written() {
return
}

if current.External {
ctx.Error(http.StatusForbidden, "can not control external run")
}

run := current.Run
doer := ctx.Doer

Expand Down Expand Up @@ -486,36 +580,43 @@
// getRunJobs gets the jobs of runIndex, and returns jobs[jobIndex], jobs.
// Any error will be written to the ctx.
// It never returns a nil job of an empty jobs, if the jobIndex is out of range, it will be treated as 0.
func getRunJobs(ctx *context_module.Context, runIndex, jobIndex int64) (*actions_model.ActionRunJob, []*actions_model.ActionRunJob) {
func getRunJobs(ctx *context_module.Context, runIndex, jobIndex int64) (*actions_model.ActionRunJob, []*actions_model.ActionRunJob, *cicdfeedback.WorkflowInfo) {
run, err := actions_model.GetRunByIndex(ctx, ctx.Repo.Repository.ID, runIndex)
if err != nil {
if errors.Is(err, util.ErrNotExist) {
ctx.Error(http.StatusNotFound, err.Error())
return nil, nil
return nil, nil, nil
}
ctx.Error(http.StatusInternalServerError, err.Error())
return nil, nil
return nil, nil, nil
}
run.Repo = ctx.Repo.Repository

jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID)
var jobs []*actions_model.ActionRunJob
var externalInfo *cicdfeedback.WorkflowInfo
if run.External {
jobs, externalInfo, err = cicdfeedback.GetExternalRunJobs(ctx, run)
} else {
jobs, err = actions_model.GetRunJobsByRunID(ctx, run.ID)
}
if err != nil {
ctx.Error(http.StatusInternalServerError, err.Error())
return nil, nil
return nil, nil, nil
}

if len(jobs) == 0 {
ctx.Error(http.StatusNotFound)
return nil, nil
return nil, nil, nil
}

for _, v := range jobs {
v.Run = run
}

if jobIndex >= 0 && jobIndex < int64(len(jobs)) {
return jobs[jobIndex], jobs
return jobs[jobIndex], jobs, externalInfo
}
return jobs[0], jobs
return jobs[0], jobs, externalInfo
}

type ArtifactsViewResponse struct {
Expand Down
Loading
Loading