Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. package mtglib
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/9seconds/mtg/v2/essentials"
  11. "github.com/9seconds/mtg/v2/mtglib/internal/dc"
  12. "github.com/9seconds/mtg/v2/mtglib/internal/doppel"
  13. "github.com/9seconds/mtg/v2/mtglib/internal/obfuscation"
  14. "github.com/9seconds/mtg/v2/mtglib/internal/relay"
  15. "github.com/9seconds/mtg/v2/mtglib/internal/tls"
  16. "github.com/9seconds/mtg/v2/mtglib/internal/tls/fake"
  17. "github.com/panjf2000/ants/v2"
  18. )
  19. // Proxy is an MTPROTO proxy structure.
  20. type Proxy struct {
  21. ctx context.Context
  22. ctxCancel context.CancelFunc
  23. streamWaitGroup sync.WaitGroup
  24. allowFallbackOnUnknownDC bool
  25. tolerateTimeSkewness time.Duration
  26. idleTimeout time.Duration
  27. handshakeTimeout time.Duration
  28. domainFrontingPort int
  29. domainFrontingIP string
  30. domainFrontingProxyProtocol bool
  31. workerPool *ants.PoolWithFunc
  32. telegram *dc.Telegram
  33. configUpdater *dc.PublicConfigUpdater
  34. doppelGanger *doppel.Ganger
  35. clientObfuscatror obfuscation.Obfuscator
  36. secret Secret
  37. network Network
  38. antiReplayCache AntiReplayCache
  39. blocklist IPBlocklist
  40. allowlist IPBlocklist
  41. eventStream EventStream
  42. logger Logger
  43. }
  44. // DomainFrontingAddress returns a host:port pair for a fronting domain.
  45. // If DomainFrontingIP is set, it is used instead of resolving the hostname.
  46. func (p *Proxy) DomainFrontingAddress() string {
  47. host := p.secret.Host
  48. if p.domainFrontingIP != "" {
  49. host = p.domainFrontingIP
  50. }
  51. return net.JoinHostPort(host, strconv.Itoa(p.domainFrontingPort))
  52. }
  53. // ServeConn serves a connection. We do not check IP blocklist and concurrency
  54. // limit here.
  55. func (p *Proxy) ServeConn(conn essentials.Conn) {
  56. p.streamWaitGroup.Add(1)
  57. defer p.streamWaitGroup.Done()
  58. ctx := newStreamContext(p.ctx, p.logger, conn)
  59. defer ctx.Close()
  60. if err := ctx.clientConn.SetDeadline(time.Now().Add(p.handshakeTimeout)); err != nil {
  61. ctx.logger.WarningError("cannot set handshake timeout", err)
  62. return
  63. }
  64. stop := context.AfterFunc(ctx, func() {
  65. ctx.Close()
  66. })
  67. defer stop()
  68. p.eventStream.Send(ctx, NewEventStart(ctx.streamID, ctx.ClientIP()))
  69. ctx.logger.Info("Stream has been started")
  70. defer func() {
  71. p.eventStream.Send(ctx, NewEventFinish(ctx.streamID))
  72. ctx.logger.Info("Stream has been finished")
  73. }()
  74. if !p.doFakeTLSHandshake(ctx) {
  75. return
  76. }
  77. clientConn, err := p.doppelGanger.NewConn(ctx.clientConn)
  78. if err != nil {
  79. ctx.logger.InfoError("cannot wrap into doppelganger connection", err)
  80. return
  81. }
  82. defer clientConn.Stop()
  83. ctx.clientConn = clientConn
  84. if err := p.doObfuscatedHandshake(ctx); err != nil {
  85. ctx.logger.InfoError("obfuscated handshake is failed", err)
  86. return
  87. }
  88. if err := ctx.clientConn.SetDeadline(time.Time{}); err != nil {
  89. ctx.logger.WarningError("cannot set deadline", err)
  90. return
  91. }
  92. if err := p.doTelegramCall(ctx); err != nil {
  93. ctx.logger.WarningError("cannot dial to telegram", err)
  94. return
  95. }
  96. tracker := newIdleTracker(p.idleTimeout)
  97. relay.Relay(
  98. ctx,
  99. ctx.logger.Named("relay"),
  100. connIdleTimeout{Conn: ctx.telegramConn, tracker: tracker},
  101. connIdleTimeout{Conn: ctx.clientConn, tracker: tracker},
  102. )
  103. }
  104. // Serve starts a proxy on a given listener.
  105. func (p *Proxy) Serve(listener net.Listener) error {
  106. p.streamWaitGroup.Add(1)
  107. defer p.streamWaitGroup.Done()
  108. for {
  109. conn, err := listener.Accept()
  110. if err != nil {
  111. select {
  112. case <-p.ctx.Done():
  113. return nil
  114. default:
  115. return fmt.Errorf("cannot accept a new connection: %w", err)
  116. }
  117. }
  118. ipAddr := conn.RemoteAddr().(*net.TCPAddr).IP //nolint: forcetypeassert
  119. logger := p.logger.BindStr("ip", ipAddr.String())
  120. if !p.allowlist.Contains(ipAddr) {
  121. conn.Close() //nolint: errcheck
  122. logger.Info("ip was rejected by allowlist")
  123. p.eventStream.Send(p.ctx, NewEventIPAllowlisted(ipAddr))
  124. continue
  125. }
  126. if p.blocklist.Contains(ipAddr) {
  127. conn.Close() //nolint: errcheck
  128. logger.Info("ip was blacklisted")
  129. p.eventStream.Send(p.ctx, NewEventIPBlocklisted(ipAddr))
  130. continue
  131. }
  132. err = p.workerPool.Invoke(conn)
  133. switch {
  134. case err == nil:
  135. case errors.Is(err, ants.ErrPoolClosed):
  136. return nil
  137. case errors.Is(err, ants.ErrPoolOverload):
  138. conn.Close() //nolint: errcheck
  139. logger.Info("connection was concurrency limited")
  140. p.eventStream.Send(p.ctx, NewEventConcurrencyLimited())
  141. }
  142. }
  143. }
  144. // Shutdown 'gracefully' shutdowns all connections. Please remember that it
  145. // does not close an underlying listener.
  146. func (p *Proxy) Shutdown() {
  147. p.ctxCancel()
  148. p.streamWaitGroup.Wait()
  149. p.workerPool.Release()
  150. p.configUpdater.Wait()
  151. p.doppelGanger.Shutdown()
  152. p.allowlist.Shutdown()
  153. p.blocklist.Shutdown()
  154. }
  155. func (p *Proxy) doFakeTLSHandshake(ctx *streamContext) bool {
  156. rewind := newConnRewind(ctx.clientConn)
  157. clientHello, err := fake.ReadClientHello(
  158. rewind,
  159. p.secret.Key[:],
  160. p.secret.Host,
  161. p.tolerateTimeSkewness,
  162. )
  163. if err != nil {
  164. p.logger.InfoError("cannot read client hello", err)
  165. p.doDomainFronting(ctx, rewind)
  166. return false
  167. }
  168. if p.antiReplayCache.SeenBefore(clientHello.SessionID) {
  169. p.logger.Warning("replay attack has been detected!")
  170. p.eventStream.Send(p.ctx, NewEventReplayAttack(ctx.streamID))
  171. p.doDomainFronting(ctx, rewind)
  172. return false
  173. }
  174. gangerNoise := p.doppelGanger.NoiseParams()
  175. noiseParams := fake.NoiseParams{Mean: gangerNoise.Mean, Jitter: gangerNoise.Jitter}
  176. if err := fake.SendServerHello(ctx.clientConn, p.secret.Key[:], clientHello, noiseParams); err != nil {
  177. p.logger.InfoError("cannot send welcome packet", err)
  178. return false
  179. }
  180. ctx.clientConn = tls.New(ctx.clientConn, true, false)
  181. return true
  182. }
  183. func (p *Proxy) doObfuscatedHandshake(ctx *streamContext) error {
  184. dc, conn, err := p.clientObfuscatror.ReadHandshake(ctx.clientConn)
  185. if err != nil {
  186. return fmt.Errorf("cannot process client handshake: %w", err)
  187. }
  188. ctx.dc = dc
  189. ctx.clientConn = conn
  190. ctx.logger = ctx.logger.BindInt("dc", dc)
  191. return nil
  192. }
  193. func (p *Proxy) doTelegramCall(ctx *streamContext) error {
  194. dcid := ctx.dc
  195. addresses := p.telegram.GetAddresses(dcid)
  196. if len(addresses) == 0 && p.allowFallbackOnUnknownDC {
  197. ctx.logger = ctx.logger.BindInt("original_dc", dcid)
  198. ctx.logger.Warning("unknown DC, fallbacks")
  199. ctx.dc = dc.DefaultDC
  200. addresses = p.telegram.GetAddresses(dc.DefaultDC)
  201. }
  202. var (
  203. conn essentials.Conn
  204. err error
  205. foundAddr dc.Addr
  206. )
  207. for _, addr := range addresses {
  208. conn, err = p.network.Dial(addr.Network, addr.Address)
  209. if err == nil {
  210. foundAddr = addr
  211. break
  212. }
  213. }
  214. if err != nil {
  215. return fmt.Errorf("no addresses to call: %w", err)
  216. }
  217. if conn == nil {
  218. return fmt.Errorf("no available addresses for DC %d", ctx.dc)
  219. }
  220. tgConn, err := foundAddr.Obfuscator.SendHandshake(conn, ctx.dc)
  221. if err != nil {
  222. conn.Close() // nolint: errcheck
  223. return fmt.Errorf("cannot perform server handshake: %w", err)
  224. }
  225. ctx.telegramConn = connTraffic{
  226. Conn: tgConn,
  227. streamID: ctx.streamID,
  228. stream: p.eventStream,
  229. ctx: ctx,
  230. }
  231. telegramHost, _, err := net.SplitHostPort(foundAddr.Address)
  232. if err != nil {
  233. conn.Close() //nolint: errcheck
  234. return fmt.Errorf("cannot parse telegram address %s: %w", foundAddr.Address, err)
  235. }
  236. p.eventStream.Send(ctx,
  237. NewEventConnectedToDC(ctx.streamID,
  238. net.ParseIP(telegramHost),
  239. ctx.dc),
  240. )
  241. return nil
  242. }
  243. func (p *Proxy) doDomainFronting(ctx *streamContext, conn *connRewind) {
  244. p.eventStream.Send(p.ctx, NewEventDomainFronting(ctx.streamID))
  245. conn.Rewind()
  246. nativeDialer := p.network.NativeDialer()
  247. fConn, err := nativeDialer.DialContext(ctx, "tcp", p.DomainFrontingAddress())
  248. if err != nil {
  249. p.logger.WarningError("cannot dial to the fronting domain", err)
  250. return
  251. }
  252. frontConn := essentials.WrapNetConn(fConn)
  253. if p.domainFrontingProxyProtocol {
  254. frontConn = newConnProxyProtocol(ctx.clientConn, frontConn)
  255. }
  256. frontConn = connTraffic{
  257. Conn: frontConn,
  258. ctx: ctx,
  259. streamID: ctx.streamID,
  260. stream: p.eventStream,
  261. }
  262. tracker := newIdleTracker(p.idleTimeout)
  263. relay.Relay(
  264. ctx,
  265. ctx.logger.Named("domain-fronting"),
  266. connIdleTimeout{Conn: frontConn, tracker: tracker},
  267. connIdleTimeout{Conn: conn, tracker: tracker},
  268. )
  269. }
  270. // NewProxy makes a new proxy instance.
  271. func NewProxy(opts ProxyOpts) (*Proxy, error) {
  272. if err := opts.valid(); err != nil {
  273. return nil, fmt.Errorf("invalid settings: %w", err)
  274. }
  275. tg, err := dc.New(opts.getPreferIP())
  276. if err != nil {
  277. return nil, fmt.Errorf("cannot build telegram dc fetcher: %w", err)
  278. }
  279. ctx, cancel := context.WithCancel(context.Background())
  280. logger := opts.getLogger("proxy")
  281. updatersLogger := logger.Named("telegram-updaters")
  282. proxy := &Proxy{
  283. ctx: ctx,
  284. ctxCancel: cancel,
  285. secret: opts.Secret,
  286. network: opts.Network,
  287. antiReplayCache: opts.AntiReplayCache,
  288. blocklist: opts.IPBlocklist,
  289. allowlist: opts.IPAllowlist,
  290. eventStream: opts.EventStream,
  291. logger: logger,
  292. domainFrontingPort: opts.getDomainFrontingPort(),
  293. domainFrontingIP: opts.DomainFrontingIP,
  294. tolerateTimeSkewness: opts.getTolerateTimeSkewness(),
  295. idleTimeout: opts.getIdleTimeout(),
  296. handshakeTimeout: opts.getHandshakeTimeout(),
  297. allowFallbackOnUnknownDC: opts.AllowFallbackOnUnknownDC,
  298. telegram: tg,
  299. doppelGanger: doppel.NewGanger(
  300. ctx,
  301. opts.Network,
  302. logger.Named("doppelganger"),
  303. opts.DoppelGangerEach,
  304. int(opts.DoppelGangerPerRaid),
  305. opts.DoppelGangerURLs,
  306. opts.DoppelGangerDRS,
  307. ),
  308. configUpdater: dc.NewPublicConfigUpdater(
  309. tg,
  310. updatersLogger.Named("public-config"),
  311. opts.Network.MakeHTTPClient(nil),
  312. ),
  313. clientObfuscatror: obfuscation.Obfuscator{
  314. Secret: opts.Secret.Key[:],
  315. },
  316. domainFrontingProxyProtocol: opts.DomainFrontingProxyProtocol,
  317. }
  318. proxy.doppelGanger.Run()
  319. if opts.AutoUpdate {
  320. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv4, "tcp4")
  321. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv6, "tcp6")
  322. }
  323. pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
  324. func(arg any) {
  325. proxy.ServeConn(arg.(essentials.Conn)) //nolint: forcetypeassert
  326. },
  327. ants.WithLogger(opts.getLogger("ants")),
  328. ants.WithNonblocking(true))
  329. if err != nil {
  330. panic(err)
  331. }
  332. proxy.workerPool = pool
  333. return proxy, nil
  334. }