gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package queue
  4. import (
  5. "path/filepath"
  6. "testing"
  7. "code.gitea.io/gitea/modules/setting"
  8. "github.com/stretchr/testify/assert"
  9. )
  10. func TestManager(t *testing.T) {
  11. oldAppDataPath := setting.AppDataPath
  12. setting.AppDataPath = t.TempDir()
  13. defer func() {
  14. setting.AppDataPath = oldAppDataPath
  15. }()
  16. newQueueFromConfig := func(name, cfg string) (*WorkerPoolQueue[int], error) {
  17. cfgProvider, err := setting.NewConfigProviderFromData(cfg)
  18. if err != nil {
  19. return nil, err
  20. }
  21. qs, err := setting.GetQueueSettings(cfgProvider, name)
  22. if err != nil {
  23. return nil, err
  24. }
  25. return newWorkerPoolQueueForTest(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
  26. }
  27. // test invalid CONN_STR
  28. _, err := newQueueFromConfig("default", `
  29. [queue]
  30. DATADIR = temp-dir
  31. CONN_STR = redis://
  32. `)
  33. assert.ErrorContains(t, err, "invalid leveldb connection string")
  34. // test default config
  35. q, err := newQueueFromConfig("default", "")
  36. assert.NoError(t, err)
  37. assert.Equal(t, "default", q.GetName())
  38. assert.Equal(t, "level", q.GetType())
  39. assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/common"), q.baseConfig.DataFullDir)
  40. assert.Equal(t, 100000, q.baseConfig.Length)
  41. assert.Equal(t, 20, q.batchLength)
  42. assert.Empty(t, q.baseConfig.ConnStr)
  43. assert.Equal(t, "default_queue", q.baseConfig.QueueFullName)
  44. assert.Equal(t, "default_queue_unique", q.baseConfig.SetFullName)
  45. assert.NotZero(t, q.GetWorkerMaxNumber())
  46. assert.Equal(t, 0, q.GetWorkerNumber())
  47. assert.Equal(t, 0, q.GetWorkerActiveNumber())
  48. assert.Equal(t, 0, q.GetQueueItemNumber())
  49. assert.Equal(t, "int", q.GetItemTypeName())
  50. // test inherited config
  51. cfgProvider, err := setting.NewConfigProviderFromData(`
  52. [queue]
  53. TYPE = channel
  54. DATADIR = queues/dir1
  55. LENGTH = 100
  56. BATCH_LENGTH = 20
  57. CONN_STR = "addrs=127.0.0.1:6379 db=0"
  58. QUEUE_NAME = _queue1
  59. [queue.sub]
  60. TYPE = level
  61. DATADIR = queues/dir2
  62. LENGTH = 102
  63. BATCH_LENGTH = 22
  64. CONN_STR =
  65. QUEUE_NAME = _q2
  66. SET_NAME = _u2
  67. MAX_WORKERS = 123
  68. `)
  69. assert.NoError(t, err)
  70. q1 := createWorkerPoolQueue[string](t.Context(), "no-such", cfgProvider, nil, false)
  71. assert.Equal(t, "no-such", q1.GetName())
  72. assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
  73. assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir)
  74. assert.Equal(t, 100, q1.baseConfig.Length)
  75. assert.Equal(t, 20, q1.batchLength)
  76. assert.Equal(t, "addrs=127.0.0.1:6379 db=0", q1.baseConfig.ConnStr)
  77. assert.Equal(t, "no-such_queue1", q1.baseConfig.QueueFullName)
  78. assert.Equal(t, "no-such_queue1_unique", q1.baseConfig.SetFullName)
  79. assert.NotZero(t, q1.GetWorkerMaxNumber())
  80. assert.Equal(t, 0, q1.GetWorkerNumber())
  81. assert.Equal(t, 0, q1.GetWorkerActiveNumber())
  82. assert.Equal(t, 0, q1.GetQueueItemNumber())
  83. assert.Equal(t, "string", q1.GetItemTypeName())
  84. qid1 := GetManager().qidCounter
  85. q2 := createWorkerPoolQueue(t.Context(), "sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
  86. assert.Equal(t, "sub", q2.GetName())
  87. assert.Equal(t, "level", q2.GetType())
  88. assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir)
  89. assert.Equal(t, 102, q2.baseConfig.Length)
  90. assert.Equal(t, 22, q2.batchLength)
  91. assert.Empty(t, q2.baseConfig.ConnStr)
  92. assert.Equal(t, "sub_q2", q2.baseConfig.QueueFullName)
  93. assert.Equal(t, "sub_q2_u2", q2.baseConfig.SetFullName)
  94. assert.Equal(t, 123, q2.GetWorkerMaxNumber())
  95. assert.Equal(t, 0, q2.GetWorkerNumber())
  96. assert.Equal(t, 0, q2.GetWorkerActiveNumber())
  97. assert.Equal(t, 0, q2.GetQueueItemNumber())
  98. assert.Equal(t, "int", q2.GetItemTypeName())
  99. qid2 := GetManager().qidCounter
  100. assert.Equal(t, q1, GetManager().ManagedQueues()[qid1])
  101. GetManager().GetManagedQueue(qid1).SetWorkerMaxNumber(120)
  102. assert.Equal(t, 120, q1.workerMaxNum)
  103. stop := runWorkerPoolQueue(q2)
  104. assert.NoError(t, GetManager().GetManagedQueue(qid2).FlushWithContext(t.Context(), 0))
  105. assert.NoError(t, GetManager().FlushAll(t.Context(), 0))
  106. stop()
  107. }