gitea源码

archiver.go 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package archiver
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "os"
  11. "strings"
  12. "time"
  13. "code.gitea.io/gitea/models/db"
  14. repo_model "code.gitea.io/gitea/models/repo"
  15. "code.gitea.io/gitea/modules/git"
  16. "code.gitea.io/gitea/modules/gitrepo"
  17. "code.gitea.io/gitea/modules/graceful"
  18. "code.gitea.io/gitea/modules/httplib"
  19. "code.gitea.io/gitea/modules/log"
  20. "code.gitea.io/gitea/modules/process"
  21. "code.gitea.io/gitea/modules/queue"
  22. "code.gitea.io/gitea/modules/setting"
  23. "code.gitea.io/gitea/modules/storage"
  24. gitea_context "code.gitea.io/gitea/services/context"
  25. )
  26. // ArchiveRequest defines the parameters of an archive request, which notably
  27. // includes the specific repository being archived as well as the commit, the
  28. // name by which it was requested, and the kind of archive being requested.
  29. // This is entirely opaque to external entities, though, and mostly used as a
  30. // handle elsewhere.
  31. type ArchiveRequest struct {
  32. RepoID int64
  33. Type git.ArchiveType
  34. CommitID string
  35. archiveRefShortName string // the ref short name to download the archive, for example: "master", "v1.0.0", "commit id"
  36. }
  37. // ErrUnknownArchiveFormat request archive format is not supported
  38. type ErrUnknownArchiveFormat struct {
  39. RequestNameType string
  40. }
  41. // Error implements error
  42. func (err ErrUnknownArchiveFormat) Error() string {
  43. return "unknown format: " + err.RequestNameType
  44. }
  45. // Is implements error
  46. func (ErrUnknownArchiveFormat) Is(err error) bool {
  47. _, ok := err.(ErrUnknownArchiveFormat)
  48. return ok
  49. }
  50. // RepoRefNotFoundError is returned when a requested reference (commit, tag) was not found.
  51. type RepoRefNotFoundError struct {
  52. RefShortName string
  53. }
  54. // Error implements error.
  55. func (e RepoRefNotFoundError) Error() string {
  56. return "unrecognized repository reference: " + e.RefShortName
  57. }
  58. func (e RepoRefNotFoundError) Is(err error) bool {
  59. _, ok := err.(RepoRefNotFoundError)
  60. return ok
  61. }
  62. // NewRequest creates an archival request, based on the URI. The
  63. // resulting ArchiveRequest is suitable for being passed to Await()
  64. // if it's determined that the request still needs to be satisfied.
  65. func NewRequest(repoID int64, repo *git.Repository, archiveRefExt string) (*ArchiveRequest, error) {
  66. // here the archiveRefShortName is not a clear ref, it could be a tag, branch or commit id
  67. archiveRefShortName, archiveType := git.SplitArchiveNameType(archiveRefExt)
  68. if archiveType == git.ArchiveUnknown {
  69. return nil, ErrUnknownArchiveFormat{archiveRefExt}
  70. }
  71. // Get corresponding commit.
  72. commitID, err := repo.ConvertToGitID(archiveRefShortName)
  73. if err != nil {
  74. return nil, RepoRefNotFoundError{RefShortName: archiveRefShortName}
  75. }
  76. r := &ArchiveRequest{RepoID: repoID, archiveRefShortName: archiveRefShortName, Type: archiveType}
  77. r.CommitID = commitID.String()
  78. return r, nil
  79. }
  80. // GetArchiveName returns the name of the caller, based on the ref used by the
  81. // caller to create this request.
  82. func (aReq *ArchiveRequest) GetArchiveName() string {
  83. return strings.ReplaceAll(aReq.archiveRefShortName, "/", "-") + "." + aReq.Type.String()
  84. }
  85. // Await awaits the completion of an ArchiveRequest. If the archive has
  86. // already been prepared the method returns immediately. Otherwise, an archiver
  87. // process will be started and its completion awaited. On success the returned
  88. // RepoArchiver may be used to download the archive. Note that even if the
  89. // context is cancelled/times out a started archiver will still continue to run
  90. // in the background.
  91. func (aReq *ArchiveRequest) Await(ctx context.Context) (*repo_model.RepoArchiver, error) {
  92. archiver, err := repo_model.GetRepoArchiver(ctx, aReq.RepoID, aReq.Type, aReq.CommitID)
  93. if err != nil {
  94. return nil, fmt.Errorf("models.GetRepoArchiver: %w", err)
  95. }
  96. if archiver != nil && archiver.Status == repo_model.ArchiverReady {
  97. // Archive already generated, we're done.
  98. return archiver, nil
  99. }
  100. if err := StartArchive(aReq); err != nil {
  101. return nil, fmt.Errorf("archiver.StartArchive: %w", err)
  102. }
  103. poll := time.NewTicker(time.Second * 1)
  104. defer poll.Stop()
  105. for {
  106. select {
  107. case <-graceful.GetManager().HammerContext().Done():
  108. // System stopped.
  109. return nil, graceful.GetManager().HammerContext().Err()
  110. case <-ctx.Done():
  111. return nil, ctx.Err()
  112. case <-poll.C:
  113. archiver, err = repo_model.GetRepoArchiver(ctx, aReq.RepoID, aReq.Type, aReq.CommitID)
  114. if err != nil {
  115. return nil, fmt.Errorf("repo_model.GetRepoArchiver: %w", err)
  116. }
  117. if archiver != nil && archiver.Status == repo_model.ArchiverReady {
  118. return archiver, nil
  119. }
  120. }
  121. }
  122. }
  123. // Stream satisfies the ArchiveRequest being passed in. Processing
  124. // will occur directly in this routine.
  125. func (aReq *ArchiveRequest) Stream(ctx context.Context, gitRepo *git.Repository, w io.Writer) error {
  126. if aReq.Type == git.ArchiveBundle {
  127. return gitRepo.CreateBundle(
  128. ctx,
  129. aReq.CommitID,
  130. w,
  131. )
  132. }
  133. return gitRepo.CreateArchive(
  134. ctx,
  135. aReq.Type,
  136. w,
  137. setting.Repository.PrefixArchiveFiles,
  138. aReq.CommitID,
  139. )
  140. }
  141. // doArchive satisfies the ArchiveRequest being passed in. Processing
  142. // will occur in a separate goroutine, as this phase may take a while to
  143. // complete. If the archive already exists, doArchive will not do
  144. // anything. In all cases, the caller should be examining the *ArchiveRequest
  145. // being returned for completion, as it may be different than the one they passed
  146. // in.
  147. func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
  148. ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
  149. defer finished()
  150. archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
  151. if err != nil {
  152. return nil, err
  153. }
  154. if archiver != nil {
  155. // FIXME: If another process are generating it, we think it's not ready and just return
  156. // Or we should wait until the archive generated.
  157. if archiver.Status == repo_model.ArchiverGenerating {
  158. return nil, nil
  159. }
  160. } else {
  161. archiver = &repo_model.RepoArchiver{
  162. RepoID: r.RepoID,
  163. Type: r.Type,
  164. CommitID: r.CommitID,
  165. Status: repo_model.ArchiverGenerating,
  166. }
  167. if err := db.Insert(ctx, archiver); err != nil {
  168. return nil, err
  169. }
  170. }
  171. rPath := archiver.RelativePath()
  172. _, err = storage.RepoArchives.Stat(rPath)
  173. if err == nil {
  174. if archiver.Status == repo_model.ArchiverGenerating {
  175. archiver.Status = repo_model.ArchiverReady
  176. if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
  177. return nil, err
  178. }
  179. }
  180. return archiver, nil
  181. }
  182. if !errors.Is(err, os.ErrNotExist) {
  183. return nil, fmt.Errorf("unable to stat archive: %w", err)
  184. }
  185. rd, w := io.Pipe()
  186. defer func() {
  187. _ = w.Close()
  188. _ = rd.Close()
  189. }()
  190. done := make(chan error, 1) // Ensure that there is some capacity which will ensure that the goroutine below can always finish
  191. repo, err := repo_model.GetRepositoryByID(ctx, archiver.RepoID)
  192. if err != nil {
  193. return nil, fmt.Errorf("archiver.LoadRepo failed: %w", err)
  194. }
  195. gitRepo, err := gitrepo.OpenRepository(ctx, repo)
  196. if err != nil {
  197. return nil, err
  198. }
  199. defer gitRepo.Close()
  200. go func(done chan error, w *io.PipeWriter, archiveReq *ArchiveRequest, gitRepo *git.Repository) {
  201. defer func() {
  202. if r := recover(); r != nil {
  203. done <- fmt.Errorf("%v", r)
  204. }
  205. }()
  206. err := archiveReq.Stream(ctx, gitRepo, w)
  207. _ = w.CloseWithError(err)
  208. done <- err
  209. }(done, w, r, gitRepo)
  210. // TODO: add lfs data to zip
  211. // TODO: add submodule data to zip
  212. if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil {
  213. return nil, fmt.Errorf("unable to write archive: %w", err)
  214. }
  215. err = <-done
  216. if err != nil {
  217. return nil, err
  218. }
  219. if archiver.Status == repo_model.ArchiverGenerating {
  220. archiver.Status = repo_model.ArchiverReady
  221. if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
  222. return nil, err
  223. }
  224. }
  225. return archiver, nil
  226. }
  227. var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
  228. // Init initializes archiver
  229. func Init(ctx context.Context) error {
  230. handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
  231. for _, archiveReq := range items {
  232. log.Trace("ArchiverData Process: %#v", archiveReq)
  233. if archiver, err := doArchive(ctx, archiveReq); err != nil {
  234. log.Error("Archive %v failed: %v", archiveReq, err)
  235. } else {
  236. log.Trace("ArchiverData Success: %#v", archiver)
  237. }
  238. }
  239. return nil
  240. }
  241. archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler)
  242. if archiverQueue == nil {
  243. return errors.New("unable to create repo-archive queue")
  244. }
  245. go graceful.GetManager().RunWithCancel(archiverQueue)
  246. return nil
  247. }
  248. // StartArchive push the archive request to the queue
  249. func StartArchive(request *ArchiveRequest) error {
  250. has, err := archiverQueue.Has(request)
  251. if err != nil {
  252. return err
  253. }
  254. if has {
  255. return nil
  256. }
  257. return archiverQueue.Push(request)
  258. }
  259. func deleteOldRepoArchiver(ctx context.Context, archiver *repo_model.RepoArchiver) error {
  260. if _, err := db.DeleteByID[repo_model.RepoArchiver](ctx, archiver.ID); err != nil {
  261. return err
  262. }
  263. p := archiver.RelativePath()
  264. if err := storage.RepoArchives.Delete(p); err != nil {
  265. log.Error("delete repo archive file failed: %v", err)
  266. }
  267. return nil
  268. }
  269. // DeleteOldRepositoryArchives deletes old repository archives.
  270. func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error {
  271. log.Trace("Doing: ArchiveCleanup")
  272. for {
  273. archivers, err := db.Find[repo_model.RepoArchiver](ctx, repo_model.FindRepoArchiversOption{
  274. ListOptions: db.ListOptions{
  275. PageSize: 100,
  276. Page: 1,
  277. },
  278. OlderThan: olderThan,
  279. })
  280. if err != nil {
  281. log.Trace("Error: ArchiveClean: %v", err)
  282. return err
  283. }
  284. for _, archiver := range archivers {
  285. if err := deleteOldRepoArchiver(ctx, archiver); err != nil {
  286. return err
  287. }
  288. }
  289. if len(archivers) < 100 {
  290. break
  291. }
  292. }
  293. log.Trace("Finished: ArchiveCleanup")
  294. return nil
  295. }
  296. // DeleteRepositoryArchives deletes all repositories' archives.
  297. func DeleteRepositoryArchives(ctx context.Context) error {
  298. if err := repo_model.DeleteAllRepoArchives(ctx); err != nil {
  299. return err
  300. }
  301. return storage.Clean(storage.RepoArchives)
  302. }
  303. func ServeRepoArchive(ctx *gitea_context.Base, repo *repo_model.Repository, gitRepo *git.Repository, archiveReq *ArchiveRequest) {
  304. // Add nix format link header so tarballs lock correctly:
  305. // https://github.com/nixos/nix/blob/56763ff918eb308db23080e560ed2ea3e00c80a7/doc/manual/src/protocols/tarball-fetcher.md
  306. ctx.Resp.Header().Add("Link", fmt.Sprintf(`<%s/archive/%s.%s?rev=%s>; rel="immutable"`,
  307. repo.APIURL(),
  308. archiveReq.CommitID,
  309. archiveReq.Type.String(),
  310. archiveReq.CommitID,
  311. ))
  312. downloadName := repo.Name + "-" + archiveReq.GetArchiveName()
  313. if setting.Repository.StreamArchives {
  314. httplib.ServeSetHeaders(ctx.Resp, &httplib.ServeHeaderOptions{Filename: downloadName})
  315. if err := archiveReq.Stream(ctx, gitRepo, ctx.Resp); err != nil && !ctx.Written() {
  316. log.Error("Archive %v streaming failed: %v", archiveReq, err)
  317. ctx.HTTPError(http.StatusInternalServerError)
  318. }
  319. return
  320. }
  321. archiver, err := archiveReq.Await(ctx)
  322. if err != nil {
  323. log.Error("Archive %v await failed: %v", archiveReq, err)
  324. ctx.HTTPError(http.StatusInternalServerError)
  325. return
  326. }
  327. rPath := archiver.RelativePath()
  328. if setting.RepoArchive.Storage.ServeDirect() {
  329. // If we have a signed url (S3, object storage), redirect to this directly.
  330. u, err := storage.RepoArchives.URL(rPath, downloadName, ctx.Req.Method, nil)
  331. if u != nil && err == nil {
  332. ctx.Redirect(u.String())
  333. return
  334. }
  335. }
  336. fr, err := storage.RepoArchives.Open(rPath)
  337. if err != nil {
  338. log.Error("Archive %v open file failed: %v", archiveReq, err)
  339. ctx.HTTPError(http.StatusInternalServerError)
  340. return
  341. }
  342. defer fr.Close()
  343. ctx.ServeContent(fr, &gitea_context.ServeHeaderOptions{
  344. Filename: downloadName,
  345. LastModified: archiver.CreatedUnix.AsLocalTime(),
  346. })
  347. }