gitea源码


  1. // Copyright 2016 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package code
  4. import (
  5. "context"
  6. "os"
  7. "runtime/pprof"
  8. "slices"
  9. "sync/atomic"
  10. "time"
  11. "code.gitea.io/gitea/models/db"
  12. repo_model "code.gitea.io/gitea/models/repo"
  13. "code.gitea.io/gitea/modules/graceful"
  14. "code.gitea.io/gitea/modules/indexer"
  15. "code.gitea.io/gitea/modules/indexer/code/bleve"
  16. "code.gitea.io/gitea/modules/indexer/code/elasticsearch"
  17. "code.gitea.io/gitea/modules/indexer/code/internal"
  18. "code.gitea.io/gitea/modules/log"
  19. "code.gitea.io/gitea/modules/process"
  20. "code.gitea.io/gitea/modules/queue"
  21. "code.gitea.io/gitea/modules/setting"
  22. )
  23. var (
  24. indexerQueue *queue.WorkerPoolQueue[*internal.IndexerData]
  25. // globalIndexer is the global indexer, it cannot be nil.
  26. // When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready.
  27. // So it's always safe use it as *globalIndexer.Load() and call its methods.
  28. globalIndexer atomic.Pointer[internal.Indexer]
  29. )
  30. func init() {
  31. dummyIndexer := internal.NewDummyIndexer()
  32. globalIndexer.Store(&dummyIndexer)
  33. }
  34. func index(ctx context.Context, indexer internal.Indexer, repoID int64) error {
  35. repo, err := repo_model.GetRepositoryByID(ctx, repoID)
  36. if repo_model.IsErrRepoNotExist(err) {
  37. return indexer.Delete(ctx, repoID)
  38. }
  39. if err != nil {
  40. return err
  41. }
  42. repoTypes := setting.Indexer.RepoIndexerRepoTypes
  43. if len(repoTypes) == 0 {
  44. repoTypes = []string{"sources"}
  45. }
  46. // skip forks from being indexed if unit is not present
  47. if !slices.Contains(repoTypes, "forks") && repo.IsFork {
  48. return nil
  49. }
  50. // skip mirrors from being indexed if unit is not present
  51. if !slices.Contains(repoTypes, "mirrors") && repo.IsMirror {
  52. return nil
  53. }
  54. // skip templates from being indexed if unit is not present
  55. if !slices.Contains(repoTypes, "templates") && repo.IsTemplate {
  56. return nil
  57. }
  58. // skip regular repos from being indexed if unit is not present
  59. if !slices.Contains(repoTypes, "sources") && !repo.IsFork && !repo.IsMirror && !repo.IsTemplate {
  60. return nil
  61. }
  62. sha, err := getDefaultBranchSha(ctx, repo)
  63. if err != nil {
  64. return err
  65. }
  66. changes, err := getRepoChanges(ctx, repo, sha)
  67. if err != nil {
  68. return err
  69. } else if changes == nil {
  70. return nil
  71. }
  72. if err := indexer.Index(ctx, repo, sha, changes); err != nil {
  73. return err
  74. }
  75. return repo_model.UpdateIndexerStatus(ctx, repo, repo_model.RepoIndexerTypeCode, sha)
  76. }
  77. // Init initialize the repo indexer
  78. func Init() {
  79. if !setting.Indexer.RepoIndexerEnabled {
  80. (*globalIndexer.Load()).Close()
  81. return
  82. }
  83. ctx, cancel, finished := process.GetManager().AddTypedContext(context.Background(), "Service: CodeIndexer", process.SystemProcessType, false)
  84. graceful.GetManager().RunAtTerminate(func() {
  85. select {
  86. case <-ctx.Done():
  87. return
  88. default:
  89. }
  90. cancel()
  91. log.Debug("Closing repository indexer")
  92. (*globalIndexer.Load()).Close()
  93. log.Info("PID: %d Repository Indexer closed", os.Getpid())
  94. finished()
  95. })
  96. waitChannel := make(chan time.Duration, 1)
  97. // Create the Queue
  98. switch setting.Indexer.RepoType {
  99. case "bleve", "elasticsearch":
  100. handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
  101. indexer := *globalIndexer.Load()
  102. for _, indexerData := range items {
  103. log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
  104. if err := index(ctx, indexer, indexerData.RepoID); err != nil {
  105. if !setting.IsInTesting {
  106. log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
  107. }
  108. }
  109. }
  110. return nil // do not re-queue the failed items, otherwise some broken repo will block the queue
  111. }
  112. indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler)
  113. if indexerQueue == nil {
  114. log.Fatal("Unable to create codes indexer queue")
  115. }
  116. default:
  117. log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType)
  118. }
  119. go func() {
  120. pprof.SetGoroutineLabels(ctx)
  121. start := time.Now()
  122. var (
  123. rIndexer internal.Indexer
  124. existed bool
  125. err error
  126. )
  127. switch setting.Indexer.RepoType {
  128. case "bleve":
  129. log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoPath)
  130. defer func() {
  131. if err := recover(); err != nil {
  132. log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
  133. log.Error("The indexer files are likely corrupted and may need to be deleted")
  134. log.Error("You can completely remove the \"%s\" directory to make Gitea recreate the indexes", setting.Indexer.RepoPath)
  135. }
  136. }()
  137. rIndexer = bleve.NewIndexer(setting.Indexer.RepoPath)
  138. existed, err = rIndexer.Init(ctx)
  139. if err != nil {
  140. cancel()
  141. (*globalIndexer.Load()).Close()
  142. close(waitChannel)
  143. log.Fatal("PID: %d Unable to initialize the bleve Repository Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.RepoPath, err)
  144. }
  145. case "elasticsearch":
  146. log.Info("PID: %d Initializing Repository Indexer at: %s", os.Getpid(), setting.Indexer.RepoConnStr)
  147. defer func() {
  148. if err := recover(); err != nil {
  149. log.Error("PANIC whilst initializing repository indexer: %v\nStacktrace: %s", err, log.Stack(2))
  150. log.Error("The indexer files are likely corrupted and may need to be deleted")
  151. log.Error("You can completely remove the \"%s\" index to make Gitea recreate the indexes", setting.Indexer.RepoConnStr)
  152. }
  153. }()
  154. rIndexer = elasticsearch.NewIndexer(setting.Indexer.RepoConnStr, setting.Indexer.RepoIndexerName)
  155. existed, err = rIndexer.Init(ctx)
  156. if err != nil {
  157. cancel()
  158. (*globalIndexer.Load()).Close()
  159. close(waitChannel)
  160. log.Fatal("PID: %d Unable to initialize the elasticsearch Repository Indexer connstr: %s Error: %v", os.Getpid(), setting.Indexer.RepoConnStr, err)
  161. }
  162. default:
  163. log.Fatal("PID: %d Unknown Indexer type: %s", os.Getpid(), setting.Indexer.RepoType)
  164. }
  165. globalIndexer.Store(&rIndexer)
  166. // Start processing the queue
  167. go graceful.GetManager().RunWithCancel(indexerQueue)
  168. if !existed { // populate the index because it's created for the first time
  169. go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)
  170. }
  171. select {
  172. case waitChannel <- time.Since(start):
  173. case <-graceful.GetManager().IsShutdown():
  174. }
  175. close(waitChannel)
  176. }()
  177. if setting.Indexer.StartupTimeout > 0 {
  178. go func() {
  179. pprof.SetGoroutineLabels(ctx)
  180. timeout := setting.Indexer.StartupTimeout
  181. if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
  182. timeout += setting.GracefulHammerTime
  183. }
  184. select {
  185. case <-graceful.GetManager().IsShutdown():
  186. log.Warn("Shutdown before Repository Indexer completed initialization")
  187. cancel()
  188. (*globalIndexer.Load()).Close()
  189. case duration, ok := <-waitChannel:
  190. if !ok {
  191. log.Warn("Repository Indexer Initialization failed")
  192. cancel()
  193. (*globalIndexer.Load()).Close()
  194. return
  195. }
  196. log.Info("Repository Indexer Initialization took %v", duration)
  197. case <-time.After(timeout):
  198. cancel()
  199. (*globalIndexer.Load()).Close()
  200. log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout)
  201. }
  202. }()
  203. }
  204. }
  205. // UpdateRepoIndexer update a repository's entries in the indexer
  206. func UpdateRepoIndexer(repo *repo_model.Repository) {
  207. indexData := &internal.IndexerData{RepoID: repo.ID}
  208. if err := indexerQueue.Push(indexData); err != nil {
  209. log.Error("Update repo index data %v failed: %v", indexData, err)
  210. }
  211. }
  212. // IsAvailable checks if issue indexer is available
  213. func IsAvailable(ctx context.Context) bool {
  214. return (*globalIndexer.Load()).Ping(ctx) == nil
  215. }
  216. // populateRepoIndexer populate the repo indexer with pre-existing data. This
  217. // should only be run when the indexer is created for the first time.
  218. func populateRepoIndexer(ctx context.Context) {
  219. log.Info("Populating the repo indexer with existing repositories")
  220. exist, err := db.IsTableNotEmpty("repository")
  221. if err != nil {
  222. log.Fatal("System error: %v", err)
  223. } else if !exist {
  224. return
  225. }
  226. // if there is any existing repo indexer metadata in the DB, delete it
  227. // since we are starting afresh. Also, xorm requires deletes to have a
  228. // condition, and we want to delete everything, thus 1=1.
  229. if err := db.DeleteAllRecords("repo_indexer_status"); err != nil {
  230. log.Fatal("System error: %v", err)
  231. }
  232. var maxRepoID int64
  233. if maxRepoID, err = db.GetMaxID("repository"); err != nil {
  234. log.Fatal("System error: %v", err)
  235. }
  236. // start with the maximum existing repo ID and work backwards, so that we
  237. // don't include repos that are created after gitea starts; such repos will
  238. // already be added to the indexer, and we don't need to add them again.
  239. for maxRepoID > 0 {
  240. select {
  241. case <-ctx.Done():
  242. log.Info("Repository Indexer population shutdown before completion")
  243. return
  244. default:
  245. }
  246. ids, err := repo_model.GetUnindexedRepos(ctx, repo_model.RepoIndexerTypeCode, maxRepoID, 0, 50)
  247. if err != nil {
  248. log.Error("populateRepoIndexer: %v", err)
  249. return
  250. } else if len(ids) == 0 {
  251. break
  252. }
  253. for _, id := range ids {
  254. select {
  255. case <-ctx.Done():
  256. log.Info("Repository Indexer population shutdown before completion")
  257. return
  258. default:
  259. }
  260. if err := indexerQueue.Push(&internal.IndexerData{RepoID: id}); err != nil {
  261. log.Error("indexerQueue.Push: %v", err)
  262. return
  263. }
  264. maxRepoID = id - 1
  265. }
  266. }
  267. log.Info("Done (re)populating the repo indexer with existing repositories")
  268. }
  269. func SupportedSearchModes() []indexer.SearchMode {
  270. gi := globalIndexer.Load()
  271. if gi == nil {
  272. return nil
  273. }
  274. return (*gi).SupportedSearchModes()
  275. }