gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. // Copyright 2019 Gitea. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package task
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. admin_model "code.gitea.io/gitea/models/admin"
  9. repo_model "code.gitea.io/gitea/models/repo"
  10. user_model "code.gitea.io/gitea/models/user"
  11. "code.gitea.io/gitea/modules/graceful"
  12. "code.gitea.io/gitea/modules/json"
  13. "code.gitea.io/gitea/modules/log"
  14. base "code.gitea.io/gitea/modules/migration"
  15. "code.gitea.io/gitea/modules/queue"
  16. "code.gitea.io/gitea/modules/secret"
  17. "code.gitea.io/gitea/modules/setting"
  18. "code.gitea.io/gitea/modules/structs"
  19. "code.gitea.io/gitea/modules/timeutil"
  20. "code.gitea.io/gitea/modules/util"
  21. repo_service "code.gitea.io/gitea/services/repository"
  22. )
  23. // taskQueue is a global queue of tasks
  24. var taskQueue *queue.WorkerPoolQueue[*admin_model.Task]
  25. // Run a task
  26. func Run(ctx context.Context, t *admin_model.Task) error {
  27. switch t.Type {
  28. case structs.TaskTypeMigrateRepo:
  29. return runMigrateTask(ctx, t)
  30. default:
  31. return fmt.Errorf("Unknown task type: %d", t.Type)
  32. }
  33. }
  34. // Init will start the service to get all unfinished tasks and run them
  35. func Init() error {
  36. taskQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "task", handler)
  37. if taskQueue == nil {
  38. return errors.New("unable to create task queue")
  39. }
  40. go graceful.GetManager().RunWithCancel(taskQueue)
  41. return nil
  42. }
  43. func handler(items ...*admin_model.Task) []*admin_model.Task {
  44. for _, task := range items {
  45. if err := Run(graceful.GetManager().ShutdownContext(), task); err != nil {
  46. log.Error("Run task failed: %v", err)
  47. }
  48. }
  49. return nil
  50. }
  51. // MigrateRepository add migration repository to task
  52. func MigrateRepository(ctx context.Context, doer, u *user_model.User, opts base.MigrateOptions) error {
  53. task, err := CreateMigrateTask(ctx, doer, u, opts)
  54. if err != nil {
  55. return err
  56. }
  57. return taskQueue.Push(task)
  58. }
  59. // CreateMigrateTask creates a migrate task
  60. func CreateMigrateTask(ctx context.Context, doer, u *user_model.User, opts base.MigrateOptions) (*admin_model.Task, error) {
  61. // encrypt credentials for persistence
  62. var err error
  63. opts.CloneAddrEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.CloneAddr)
  64. if err != nil {
  65. return nil, err
  66. }
  67. opts.CloneAddr = util.SanitizeCredentialURLs(opts.CloneAddr)
  68. opts.AuthPasswordEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthPassword)
  69. if err != nil {
  70. return nil, err
  71. }
  72. opts.AuthPassword = ""
  73. opts.AuthTokenEncrypted, err = secret.EncryptSecret(setting.SecretKey, opts.AuthToken)
  74. if err != nil {
  75. return nil, err
  76. }
  77. opts.AuthToken = ""
  78. bs, err := json.Marshal(&opts)
  79. if err != nil {
  80. return nil, err
  81. }
  82. task := &admin_model.Task{
  83. DoerID: doer.ID,
  84. OwnerID: u.ID,
  85. Type: structs.TaskTypeMigrateRepo,
  86. Status: structs.TaskStatusQueued,
  87. PayloadContent: string(bs),
  88. }
  89. if err := admin_model.CreateTask(ctx, task); err != nil {
  90. return nil, err
  91. }
  92. repo, err := repo_service.CreateRepositoryDirectly(ctx, doer, u, repo_service.CreateRepoOptions{
  93. Name: opts.RepoName,
  94. Description: opts.Description,
  95. OriginalURL: opts.OriginalURL,
  96. GitServiceType: opts.GitServiceType,
  97. IsPrivate: opts.Private || setting.Repository.ForcePrivate,
  98. IsMirror: opts.Mirror,
  99. Status: repo_model.RepositoryBeingMigrated,
  100. }, false)
  101. if err != nil {
  102. task.EndTime = timeutil.TimeStampNow()
  103. task.Status = structs.TaskStatusFailed
  104. err2 := task.UpdateCols(ctx, "end_time", "status")
  105. if err2 != nil {
  106. log.Error("UpdateCols Failed: %v", err2.Error())
  107. }
  108. return nil, err
  109. }
  110. task.RepoID = repo.ID
  111. if err = task.UpdateCols(ctx, "repo_id"); err != nil {
  112. return nil, err
  113. }
  114. return task, nil
  115. }
  116. // RetryMigrateTask retry a migrate task
  117. func RetryMigrateTask(ctx context.Context, repoID int64) error {
  118. migratingTask, err := admin_model.GetMigratingTask(ctx, repoID)
  119. if err != nil {
  120. log.Error("GetMigratingTask: %v", err)
  121. return err
  122. }
  123. if migratingTask.Status == structs.TaskStatusQueued || migratingTask.Status == structs.TaskStatusRunning {
  124. return nil
  125. }
  126. // TODO Need to removing the storage/database garbage brought by the failed task
  127. // Reset task status and messages
  128. migratingTask.Status = structs.TaskStatusQueued
  129. migratingTask.Message = ""
  130. if err = migratingTask.UpdateCols(ctx, "status", "message"); err != nil {
  131. log.Error("task.UpdateCols failed: %v", err)
  132. return err
  133. }
  134. return taskQueue.Push(migratingTask)
  135. }