Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

circuit_breaker.go 4.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package network
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "time"
  6. "github.com/9seconds/mtg/v2/essentials"
  7. )
  8. const (
  9. circuitBreakerStateClosed uint32 = iota
  10. circuitBreakerStateHalfOpened
  11. circuitBreakerStateOpened
  12. )
  13. type circuitBreakerDialer struct {
  14. Dialer
  15. stateMutexChan chan bool
  16. halfOpenTimer *time.Timer
  17. failuresCleanupTimer *time.Timer
  18. state uint32
  19. halfOpenAttempts uint32
  20. failuresCount uint32
  21. openThreshold uint32
  22. halfOpenTimeout time.Duration
  23. resetFailuresTimeout time.Duration
  24. }
  25. func (c *circuitBreakerDialer) Dial(network, address string) (essentials.Conn, error) {
  26. return c.DialContext(context.Background(), network, address)
  27. }
  28. func (c *circuitBreakerDialer) DialContext(ctx context.Context,
  29. network, address string,
  30. ) (essentials.Conn, error) {
  31. switch atomic.LoadUint32(&c.state) {
  32. case circuitBreakerStateClosed:
  33. return c.doClosed(ctx, network, address)
  34. case circuitBreakerStateHalfOpened:
  35. return c.doHalfOpened(ctx, network, address)
  36. default:
  37. return nil, ErrCircuitBreakerOpened
  38. }
  39. }
  40. func (c *circuitBreakerDialer) doClosed(ctx context.Context,
  41. network, address string,
  42. ) (essentials.Conn, error) {
  43. conn, err := c.Dialer.DialContext(ctx, network, address)
  44. select {
  45. case <-ctx.Done():
  46. if conn != nil {
  47. conn.Close()
  48. }
  49. return nil, ctx.Err() //nolint: wrapcheck
  50. case c.stateMutexChan <- true:
  51. defer func() {
  52. <-c.stateMutexChan
  53. }()
  54. }
  55. if err == nil {
  56. c.switchState(circuitBreakerStateClosed)
  57. return conn, nil
  58. }
  59. c.failuresCount++
  60. if c.state == circuitBreakerStateClosed && c.failuresCount >= c.openThreshold {
  61. c.switchState(circuitBreakerStateOpened)
  62. }
  63. return conn, err //nolint: wrapcheck
  64. }
  65. func (c *circuitBreakerDialer) doHalfOpened(ctx context.Context,
  66. network, address string,
  67. ) (essentials.Conn, error) {
  68. if !atomic.CompareAndSwapUint32(&c.halfOpenAttempts, 0, 1) {
  69. return nil, ErrCircuitBreakerOpened
  70. }
  71. conn, err := c.Dialer.DialContext(ctx, network, address)
  72. select {
  73. case <-ctx.Done():
  74. if conn != nil {
  75. conn.Close()
  76. }
  77. return nil, ctx.Err() //nolint: wrapcheck
  78. case c.stateMutexChan <- true:
  79. defer func() {
  80. <-c.stateMutexChan
  81. }()
  82. }
  83. if c.state != circuitBreakerStateHalfOpened {
  84. return conn, err //nolint: wrapcheck
  85. }
  86. if err == nil {
  87. c.switchState(circuitBreakerStateClosed)
  88. } else {
  89. c.switchState(circuitBreakerStateOpened)
  90. }
  91. return conn, err //nolint: wrapcheck
  92. }
  93. func (c *circuitBreakerDialer) switchState(state uint32) {
  94. switch state {
  95. case circuitBreakerStateClosed:
  96. c.stopTimer(&c.halfOpenTimer)
  97. c.ensureTimer(&c.failuresCleanupTimer, c.resetFailuresTimeout, c.resetFailures)
  98. case circuitBreakerStateHalfOpened:
  99. c.stopTimer(&c.failuresCleanupTimer)
  100. c.stopTimer(&c.halfOpenTimer)
  101. case circuitBreakerStateOpened:
  102. c.stopTimer(&c.failuresCleanupTimer)
  103. c.ensureTimer(&c.halfOpenTimer, c.halfOpenTimeout, c.tryHalfOpen)
  104. }
  105. c.failuresCount = 0
  106. atomic.StoreUint32(&c.halfOpenAttempts, 0)
  107. atomic.StoreUint32(&c.state, state)
  108. }
  109. func (c *circuitBreakerDialer) resetFailures() {
  110. c.stateMutexChan <- true
  111. defer func() {
  112. <-c.stateMutexChan
  113. }()
  114. c.stopTimer(&c.failuresCleanupTimer)
  115. if c.state == circuitBreakerStateClosed {
  116. c.switchState(circuitBreakerStateClosed)
  117. }
  118. }
  119. func (c *circuitBreakerDialer) tryHalfOpen() {
  120. c.stateMutexChan <- true
  121. defer func() {
  122. <-c.stateMutexChan
  123. }()
  124. if c.state == circuitBreakerStateOpened {
  125. c.switchState(circuitBreakerStateHalfOpened)
  126. }
  127. }
  128. func (c *circuitBreakerDialer) stopTimer(timerRef **time.Timer) {
  129. timer := *timerRef
  130. if timer == nil {
  131. return
  132. }
  133. timer.Stop()
  134. select {
  135. case <-timer.C:
  136. default:
  137. }
  138. *timerRef = nil
  139. }
  140. func (c *circuitBreakerDialer) ensureTimer(timerRef **time.Timer,
  141. timeout time.Duration, callback func(),
  142. ) {
  143. if *timerRef == nil {
  144. *timerRef = time.AfterFunc(timeout, callback)
  145. }
  146. }
  147. func newCircuitBreakerDialer(baseDialer Dialer,
  148. openThreshold uint32, halfOpenTimeout, resetFailuresTimeout time.Duration,
  149. ) Dialer {
  150. cb := &circuitBreakerDialer{
  151. Dialer: baseDialer,
  152. stateMutexChan: make(chan bool, 1),
  153. openThreshold: openThreshold,
  154. halfOpenTimeout: halfOpenTimeout,
  155. resetFailuresTimeout: resetFailuresTimeout,
  156. }
  157. cb.stateMutexChan <- true // to convince race detector we are good
  158. cb.switchState(circuitBreakerStateClosed)
  159. <-cb.stateMutexChan
  160. return cb
  161. }