Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

circuit_breaker.go 4.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package network
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "time"
  6. "github.com/9seconds/mtg/v2/essentials"
  7. )
  8. const (
  9. circuitBreakerStateClosed uint32 = iota
  10. circuitBreakerStateHalfOpened
  11. circuitBreakerStateOpened
  12. )
  13. type circuitBreakerDialer struct {
  14. Dialer
  15. stateMutexChan chan bool
  16. halfOpenTimer *time.Timer
  17. failuresCleanupTimer *time.Timer
  18. state uint32
  19. halfOpenAttempts uint32
  20. failuresCount uint32
  21. openThreshold uint32
  22. halfOpenTimeout time.Duration
  23. resetFailuresTimeout time.Duration
  24. }
  25. func (c *circuitBreakerDialer) Dial(network, address string) (essentials.Conn, error) {
  26. return c.DialContext(context.Background(), network, address)
  27. }
  28. func (c *circuitBreakerDialer) DialContext(ctx context.Context,
  29. network, address string) (essentials.Conn, error) {
  30. switch atomic.LoadUint32(&c.state) {
  31. case circuitBreakerStateClosed:
  32. return c.doClosed(ctx, network, address)
  33. case circuitBreakerStateHalfOpened:
  34. return c.doHalfOpened(ctx, network, address)
  35. default:
  36. return nil, ErrCircuitBreakerOpened
  37. }
  38. }
  39. func (c *circuitBreakerDialer) doClosed(ctx context.Context,
  40. network, address string) (essentials.Conn, error) {
  41. conn, err := c.Dialer.DialContext(ctx, network, address)
  42. select {
  43. case <-ctx.Done():
  44. if conn != nil {
  45. conn.Close()
  46. }
  47. return nil, ctx.Err() // nolint: wrapcheck
  48. case c.stateMutexChan <- true:
  49. defer func() {
  50. <-c.stateMutexChan
  51. }()
  52. }
  53. if err == nil {
  54. c.switchState(circuitBreakerStateClosed)
  55. return conn, nil
  56. }
  57. c.failuresCount++
  58. if c.state == circuitBreakerStateClosed && c.failuresCount >= c.openThreshold {
  59. c.switchState(circuitBreakerStateOpened)
  60. }
  61. return conn, err // nolint: wrapcheck
  62. }
  63. func (c *circuitBreakerDialer) doHalfOpened(ctx context.Context,
  64. network, address string) (essentials.Conn, error) {
  65. if !atomic.CompareAndSwapUint32(&c.halfOpenAttempts, 0, 1) {
  66. return nil, ErrCircuitBreakerOpened
  67. }
  68. conn, err := c.Dialer.DialContext(ctx, network, address)
  69. select {
  70. case <-ctx.Done():
  71. if conn != nil {
  72. conn.Close()
  73. }
  74. return nil, ctx.Err() // nolint: wrapcheck
  75. case c.stateMutexChan <- true:
  76. defer func() {
  77. <-c.stateMutexChan
  78. }()
  79. }
  80. if c.state != circuitBreakerStateHalfOpened {
  81. return conn, err // nolint: wrapcheck
  82. }
  83. if err == nil {
  84. c.switchState(circuitBreakerStateClosed)
  85. } else {
  86. c.switchState(circuitBreakerStateOpened)
  87. }
  88. return conn, err // nolint: wrapcheck
  89. }
  90. func (c *circuitBreakerDialer) switchState(state uint32) {
  91. switch state {
  92. case circuitBreakerStateClosed:
  93. c.stopTimer(&c.halfOpenTimer)
  94. c.ensureTimer(&c.failuresCleanupTimer, c.resetFailuresTimeout, c.resetFailures)
  95. case circuitBreakerStateHalfOpened:
  96. c.stopTimer(&c.failuresCleanupTimer)
  97. c.stopTimer(&c.halfOpenTimer)
  98. case circuitBreakerStateOpened:
  99. c.stopTimer(&c.failuresCleanupTimer)
  100. c.ensureTimer(&c.halfOpenTimer, c.halfOpenTimeout, c.tryHalfOpen)
  101. }
  102. c.failuresCount = 0
  103. atomic.StoreUint32(&c.halfOpenAttempts, 0)
  104. atomic.StoreUint32(&c.state, state)
  105. }
  106. func (c *circuitBreakerDialer) resetFailures() {
  107. c.stateMutexChan <- true
  108. defer func() {
  109. <-c.stateMutexChan
  110. }()
  111. c.stopTimer(&c.failuresCleanupTimer)
  112. if c.state == circuitBreakerStateClosed {
  113. c.switchState(circuitBreakerStateClosed)
  114. }
  115. }
  116. func (c *circuitBreakerDialer) tryHalfOpen() {
  117. c.stateMutexChan <- true
  118. defer func() {
  119. <-c.stateMutexChan
  120. }()
  121. if c.state == circuitBreakerStateOpened {
  122. c.switchState(circuitBreakerStateHalfOpened)
  123. }
  124. }
  125. func (c *circuitBreakerDialer) stopTimer(timerRef **time.Timer) {
  126. timer := *timerRef
  127. if timer == nil {
  128. return
  129. }
  130. timer.Stop()
  131. select {
  132. case <-timer.C:
  133. default:
  134. }
  135. *timerRef = nil
  136. }
  137. func (c *circuitBreakerDialer) ensureTimer(timerRef **time.Timer,
  138. timeout time.Duration, callback func()) {
  139. if *timerRef == nil {
  140. *timerRef = time.AfterFunc(timeout, callback)
  141. }
  142. }
  143. func newCircuitBreakerDialer(baseDialer Dialer,
  144. openThreshold uint32, halfOpenTimeout, resetFailuresTimeout time.Duration) Dialer {
  145. cb := &circuitBreakerDialer{
  146. Dialer: baseDialer,
  147. stateMutexChan: make(chan bool, 1),
  148. openThreshold: openThreshold,
  149. halfOpenTimeout: halfOpenTimeout,
  150. resetFailuresTimeout: resetFailuresTimeout,
  151. }
  152. cb.stateMutexChan <- true // to convince race detector we are good
  153. cb.switchState(circuitBreakerStateClosed)
  154. <-cb.stateMutexChan
  155. return cb
  156. }