Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

ganger.go 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package doppel
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/9seconds/mtg/v2/essentials"
  7. )
  8. const (
  9. DoppelGangerMaxDurations = 4096
  10. DoppelGangerScoutRaidEach = 30 * time.Minute
  11. DoppelGangerScoutRepeats = 10
  12. )
  13. type gangerConnRequest struct {
  14. ret chan<- Conn
  15. payload essentials.Conn
  16. }
  17. type Ganger struct {
  18. ctx context.Context
  19. ctxCancel context.CancelFunc
  20. logger Logger
  21. wg sync.WaitGroup
  22. scout Scout
  23. scoutRaidEach time.Duration
  24. scoutRaidRepeats int
  25. stats *Stats
  26. durations []time.Duration
  27. connRequests chan gangerConnRequest
  28. }
  29. func (g *Ganger) Shutdown() {
  30. g.ctxCancel()
  31. g.wg.Wait()
  32. }
  33. func (g *Ganger) Run() {
  34. g.wg.Go(func() {
  35. g.run()
  36. })
  37. }
  38. func (g *Ganger) NewConn(conn essentials.Conn) (Conn, error) {
  39. rvChan := make(chan Conn)
  40. req := gangerConnRequest{
  41. ret: rvChan,
  42. payload: conn,
  43. }
  44. defer close(req.ret)
  45. select {
  46. case <-g.ctx.Done():
  47. return Conn{}, context.Cause(g.ctx)
  48. case g.connRequests <- req:
  49. }
  50. select {
  51. case <-g.ctx.Done():
  52. return Conn{}, context.Cause(g.ctx)
  53. case conn := <-rvChan:
  54. return conn, nil
  55. }
  56. }
  57. func (g *Ganger) run() {
  58. scoutTicker := time.NewTicker(g.scoutRaidEach)
  59. defer func() {
  60. scoutTicker.Stop()
  61. select {
  62. case <-scoutTicker.C:
  63. default:
  64. }
  65. }()
  66. scoutCollectedChan := make(chan []time.Duration)
  67. currentScoutCollectedChan := scoutCollectedChan
  68. updatedStatsChan := make(chan *Stats)
  69. g.wg.Go(func() {
  70. g.runScoutRaid(scoutCollectedChan)
  71. })
  72. for {
  73. select {
  74. case <-g.ctx.Done():
  75. return
  76. case durations := <-currentScoutCollectedChan:
  77. g.durations = append(g.durations, durations...)
  78. if len(g.durations) > DoppelGangerMaxDurations {
  79. g.durations = g.durations[len(g.durations)-DoppelGangerMaxDurations:]
  80. }
  81. currentScoutCollectedChan = nil
  82. g.wg.Go(func() {
  83. select {
  84. case <-g.ctx.Done():
  85. case updatedStatsChan <- NewStats(durations):
  86. }
  87. })
  88. case stats := <-updatedStatsChan:
  89. g.stats = stats
  90. currentScoutCollectedChan = scoutCollectedChan
  91. case <-scoutTicker.C:
  92. g.wg.Go(func() {
  93. g.runScoutRaid(scoutCollectedChan)
  94. })
  95. case req := <-g.connRequests:
  96. select {
  97. case <-g.ctx.Done():
  98. case req.ret <- NewConn(g.ctx, req.payload, g.stats):
  99. }
  100. }
  101. }
  102. }
  103. func (g *Ganger) runScoutRaid(rvChan chan<- []time.Duration) {
  104. durations := []time.Duration{}
  105. for range g.scoutRaidRepeats {
  106. learned, err := g.scout.Learn(g.ctx)
  107. if err != nil {
  108. g.logger.WarningError("cannot learn", err)
  109. continue
  110. }
  111. durations = append(durations, learned...)
  112. }
  113. select {
  114. case <-g.ctx.Done():
  115. return
  116. case rvChan <- durations:
  117. }
  118. }
  119. func NewGanger(
  120. ctx context.Context,
  121. network Network,
  122. logger Logger,
  123. scoutEach time.Duration,
  124. scoutRepeats int,
  125. urls []string,
  126. ) *Ganger {
  127. ctx, cancel := context.WithCancel(ctx)
  128. if scoutEach == 0 {
  129. scoutEach = DoppelGangerScoutRaidEach
  130. }
  131. if scoutRepeats == 0 {
  132. scoutRepeats = DoppelGangerScoutRepeats
  133. }
  134. return &Ganger{
  135. ctx: ctx,
  136. ctxCancel: cancel,
  137. logger: logger,
  138. scoutRaidEach: scoutEach,
  139. scoutRaidRepeats: scoutRepeats,
  140. stats: &Stats{
  141. k: StatsDefaultK,
  142. lambda: StatsDefaultLambda,
  143. },
  144. scout: NewScout(network, urls),
  145. connRequests: make(chan gangerConnRequest),
  146. }
  147. }