gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. // Copyright 2023 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "crypto/md5"
  6. "crypto/sha256"
  7. "encoding/base64"
  8. "encoding/hex"
  9. "errors"
  10. "fmt"
  11. "hash"
  12. "io"
  13. "path/filepath"
  14. "sort"
  15. "strings"
  16. "time"
  17. "code.gitea.io/gitea/models/actions"
  18. "code.gitea.io/gitea/models/db"
  19. "code.gitea.io/gitea/modules/log"
  20. "code.gitea.io/gitea/modules/storage"
  21. )
  22. func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
  23. artifact *actions.ActionArtifact,
  24. contentSize, runID, start, end, length int64, checkMd5 bool,
  25. ) (int64, error) {
  26. // build chunk store path
  27. storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
  28. var r io.Reader = ctx.Req.Body
  29. var hasher hash.Hash
  30. if checkMd5 {
  31. // use io.TeeReader to avoid reading all body to md5 sum.
  32. // it writes data to hasher after reading end
  33. // if hash is not matched, delete the read-end result
  34. hasher = md5.New()
  35. r = io.TeeReader(r, hasher)
  36. }
  37. // save chunk to storage
  38. writtenSize, err := st.Save(storagePath, r, contentSize)
  39. if err != nil {
  40. return -1, fmt.Errorf("save chunk to storage error: %v", err)
  41. }
  42. var checkErr error
  43. if checkMd5 {
  44. // check md5
  45. reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
  46. chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
  47. log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
  48. // if md5 not match, delete the chunk
  49. if reqMd5String != chunkMd5String {
  50. checkErr = errors.New("md5 not match")
  51. }
  52. }
  53. if writtenSize != contentSize {
  54. checkErr = errors.Join(checkErr, fmt.Errorf("writtenSize %d not match contentSize %d", writtenSize, contentSize))
  55. }
  56. if checkErr != nil {
  57. if err := st.Delete(storagePath); err != nil {
  58. log.Error("Error deleting chunk: %s, %v", storagePath, err)
  59. }
  60. return -1, checkErr
  61. }
  62. log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
  63. storagePath, contentSize, artifact.ID, start, end)
  64. // return chunk total size
  65. return length, nil
  66. }
  67. func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
  68. artifact *actions.ActionArtifact,
  69. contentSize, runID int64,
  70. ) (int64, error) {
  71. // parse content-range header, format: bytes 0-1023/146515
  72. contentRange := ctx.Req.Header.Get("Content-Range")
  73. start, end, length := int64(0), int64(0), int64(0)
  74. if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
  75. log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
  76. return -1, fmt.Errorf("parse content range error: %v", err)
  77. }
  78. return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true)
  79. }
  80. func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
  81. artifact *actions.ActionArtifact,
  82. start, contentSize, runID int64,
  83. ) (int64, error) {
  84. end := start + contentSize - 1
  85. return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false)
  86. }
  87. type chunkFileItem struct {
  88. RunID int64
  89. ArtifactID int64
  90. Start int64
  91. End int64
  92. Path string
  93. }
  94. func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
  95. storageDir := fmt.Sprintf("tmp%d", runID)
  96. var chunks []*chunkFileItem
  97. if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
  98. baseName := filepath.Base(fpath)
  99. // when read chunks from storage, it only contains storage dir and basename,
  100. // no matter the subdirectory setting in storage config
  101. item := chunkFileItem{Path: storageDir + "/" + baseName}
  102. if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
  103. return fmt.Errorf("parse content range error: %v", err)
  104. }
  105. chunks = append(chunks, &item)
  106. return nil
  107. }); err != nil {
  108. return nil, err
  109. }
  110. // chunks group by artifact id
  111. chunksMap := make(map[int64][]*chunkFileItem)
  112. for _, c := range chunks {
  113. chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
  114. }
  115. return chunksMap, nil
  116. }
  117. func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
  118. storageDir := fmt.Sprintf("tmpv4%d", runID)
  119. var chunks []*chunkFileItem
  120. chunkMap := map[string]*chunkFileItem{}
  121. dummy := &chunkFileItem{}
  122. for _, name := range blist.Latest {
  123. chunkMap[name] = dummy
  124. }
  125. if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
  126. baseName := filepath.Base(fpath)
  127. if !strings.HasPrefix(baseName, "block-") {
  128. return nil
  129. }
  130. // when read chunks from storage, it only contains storage dir and basename,
  131. // no matter the subdirectory setting in storage config
  132. item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
  133. var size int64
  134. var b64chunkName string
  135. if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
  136. return fmt.Errorf("parse content range error: %v", err)
  137. }
  138. rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
  139. if err != nil {
  140. return fmt.Errorf("failed to parse chunkName: %v", err)
  141. }
  142. chunkName := string(rchunkName)
  143. item.End = item.Start + size - 1
  144. if _, ok := chunkMap[chunkName]; ok {
  145. chunkMap[chunkName] = &item
  146. }
  147. return nil
  148. }); err != nil {
  149. return nil, err
  150. }
  151. for i, name := range blist.Latest {
  152. chunk, ok := chunkMap[name]
  153. if !ok || chunk.Path == "" {
  154. return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
  155. }
  156. chunks = append(chunks, chunk)
  157. if i > 0 {
  158. chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
  159. chunk.End += chunk.Start
  160. }
  161. }
  162. return chunks, nil
  163. }
  164. func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
  165. // read all db artifacts by name
  166. artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
  167. RunID: runID,
  168. ArtifactName: artifactName,
  169. })
  170. if err != nil {
  171. return err
  172. }
  173. // read all uploading chunks from storage
  174. chunksMap, err := listChunksByRunID(st, runID)
  175. if err != nil {
  176. return err
  177. }
  178. // range db artifacts to merge chunks
  179. for _, art := range artifacts {
  180. chunks, ok := chunksMap[art.ID]
  181. if !ok {
  182. log.Debug("artifact %d chunks not found", art.ID)
  183. continue
  184. }
  185. if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil {
  186. return err
  187. }
  188. }
  189. return nil
  190. }
  191. func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error {
  192. sort.Slice(chunks, func(i, j int) bool {
  193. return chunks[i].Start < chunks[j].Start
  194. })
  195. allChunks := make([]*chunkFileItem, 0)
  196. startAt := int64(-1)
  197. // check if all chunks are uploaded and in order and clean repeated chunks
  198. for _, c := range chunks {
  199. // startAt is -1 means this is the first chunk
  200. // previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
  201. // StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
  202. if c.Start == (startAt + 1) {
  203. allChunks = append(allChunks, c)
  204. startAt = c.End
  205. }
  206. }
  207. // if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
  208. if startAt+1 != artifact.FileCompressedSize {
  209. log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
  210. return nil
  211. }
  212. // use multiReader
  213. readers := make([]io.Reader, 0, len(allChunks))
  214. closeReaders := func() {
  215. for _, r := range readers {
  216. _ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
  217. }
  218. readers = nil
  219. }
  220. defer closeReaders()
  221. for _, c := range allChunks {
  222. var readCloser io.ReadCloser
  223. var err error
  224. if readCloser, err = st.Open(c.Path); err != nil {
  225. return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
  226. }
  227. readers = append(readers, readCloser)
  228. }
  229. mergedReader := io.MultiReader(readers...)
  230. shaPrefix := "sha256:"
  231. var hash hash.Hash
  232. if strings.HasPrefix(checksum, shaPrefix) {
  233. hash = sha256.New()
  234. }
  235. if hash != nil {
  236. mergedReader = io.TeeReader(mergedReader, hash)
  237. }
  238. // if chunk is gzip, use gz as extension
  239. // download-artifact action will use content-encoding header to decide if it should decompress the file
  240. extension := "chunk"
  241. if artifact.ContentEncoding == "gzip" {
  242. extension = "chunk.gz"
  243. }
  244. // save merged file
  245. storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
  246. written, err := st.Save(storagePath, mergedReader, artifact.FileCompressedSize)
  247. if err != nil {
  248. return fmt.Errorf("save merged file error: %v", err)
  249. }
  250. if written != artifact.FileCompressedSize {
  251. return errors.New("merged file size is not equal to chunk length")
  252. }
  253. defer func() {
  254. closeReaders() // close before delete
  255. // drop chunks
  256. for _, c := range chunks {
  257. if err := st.Delete(c.Path); err != nil {
  258. log.Warn("Error deleting chunk: %s, %v", c.Path, err)
  259. }
  260. }
  261. }()
  262. if hash != nil {
  263. rawChecksum := hash.Sum(nil)
  264. actualChecksum := hex.EncodeToString(rawChecksum)
  265. if !strings.HasSuffix(checksum, actualChecksum) {
  266. return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
  267. }
  268. }
  269. // save storage path to artifact
  270. log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
  271. // if artifact is already uploaded, delete the old file
  272. if artifact.StoragePath != "" {
  273. if err := st.Delete(artifact.StoragePath); err != nil {
  274. log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
  275. }
  276. }
  277. artifact.StoragePath = storagePath
  278. artifact.Status = actions.ArtifactStatusUploadConfirmed
  279. if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
  280. return fmt.Errorf("update artifact error: %v", err)
  281. }
  282. return nil
  283. }