gitea源码

redis_locker.go 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. // Copyright 2024 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package globallock
  4. import (
  5. "context"
  6. "errors"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "code.gitea.io/gitea/modules/nosql"
  11. "github.com/go-redsync/redsync/v4"
  12. "github.com/go-redsync/redsync/v4/redis/goredis/v9"
  13. )
  14. const redisLockKeyPrefix = "gitea:globallock:"
  15. // redisLockExpiry is the default expiry time for a lock.
  16. // Define it as a variable to make it possible to change it in tests.
  17. var redisLockExpiry = 30 * time.Second
  18. type redisLocker struct {
  19. rs *redsync.Redsync
  20. mutexM sync.Map
  21. closed atomic.Bool
  22. extendWg sync.WaitGroup
  23. }
  24. var _ Locker = &redisLocker{}
  25. func NewRedisLocker(connection string) Locker {
  26. l := &redisLocker{
  27. rs: redsync.New(
  28. goredis.NewPool(
  29. nosql.GetManager().GetRedisClient(connection),
  30. ),
  31. ),
  32. }
  33. l.extendWg.Add(1)
  34. l.startExtend()
  35. return l
  36. }
  37. func (l *redisLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
  38. return l.lock(ctx, key, 0)
  39. }
  40. func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
  41. f, err := l.lock(ctx, key, 1)
  42. var (
  43. errTaken *redsync.ErrTaken
  44. errNodeTaken *redsync.ErrNodeTaken
  45. )
  46. if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) {
  47. return false, f, nil
  48. }
  49. return err == nil, f, err
  50. }
  51. // Close closes the locker.
  52. // It will stop extending the locks and refuse to acquire new locks.
  53. // In actual use, it is not necessary to call this function.
  54. // But it's useful in tests to release resources.
  55. // It could take some time since it waits for the extending goroutine to finish.
  56. func (l *redisLocker) Close() error {
  57. l.closed.Store(true)
  58. l.extendWg.Wait()
  59. return nil
  60. }
  61. func (l *redisLocker) lock(ctx context.Context, key string, tries int) (ReleaseFunc, error) {
  62. if l.closed.Load() {
  63. return func() {}, errors.New("locker is closed")
  64. }
  65. options := []redsync.Option{
  66. redsync.WithExpiry(redisLockExpiry),
  67. }
  68. if tries > 0 {
  69. options = append(options, redsync.WithTries(tries))
  70. }
  71. mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...)
  72. if err := mutex.LockContext(ctx); err != nil {
  73. return func() {}, err
  74. }
  75. l.mutexM.Store(key, mutex)
  76. releaseOnce := sync.Once{}
  77. return func() {
  78. releaseOnce.Do(func() {
  79. l.mutexM.Delete(key)
  80. // It's safe to ignore the error here,
  81. // if it failed to unlock, it will be released automatically after the lock expires.
  82. // Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out.
  83. _, _ = mutex.Unlock()
  84. })
  85. }, nil
  86. }
  87. func (l *redisLocker) startExtend() {
  88. if l.closed.Load() {
  89. l.extendWg.Done()
  90. return
  91. }
  92. toExtend := make([]*redsync.Mutex, 0)
  93. l.mutexM.Range(func(_, value any) bool {
  94. m := value.(*redsync.Mutex)
  95. // Extend the lock if it is not expired.
  96. // Although the mutex will be removed from the map before it is released,
  97. // it still can be expired because of a failed extension.
  98. // If it happens, it does not need to be extended anymore.
  99. if time.Now().After(m.Until()) {
  100. return true
  101. }
  102. toExtend = append(toExtend, m)
  103. return true
  104. })
  105. for _, v := range toExtend {
  106. // If it failed to extend, it will be released automatically after the lock expires.
  107. _, _ = v.Extend()
  108. }
  109. time.AfterFunc(redisLockExpiry/2, l.startExtend)
  110. }