| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- // Copyright 2022 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package actions
-
- import (
- "context"
- "errors"
- "fmt"
-
- actions_model "code.gitea.io/gitea/models/actions"
- "code.gitea.io/gitea/models/db"
- "code.gitea.io/gitea/modules/graceful"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/queue"
- notify_service "code.gitea.io/gitea/services/notify"
-
- "github.com/nektos/act/pkg/jobparser"
- "xorm.io/builder"
- )
-
- var jobEmitterQueue *queue.WorkerPoolQueue[*jobUpdate]
-
- type jobUpdate struct {
- RunID int64
- }
-
- func EmitJobsIfReady(runID int64) error {
- err := jobEmitterQueue.Push(&jobUpdate{
- RunID: runID,
- })
- if errors.Is(err, queue.ErrAlreadyInQueue) {
- return nil
- }
- return err
- }
-
- func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
- ctx := graceful.GetManager().ShutdownContext()
- var ret []*jobUpdate
- for _, update := range items {
- if err := checkJobsOfRun(ctx, update.RunID); err != nil {
- ret = append(ret, update)
- }
- }
- return ret
- }
-
- func checkJobsOfRun(ctx context.Context, runID int64) error {
- jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
- if err != nil {
- return err
- }
- var updatedjobs []*actions_model.ActionRunJob
- if err := db.WithTx(ctx, func(ctx context.Context) error {
- idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
- for _, job := range jobs {
- idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
- }
-
- updates := newJobStatusResolver(jobs).Resolve()
- for _, job := range jobs {
- if status, ok := updates[job.ID]; ok {
- job.Status = status
- if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
- return err
- } else if n != 1 {
- return fmt.Errorf("no affected for updating blocked job %v", job.ID)
- }
- updatedjobs = append(updatedjobs, job)
- }
- }
- return nil
- }); err != nil {
- return err
- }
- CreateCommitStatus(ctx, jobs...)
- for _, job := range updatedjobs {
- _ = job.LoadAttributes(ctx)
- notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
- }
- if len(jobs) > 0 {
- runUpdated := true
- for _, job := range jobs {
- if !job.Status.IsDone() {
- runUpdated = false
- break
- }
- }
- if runUpdated {
- NotifyWorkflowRunStatusUpdateWithReload(ctx, jobs[0])
- }
- }
- return nil
- }
-
- func NotifyWorkflowRunStatusUpdateWithReload(ctx context.Context, job *actions_model.ActionRunJob) {
- job.Run = nil
- if err := job.LoadAttributes(ctx); err != nil {
- log.Error("LoadAttributes: %v", err)
- return
- }
- notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
- }
-
- type jobStatusResolver struct {
- statuses map[int64]actions_model.Status
- needs map[int64][]int64
- jobMap map[int64]*actions_model.ActionRunJob
- }
-
- func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
- idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
- jobMap := make(map[int64]*actions_model.ActionRunJob)
- for _, job := range jobs {
- idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
- jobMap[job.ID] = job
- }
-
- statuses := make(map[int64]actions_model.Status, len(jobs))
- needs := make(map[int64][]int64, len(jobs))
- for _, job := range jobs {
- statuses[job.ID] = job.Status
- for _, need := range job.Needs {
- for _, v := range idToJobs[need] {
- needs[job.ID] = append(needs[job.ID], v.ID)
- }
- }
- }
- return &jobStatusResolver{
- statuses: statuses,
- needs: needs,
- jobMap: jobMap,
- }
- }
-
- func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
- ret := map[int64]actions_model.Status{}
- for i := 0; i < len(r.statuses); i++ {
- updated := r.resolve()
- if len(updated) == 0 {
- return ret
- }
- for k, v := range updated {
- ret[k] = v
- r.statuses[k] = v
- }
- }
- return ret
- }
-
- func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
- ret := map[int64]actions_model.Status{}
- for id, status := range r.statuses {
- if status != actions_model.StatusBlocked {
- continue
- }
- allDone, allSucceed := true, true
- for _, need := range r.needs[id] {
- needStatus := r.statuses[need]
- if !needStatus.IsDone() {
- allDone = false
- }
- if needStatus.In(actions_model.StatusFailure, actions_model.StatusCancelled, actions_model.StatusSkipped) {
- allSucceed = false
- }
- }
- if allDone {
- if allSucceed {
- ret[id] = actions_model.StatusWaiting
- } else {
- // Check if the job has an "if" condition
- hasIf := false
- if wfJobs, _ := jobparser.Parse(r.jobMap[id].WorkflowPayload); len(wfJobs) == 1 {
- _, wfJob := wfJobs[0].Job()
- hasIf = len(wfJob.If.Value) > 0
- }
-
- if hasIf {
- // act_runner will check the "if" condition
- ret[id] = actions_model.StatusWaiting
- } else {
- // If the "if" condition is empty and not all dependent jobs completed successfully,
- // the job should be skipped.
- ret[id] = actions_model.StatusSkipped
- }
- }
- }
- }
- return ret
- }
|