gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "errors"
  7. "maps"
  8. "sync"
  9. "time"
  10. "code.gitea.io/gitea/modules/log"
  11. "code.gitea.io/gitea/modules/setting"
  12. )
  13. // Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
  14. type Manager struct {
  15. mu sync.Mutex
  16. qidCounter int64
  17. Queues map[int64]ManagedWorkerPoolQueue
  18. }
  19. type ManagedWorkerPoolQueue interface {
  20. GetName() string
  21. GetType() string
  22. GetItemTypeName() string
  23. GetWorkerNumber() int
  24. GetWorkerActiveNumber() int
  25. GetWorkerMaxNumber() int
  26. SetWorkerMaxNumber(num int)
  27. GetQueueItemNumber() int
  28. // FlushWithContext tries to make the handler process all items in the queue synchronously.
  29. // It is for testing purpose only. It's not designed to be used in a cluster.
  30. // Negative timeout means discarding all items in the queue.
  31. FlushWithContext(ctx context.Context, timeout time.Duration) error
  32. // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
  33. RemoveAllItems(ctx context.Context) error
  34. }
  35. var manager *Manager
  36. func init() {
  37. manager = &Manager{
  38. Queues: make(map[int64]ManagedWorkerPoolQueue),
  39. }
  40. }
  41. func GetManager() *Manager {
  42. return manager
  43. }
  44. func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
  45. m.mu.Lock()
  46. defer m.mu.Unlock()
  47. m.qidCounter++
  48. m.Queues[m.qidCounter] = managed
  49. }
  50. func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
  51. m.mu.Lock()
  52. defer m.mu.Unlock()
  53. return m.Queues[qid]
  54. }
  55. func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
  56. m.mu.Lock()
  57. defer m.mu.Unlock()
  58. queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
  59. maps.Copy(queues, m.Queues)
  60. return queues
  61. }
  62. // FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
  63. // It is for testing purpose only. It's not designed to be used in a cluster.
  64. // Negative timeout means discarding all items in the queue.
  65. func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
  66. var finalErrors []error
  67. qs := m.ManagedQueues()
  68. for _, q := range qs {
  69. if err := q.FlushWithContext(ctx, timeout); err != nil {
  70. finalErrors = append(finalErrors, err)
  71. }
  72. }
  73. return errors.Join(finalErrors...)
  74. }
  75. // CreateSimpleQueue creates a simple queue from global setting config provider by name
  76. func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
  77. return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
  78. }
  79. // CreateUniqueQueue creates a unique queue from global setting config provider by name
  80. func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
  81. return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
  82. }
  83. func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
  84. queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
  85. if err != nil {
  86. log.Error("Failed to get queue settings for %q: %v", name, err)
  87. return nil
  88. }
  89. w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
  90. if err != nil {
  91. log.Error("Failed to create queue %q: %v", name, err)
  92. return nil
  93. }
  94. GetManager().AddManagedQueue(w)
  95. return w
  96. }