Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. package mtglib
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/9seconds/mtg/v2/essentials"
  11. "github.com/9seconds/mtg/v2/mtglib/internal/dc"
  12. "github.com/9seconds/mtg/v2/mtglib/internal/doppel"
  13. "github.com/9seconds/mtg/v2/mtglib/internal/obfuscation"
  14. "github.com/9seconds/mtg/v2/mtglib/internal/relay"
  15. "github.com/9seconds/mtg/v2/mtglib/internal/tls"
  16. "github.com/9seconds/mtg/v2/mtglib/internal/tls/fake"
  17. "github.com/panjf2000/ants/v2"
  18. )
  19. // Proxy is an MTPROTO proxy structure.
  20. type Proxy struct {
  21. ctx context.Context
  22. ctxCancel context.CancelFunc
  23. streamWaitGroup sync.WaitGroup
  24. allowFallbackOnUnknownDC bool
  25. tolerateTimeSkewness time.Duration
  26. idleTimeout time.Duration
  27. handshakeTimeout time.Duration
  28. domainFrontingPort int
  29. domainFrontingHost string
  30. domainFrontingProxyProtocol bool
  31. workerPool *ants.PoolWithFunc
  32. telegram *dc.Telegram
  33. configUpdater *dc.PublicConfigUpdater
  34. doppelGanger *doppel.Ganger
  35. clientObfuscatror obfuscation.Obfuscator
  36. secret Secret
  37. network Network
  38. antiReplayCache AntiReplayCache
  39. blocklist IPBlocklist
  40. allowlist IPBlocklist
  41. eventStream EventStream
  42. logger Logger
  43. }
  44. // DomainFrontingAddress returns a host:port pair for a fronting domain.
  45. // If a fronting host (literal IP or hostname) is configured, it is used
  46. // instead of resolving the secret's hostname.
  47. func (p *Proxy) DomainFrontingAddress() string {
  48. host := p.secret.Host
  49. if p.domainFrontingHost != "" {
  50. host = p.domainFrontingHost
  51. }
  52. return net.JoinHostPort(host, strconv.Itoa(p.domainFrontingPort))
  53. }
  54. // ServeConn serves a connection. We do not check IP blocklist and concurrency
  55. // limit here.
  56. func (p *Proxy) ServeConn(conn essentials.Conn) {
  57. p.streamWaitGroup.Add(1)
  58. defer p.streamWaitGroup.Done()
  59. ctx := newStreamContext(p.ctx, p.logger, conn)
  60. defer ctx.Close()
  61. if err := ctx.clientConn.SetDeadline(time.Now().Add(p.handshakeTimeout)); err != nil {
  62. ctx.logger.WarningError("cannot set handshake timeout", err)
  63. return
  64. }
  65. stop := context.AfterFunc(ctx, func() {
  66. ctx.Close()
  67. })
  68. defer stop()
  69. p.eventStream.Send(ctx, NewEventStart(ctx.streamID, ctx.ClientIP()))
  70. ctx.logger.Info("Stream has been started")
  71. defer func() {
  72. p.eventStream.Send(ctx, NewEventFinish(ctx.streamID))
  73. ctx.logger.Info("Stream has been finished")
  74. }()
  75. if !p.doFakeTLSHandshake(ctx) {
  76. return
  77. }
  78. clientConn, err := p.doppelGanger.NewConn(ctx.clientConn)
  79. if err != nil {
  80. ctx.logger.InfoError("cannot wrap into doppelganger connection", err)
  81. return
  82. }
  83. defer clientConn.Stop()
  84. ctx.clientConn = clientConn
  85. if err := p.doObfuscatedHandshake(ctx); err != nil {
  86. ctx.logger.InfoError("obfuscated handshake is failed", err)
  87. return
  88. }
  89. if err := ctx.clientConn.SetDeadline(time.Time{}); err != nil {
  90. ctx.logger.WarningError("cannot set deadline", err)
  91. return
  92. }
  93. if err := p.doTelegramCall(ctx); err != nil {
  94. ctx.logger.WarningError("cannot dial to telegram", err)
  95. return
  96. }
  97. tracker := newIdleTracker(p.idleTimeout)
  98. relay.Relay(
  99. ctx,
  100. ctx.logger.Named("relay"),
  101. connIdleTimeout{Conn: ctx.telegramConn, tracker: tracker},
  102. connIdleTimeout{Conn: ctx.clientConn, tracker: tracker},
  103. )
  104. }
  105. // Serve starts a proxy on a given listener.
  106. func (p *Proxy) Serve(listener net.Listener) error {
  107. p.streamWaitGroup.Add(1)
  108. defer p.streamWaitGroup.Done()
  109. for {
  110. conn, err := listener.Accept()
  111. if err != nil {
  112. select {
  113. case <-p.ctx.Done():
  114. return nil
  115. default:
  116. return fmt.Errorf("cannot accept a new connection: %w", err)
  117. }
  118. }
  119. ipAddr := conn.RemoteAddr().(*net.TCPAddr).IP //nolint: forcetypeassert
  120. logger := p.logger.BindStr("ip", ipAddr.String())
  121. if !p.allowlist.Contains(ipAddr) {
  122. conn.Close() //nolint: errcheck
  123. logger.Info("ip was rejected by allowlist")
  124. p.eventStream.Send(p.ctx, NewEventIPAllowlisted(ipAddr))
  125. continue
  126. }
  127. if p.blocklist.Contains(ipAddr) {
  128. conn.Close() //nolint: errcheck
  129. logger.Info("ip was blacklisted")
  130. p.eventStream.Send(p.ctx, NewEventIPBlocklisted(ipAddr))
  131. continue
  132. }
  133. err = p.workerPool.Invoke(conn)
  134. switch {
  135. case err == nil:
  136. case errors.Is(err, ants.ErrPoolClosed):
  137. return nil
  138. case errors.Is(err, ants.ErrPoolOverload):
  139. conn.Close() //nolint: errcheck
  140. logger.Info("connection was concurrency limited")
  141. p.eventStream.Send(p.ctx, NewEventConcurrencyLimited())
  142. }
  143. }
  144. }
  145. // Shutdown 'gracefully' shutdowns all connections. Please remember that it
  146. // does not close an underlying listener.
  147. func (p *Proxy) Shutdown() {
  148. p.ctxCancel()
  149. p.streamWaitGroup.Wait()
  150. p.workerPool.Release()
  151. p.configUpdater.Wait()
  152. p.doppelGanger.Shutdown()
  153. p.allowlist.Shutdown()
  154. p.blocklist.Shutdown()
  155. }
  156. func (p *Proxy) doFakeTLSHandshake(ctx *streamContext) bool {
  157. rewind := newConnRewind(ctx.clientConn)
  158. clientHello, err := fake.ReadClientHello(
  159. rewind,
  160. p.secret.Key[:],
  161. p.secret.Host,
  162. p.tolerateTimeSkewness,
  163. )
  164. if err != nil {
  165. p.logger.InfoError("cannot read client hello", err)
  166. p.doDomainFronting(ctx, rewind)
  167. return false
  168. }
  169. if p.antiReplayCache.SeenBefore(clientHello.SessionID) {
  170. p.logger.Warning("replay attack has been detected!")
  171. p.eventStream.Send(p.ctx, NewEventReplayAttack(ctx.streamID))
  172. p.doDomainFronting(ctx, rewind)
  173. return false
  174. }
  175. gangerNoise := p.doppelGanger.NoiseParams()
  176. noiseParams := fake.NoiseParams{Mean: gangerNoise.Mean, Jitter: gangerNoise.Jitter}
  177. if err := fake.SendServerHello(ctx.clientConn, p.secret.Key[:], clientHello, noiseParams); err != nil {
  178. p.logger.InfoError("cannot send welcome packet", err)
  179. return false
  180. }
  181. ctx.clientConn = tls.New(ctx.clientConn, true, false)
  182. return true
  183. }
  184. func (p *Proxy) doObfuscatedHandshake(ctx *streamContext) error {
  185. dc, conn, err := p.clientObfuscatror.ReadHandshake(ctx.clientConn)
  186. if err != nil {
  187. return fmt.Errorf("cannot process client handshake: %w", err)
  188. }
  189. ctx.dc = dc
  190. ctx.clientConn = conn
  191. ctx.logger = ctx.logger.BindInt("dc", dc)
  192. return nil
  193. }
  194. func (p *Proxy) doTelegramCall(ctx *streamContext) error {
  195. dcid := ctx.dc
  196. addresses := p.telegram.GetAddresses(dcid)
  197. if len(addresses) == 0 && p.allowFallbackOnUnknownDC {
  198. ctx.logger = ctx.logger.BindInt("original_dc", dcid)
  199. ctx.logger.Warning("unknown DC, fallbacks")
  200. ctx.dc = dc.DefaultDC
  201. addresses = p.telegram.GetAddresses(dc.DefaultDC)
  202. }
  203. var (
  204. conn essentials.Conn
  205. err error
  206. foundAddr dc.Addr
  207. )
  208. for _, addr := range addresses {
  209. conn, err = p.network.Dial(addr.Network, addr.Address)
  210. if err == nil {
  211. foundAddr = addr
  212. break
  213. }
  214. }
  215. if err != nil {
  216. return fmt.Errorf("no addresses to call: %w", err)
  217. }
  218. if conn == nil {
  219. return fmt.Errorf("no available addresses for DC %d", ctx.dc)
  220. }
  221. tgConn, err := foundAddr.Obfuscator.SendHandshake(conn, ctx.dc)
  222. if err != nil {
  223. conn.Close() // nolint: errcheck
  224. return fmt.Errorf("cannot perform server handshake: %w", err)
  225. }
  226. ctx.telegramConn = connTraffic{
  227. Conn: tgConn,
  228. streamID: ctx.streamID,
  229. stream: p.eventStream,
  230. ctx: ctx,
  231. }
  232. telegramHost, _, err := net.SplitHostPort(foundAddr.Address)
  233. if err != nil {
  234. conn.Close() //nolint: errcheck
  235. return fmt.Errorf("cannot parse telegram address %s: %w", foundAddr.Address, err)
  236. }
  237. p.eventStream.Send(ctx,
  238. NewEventConnectedToDC(ctx.streamID,
  239. net.ParseIP(telegramHost),
  240. ctx.dc),
  241. )
  242. return nil
  243. }
  244. func (p *Proxy) doDomainFronting(ctx *streamContext, conn *connRewind) {
  245. p.eventStream.Send(p.ctx, NewEventDomainFronting(ctx.streamID))
  246. conn.Rewind()
  247. nativeDialer := p.network.NativeDialer()
  248. fConn, err := nativeDialer.DialContext(ctx, "tcp", p.DomainFrontingAddress())
  249. if err != nil {
  250. p.logger.WarningError("cannot dial to the fronting domain", err)
  251. return
  252. }
  253. frontConn := essentials.WrapNetConn(fConn)
  254. if p.domainFrontingProxyProtocol {
  255. frontConn = newConnProxyProtocol(ctx.clientConn, frontConn)
  256. }
  257. frontConn = connTraffic{
  258. Conn: frontConn,
  259. ctx: ctx,
  260. streamID: ctx.streamID,
  261. stream: p.eventStream,
  262. }
  263. tracker := newIdleTracker(p.idleTimeout)
  264. relay.Relay(
  265. ctx,
  266. ctx.logger.Named("domain-fronting"),
  267. connIdleTimeout{Conn: frontConn, tracker: tracker},
  268. connIdleTimeout{Conn: conn, tracker: tracker},
  269. )
  270. }
  271. // NewProxy makes a new proxy instance.
  272. func NewProxy(opts ProxyOpts) (*Proxy, error) {
  273. if err := opts.valid(); err != nil {
  274. return nil, fmt.Errorf("invalid settings: %w", err)
  275. }
  276. tg, err := dc.New(opts.getPreferIP())
  277. if err != nil {
  278. return nil, fmt.Errorf("cannot build telegram dc fetcher: %w", err)
  279. }
  280. ctx, cancel := context.WithCancel(context.Background())
  281. logger := opts.getLogger("proxy")
  282. updatersLogger := logger.Named("telegram-updaters")
  283. if opts.DomainFrontingIP != "" {
  284. logger.Warning("mtglib.ProxyOpts.DomainFrontingIP is deprecated and ignored; use DomainFrontingHost instead")
  285. }
  286. proxy := &Proxy{
  287. ctx: ctx,
  288. ctxCancel: cancel,
  289. secret: opts.Secret,
  290. network: opts.Network,
  291. antiReplayCache: opts.AntiReplayCache,
  292. blocklist: opts.IPBlocklist,
  293. allowlist: opts.IPAllowlist,
  294. eventStream: opts.EventStream,
  295. logger: logger,
  296. domainFrontingPort: opts.getDomainFrontingPort(),
  297. domainFrontingHost: opts.DomainFrontingHost,
  298. tolerateTimeSkewness: opts.getTolerateTimeSkewness(),
  299. idleTimeout: opts.getIdleTimeout(),
  300. handshakeTimeout: opts.getHandshakeTimeout(),
  301. allowFallbackOnUnknownDC: opts.AllowFallbackOnUnknownDC,
  302. telegram: tg,
  303. doppelGanger: doppel.NewGanger(
  304. ctx,
  305. opts.Network,
  306. logger.Named("doppelganger"),
  307. opts.DoppelGangerEach,
  308. int(opts.DoppelGangerPerRaid),
  309. opts.DoppelGangerURLs,
  310. opts.DoppelGangerDRS,
  311. ),
  312. configUpdater: dc.NewPublicConfigUpdater(
  313. tg,
  314. updatersLogger.Named("public-config"),
  315. opts.Network.MakeHTTPClient(nil),
  316. ),
  317. clientObfuscatror: obfuscation.Obfuscator{
  318. Secret: opts.Secret.Key[:],
  319. },
  320. domainFrontingProxyProtocol: opts.DomainFrontingProxyProtocol,
  321. }
  322. proxy.doppelGanger.Run()
  323. if opts.AutoUpdate {
  324. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv4, "tcp4")
  325. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv6, "tcp6")
  326. }
  327. pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
  328. func(arg any) {
  329. proxy.ServeConn(arg.(essentials.Conn)) //nolint: forcetypeassert
  330. },
  331. ants.WithLogger(opts.getLogger("ants")),
  332. ants.WithNonblocking(true))
  333. if err != nil {
  334. panic(err)
  335. }
  336. proxy.workerPool = pool
  337. return proxy, nil
  338. }