| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- // Copyright 2023 The Gitea Authors. All rights reserved.
- // SPDX-License-Identifier: MIT
-
- package actions
-
- import (
- "crypto/md5"
- "crypto/sha256"
- "encoding/base64"
- "encoding/hex"
- "errors"
- "fmt"
- "hash"
- "io"
- "path/filepath"
- "sort"
- "strings"
- "time"
-
- "code.gitea.io/gitea/models/actions"
- "code.gitea.io/gitea/models/db"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/storage"
- )
-
- func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
- artifact *actions.ActionArtifact,
- contentSize, runID, start, end, length int64, checkMd5 bool,
- ) (int64, error) {
- // build chunk store path
- storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
- var r io.Reader = ctx.Req.Body
- var hasher hash.Hash
- if checkMd5 {
- // use io.TeeReader to avoid reading all body to md5 sum.
- // it writes data to hasher after reading end
- // if hash is not matched, delete the read-end result
- hasher = md5.New()
- r = io.TeeReader(r, hasher)
- }
- // save chunk to storage
- writtenSize, err := st.Save(storagePath, r, contentSize)
- if err != nil {
- return -1, fmt.Errorf("save chunk to storage error: %v", err)
- }
- var checkErr error
- if checkMd5 {
- // check md5
- reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
- chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
- log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
- // if md5 not match, delete the chunk
- if reqMd5String != chunkMd5String {
- checkErr = errors.New("md5 not match")
- }
- }
- if writtenSize != contentSize {
- checkErr = errors.Join(checkErr, fmt.Errorf("writtenSize %d not match contentSize %d", writtenSize, contentSize))
- }
- if checkErr != nil {
- if err := st.Delete(storagePath); err != nil {
- log.Error("Error deleting chunk: %s, %v", storagePath, err)
- }
- return -1, checkErr
- }
- log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
- storagePath, contentSize, artifact.ID, start, end)
- // return chunk total size
- return length, nil
- }
-
- func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
- artifact *actions.ActionArtifact,
- contentSize, runID int64,
- ) (int64, error) {
- // parse content-range header, format: bytes 0-1023/146515
- contentRange := ctx.Req.Header.Get("Content-Range")
- start, end, length := int64(0), int64(0), int64(0)
- if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
- log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
- return -1, fmt.Errorf("parse content range error: %v", err)
- }
- return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true)
- }
-
- func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
- artifact *actions.ActionArtifact,
- start, contentSize, runID int64,
- ) (int64, error) {
- end := start + contentSize - 1
- return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false)
- }
-
- type chunkFileItem struct {
- RunID int64
- ArtifactID int64
- Start int64
- End int64
- Path string
- }
-
- func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
- storageDir := fmt.Sprintf("tmp%d", runID)
- var chunks []*chunkFileItem
- if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
- baseName := filepath.Base(fpath)
- // when read chunks from storage, it only contains storage dir and basename,
- // no matter the subdirectory setting in storage config
- item := chunkFileItem{Path: storageDir + "/" + baseName}
- if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
- return fmt.Errorf("parse content range error: %v", err)
- }
- chunks = append(chunks, &item)
- return nil
- }); err != nil {
- return nil, err
- }
- // chunks group by artifact id
- chunksMap := make(map[int64][]*chunkFileItem)
- for _, c := range chunks {
- chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
- }
- return chunksMap, nil
- }
-
- func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
- storageDir := fmt.Sprintf("tmpv4%d", runID)
- var chunks []*chunkFileItem
- chunkMap := map[string]*chunkFileItem{}
- dummy := &chunkFileItem{}
- for _, name := range blist.Latest {
- chunkMap[name] = dummy
- }
- if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
- baseName := filepath.Base(fpath)
- if !strings.HasPrefix(baseName, "block-") {
- return nil
- }
- // when read chunks from storage, it only contains storage dir and basename,
- // no matter the subdirectory setting in storage config
- item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
- var size int64
- var b64chunkName string
- if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
- return fmt.Errorf("parse content range error: %v", err)
- }
- rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
- if err != nil {
- return fmt.Errorf("failed to parse chunkName: %v", err)
- }
- chunkName := string(rchunkName)
- item.End = item.Start + size - 1
- if _, ok := chunkMap[chunkName]; ok {
- chunkMap[chunkName] = &item
- }
- return nil
- }); err != nil {
- return nil, err
- }
- for i, name := range blist.Latest {
- chunk, ok := chunkMap[name]
- if !ok || chunk.Path == "" {
- return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
- }
- chunks = append(chunks, chunk)
- if i > 0 {
- chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
- chunk.End += chunk.Start
- }
- }
- return chunks, nil
- }
-
- func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
- // read all db artifacts by name
- artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
- RunID: runID,
- ArtifactName: artifactName,
- })
- if err != nil {
- return err
- }
- // read all uploading chunks from storage
- chunksMap, err := listChunksByRunID(st, runID)
- if err != nil {
- return err
- }
- // range db artifacts to merge chunks
- for _, art := range artifacts {
- chunks, ok := chunksMap[art.ID]
- if !ok {
- log.Debug("artifact %d chunks not found", art.ID)
- continue
- }
- if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil {
- return err
- }
- }
- return nil
- }
-
- func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error {
- sort.Slice(chunks, func(i, j int) bool {
- return chunks[i].Start < chunks[j].Start
- })
- allChunks := make([]*chunkFileItem, 0)
- startAt := int64(-1)
- // check if all chunks are uploaded and in order and clean repeated chunks
- for _, c := range chunks {
- // startAt is -1 means this is the first chunk
- // previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
- // StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
- if c.Start == (startAt + 1) {
- allChunks = append(allChunks, c)
- startAt = c.End
- }
- }
- // if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
- if startAt+1 != artifact.FileCompressedSize {
- log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
- return nil
- }
- // use multiReader
- readers := make([]io.Reader, 0, len(allChunks))
- closeReaders := func() {
- for _, r := range readers {
- _ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
- }
- readers = nil
- }
- defer closeReaders()
- for _, c := range allChunks {
- var readCloser io.ReadCloser
- var err error
- if readCloser, err = st.Open(c.Path); err != nil {
- return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
- }
- readers = append(readers, readCloser)
- }
- mergedReader := io.MultiReader(readers...)
- shaPrefix := "sha256:"
- var hash hash.Hash
- if strings.HasPrefix(checksum, shaPrefix) {
- hash = sha256.New()
- }
- if hash != nil {
- mergedReader = io.TeeReader(mergedReader, hash)
- }
-
- // if chunk is gzip, use gz as extension
- // download-artifact action will use content-encoding header to decide if it should decompress the file
- extension := "chunk"
- if artifact.ContentEncoding == "gzip" {
- extension = "chunk.gz"
- }
-
- // save merged file
- storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
- written, err := st.Save(storagePath, mergedReader, artifact.FileCompressedSize)
- if err != nil {
- return fmt.Errorf("save merged file error: %v", err)
- }
- if written != artifact.FileCompressedSize {
- return errors.New("merged file size is not equal to chunk length")
- }
-
- defer func() {
- closeReaders() // close before delete
- // drop chunks
- for _, c := range chunks {
- if err := st.Delete(c.Path); err != nil {
- log.Warn("Error deleting chunk: %s, %v", c.Path, err)
- }
- }
- }()
-
- if hash != nil {
- rawChecksum := hash.Sum(nil)
- actualChecksum := hex.EncodeToString(rawChecksum)
- if !strings.HasSuffix(checksum, actualChecksum) {
- return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
- }
- }
-
- // save storage path to artifact
- log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
- // if artifact is already uploaded, delete the old file
- if artifact.StoragePath != "" {
- if err := st.Delete(artifact.StoragePath); err != nil {
- log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
- }
- }
-
- artifact.StoragePath = storagePath
- artifact.Status = actions.ArtifactStatusUploadConfirmed
- if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
- return fmt.Errorf("update artifact error: %v", err)
- }
-
- return nil
- }
|