Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

circuit_breaker.go 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package network
  2. import (
  3. "context"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. )
  8. const (
  9. circuitBreakerStateClosed uint32 = iota
  10. circuitBreakerStateHalfOpened
  11. circuitBreakerStateOpened
  12. )
  13. type circuitBreakerDialer struct {
  14. Dialer
  15. stateMutexChan chan bool
  16. halfOpenTimer *time.Timer
  17. failuresCleanupTimer *time.Timer
  18. state uint32
  19. halfOpenAttempts uint32
  20. failuresCount uint32
  21. openThreshold uint32
  22. halfOpenTimeout time.Duration
  23. resetFailuresTimeout time.Duration
  24. }
  25. func (c *circuitBreakerDialer) Dial(network, address string) (net.Conn, error) {
  26. return c.DialContext(context.Background(), network, address)
  27. }
  28. func (c *circuitBreakerDialer) DialContext(ctx context.Context,
  29. network, address string) (net.Conn, error) {
  30. switch atomic.LoadUint32(&c.state) {
  31. case circuitBreakerStateClosed:
  32. return c.doClosed(ctx, network, address)
  33. case circuitBreakerStateHalfOpened:
  34. return c.doHalfOpened(ctx, network, address)
  35. default:
  36. return nil, ErrCircuitBreakerOpened
  37. }
  38. }
  39. func (c *circuitBreakerDialer) doClosed(ctx context.Context,
  40. network, address string) (net.Conn, error) {
  41. conn, err := c.Dialer.DialContext(ctx, network, address)
  42. select {
  43. case <-ctx.Done():
  44. if conn != nil {
  45. conn.Close()
  46. }
  47. return nil, ctx.Err()
  48. case c.stateMutexChan <- true:
  49. defer func() {
  50. <-c.stateMutexChan
  51. }()
  52. }
  53. if err == nil {
  54. c.switchState(circuitBreakerStateClosed)
  55. return conn, err // nolint: wrapcheck
  56. }
  57. c.failuresCount++
  58. if c.state == circuitBreakerStateClosed && c.failuresCount >= c.openThreshold {
  59. c.switchState(circuitBreakerStateOpened)
  60. }
  61. return conn, err // nolint: wrapcheck
  62. }
  63. func (c *circuitBreakerDialer) doHalfOpened(ctx context.Context, network, address string) (net.Conn, error) {
  64. if !atomic.CompareAndSwapUint32(&c.halfOpenAttempts, 0, 1) {
  65. return nil, ErrCircuitBreakerOpened
  66. }
  67. conn, err := c.Dialer.DialContext(ctx, network, address)
  68. select {
  69. case <-ctx.Done():
  70. if conn != nil {
  71. conn.Close()
  72. }
  73. return nil, ctx.Err()
  74. case c.stateMutexChan <- true:
  75. defer func() {
  76. <-c.stateMutexChan
  77. }()
  78. }
  79. if c.state != circuitBreakerStateHalfOpened {
  80. return conn, err // nolint: wrapcheck
  81. }
  82. if err == nil {
  83. c.switchState(circuitBreakerStateClosed)
  84. } else {
  85. c.switchState(circuitBreakerStateOpened)
  86. }
  87. return conn, err // nolint: wrapcheck
  88. }
  89. func (c *circuitBreakerDialer) switchState(state uint32) {
  90. switch state {
  91. case circuitBreakerStateClosed:
  92. c.stopTimer(&c.halfOpenTimer)
  93. c.ensureTimer(&c.failuresCleanupTimer, c.resetFailuresTimeout, c.resetFailures)
  94. case circuitBreakerStateHalfOpened:
  95. c.stopTimer(&c.failuresCleanupTimer)
  96. c.stopTimer(&c.halfOpenTimer)
  97. case circuitBreakerStateOpened:
  98. c.stopTimer(&c.failuresCleanupTimer)
  99. c.ensureTimer(&c.halfOpenTimer, c.halfOpenTimeout, c.tryHalfOpen)
  100. }
  101. c.failuresCount = 0
  102. atomic.StoreUint32(&c.halfOpenAttempts, 0)
  103. atomic.StoreUint32(&c.state, state)
  104. }
  105. func (c *circuitBreakerDialer) resetFailures() {
  106. c.stateMutexChan <- true
  107. defer func() {
  108. <-c.stateMutexChan
  109. }()
  110. c.stopTimer(&c.failuresCleanupTimer)
  111. if c.state == circuitBreakerStateClosed {
  112. c.switchState(circuitBreakerStateClosed)
  113. }
  114. }
  115. func (c *circuitBreakerDialer) tryHalfOpen() {
  116. c.stateMutexChan <- true
  117. defer func() {
  118. <-c.stateMutexChan
  119. }()
  120. if c.state == circuitBreakerStateOpened {
  121. c.switchState(circuitBreakerStateHalfOpened)
  122. }
  123. }
  124. func (c *circuitBreakerDialer) stopTimer(timerRef **time.Timer) {
  125. timer := *timerRef
  126. if timer == nil {
  127. return
  128. }
  129. timer.Stop()
  130. select {
  131. case <-timer.C:
  132. default:
  133. }
  134. *timerRef = nil
  135. }
  136. func (c *circuitBreakerDialer) ensureTimer(timerRef **time.Timer,
  137. timeout time.Duration, callback func()) {
  138. if *timerRef == nil {
  139. *timerRef = time.AfterFunc(timeout, callback)
  140. }
  141. }
  142. func newCircuitBreakerDialer(baseDialer Dialer,
  143. openThreshold uint32, halfOpenTimeout, resetFailuresTimeout time.Duration) Dialer {
  144. cb := &circuitBreakerDialer{
  145. Dialer: baseDialer,
  146. stateMutexChan: make(chan bool, 1),
  147. openThreshold: openThreshold,
  148. halfOpenTimeout: halfOpenTimeout,
  149. resetFailuresTimeout: resetFailuresTimeout,
  150. }
  151. cb.stateMutexChan <- true // to convince race detector we are good
  152. cb.switchState(circuitBreakerStateClosed)
  153. <-cb.stateMutexChan
  154. return cb
  155. }