Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package mtglib
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/9seconds/mtg/v2/mtglib/internal/obfuscated2"
  10. "github.com/9seconds/mtg/v2/mtglib/internal/relay"
  11. "github.com/9seconds/mtg/v2/mtglib/internal/telegram"
  12. "github.com/panjf2000/ants/v2"
  13. )
  14. type Proxy struct {
  15. ctx context.Context
  16. ctxCancel context.CancelFunc
  17. streamWaitGroup sync.WaitGroup
  18. idleTimeout time.Duration
  19. bufferSize int
  20. workerPool *ants.PoolWithFunc
  21. telegram *telegram.Telegram
  22. secret Secret
  23. antiReplayCache AntiReplayCache
  24. ipBlocklist IPBlocklist
  25. eventStream EventStream
  26. logger Logger
  27. }
  28. func (p *Proxy) ServeConn(conn net.Conn) {
  29. ctx := newStreamContext(p.ctx, p.logger, conn)
  30. defer ctx.Close()
  31. go func() {
  32. <-ctx.Done()
  33. ctx.Close()
  34. }()
  35. p.eventStream.Send(ctx, EventStart{
  36. CreatedAt: time.Now(),
  37. ConnID: ctx.connID,
  38. RemoteIP: ctx.ClientIP(),
  39. })
  40. ctx.logger.Info("Stream has been started")
  41. defer func() {
  42. p.eventStream.Send(ctx, EventFinish{
  43. CreatedAt: time.Now(),
  44. ConnID: ctx.connID,
  45. })
  46. ctx.logger.Info("Stream has been finished")
  47. }()
  48. if err := p.doObfuscated2Handshake(ctx); err != nil {
  49. p.logger.InfoError("obfuscated2 handshake is failed", err)
  50. return
  51. }
  52. if err := p.doTelegramCall(ctx); err != nil {
  53. p.logger.WarningError("cannot dial to telegram", err)
  54. return
  55. }
  56. rel := relay.AcquireRelay(ctx, p.logger.Named("relay"), p.bufferSize, p.idleTimeout)
  57. defer relay.ReleaseRelay(rel)
  58. if err := rel.Process(ctx.clientConn, ctx.telegramConn); err != nil {
  59. p.logger.DebugError("relay has been finished", err)
  60. }
  61. }
  62. func (p *Proxy) Serve(listener net.Listener) error {
  63. for {
  64. conn, err := listener.Accept()
  65. if err != nil {
  66. return fmt.Errorf("cannot accept a new connection: %w", err)
  67. }
  68. if addr := conn.RemoteAddr().(*net.TCPAddr).IP; p.ipBlocklist.Contains(addr) {
  69. conn.Close()
  70. p.eventStream.Send(p.ctx, EventIPBlocklisted{
  71. CreatedAt: time.Now(),
  72. RemoteIP: addr,
  73. })
  74. continue
  75. }
  76. err = p.workerPool.Invoke(conn)
  77. switch {
  78. case err == nil:
  79. case errors.Is(err, ants.ErrPoolClosed):
  80. return nil
  81. case errors.Is(err, ants.ErrPoolOverload):
  82. p.eventStream.Send(p.ctx, EventConcurrencyLimited{
  83. CreatedAt: time.Now(),
  84. })
  85. }
  86. }
  87. }
  88. func (p *Proxy) Shutdown() {
  89. p.ctxCancel()
  90. p.streamWaitGroup.Wait()
  91. p.workerPool.Release()
  92. }
  93. func (p *Proxy) doObfuscated2Handshake(ctx *streamContext) error {
  94. dc, encryptor, decryptor, err := obfuscated2.ClientHandshake(p.secret.Key[:], ctx.clientConn)
  95. if err != nil {
  96. return fmt.Errorf("cannot process client handshake: %w", err)
  97. }
  98. ctx.dc = dc
  99. ctx.logger = ctx.logger.BindInt("dc", dc)
  100. ctx.clientConn = &obfuscated2.Conn{
  101. Conn: ctx.clientConn,
  102. Encryptor: encryptor,
  103. Decryptor: decryptor,
  104. }
  105. return nil
  106. }
  107. func (p *Proxy) doTelegramCall(ctx *streamContext) error {
  108. conn, err := p.telegram.Dial(ctx, ctx.dc)
  109. if err != nil {
  110. return fmt.Errorf("cannot dial to Telegram: %w", err)
  111. }
  112. encryptor, decryptor, err := obfuscated2.ServerHandshake(conn)
  113. if err != nil {
  114. conn.Close()
  115. return fmt.Errorf("cannot perform obfuscated2 handshake: %w", err)
  116. }
  117. ctx.telegramConn = &obfuscated2.Conn{
  118. Conn: connTelegramTraffic{
  119. Conn: conn,
  120. connID: ctx.connID,
  121. stream: p.eventStream,
  122. ctx: ctx,
  123. },
  124. Encryptor: encryptor,
  125. Decryptor: decryptor,
  126. }
  127. p.eventStream.Send(ctx, EventConnectedToDC{
  128. CreatedAt: time.Now(),
  129. ConnID: ctx.connID,
  130. RemoteIP: conn.RemoteAddr().(*net.TCPAddr).IP,
  131. DC: ctx.dc,
  132. })
  133. return nil
  134. }
  135. func NewProxy(opts ProxyOpts) (*Proxy, error) { // nolint: cyclop
  136. switch {
  137. case opts.Network == nil:
  138. return nil, ErrNetworkIsNotDefined
  139. case opts.AntiReplayCache == nil:
  140. return nil, ErrAntiReplayCacheIsNotDefined
  141. case opts.IPBlocklist == nil:
  142. return nil, ErrIPBlocklistIsNotDefined
  143. case opts.EventStream == nil:
  144. return nil, ErrEventStreamIsNotDefined
  145. case opts.Logger == nil:
  146. return nil, ErrLoggerIsNotDefined
  147. case !opts.Secret.Valid():
  148. return nil, ErrSecretInvalid
  149. }
  150. tg, err := telegram.New(opts.Network, opts.PreferIP)
  151. if err != nil {
  152. return nil, fmt.Errorf("cannot build telegram dialer: %w", err)
  153. }
  154. concurrency := opts.Concurrency
  155. if concurrency == 0 {
  156. concurrency = DefaultConcurrency
  157. }
  158. idleTimeout := opts.IdleTimeout
  159. if idleTimeout < 1 {
  160. idleTimeout = DefaultIdleTimeout
  161. }
  162. bufferSize := opts.BufferSize
  163. if bufferSize < 1 {
  164. bufferSize = DefaultBufferSize
  165. }
  166. ctx, cancel := context.WithCancel(context.Background())
  167. proxy := &Proxy{
  168. ctx: ctx,
  169. ctxCancel: cancel,
  170. secret: opts.Secret,
  171. antiReplayCache: opts.AntiReplayCache,
  172. ipBlocklist: opts.IPBlocklist,
  173. eventStream: opts.EventStream,
  174. logger: opts.Logger.Named("proxy"),
  175. idleTimeout: idleTimeout,
  176. bufferSize: int(bufferSize),
  177. telegram: tg,
  178. }
  179. pool, err := ants.NewPoolWithFunc(int(concurrency), func(arg interface{}) {
  180. proxy.ServeConn(arg.(net.Conn))
  181. }, ants.WithLogger(opts.Logger.Named("ants")))
  182. if err != nil {
  183. return nil, fmt.Errorf("cannot initialize a pool: %w", err)
  184. }
  185. proxy.workerPool = pool
  186. return proxy, nil
  187. }