| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- // Copyright 2020 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package archiver
-
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "net/http"
- "os"
- "strings"
- "time"
-
- "code.gitea.io/gitea/models/db"
- repo_model "code.gitea.io/gitea/models/repo"
- "code.gitea.io/gitea/modules/git"
- "code.gitea.io/gitea/modules/gitrepo"
- "code.gitea.io/gitea/modules/graceful"
- "code.gitea.io/gitea/modules/httplib"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/process"
- "code.gitea.io/gitea/modules/queue"
- "code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/storage"
- gitea_context "code.gitea.io/gitea/services/context"
- )
-
- // ArchiveRequest defines the parameters of an archive request, which notably
- // includes the specific repository being archived as well as the commit, the
- // name by which it was requested, and the kind of archive being requested.
- // This is entirely opaque to external entities, though, and mostly used as a
- // handle elsewhere.
- type ArchiveRequest struct {
- RepoID int64
- Type git.ArchiveType
- CommitID string
-
- archiveRefShortName string // the ref short name to download the archive, for example: "master", "v1.0.0", "commit id"
- }
-
- // ErrUnknownArchiveFormat request archive format is not supported
- type ErrUnknownArchiveFormat struct {
- RequestNameType string
- }
-
- // Error implements error
- func (err ErrUnknownArchiveFormat) Error() string {
- return "unknown format: " + err.RequestNameType
- }
-
- // Is implements error
- func (ErrUnknownArchiveFormat) Is(err error) bool {
- _, ok := err.(ErrUnknownArchiveFormat)
- return ok
- }
-
- // RepoRefNotFoundError is returned when a requested reference (commit, tag) was not found.
- type RepoRefNotFoundError struct {
- RefShortName string
- }
-
- // Error implements error.
- func (e RepoRefNotFoundError) Error() string {
- return "unrecognized repository reference: " + e.RefShortName
- }
-
- func (e RepoRefNotFoundError) Is(err error) bool {
- _, ok := err.(RepoRefNotFoundError)
- return ok
- }
-
- // NewRequest creates an archival request, based on the URI. The
- // resulting ArchiveRequest is suitable for being passed to Await()
- // if it's determined that the request still needs to be satisfied.
- func NewRequest(repoID int64, repo *git.Repository, archiveRefExt string) (*ArchiveRequest, error) {
- // here the archiveRefShortName is not a clear ref, it could be a tag, branch or commit id
- archiveRefShortName, archiveType := git.SplitArchiveNameType(archiveRefExt)
- if archiveType == git.ArchiveUnknown {
- return nil, ErrUnknownArchiveFormat{archiveRefExt}
- }
-
- // Get corresponding commit.
- commitID, err := repo.ConvertToGitID(archiveRefShortName)
- if err != nil {
- return nil, RepoRefNotFoundError{RefShortName: archiveRefShortName}
- }
-
- r := &ArchiveRequest{RepoID: repoID, archiveRefShortName: archiveRefShortName, Type: archiveType}
- r.CommitID = commitID.String()
- return r, nil
- }
-
- // GetArchiveName returns the name of the caller, based on the ref used by the
- // caller to create this request.
- func (aReq *ArchiveRequest) GetArchiveName() string {
- return strings.ReplaceAll(aReq.archiveRefShortName, "/", "-") + "." + aReq.Type.String()
- }
-
- // Await awaits the completion of an ArchiveRequest. If the archive has
- // already been prepared the method returns immediately. Otherwise, an archiver
- // process will be started and its completion awaited. On success the returned
- // RepoArchiver may be used to download the archive. Note that even if the
- // context is cancelled/times out a started archiver will still continue to run
- // in the background.
- func (aReq *ArchiveRequest) Await(ctx context.Context) (*repo_model.RepoArchiver, error) {
- archiver, err := repo_model.GetRepoArchiver(ctx, aReq.RepoID, aReq.Type, aReq.CommitID)
- if err != nil {
- return nil, fmt.Errorf("models.GetRepoArchiver: %w", err)
- }
-
- if archiver != nil && archiver.Status == repo_model.ArchiverReady {
- // Archive already generated, we're done.
- return archiver, nil
- }
-
- if err := StartArchive(aReq); err != nil {
- return nil, fmt.Errorf("archiver.StartArchive: %w", err)
- }
-
- poll := time.NewTicker(time.Second * 1)
- defer poll.Stop()
-
- for {
- select {
- case <-graceful.GetManager().HammerContext().Done():
- // System stopped.
- return nil, graceful.GetManager().HammerContext().Err()
- case <-ctx.Done():
- return nil, ctx.Err()
- case <-poll.C:
- archiver, err = repo_model.GetRepoArchiver(ctx, aReq.RepoID, aReq.Type, aReq.CommitID)
- if err != nil {
- return nil, fmt.Errorf("repo_model.GetRepoArchiver: %w", err)
- }
- if archiver != nil && archiver.Status == repo_model.ArchiverReady {
- return archiver, nil
- }
- }
- }
- }
-
- // Stream satisfies the ArchiveRequest being passed in. Processing
- // will occur directly in this routine.
- func (aReq *ArchiveRequest) Stream(ctx context.Context, gitRepo *git.Repository, w io.Writer) error {
- if aReq.Type == git.ArchiveBundle {
- return gitRepo.CreateBundle(
- ctx,
- aReq.CommitID,
- w,
- )
- }
- return gitRepo.CreateArchive(
- ctx,
- aReq.Type,
- w,
- setting.Repository.PrefixArchiveFiles,
- aReq.CommitID,
- )
- }
-
- // doArchive satisfies the ArchiveRequest being passed in. Processing
- // will occur in a separate goroutine, as this phase may take a while to
- // complete. If the archive already exists, doArchive will not do
- // anything. In all cases, the caller should be examining the *ArchiveRequest
- // being returned for completion, as it may be different than the one they passed
- // in.
- func doArchive(ctx context.Context, r *ArchiveRequest) (*repo_model.RepoArchiver, error) {
- ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("ArchiveRequest[%d]: %s", r.RepoID, r.GetArchiveName()))
- defer finished()
-
- archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID)
- if err != nil {
- return nil, err
- }
-
- if archiver != nil {
- // FIXME: If another process are generating it, we think it's not ready and just return
- // Or we should wait until the archive generated.
- if archiver.Status == repo_model.ArchiverGenerating {
- return nil, nil
- }
- } else {
- archiver = &repo_model.RepoArchiver{
- RepoID: r.RepoID,
- Type: r.Type,
- CommitID: r.CommitID,
- Status: repo_model.ArchiverGenerating,
- }
- if err := db.Insert(ctx, archiver); err != nil {
- return nil, err
- }
- }
-
- rPath := archiver.RelativePath()
- _, err = storage.RepoArchives.Stat(rPath)
- if err == nil {
- if archiver.Status == repo_model.ArchiverGenerating {
- archiver.Status = repo_model.ArchiverReady
- if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
- return nil, err
- }
- }
- return archiver, nil
- }
-
- if !errors.Is(err, os.ErrNotExist) {
- return nil, fmt.Errorf("unable to stat archive: %w", err)
- }
-
- rd, w := io.Pipe()
- defer func() {
- _ = w.Close()
- _ = rd.Close()
- }()
- done := make(chan error, 1) // Ensure that there is some capacity which will ensure that the goroutine below can always finish
- repo, err := repo_model.GetRepositoryByID(ctx, archiver.RepoID)
- if err != nil {
- return nil, fmt.Errorf("archiver.LoadRepo failed: %w", err)
- }
-
- gitRepo, err := gitrepo.OpenRepository(ctx, repo)
- if err != nil {
- return nil, err
- }
- defer gitRepo.Close()
-
- go func(done chan error, w *io.PipeWriter, archiveReq *ArchiveRequest, gitRepo *git.Repository) {
- defer func() {
- if r := recover(); r != nil {
- done <- fmt.Errorf("%v", r)
- }
- }()
-
- err := archiveReq.Stream(ctx, gitRepo, w)
- _ = w.CloseWithError(err)
- done <- err
- }(done, w, r, gitRepo)
-
- // TODO: add lfs data to zip
- // TODO: add submodule data to zip
-
- if _, err := storage.RepoArchives.Save(rPath, rd, -1); err != nil {
- return nil, fmt.Errorf("unable to write archive: %w", err)
- }
-
- err = <-done
- if err != nil {
- return nil, err
- }
-
- if archiver.Status == repo_model.ArchiverGenerating {
- archiver.Status = repo_model.ArchiverReady
- if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil {
- return nil, err
- }
- }
-
- return archiver, nil
- }
-
- var archiverQueue *queue.WorkerPoolQueue[*ArchiveRequest]
-
- // Init initializes archiver
- func Init(ctx context.Context) error {
- handler := func(items ...*ArchiveRequest) []*ArchiveRequest {
- for _, archiveReq := range items {
- log.Trace("ArchiverData Process: %#v", archiveReq)
- if archiver, err := doArchive(ctx, archiveReq); err != nil {
- log.Error("Archive %v failed: %v", archiveReq, err)
- } else {
- log.Trace("ArchiverData Success: %#v", archiver)
- }
- }
- return nil
- }
-
- archiverQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo-archive", handler)
- if archiverQueue == nil {
- return errors.New("unable to create repo-archive queue")
- }
- go graceful.GetManager().RunWithCancel(archiverQueue)
-
- return nil
- }
-
- // StartArchive push the archive request to the queue
- func StartArchive(request *ArchiveRequest) error {
- has, err := archiverQueue.Has(request)
- if err != nil {
- return err
- }
- if has {
- return nil
- }
- return archiverQueue.Push(request)
- }
-
- func deleteOldRepoArchiver(ctx context.Context, archiver *repo_model.RepoArchiver) error {
- if _, err := db.DeleteByID[repo_model.RepoArchiver](ctx, archiver.ID); err != nil {
- return err
- }
- p := archiver.RelativePath()
- if err := storage.RepoArchives.Delete(p); err != nil {
- log.Error("delete repo archive file failed: %v", err)
- }
- return nil
- }
-
- // DeleteOldRepositoryArchives deletes old repository archives.
- func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error {
- log.Trace("Doing: ArchiveCleanup")
-
- for {
- archivers, err := db.Find[repo_model.RepoArchiver](ctx, repo_model.FindRepoArchiversOption{
- ListOptions: db.ListOptions{
- PageSize: 100,
- Page: 1,
- },
- OlderThan: olderThan,
- })
- if err != nil {
- log.Trace("Error: ArchiveClean: %v", err)
- return err
- }
-
- for _, archiver := range archivers {
- if err := deleteOldRepoArchiver(ctx, archiver); err != nil {
- return err
- }
- }
- if len(archivers) < 100 {
- break
- }
- }
-
- log.Trace("Finished: ArchiveCleanup")
- return nil
- }
-
- // DeleteRepositoryArchives deletes all repositories' archives.
- func DeleteRepositoryArchives(ctx context.Context) error {
- if err := repo_model.DeleteAllRepoArchives(ctx); err != nil {
- return err
- }
- return storage.Clean(storage.RepoArchives)
- }
-
- func ServeRepoArchive(ctx *gitea_context.Base, repo *repo_model.Repository, gitRepo *git.Repository, archiveReq *ArchiveRequest) {
- // Add nix format link header so tarballs lock correctly:
- // https://github.com/nixos/nix/blob/56763ff918eb308db23080e560ed2ea3e00c80a7/doc/manual/src/protocols/tarball-fetcher.md
- ctx.Resp.Header().Add("Link", fmt.Sprintf(`<%s/archive/%s.%s?rev=%s>; rel="immutable"`,
- repo.APIURL(),
- archiveReq.CommitID,
- archiveReq.Type.String(),
- archiveReq.CommitID,
- ))
- downloadName := repo.Name + "-" + archiveReq.GetArchiveName()
-
- if setting.Repository.StreamArchives {
- httplib.ServeSetHeaders(ctx.Resp, &httplib.ServeHeaderOptions{Filename: downloadName})
- if err := archiveReq.Stream(ctx, gitRepo, ctx.Resp); err != nil && !ctx.Written() {
- log.Error("Archive %v streaming failed: %v", archiveReq, err)
- ctx.HTTPError(http.StatusInternalServerError)
- }
- return
- }
-
- archiver, err := archiveReq.Await(ctx)
- if err != nil {
- log.Error("Archive %v await failed: %v", archiveReq, err)
- ctx.HTTPError(http.StatusInternalServerError)
- return
- }
-
- rPath := archiver.RelativePath()
- if setting.RepoArchive.Storage.ServeDirect() {
- // If we have a signed url (S3, object storage), redirect to this directly.
- u, err := storage.RepoArchives.URL(rPath, downloadName, ctx.Req.Method, nil)
- if u != nil && err == nil {
- ctx.Redirect(u.String())
- return
- }
- }
-
- fr, err := storage.RepoArchives.Open(rPath)
- if err != nil {
- log.Error("Archive %v open file failed: %v", archiveReq, err)
- ctx.HTTPError(http.StatusInternalServerError)
- return
- }
- defer fr.Close()
-
- ctx.ServeContent(fr, &gitea_context.ServeHeaderOptions{
- Filename: downloadName,
- LastModified: archiver.CreatedUnix.AsLocalTime(),
- })
- }
|