gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package container
  4. import (
  5. "context"
  6. "errors"
  7. "io"
  8. "os"
  9. packages_model "code.gitea.io/gitea/models/packages"
  10. packages_module "code.gitea.io/gitea/modules/packages"
  11. "code.gitea.io/gitea/modules/setting"
  12. "code.gitea.io/gitea/modules/tempdir"
  13. )
  14. var (
  15. // errWriteAfterRead occurs if Write is called after a read operation
  16. errWriteAfterRead = errors.New("write is unsupported after a read operation")
  17. // errOffsetMissmatch occurs if the file offset is different than the model
  18. errOffsetMissmatch = errors.New("offset mismatch between file and model")
  19. )
  20. // BlobUploader handles chunked blob uploads
  21. type BlobUploader struct {
  22. *packages_model.PackageBlobUpload
  23. *packages_module.MultiHasher
  24. file *os.File
  25. reading bool
  26. }
  27. func uploadPathTempDir() *tempdir.TempDir {
  28. return setting.AppDataTempDir("package-upload")
  29. }
  30. func buildFilePath(uploadPath *tempdir.TempDir, id string) string {
  31. return uploadPath.JoinPath(id)
  32. }
  33. // NewBlobUploader creates a new blob uploader for the given id
  34. func NewBlobUploader(ctx context.Context, id string) (*BlobUploader, error) {
  35. model, err := packages_model.GetBlobUploadByID(ctx, id)
  36. if err != nil {
  37. return nil, err
  38. }
  39. hash := packages_module.NewMultiHasher()
  40. if len(model.HashStateBytes) != 0 {
  41. if err := hash.UnmarshalBinary(model.HashStateBytes); err != nil {
  42. return nil, err
  43. }
  44. }
  45. uploadPath := uploadPathTempDir()
  46. _, err = uploadPath.MkdirAllSub("")
  47. if err != nil {
  48. return nil, err
  49. }
  50. f, err := os.OpenFile(buildFilePath(uploadPath, model.ID), os.O_RDWR|os.O_CREATE, 0o666)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return &BlobUploader{
  55. model,
  56. hash,
  57. f,
  58. false,
  59. }, nil
  60. }
  61. // Close implements io.Closer
  62. func (u *BlobUploader) Close() error {
  63. return u.file.Close()
  64. }
  65. // Append appends a chunk of data and updates the model
  66. func (u *BlobUploader) Append(ctx context.Context, r io.Reader) error {
  67. if u.reading {
  68. return errWriteAfterRead
  69. }
  70. offset, err := u.file.Seek(0, io.SeekEnd)
  71. if err != nil {
  72. return err
  73. }
  74. if offset != u.BytesReceived {
  75. return errOffsetMissmatch
  76. }
  77. n, err := io.Copy(io.MultiWriter(u.file, u.MultiHasher), r)
  78. if err != nil {
  79. return err
  80. }
  81. // fast path if nothing was written
  82. if n == 0 {
  83. return nil
  84. }
  85. u.BytesReceived += n
  86. u.HashStateBytes, err = u.MultiHasher.MarshalBinary()
  87. if err != nil {
  88. return err
  89. }
  90. return packages_model.UpdateBlobUpload(ctx, u.PackageBlobUpload)
  91. }
  92. func (u *BlobUploader) Size() int64 {
  93. return u.BytesReceived
  94. }
  95. // Read implements io.Reader
  96. func (u *BlobUploader) Read(p []byte) (int, error) {
  97. if !u.reading {
  98. _, err := u.file.Seek(0, io.SeekStart)
  99. if err != nil {
  100. return 0, err
  101. }
  102. u.reading = true
  103. }
  104. return u.file.Read(p)
  105. }
  106. // RemoveBlobUploadByID Remove deletes the data and the model of a blob upload
  107. func RemoveBlobUploadByID(ctx context.Context, id string) error {
  108. if err := packages_model.DeleteBlobUploadByID(ctx, id); err != nil {
  109. return err
  110. }
  111. err := os.Remove(buildFilePath(uploadPathTempDir(), id))
  112. if err != nil && !os.IsNotExist(err) {
  113. return err
  114. }
  115. return nil
  116. }