gitea源码

workerqueue.go 7.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "context"
  6. "fmt"
  7. "runtime/pprof"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "code.gitea.io/gitea/modules/json"
  12. "code.gitea.io/gitea/modules/log"
  13. "code.gitea.io/gitea/modules/process"
  14. "code.gitea.io/gitea/modules/setting"
  15. )
  16. // WorkerPoolQueue is a queue that uses a pool of workers to process items
  17. // It can use different underlying (base) queue types
  18. type WorkerPoolQueue[T any] struct {
  19. ctxRun context.Context
  20. ctxRunCancel context.CancelFunc
  21. shutdownDone chan struct{}
  22. shutdownTimeout atomic.Int64 // in case some buggy handlers (workers) would hang forever, "shutdown" should finish in predictable time
  23. origHandler HandlerFuncT[T]
  24. safeHandler HandlerFuncT[T]
  25. baseQueueType string
  26. baseConfig *BaseConfig
  27. baseQueue baseQueue
  28. batchChan chan []T
  29. flushChan chan flushType
  30. isFlushing atomic.Bool
  31. batchLength int
  32. workerNum int
  33. workerMaxNum int
  34. workerActiveNum int
  35. workerNumMu sync.Mutex
  36. }
  37. type flushType struct {
  38. timeout time.Duration
  39. c chan struct{}
  40. }
  41. var _ ManagedWorkerPoolQueue = (*WorkerPoolQueue[any])(nil)
  42. func (q *WorkerPoolQueue[T]) GetName() string {
  43. return q.baseConfig.ManagedName
  44. }
  45. func (q *WorkerPoolQueue[T]) GetType() string {
  46. return q.baseQueueType
  47. }
  48. func (q *WorkerPoolQueue[T]) GetItemTypeName() string {
  49. var t T
  50. return fmt.Sprintf("%T", t)
  51. }
  52. func (q *WorkerPoolQueue[T]) GetWorkerNumber() int {
  53. q.workerNumMu.Lock()
  54. defer q.workerNumMu.Unlock()
  55. return q.workerNum
  56. }
  57. func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int {
  58. q.workerNumMu.Lock()
  59. defer q.workerNumMu.Unlock()
  60. return q.workerActiveNum
  61. }
  62. func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int {
  63. q.workerNumMu.Lock()
  64. defer q.workerNumMu.Unlock()
  65. return q.workerMaxNum
  66. }
  67. func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int) {
  68. q.workerNumMu.Lock()
  69. defer q.workerNumMu.Unlock()
  70. q.workerMaxNum = num
  71. }
  72. func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int {
  73. cnt, err := q.baseQueue.Len(q.ctxRun)
  74. if err != nil {
  75. log.Error("Failed to get number of items in queue %q: %v", q.GetName(), err)
  76. }
  77. return cnt
  78. }
  79. func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.Duration) (err error) {
  80. if q.isBaseQueueDummy() {
  81. return nil
  82. }
  83. log.Debug("Try to flush queue %q with timeout %v", q.GetName(), timeout)
  84. defer log.Debug("Finish flushing queue %q, err: %v", q.GetName(), err)
  85. var after <-chan time.Time
  86. after = infiniteTimerC
  87. if timeout > 0 {
  88. after = time.After(timeout)
  89. }
  90. flush := flushType{timeout: timeout, c: make(chan struct{})}
  91. // send flush request
  92. // if it blocks, it means that there is a flush in progress or the queue hasn't been started yet
  93. select {
  94. case q.flushChan <- flush:
  95. case <-ctx.Done():
  96. return ctx.Err()
  97. case <-q.ctxRun.Done():
  98. return q.ctxRun.Err()
  99. case <-after:
  100. return context.DeadlineExceeded
  101. }
  102. // wait for flush to finish
  103. select {
  104. case <-flush.c:
  105. return nil
  106. case <-ctx.Done():
  107. return ctx.Err()
  108. case <-q.ctxRun.Done():
  109. return q.ctxRun.Err()
  110. case <-after:
  111. return context.DeadlineExceeded
  112. }
  113. }
  114. // RemoveAllItems removes all items in the baes queue
  115. func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
  116. return q.baseQueue.RemoveAll(ctx)
  117. }
  118. func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
  119. bs, err := json.Marshal(data)
  120. if err != nil {
  121. log.Error("Failed to marshal item for queue %q: %v", q.GetName(), err)
  122. return nil
  123. }
  124. return bs
  125. }
  126. func (q *WorkerPoolQueue[T]) unmarshal(data []byte) (t T, ok bool) {
  127. if err := json.Unmarshal(data, &t); err != nil {
  128. log.Error("Failed to unmarshal item from queue %q: %v", q.GetName(), err)
  129. return t, false
  130. }
  131. return t, true
  132. }
  133. func (q *WorkerPoolQueue[T]) isBaseQueueDummy() bool {
  134. _, isDummy := q.baseQueue.(*baseDummy)
  135. return isDummy
  136. }
  137. // Push adds an item to the queue, it may block for a while and then returns an error if the queue is full
  138. func (q *WorkerPoolQueue[T]) Push(data T) error {
  139. if q.isBaseQueueDummy() && q.safeHandler != nil {
  140. // FIXME: the "immediate" queue is only for testing, but it really causes problems because its behavior is different from a real queue.
  141. // Even if tests pass, it doesn't mean that there is no bug in code.
  142. if data, ok := q.unmarshal(q.marshal(data)); ok {
  143. q.safeHandler(data)
  144. }
  145. }
  146. return q.baseQueue.PushItem(q.ctxRun, q.marshal(data))
  147. }
  148. // Has only works for unique queues. Keep in mind that this check may not be reliable (due to lacking of proper transaction support)
  149. // There could be a small chance that duplicate items appear in the queue
  150. func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) {
  151. return q.baseQueue.HasItem(q.ctxRun, q.marshal(data))
  152. }
  153. func (q *WorkerPoolQueue[T]) Run() {
  154. q.doRun()
  155. }
  156. func (q *WorkerPoolQueue[T]) Cancel() {
  157. q.ctxRunCancel()
  158. }
  159. // ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue
  160. // It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed
  161. func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
  162. q.shutdownTimeout.Store(int64(timeout))
  163. q.ctxRunCancel()
  164. <-q.shutdownDone
  165. }
  166. func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQueue, error)) {
  167. switch t {
  168. case "dummy", "immediate":
  169. return t, newBaseDummy
  170. case "channel":
  171. return t, newBaseChannelGeneric
  172. case "redis":
  173. return t, newBaseRedisGeneric
  174. default: // level(leveldb,levelqueue,persistable-channel)
  175. return "level", newBaseLevelQueueGeneric
  176. }
  177. }
  178. func newWorkerPoolQueueForTest[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
  179. return NewWorkerPoolQueueWithContext(context.Background(), name, queueSetting, handler, unique)
  180. }
  181. func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
  182. if handler == nil {
  183. log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
  184. queueSetting.Type = "dummy"
  185. }
  186. var w WorkerPoolQueue[T]
  187. var err error
  188. queueType, newQueueFn := getNewQueueFn(queueSetting.Type)
  189. w.baseQueueType = queueType
  190. w.baseConfig = toBaseConfig(name, queueSetting)
  191. w.baseQueue, err = newQueueFn(w.baseConfig, unique)
  192. if err != nil {
  193. return nil, err
  194. }
  195. log.Trace("Created queue %q of type %q", name, queueType)
  196. w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
  197. w.batchChan = make(chan []T)
  198. w.flushChan = make(chan flushType)
  199. w.shutdownDone = make(chan struct{})
  200. w.shutdownTimeout.Store(int64(shutdownDefaultTimeout))
  201. w.workerMaxNum = queueSetting.MaxWorkers
  202. w.batchLength = queueSetting.BatchLength
  203. w.origHandler = handler
  204. w.safeHandler = func(t ...T) (unhandled []T) {
  205. defer func() {
  206. // FIXME: there is no ctx support in the handler, so process manager is unable to restore the labels
  207. // so here we explicitly set the "queue ctx" labels again after the handler is done
  208. pprof.SetGoroutineLabels(w.ctxRun)
  209. err := recover()
  210. if err != nil {
  211. log.Error("Recovered from panic in queue %q handler: %v\n%s", name, err, log.Stack(2))
  212. }
  213. }()
  214. if w.origHandler != nil {
  215. return w.origHandler(t...)
  216. }
  217. return nil
  218. }
  219. return &w, nil
  220. }