Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

proxy.go 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package mtglib
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/9seconds/mtg/v2/essentials"
  11. "github.com/9seconds/mtg/v2/mtglib/internal/dc"
  12. "github.com/9seconds/mtg/v2/mtglib/internal/doppel"
  13. "github.com/9seconds/mtg/v2/mtglib/internal/relay"
  14. "github.com/9seconds/mtg/v2/mtglib/internal/tls"
  15. "github.com/9seconds/mtg/v2/mtglib/internal/tls/fake"
  16. "github.com/9seconds/mtg/v2/mtglib/obfuscation"
  17. "github.com/panjf2000/ants/v2"
  18. )
  19. // Restore a large receive window after the tiny pre-handshake DPI desync clamp.
  20. const dpiDesyncPostHandshakeWindowClamp = 1 << 30
  21. // Proxy is an MTPROTO proxy structure.
  22. type Proxy struct {
  23. ctx context.Context
  24. ctxCancel context.CancelFunc
  25. streamWaitGroup sync.WaitGroup
  26. allowFallbackOnUnknownDC bool
  27. tolerateTimeSkewness time.Duration
  28. idleTimeout time.Duration
  29. handshakeTimeout time.Duration
  30. domainFrontingPort int
  31. domainFrontingHost string
  32. domainFrontingProxyProtocol bool
  33. dpiDesync bool
  34. workerPool *ants.PoolWithFunc
  35. telegram *dc.Telegram
  36. configUpdater *dc.PublicConfigUpdater
  37. doppelGanger *doppel.Ganger
  38. clientObfuscatror obfuscation.Obfuscator
  39. secret Secret
  40. network Network
  41. antiReplayCache AntiReplayCache
  42. blocklist IPBlocklist
  43. allowlist IPBlocklist
  44. eventStream EventStream
  45. logger Logger
  46. }
  47. // DomainFrontingAddress returns a host:port pair for a fronting domain.
  48. // If a fronting host (literal IP or hostname) is configured, it is used
  49. // instead of resolving the secret's hostname.
  50. func (p *Proxy) DomainFrontingAddress() string {
  51. host := p.secret.Host
  52. if p.domainFrontingHost != "" {
  53. host = p.domainFrontingHost
  54. }
  55. return net.JoinHostPort(host, strconv.Itoa(p.domainFrontingPort))
  56. }
  57. // ServeConn serves a connection. We do not check IP blocklist and concurrency
  58. // limit here.
  59. func (p *Proxy) ServeConn(conn essentials.Conn) {
  60. p.streamWaitGroup.Add(1)
  61. defer p.streamWaitGroup.Done()
  62. ctx := newStreamContext(p.ctx, p.logger, conn)
  63. defer ctx.Close()
  64. if err := ctx.clientConn.SetDeadline(time.Now().Add(p.handshakeTimeout)); err != nil {
  65. ctx.logger.WarningError("cannot set handshake timeout", err)
  66. return
  67. }
  68. stop := context.AfterFunc(ctx, func() {
  69. ctx.Close()
  70. })
  71. defer stop()
  72. p.eventStream.Send(ctx, NewEventStart(ctx.streamID, ctx.ClientIP()))
  73. ctx.logger.Info("Stream has been started")
  74. defer func() {
  75. p.eventStream.Send(ctx, NewEventFinish(ctx.streamID))
  76. ctx.logger.Info("Stream has been finished")
  77. }()
  78. if !p.doFakeTLSHandshake(ctx) {
  79. return
  80. }
  81. clientConn, err := p.doppelGanger.NewConn(ctx.clientConn)
  82. if err != nil {
  83. ctx.logger.InfoError("cannot wrap into doppelganger connection", err)
  84. return
  85. }
  86. defer clientConn.Stop()
  87. ctx.clientConn = clientConn
  88. if err := p.doObfuscatedHandshake(ctx); err != nil {
  89. ctx.logger.InfoError("obfuscated handshake is failed", err)
  90. return
  91. }
  92. if err := ctx.clientConn.SetDeadline(time.Time{}); err != nil {
  93. ctx.logger.WarningError("cannot set deadline", err)
  94. return
  95. }
  96. if err := p.doTelegramCall(ctx); err != nil {
  97. ctx.logger.WarningError("cannot dial to telegram", err)
  98. return
  99. }
  100. tracker := newIdleTracker(p.idleTimeout)
  101. relay.Relay(
  102. ctx,
  103. ctx.logger.Named("relay"),
  104. connIdleTimeout{Conn: ctx.telegramConn, tracker: tracker},
  105. connIdleTimeout{Conn: ctx.clientConn, tracker: tracker},
  106. )
  107. }
  108. // Serve starts a proxy on a given listener.
  109. func (p *Proxy) Serve(listener net.Listener) error {
  110. p.streamWaitGroup.Add(1)
  111. defer p.streamWaitGroup.Done()
  112. for {
  113. conn, err := listener.Accept()
  114. if err != nil {
  115. select {
  116. case <-p.ctx.Done():
  117. return nil
  118. default:
  119. return fmt.Errorf("cannot accept a new connection: %w", err)
  120. }
  121. }
  122. ipAddr := conn.RemoteAddr().(*net.TCPAddr).IP //nolint: forcetypeassert
  123. logger := p.logger.BindStr("ip", ipAddr.String())
  124. if !p.allowlist.Contains(ipAddr) {
  125. conn.Close() //nolint: errcheck
  126. logger.Info("ip was rejected by allowlist")
  127. p.eventStream.Send(p.ctx, NewEventIPAllowlisted(ipAddr))
  128. continue
  129. }
  130. if p.blocklist.Contains(ipAddr) {
  131. conn.Close() //nolint: errcheck
  132. logger.Info("ip was blacklisted")
  133. p.eventStream.Send(p.ctx, NewEventIPBlocklisted(ipAddr))
  134. continue
  135. }
  136. err = p.workerPool.Invoke(conn)
  137. switch {
  138. case err == nil:
  139. case errors.Is(err, ants.ErrPoolClosed):
  140. return nil
  141. case errors.Is(err, ants.ErrPoolOverload):
  142. conn.Close() //nolint: errcheck
  143. logger.Info("connection was concurrency limited")
  144. p.eventStream.Send(p.ctx, NewEventConcurrencyLimited())
  145. }
  146. }
  147. }
  148. // Shutdown 'gracefully' shutdowns all connections. Please remember that it
  149. // does not close an underlying listener.
  150. func (p *Proxy) Shutdown() {
  151. p.ctxCancel()
  152. p.streamWaitGroup.Wait()
  153. p.workerPool.Release()
  154. p.configUpdater.Wait()
  155. p.doppelGanger.Shutdown()
  156. p.allowlist.Shutdown()
  157. p.blocklist.Shutdown()
  158. }
  159. func (p *Proxy) doFakeTLSHandshake(ctx *streamContext) bool {
  160. rewind := newConnRewind(ctx.clientConn)
  161. clientHello, err := fake.ReadClientHello(
  162. rewind,
  163. p.secret.Key[:],
  164. p.secret.Host,
  165. p.tolerateTimeSkewness,
  166. )
  167. if err != nil {
  168. p.logger.InfoError("cannot read client hello", err)
  169. p.doDomainFronting(ctx, rewind)
  170. return false
  171. }
  172. if p.antiReplayCache.SeenBefore(clientHello.SessionID) {
  173. p.logger.Warning("replay attack has been detected!")
  174. p.eventStream.Send(p.ctx, NewEventReplayAttack(ctx.streamID))
  175. p.doDomainFronting(ctx, rewind)
  176. return false
  177. }
  178. gangerNoise := p.doppelGanger.NoiseParams()
  179. noiseParams := fake.NoiseParams{Mean: gangerNoise.Mean, Jitter: gangerNoise.Jitter}
  180. if err := fake.SendServerHello(ctx.clientConn, p.secret.Key[:], clientHello, noiseParams); err != nil {
  181. p.logger.InfoError("cannot send welcome packet", err)
  182. return false
  183. }
  184. if p.dpiDesync {
  185. if err := essentials.SetTCPWindowClamp(ctx.clientConn, dpiDesyncPostHandshakeWindowClamp); err != nil {
  186. ctx.logger.DebugError("cannot restore TCP window clamp after handshake", err)
  187. }
  188. }
  189. ctx.clientConn = tls.New(ctx.clientConn, true, false)
  190. return true
  191. }
  192. func (p *Proxy) doObfuscatedHandshake(ctx *streamContext) error {
  193. dc, conn, err := p.clientObfuscatror.ReadHandshake(ctx.clientConn)
  194. if err != nil {
  195. return fmt.Errorf("cannot process client handshake: %w", err)
  196. }
  197. ctx.dc = dc
  198. ctx.clientConn = conn
  199. ctx.logger = ctx.logger.BindInt("dc", dc)
  200. return nil
  201. }
  202. func (p *Proxy) doTelegramCall(ctx *streamContext) error {
  203. dcid := ctx.dc
  204. addresses := p.telegram.GetAddresses(dcid)
  205. if len(addresses) == 0 && p.allowFallbackOnUnknownDC {
  206. ctx.logger = ctx.logger.BindInt("original_dc", dcid)
  207. ctx.logger.Warning("unknown DC, fallbacks")
  208. ctx.dc = dc.DefaultDC
  209. addresses = p.telegram.GetAddresses(dc.DefaultDC)
  210. }
  211. var (
  212. conn essentials.Conn
  213. err error
  214. foundAddr dc.Addr
  215. )
  216. for _, addr := range addresses {
  217. conn, err = p.network.Dial(addr.Network, addr.Address)
  218. if err == nil {
  219. foundAddr = addr
  220. break
  221. }
  222. }
  223. if err != nil {
  224. return fmt.Errorf("no addresses to call: %w", err)
  225. }
  226. if conn == nil {
  227. return fmt.Errorf("no available addresses for DC %d", ctx.dc)
  228. }
  229. tgConn, err := foundAddr.Obfuscator.SendHandshake(conn, ctx.dc)
  230. if err != nil {
  231. conn.Close() // nolint: errcheck
  232. return fmt.Errorf("cannot perform server handshake: %w", err)
  233. }
  234. ctx.telegramConn = connTraffic{
  235. Conn: tgConn,
  236. streamID: ctx.streamID,
  237. stream: p.eventStream,
  238. ctx: ctx,
  239. }
  240. telegramHost, _, err := net.SplitHostPort(foundAddr.Address)
  241. if err != nil {
  242. conn.Close() //nolint: errcheck
  243. return fmt.Errorf("cannot parse telegram address %s: %w", foundAddr.Address, err)
  244. }
  245. p.eventStream.Send(
  246. ctx,
  247. NewEventConnectedToDC(ctx.streamID,
  248. net.ParseIP(telegramHost),
  249. ctx.dc),
  250. )
  251. return nil
  252. }
  253. func (p *Proxy) doDomainFronting(ctx *streamContext, conn *connRewind) {
  254. p.eventStream.Send(p.ctx, NewEventDomainFronting(ctx.streamID))
  255. conn.Rewind()
  256. nativeDialer := p.network.NativeDialer()
  257. fConn, err := nativeDialer.DialContext(ctx, "tcp", p.DomainFrontingAddress())
  258. if err != nil {
  259. p.logger.WarningError("cannot dial to the fronting domain", err)
  260. return
  261. }
  262. frontConn := essentials.WrapNetConn(fConn)
  263. if p.domainFrontingProxyProtocol {
  264. frontConn = newConnProxyProtocol(ctx.clientConn, frontConn)
  265. }
  266. frontConn = connTraffic{
  267. Conn: frontConn,
  268. ctx: ctx,
  269. streamID: ctx.streamID,
  270. stream: p.eventStream,
  271. }
  272. tracker := newIdleTracker(p.idleTimeout)
  273. relay.Relay(
  274. ctx,
  275. ctx.logger.Named("domain-fronting"),
  276. connIdleTimeout{Conn: frontConn, tracker: tracker},
  277. connIdleTimeout{Conn: conn, tracker: tracker},
  278. )
  279. }
  280. // NewProxy makes a new proxy instance.
  281. func NewProxy(opts ProxyOpts) (*Proxy, error) {
  282. if err := opts.valid(); err != nil {
  283. return nil, fmt.Errorf("invalid settings: %w", err)
  284. }
  285. tg, err := dc.New(opts.getPreferIP())
  286. if err != nil {
  287. return nil, fmt.Errorf("cannot build telegram dc fetcher: %w", err)
  288. }
  289. ctx, cancel := context.WithCancel(context.Background())
  290. logger := opts.getLogger("proxy")
  291. updatersLogger := logger.Named("telegram-updaters")
  292. if opts.DomainFrontingIP != "" {
  293. logger.Warning("mtglib.ProxyOpts.DomainFrontingIP is deprecated and ignored; use DomainFrontingHost instead")
  294. }
  295. proxy := &Proxy{
  296. ctx: ctx,
  297. ctxCancel: cancel,
  298. secret: opts.Secret,
  299. network: opts.Network,
  300. antiReplayCache: opts.AntiReplayCache,
  301. blocklist: opts.IPBlocklist,
  302. allowlist: opts.IPAllowlist,
  303. eventStream: opts.EventStream,
  304. logger: logger,
  305. domainFrontingPort: opts.getDomainFrontingPort(),
  306. domainFrontingHost: opts.DomainFrontingHost,
  307. dpiDesync: opts.DPIDesync,
  308. tolerateTimeSkewness: opts.getTolerateTimeSkewness(),
  309. idleTimeout: opts.getIdleTimeout(),
  310. handshakeTimeout: opts.getHandshakeTimeout(),
  311. allowFallbackOnUnknownDC: opts.AllowFallbackOnUnknownDC,
  312. telegram: tg,
  313. doppelGanger: doppel.NewGanger(
  314. ctx,
  315. opts.Network,
  316. logger.Named("doppelganger"),
  317. opts.DoppelGangerEach,
  318. int(opts.DoppelGangerPerRaid),
  319. opts.DoppelGangerURLs,
  320. opts.DoppelGangerDRS,
  321. ),
  322. configUpdater: dc.NewPublicConfigUpdater(
  323. tg,
  324. updatersLogger.Named("public-config"),
  325. opts.Network.MakeHTTPClient(nil),
  326. ),
  327. clientObfuscatror: obfuscation.Obfuscator{
  328. Secret: opts.Secret.Key[:],
  329. },
  330. domainFrontingProxyProtocol: opts.DomainFrontingProxyProtocol,
  331. }
  332. proxy.doppelGanger.Run()
  333. if opts.AutoUpdate {
  334. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv4, "tcp4")
  335. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv6, "tcp6")
  336. }
  337. pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
  338. func(arg any) {
  339. proxy.ServeConn(arg.(essentials.Conn)) //nolint: forcetypeassert
  340. },
  341. ants.WithLogger(opts.getLogger("ants")),
  342. ants.WithNonblocking(true))
  343. if err != nil {
  344. panic(err)
  345. }
  346. proxy.workerPool = pool
  347. return proxy, nil
  348. }