Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

ganger.go 3.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package doppel
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/9seconds/mtg/v2/essentials"
  7. )
  8. const (
  9. DoppelGangerMaxDurations = 4096
  10. DoppelGangerScoutRaidEach = 6 * time.Hour
  11. DoppelGangerScoutRepeats = 10
  12. )
  13. type gangerConnRequest struct {
  14. ret chan<- Conn
  15. payload essentials.Conn
  16. }
  17. type Ganger struct {
  18. ctx context.Context
  19. ctxCancel context.CancelFunc
  20. logger Logger
  21. wg sync.WaitGroup
  22. scout Scout
  23. scoutRaidEach time.Duration
  24. scoutRaidRepeats int
  25. drs bool
  26. stats *Stats
  27. durations []time.Duration
  28. connRequests chan gangerConnRequest
  29. }
  30. func (g *Ganger) Shutdown() {
  31. g.ctxCancel()
  32. g.wg.Wait()
  33. }
  34. func (g *Ganger) Run() {
  35. g.wg.Go(func() {
  36. g.run()
  37. })
  38. }
  39. func (g *Ganger) NewConn(conn essentials.Conn) (Conn, error) {
  40. rvChan := make(chan Conn)
  41. req := gangerConnRequest{
  42. ret: rvChan,
  43. payload: conn,
  44. }
  45. defer close(req.ret)
  46. select {
  47. case <-g.ctx.Done():
  48. return Conn{}, context.Cause(g.ctx)
  49. case g.connRequests <- req:
  50. }
  51. select {
  52. case <-g.ctx.Done():
  53. return Conn{}, context.Cause(g.ctx)
  54. case conn := <-rvChan:
  55. return conn, nil
  56. }
  57. }
  58. func (g *Ganger) run() {
  59. scoutTicker := time.NewTicker(g.scoutRaidEach)
  60. defer func() {
  61. scoutTicker.Stop()
  62. select {
  63. case <-scoutTicker.C:
  64. default:
  65. }
  66. }()
  67. scoutCollectedChan := make(chan []time.Duration)
  68. currentScoutCollectedChan := scoutCollectedChan
  69. updatedStatsChan := make(chan *Stats)
  70. g.wg.Go(func() {
  71. g.runScoutRaid(scoutCollectedChan)
  72. })
  73. for {
  74. select {
  75. case <-g.ctx.Done():
  76. return
  77. case durations := <-currentScoutCollectedChan:
  78. g.durations = append(g.durations, durations...)
  79. if len(g.durations) > DoppelGangerMaxDurations {
  80. g.durations = g.durations[len(g.durations)-DoppelGangerMaxDurations:]
  81. }
  82. if len(g.durations) < MinDurationsToCalculate {
  83. continue
  84. }
  85. currentScoutCollectedChan = nil
  86. g.wg.Go(func() {
  87. select {
  88. case <-g.ctx.Done():
  89. case updatedStatsChan <- NewStats(durations, g.drs):
  90. }
  91. })
  92. case stats := <-updatedStatsChan:
  93. g.stats = stats
  94. currentScoutCollectedChan = scoutCollectedChan
  95. case <-scoutTicker.C:
  96. g.wg.Go(func() {
  97. g.runScoutRaid(scoutCollectedChan)
  98. })
  99. case req := <-g.connRequests:
  100. select {
  101. case <-g.ctx.Done():
  102. case req.ret <- NewConn(g.ctx, req.payload, g.stats):
  103. }
  104. }
  105. }
  106. }
  107. func (g *Ganger) runScoutRaid(rvChan chan<- []time.Duration) {
  108. durations := []time.Duration{}
  109. for range g.scoutRaidRepeats {
  110. learned, err := g.scout.Learn(g.ctx)
  111. if err != nil {
  112. g.logger.WarningError("cannot learn", err)
  113. continue
  114. }
  115. durations = append(durations, learned...)
  116. }
  117. select {
  118. case <-g.ctx.Done():
  119. return
  120. case rvChan <- durations:
  121. }
  122. }
  123. func NewGanger(
  124. ctx context.Context,
  125. network Network,
  126. logger Logger,
  127. scoutEach time.Duration,
  128. scoutRepeats int,
  129. urls []string,
  130. drs bool,
  131. ) *Ganger {
  132. ctx, cancel := context.WithCancel(ctx)
  133. if scoutEach == 0 {
  134. scoutEach = DoppelGangerScoutRaidEach
  135. }
  136. if scoutRepeats == 0 {
  137. scoutRepeats = DoppelGangerScoutRepeats
  138. }
  139. return &Ganger{
  140. ctx: ctx,
  141. ctxCancel: cancel,
  142. logger: logger,
  143. scoutRaidEach: scoutEach,
  144. scoutRaidRepeats: scoutRepeats,
  145. drs: drs,
  146. stats: &Stats{
  147. k: StatsDefaultK,
  148. lambda: StatsDefaultLambda,
  149. drs: drs,
  150. },
  151. scout: NewScout(network, urls),
  152. connRequests: make(chan gangerConnRequest),
  153. }
  154. }