gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. // Copyright 2022 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package actions
  4. import (
  5. "bufio"
  6. "context"
  7. "fmt"
  8. "io"
  9. "os"
  10. "strings"
  11. "time"
  12. "code.gitea.io/gitea/models/dbfs"
  13. "code.gitea.io/gitea/modules/log"
  14. "code.gitea.io/gitea/modules/storage"
  15. "code.gitea.io/gitea/modules/zstd"
  16. runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
  17. "google.golang.org/protobuf/types/known/timestamppb"
  18. )
  19. const (
  20. MaxLineSize = 64 * 1024
  21. DBFSPrefix = "actions_log/"
  22. timeFormat = "2006-01-02T15:04:05.0000000Z07:00"
  23. defaultBufSize = MaxLineSize
  24. )
  25. // WriteLogs appends logs to DBFS file for temporary storage.
  26. // It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content.
  27. // Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage.
  28. func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
  29. flag := os.O_WRONLY
  30. if offset == 0 {
  31. // Create file only if offset is 0, or it could result in content holes if the file doesn't exist.
  32. flag |= os.O_CREATE
  33. }
  34. name := DBFSPrefix + filename
  35. f, err := dbfs.OpenFile(ctx, name, flag)
  36. if err != nil {
  37. return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
  38. }
  39. defer f.Close()
  40. stat, err := f.Stat()
  41. if err != nil {
  42. return nil, fmt.Errorf("dbfs Stat %q: %w", name, err)
  43. }
  44. if stat.Size() < offset {
  45. // If the size is less than offset, refuse to write, or it could result in content holes.
  46. // However, if the size is greater than offset, we can still write to overwrite the content.
  47. return nil, fmt.Errorf("size of %q is less than offset", name)
  48. }
  49. if _, err := f.Seek(offset, io.SeekStart); err != nil {
  50. return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
  51. }
  52. writer := bufio.NewWriterSize(f, defaultBufSize)
  53. ns := make([]int, 0, len(rows))
  54. for _, row := range rows {
  55. n, err := writer.WriteString(FormatLog(row.Time.AsTime(), row.Content) + "\n")
  56. if err != nil {
  57. return nil, err
  58. }
  59. ns = append(ns, n)
  60. }
  61. if err := writer.Flush(); err != nil {
  62. return nil, err
  63. }
  64. return ns, nil
  65. }
  66. func ReadLogs(ctx context.Context, inStorage bool, filename string, offset, limit int64) ([]*runnerv1.LogRow, error) {
  67. f, err := OpenLogs(ctx, inStorage, filename)
  68. if err != nil {
  69. return nil, err
  70. }
  71. defer f.Close()
  72. if _, err := f.Seek(offset, io.SeekStart); err != nil {
  73. return nil, fmt.Errorf("file seek: %w", err)
  74. }
  75. scanner := bufio.NewScanner(f)
  76. maxLineSize := len(timeFormat) + MaxLineSize + 1
  77. scanner.Buffer(make([]byte, maxLineSize), maxLineSize)
  78. var rows []*runnerv1.LogRow
  79. for scanner.Scan() && (int64(len(rows)) < limit || limit < 0) {
  80. t, c, err := ParseLog(scanner.Text())
  81. if err != nil {
  82. return nil, fmt.Errorf("parse log %q: %w", scanner.Text(), err)
  83. }
  84. rows = append(rows, &runnerv1.LogRow{
  85. Time: timestamppb.New(t),
  86. Content: c,
  87. })
  88. }
  89. if err := scanner.Err(); err != nil {
  90. return nil, fmt.Errorf("ReadLogs scan: %w", err)
  91. }
  92. return rows, nil
  93. }
  94. const (
  95. // logZstdBlockSize is the block size for zstd compression.
  96. // 128KB leads the compression ratio to be close to the regular zstd compression.
  97. // And it means each read from the underlying object storage will be at least 128KB*(compression ratio).
  98. // The compression ratio is about 30% for text files, so the actual read size is about 38KB, which should be acceptable.
  99. logZstdBlockSize = 128 * 1024 // 128KB
  100. )
  101. // TransferLogs transfers logs from DBFS to object storage.
  102. // It happens when the file is complete and no more logs will be appended.
  103. // It respects the file format in the filename like ".zst", and compresses the content if needed.
  104. func TransferLogs(ctx context.Context, filename string) (func(), error) {
  105. name := DBFSPrefix + filename
  106. remove := func() {
  107. if err := dbfs.Remove(ctx, name); err != nil {
  108. log.Warn("dbfs remove %q: %v", name, err)
  109. }
  110. }
  111. f, err := dbfs.Open(ctx, name)
  112. if err != nil {
  113. return nil, fmt.Errorf("dbfs open %q: %w", name, err)
  114. }
  115. defer f.Close()
  116. var reader io.Reader = f
  117. if strings.HasSuffix(filename, ".zst") {
  118. r, w := io.Pipe()
  119. reader = r
  120. zstdWriter, err := zstd.NewSeekableWriter(w, logZstdBlockSize)
  121. if err != nil {
  122. return nil, fmt.Errorf("zstd NewSeekableWriter: %w", err)
  123. }
  124. go func() {
  125. defer func() {
  126. _ = w.CloseWithError(zstdWriter.Close())
  127. }()
  128. if _, err := io.Copy(zstdWriter, f); err != nil {
  129. _ = w.CloseWithError(err)
  130. return
  131. }
  132. }()
  133. }
  134. if _, err := storage.Actions.Save(filename, reader, -1); err != nil {
  135. return nil, fmt.Errorf("storage save %q: %w", filename, err)
  136. }
  137. return remove, nil
  138. }
  139. func RemoveLogs(ctx context.Context, inStorage bool, filename string) error {
  140. if !inStorage {
  141. name := DBFSPrefix + filename
  142. err := dbfs.Remove(ctx, name)
  143. if err != nil {
  144. return fmt.Errorf("dbfs remove %q: %w", name, err)
  145. }
  146. return nil
  147. }
  148. err := storage.Actions.Delete(filename)
  149. if err != nil {
  150. return fmt.Errorf("storage delete %q: %w", filename, err)
  151. }
  152. return nil
  153. }
  154. func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeekCloser, error) {
  155. if !inStorage {
  156. name := DBFSPrefix + filename
  157. f, err := dbfs.Open(ctx, name)
  158. if err != nil {
  159. return nil, fmt.Errorf("dbfs open %q: %w", name, err)
  160. }
  161. return f, nil
  162. }
  163. f, err := storage.Actions.Open(filename)
  164. if err != nil {
  165. return nil, fmt.Errorf("storage open %q: %w", filename, err)
  166. }
  167. var reader io.ReadSeekCloser = f
  168. if strings.HasSuffix(filename, ".zst") {
  169. r, err := zstd.NewSeekableReader(f)
  170. if err != nil {
  171. return nil, fmt.Errorf("zstd NewSeekableReader: %w", err)
  172. }
  173. reader = r
  174. }
  175. return reader, nil
  176. }
  177. func FormatLog(timestamp time.Time, content string) string {
  178. // Content shouldn't contain new line, it will break log indexes, other control chars are safe.
  179. content = strings.ReplaceAll(content, "\n", `\n`)
  180. if len(content) > MaxLineSize {
  181. content = content[:MaxLineSize]
  182. }
  183. return fmt.Sprintf("%s %s", timestamp.UTC().Format(timeFormat), content)
  184. }
  185. func ParseLog(in string) (time.Time, string, error) {
  186. index := strings.IndexRune(in, ' ')
  187. if index < 0 {
  188. return time.Time{}, "", fmt.Errorf("invalid log: %q", in)
  189. }
  190. timestamp, err := time.Parse(timeFormat, in[:index])
  191. if err != nil {
  192. return time.Time{}, "", err
  193. }
  194. return timestamp, in[index+1:], nil
  195. }