gitea源码

schedule_tasks.go 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "context"
  6. "fmt"
  7. "time"
  8. actions_model "code.gitea.io/gitea/models/actions"
  9. "code.gitea.io/gitea/models/db"
  10. repo_model "code.gitea.io/gitea/models/repo"
  11. "code.gitea.io/gitea/models/unit"
  12. "code.gitea.io/gitea/modules/log"
  13. "code.gitea.io/gitea/modules/timeutil"
  14. webhook_module "code.gitea.io/gitea/modules/webhook"
  15. notify_service "code.gitea.io/gitea/services/notify"
  16. "github.com/nektos/act/pkg/jobparser"
  17. )
  18. // StartScheduleTasks start the task
  19. func StartScheduleTasks(ctx context.Context) error {
  20. return startTasks(ctx)
  21. }
  22. // startTasks retrieves specifications in pages, creates a schedule task for each specification,
  23. // and updates the specification's next run time and previous run time.
  24. // The function returns an error if there's an issue with finding or updating the specifications.
  25. func startTasks(ctx context.Context) error {
  26. // Set the page size
  27. pageSize := 50
  28. // Retrieve specs in pages until all specs have been retrieved
  29. now := time.Now()
  30. for page := 1; ; page++ {
  31. // Retrieve the specs for the current page
  32. specs, _, err := actions_model.FindSpecs(ctx, actions_model.FindSpecOptions{
  33. ListOptions: db.ListOptions{
  34. Page: page,
  35. PageSize: pageSize,
  36. },
  37. Next: now.Unix(),
  38. })
  39. if err != nil {
  40. return fmt.Errorf("find specs: %w", err)
  41. }
  42. if err := specs.LoadRepos(ctx); err != nil {
  43. return fmt.Errorf("LoadRepos: %w", err)
  44. }
  45. // Loop through each spec and create a schedule task for it
  46. for _, row := range specs {
  47. // cancel running jobs if the event is push
  48. if row.Schedule.Event == webhook_module.HookEventPush {
  49. // cancel running jobs of the same workflow
  50. if err := CancelPreviousJobs(
  51. ctx,
  52. row.RepoID,
  53. row.Schedule.Ref,
  54. row.Schedule.WorkflowID,
  55. webhook_module.HookEventSchedule,
  56. ); err != nil {
  57. log.Error("CancelPreviousJobs: %v", err)
  58. }
  59. }
  60. if row.Repo.IsArchived {
  61. // Skip if the repo is archived
  62. continue
  63. }
  64. cfg, err := row.Repo.GetUnit(ctx, unit.TypeActions)
  65. if err != nil {
  66. if repo_model.IsErrUnitTypeNotExist(err) {
  67. // Skip the actions unit of this repo is disabled.
  68. continue
  69. }
  70. return fmt.Errorf("GetUnit: %w", err)
  71. }
  72. if cfg.ActionsConfig().IsWorkflowDisabled(row.Schedule.WorkflowID) {
  73. continue
  74. }
  75. if err := CreateScheduleTask(ctx, row.Schedule); err != nil {
  76. log.Error("CreateScheduleTask: %v", err)
  77. return err
  78. }
  79. // Parse the spec
  80. schedule, err := row.Parse()
  81. if err != nil {
  82. log.Error("Parse: %v", err)
  83. return err
  84. }
  85. // Update the spec's next run time and previous run time
  86. row.Prev = row.Next
  87. row.Next = timeutil.TimeStamp(schedule.Next(now.Add(1 * time.Minute)).Unix())
  88. if err := actions_model.UpdateScheduleSpec(ctx, row, "prev", "next"); err != nil {
  89. log.Error("UpdateScheduleSpec: %v", err)
  90. return err
  91. }
  92. }
  93. // Stop if all specs have been retrieved
  94. if len(specs) < pageSize {
  95. break
  96. }
  97. }
  98. return nil
  99. }
  100. // CreateScheduleTask creates a scheduled task from a cron action schedule.
  101. // It creates an action run based on the schedule, inserts it into the database, and creates commit statuses for each job.
  102. func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) error {
  103. // Create a new action run based on the schedule
  104. run := &actions_model.ActionRun{
  105. Title: cron.Title,
  106. RepoID: cron.RepoID,
  107. OwnerID: cron.OwnerID,
  108. WorkflowID: cron.WorkflowID,
  109. TriggerUserID: cron.TriggerUserID,
  110. Ref: cron.Ref,
  111. CommitSHA: cron.CommitSHA,
  112. Event: cron.Event,
  113. EventPayload: cron.EventPayload,
  114. TriggerEvent: string(webhook_module.HookEventSchedule),
  115. ScheduleID: cron.ID,
  116. Status: actions_model.StatusWaiting,
  117. }
  118. vars, err := actions_model.GetVariablesOfRun(ctx, run)
  119. if err != nil {
  120. log.Error("GetVariablesOfRun: %v", err)
  121. return err
  122. }
  123. // Parse the workflow specification from the cron schedule
  124. workflows, err := jobparser.Parse(cron.Content, jobparser.WithVars(vars))
  125. if err != nil {
  126. return err
  127. }
  128. // Insert the action run and its associated jobs into the database
  129. if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
  130. return err
  131. }
  132. allJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
  133. if err != nil {
  134. log.Error("FindRunJobs: %v", err)
  135. }
  136. err = run.LoadAttributes(ctx)
  137. if err != nil {
  138. log.Error("LoadAttributes: %v", err)
  139. }
  140. notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
  141. for _, job := range allJobs {
  142. notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil)
  143. }
  144. // Return nil if no errors occurred
  145. return nil
  146. }