gitea源码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright 2020 The Gitea Authors. All rights reserved.
  2. // SPDX-License-Identifier: MIT
  3. package events
  4. import (
  5. "net/http"
  6. "time"
  7. "code.gitea.io/gitea/modules/eventsource"
  8. "code.gitea.io/gitea/modules/graceful"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/routers/web/auth"
  11. "code.gitea.io/gitea/services/context"
  12. )
  13. // Events listens for events
  14. func Events(ctx *context.Context) {
  15. // FIXME: Need to check if resp is actually a http.Flusher! - how though?
  16. // Set the headers related to event streaming.
  17. ctx.Resp.Header().Set("Content-Type", "text/event-stream")
  18. ctx.Resp.Header().Set("Cache-Control", "no-cache")
  19. ctx.Resp.Header().Set("Connection", "keep-alive")
  20. ctx.Resp.Header().Set("X-Accel-Buffering", "no")
  21. ctx.Resp.WriteHeader(http.StatusOK)
  22. if !ctx.IsSigned {
  23. // Return unauthorized status event
  24. event := &eventsource.Event{
  25. Name: "close",
  26. Data: "unauthorized",
  27. }
  28. _, _ = event.WriteTo(ctx)
  29. ctx.Resp.Flush()
  30. return
  31. }
  32. // Listen to connection close and un-register messageChan
  33. notify := ctx.Done()
  34. ctx.Resp.Flush()
  35. shutdownCtx := graceful.GetManager().ShutdownContext()
  36. uid := ctx.Doer.ID
  37. messageChan := eventsource.GetManager().Register(uid)
  38. unregister := func() {
  39. eventsource.GetManager().Unregister(uid, messageChan)
  40. // ensure the messageChan is closed
  41. for {
  42. _, ok := <-messageChan
  43. if !ok {
  44. break
  45. }
  46. }
  47. }
  48. if _, err := ctx.Resp.Write([]byte("\n")); err != nil {
  49. log.Error("Unable to write to EventStream: %v", err)
  50. unregister()
  51. return
  52. }
  53. timer := time.NewTicker(30 * time.Second)
  54. loop:
  55. for {
  56. select {
  57. case <-timer.C:
  58. event := &eventsource.Event{
  59. Name: "ping",
  60. }
  61. _, err := event.WriteTo(ctx.Resp)
  62. if err != nil {
  63. log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
  64. go unregister()
  65. break loop
  66. }
  67. ctx.Resp.Flush()
  68. case <-notify:
  69. go unregister()
  70. break loop
  71. case <-shutdownCtx.Done():
  72. go unregister()
  73. break loop
  74. case event, ok := <-messageChan:
  75. if !ok {
  76. break loop
  77. }
  78. // Handle logout
  79. if event.Name == "logout" {
  80. if ctx.Session.ID() == event.Data {
  81. _, _ = (&eventsource.Event{
  82. Name: "logout",
  83. Data: "here",
  84. }).WriteTo(ctx.Resp)
  85. ctx.Resp.Flush()
  86. go unregister()
  87. auth.HandleSignOut(ctx)
  88. break loop
  89. }
  90. // Replace the event - we don't want to expose the session ID to the user
  91. event = &eventsource.Event{
  92. Name: "logout",
  93. Data: "elsewhere",
  94. }
  95. }
  96. _, err := event.WriteTo(ctx.Resp)
  97. if err != nil {
  98. log.Error("Unable to write to EventStream for user %s: %v", ctx.Doer.Name, err)
  99. go unregister()
  100. break loop
  101. }
  102. ctx.Resp.Flush()
  103. }
  104. }
  105. timer.Stop()
  106. }