Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

proxy.go 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. package mtglib
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "sort"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/dolonet/mtg-multi/essentials"
  12. "github.com/dolonet/mtg-multi/mtglib/internal/dc"
  13. "github.com/dolonet/mtg-multi/mtglib/internal/doppel"
  14. "github.com/dolonet/mtg-multi/mtglib/internal/obfuscation"
  15. "github.com/dolonet/mtg-multi/mtglib/internal/relay"
  16. "github.com/dolonet/mtg-multi/mtglib/internal/tls"
  17. "github.com/dolonet/mtg-multi/mtglib/internal/tls/fake"
  18. "github.com/panjf2000/ants/v2"
  19. )
  20. // Proxy is an MTPROTO proxy structure.
  21. type Proxy struct {
  22. ctx context.Context
  23. ctxCancel context.CancelFunc
  24. streamWaitGroup sync.WaitGroup
  25. allowFallbackOnUnknownDC bool
  26. tolerateTimeSkewness time.Duration
  27. idleTimeout time.Duration
  28. handshakeTimeout time.Duration
  29. domainFrontingPort int
  30. domainFrontingIP string
  31. domainFrontingProxyProtocol bool
  32. workerPool *ants.PoolWithFunc
  33. telegram *dc.Telegram
  34. configUpdater *dc.PublicConfigUpdater
  35. doppelGanger *doppel.Ganger
  36. stats *ProxyStats
  37. secrets []Secret
  38. secretNames []string
  39. network Network
  40. antiReplayCache AntiReplayCache
  41. blocklist IPBlocklist
  42. allowlist IPBlocklist
  43. eventStream EventStream
  44. logger Logger
  45. }
  46. // DomainFrontingAddress returns a host:port pair for a fronting domain.
  47. // If DomainFrontingIP is set, it is used instead of resolving the hostname.
  48. func (p *Proxy) DomainFrontingAddress() string {
  49. // All secrets share the same host (enforced by validation),
  50. // so we use the first one.
  51. host := p.secrets[0].Host
  52. if p.domainFrontingIP != "" {
  53. host = p.domainFrontingIP
  54. }
  55. return net.JoinHostPort(host, strconv.Itoa(p.domainFrontingPort))
  56. }
  57. // ServeConn serves a connection. We do not check IP blocklist and concurrency
  58. // limit here.
  59. func (p *Proxy) ServeConn(conn essentials.Conn) {
  60. p.streamWaitGroup.Add(1)
  61. defer p.streamWaitGroup.Done()
  62. ctx := newStreamContext(p.ctx, p.logger, conn)
  63. defer ctx.Close()
  64. if err := ctx.clientConn.SetDeadline(time.Now().Add(p.handshakeTimeout)); err != nil {
  65. ctx.logger.WarningError("cannot set handshake timeout", err)
  66. return
  67. }
  68. stop := context.AfterFunc(ctx, func() {
  69. ctx.Close()
  70. })
  71. defer stop()
  72. p.eventStream.Send(ctx, NewEventStart(ctx.streamID, ctx.ClientIP()))
  73. ctx.logger.Info("Stream has been started")
  74. defer func() {
  75. p.eventStream.Send(ctx, NewEventFinish(ctx.streamID))
  76. ctx.logger.Info("Stream has been finished")
  77. }()
  78. if !p.doFakeTLSHandshake(ctx) {
  79. return
  80. }
  81. if !p.stats.CanConnect(ctx.secretName) {
  82. ctx.logger.Info("connection throttled")
  83. p.eventStream.Send(ctx, NewEventThrottled(ctx.streamID, ctx.secretName))
  84. return
  85. }
  86. p.stats.OnConnect(ctx.secretName)
  87. p.stats.UpdateLastSeen(ctx.secretName)
  88. defer p.stats.OnDisconnect(ctx.secretName)
  89. clientConn, err := p.doppelGanger.NewConn(ctx.clientConn)
  90. if err != nil {
  91. ctx.logger.InfoError("cannot wrap into doppelganger connection", err)
  92. return
  93. }
  94. defer clientConn.Stop()
  95. ctx.clientConn = clientConn
  96. if err := p.doObfuscatedHandshake(ctx); err != nil {
  97. ctx.logger.InfoError("obfuscated handshake is failed", err)
  98. return
  99. }
  100. if err := ctx.clientConn.SetDeadline(time.Time{}); err != nil {
  101. ctx.logger.WarningError("cannot set deadline", err)
  102. return
  103. }
  104. if err := p.doTelegramCall(ctx); err != nil {
  105. ctx.logger.WarningError("cannot dial to telegram", err)
  106. return
  107. }
  108. tracker := newIdleTracker(p.idleTimeout)
  109. relay.Relay(
  110. ctx,
  111. ctx.logger.Named("relay"),
  112. connIdleTimeout{Conn: ctx.telegramConn, tracker: tracker},
  113. newCountingConn(connIdleTimeout{Conn: ctx.clientConn, tracker: tracker}, p.stats, ctx.secretName),
  114. )
  115. }
  116. // Serve starts a proxy on a given listener.
  117. func (p *Proxy) Serve(listener net.Listener) error {
  118. p.streamWaitGroup.Add(1)
  119. defer p.streamWaitGroup.Done()
  120. for {
  121. conn, err := listener.Accept()
  122. if err != nil {
  123. select {
  124. case <-p.ctx.Done():
  125. return nil
  126. default:
  127. return fmt.Errorf("cannot accept a new connection: %w", err)
  128. }
  129. }
  130. ipAddr := conn.RemoteAddr().(*net.TCPAddr).IP //nolint: forcetypeassert
  131. logger := p.logger.BindStr("ip", ipAddr.String())
  132. if !p.allowlist.Contains(ipAddr) {
  133. conn.Close() //nolint: errcheck
  134. logger.Info("ip was rejected by allowlist")
  135. p.eventStream.Send(p.ctx, NewEventIPAllowlisted(ipAddr))
  136. continue
  137. }
  138. if p.blocklist.Contains(ipAddr) {
  139. conn.Close() //nolint: errcheck
  140. logger.Info("ip was blacklisted")
  141. p.eventStream.Send(p.ctx, NewEventIPBlocklisted(ipAddr))
  142. continue
  143. }
  144. err = p.workerPool.Invoke(conn)
  145. switch {
  146. case err == nil:
  147. case errors.Is(err, ants.ErrPoolClosed):
  148. return nil
  149. case errors.Is(err, ants.ErrPoolOverload):
  150. conn.Close() //nolint: errcheck
  151. logger.Info("connection was concurrency limited")
  152. p.eventStream.Send(p.ctx, NewEventConcurrencyLimited())
  153. }
  154. }
  155. }
  156. // Shutdown 'gracefully' shutdowns all connections. Please remember that it
  157. // does not close an underlying listener.
  158. func (p *Proxy) Shutdown() {
  159. p.ctxCancel()
  160. p.streamWaitGroup.Wait()
  161. p.workerPool.Release()
  162. p.configUpdater.Wait()
  163. p.doppelGanger.Shutdown()
  164. p.allowlist.Shutdown()
  165. p.blocklist.Shutdown()
  166. }
  167. func (p *Proxy) doFakeTLSHandshake(ctx *streamContext) bool {
  168. rewind := newConnRewind(ctx.clientConn)
  169. // Build a slice of secret keys to try during HMAC validation.
  170. secretKeys := make([][]byte, len(p.secrets))
  171. for i := range p.secrets {
  172. secretKeys[i] = p.secrets[i].Key[:]
  173. }
  174. result, err := fake.ReadClientHelloMulti(
  175. rewind,
  176. secretKeys,
  177. p.secrets[0].Host,
  178. p.tolerateTimeSkewness,
  179. )
  180. if err != nil {
  181. p.logger.InfoError("cannot read client hello", err)
  182. p.doDomainFronting(ctx, rewind)
  183. return false
  184. }
  185. if p.antiReplayCache.SeenBefore(result.Hello.SessionID) {
  186. p.logger.Warning("replay attack has been detected!")
  187. p.eventStream.Send(p.ctx, NewEventReplayAttack(ctx.streamID))
  188. p.doDomainFronting(ctx, rewind)
  189. return false
  190. }
  191. matchedSecret := p.secrets[result.MatchedIndex]
  192. ctx.matchedSecretKey = matchedSecret.Key[:]
  193. ctx.secretName = p.secretNames[result.MatchedIndex]
  194. ctx.logger = ctx.logger.BindStr("secret_name", ctx.secretName)
  195. gangerNoise := p.doppelGanger.NoiseParams()
  196. noiseParams := fake.NoiseParams{Mean: gangerNoise.Mean, Jitter: gangerNoise.Jitter}
  197. if err := fake.SendServerHello(ctx.clientConn, matchedSecret.Key[:], result.Hello, noiseParams); err != nil {
  198. p.logger.InfoError("cannot send welcome packet", err)
  199. return false
  200. }
  201. ctx.clientConn = tls.New(ctx.clientConn, true, false)
  202. return true
  203. }
  204. func (p *Proxy) doObfuscatedHandshake(ctx *streamContext) error {
  205. // Use the secret key that was matched during the FakeTLS handshake.
  206. obfs := obfuscation.Obfuscator{
  207. Secret: ctx.matchedSecretKey,
  208. }
  209. dc, conn, err := obfs.ReadHandshake(ctx.clientConn)
  210. if err != nil {
  211. return fmt.Errorf("cannot process client handshake: %w", err)
  212. }
  213. ctx.dc = dc
  214. ctx.clientConn = conn
  215. ctx.logger = ctx.logger.BindInt("dc", dc)
  216. return nil
  217. }
  218. func (p *Proxy) doTelegramCall(ctx *streamContext) error {
  219. dcid := ctx.dc
  220. addresses := p.telegram.GetAddresses(dcid)
  221. if len(addresses) == 0 && p.allowFallbackOnUnknownDC {
  222. ctx.logger = ctx.logger.BindInt("original_dc", dcid)
  223. ctx.logger.Warning("unknown DC, fallbacks")
  224. ctx.dc = dc.DefaultDC
  225. addresses = p.telegram.GetAddresses(dc.DefaultDC)
  226. }
  227. var (
  228. conn essentials.Conn
  229. err error
  230. foundAddr dc.Addr
  231. )
  232. for _, addr := range addresses {
  233. conn, err = p.network.Dial(addr.Network, addr.Address)
  234. if err == nil {
  235. foundAddr = addr
  236. break
  237. }
  238. }
  239. if err != nil {
  240. return fmt.Errorf("no addresses to call: %w", err)
  241. }
  242. if conn == nil {
  243. return fmt.Errorf("no available addresses for DC %d", ctx.dc)
  244. }
  245. tgConn, err := foundAddr.Obfuscator.SendHandshake(conn, ctx.dc)
  246. if err != nil {
  247. conn.Close() // nolint: errcheck
  248. return fmt.Errorf("cannot perform server handshake: %w", err)
  249. }
  250. ctx.telegramConn = connTraffic{
  251. Conn: tgConn,
  252. streamID: ctx.streamID,
  253. stream: p.eventStream,
  254. ctx: ctx,
  255. }
  256. telegramHost, _, err := net.SplitHostPort(foundAddr.Address)
  257. if err != nil {
  258. conn.Close() //nolint: errcheck
  259. return fmt.Errorf("cannot parse telegram address %s: %w", foundAddr.Address, err)
  260. }
  261. p.eventStream.Send(ctx,
  262. NewEventConnectedToDC(ctx.streamID,
  263. net.ParseIP(telegramHost),
  264. ctx.dc),
  265. )
  266. return nil
  267. }
  268. func (p *Proxy) doDomainFronting(ctx *streamContext, conn *connRewind) {
  269. p.eventStream.Send(p.ctx, NewEventDomainFronting(ctx.streamID))
  270. conn.Rewind()
  271. nativeDialer := p.network.NativeDialer()
  272. fConn, err := nativeDialer.DialContext(ctx, "tcp", p.DomainFrontingAddress())
  273. if err != nil {
  274. p.logger.WarningError("cannot dial to the fronting domain", err)
  275. return
  276. }
  277. frontConn := essentials.WrapNetConn(fConn)
  278. if p.domainFrontingProxyProtocol {
  279. frontConn = newConnProxyProtocol(ctx.clientConn, frontConn)
  280. }
  281. frontConn = connTraffic{
  282. Conn: frontConn,
  283. ctx: ctx,
  284. streamID: ctx.streamID,
  285. stream: p.eventStream,
  286. }
  287. tracker := newIdleTracker(p.idleTimeout)
  288. relay.Relay(
  289. ctx,
  290. ctx.logger.Named("domain-fronting"),
  291. connIdleTimeout{Conn: frontConn, tracker: tracker},
  292. connIdleTimeout{Conn: conn, tracker: tracker},
  293. )
  294. }
  295. // NewProxy makes a new proxy instance.
  296. func NewProxy(opts ProxyOpts) (*Proxy, error) {
  297. if err := opts.valid(); err != nil {
  298. return nil, fmt.Errorf("invalid settings: %w", err)
  299. }
  300. tg, err := dc.New(opts.getPreferIP())
  301. if err != nil {
  302. return nil, fmt.Errorf("cannot build telegram dc fetcher: %w", err)
  303. }
  304. ctx, cancel := context.WithCancel(context.Background())
  305. logger := opts.getLogger("proxy")
  306. updatersLogger := logger.Named("telegram-updaters")
  307. secretsMap := opts.getSecrets()
  308. secretNames := make([]string, 0, len(secretsMap))
  309. for name := range secretsMap {
  310. secretNames = append(secretNames, name)
  311. }
  312. sort.Strings(secretNames)
  313. secretsList := make([]Secret, 0, len(secretsMap))
  314. for _, name := range secretNames {
  315. secretsList = append(secretsList, secretsMap[name])
  316. }
  317. stats := NewProxyStats()
  318. for _, name := range secretNames {
  319. stats.PreRegister(name)
  320. }
  321. if opts.APIBindTo != "" {
  322. stats.StartServer(ctx, opts.APIBindTo, logger)
  323. }
  324. if opts.ThrottleMaxConnections > 0 {
  325. stats.SetThrottle(int64(opts.ThrottleMaxConnections), opts.getThrottleCheckInterval())
  326. stats.startThrottleLoop(ctx, logger)
  327. }
  328. proxy := &Proxy{
  329. ctx: ctx,
  330. ctxCancel: cancel,
  331. stats: stats,
  332. secrets: secretsList,
  333. secretNames: secretNames,
  334. network: opts.Network,
  335. antiReplayCache: opts.AntiReplayCache,
  336. blocklist: opts.IPBlocklist,
  337. allowlist: opts.IPAllowlist,
  338. eventStream: opts.EventStream,
  339. logger: logger,
  340. domainFrontingPort: opts.getDomainFrontingPort(),
  341. domainFrontingIP: opts.DomainFrontingIP,
  342. tolerateTimeSkewness: opts.getTolerateTimeSkewness(),
  343. idleTimeout: opts.getIdleTimeout(),
  344. handshakeTimeout: opts.getHandshakeTimeout(),
  345. allowFallbackOnUnknownDC: opts.AllowFallbackOnUnknownDC,
  346. telegram: tg,
  347. doppelGanger: doppel.NewGanger(
  348. ctx,
  349. opts.Network,
  350. logger.Named("doppelganger"),
  351. opts.DoppelGangerEach,
  352. int(opts.DoppelGangerPerRaid),
  353. opts.DoppelGangerURLs,
  354. opts.DoppelGangerDRS,
  355. ),
  356. configUpdater: dc.NewPublicConfigUpdater(
  357. tg,
  358. updatersLogger.Named("public-config"),
  359. opts.Network.MakeHTTPClient(nil),
  360. ),
  361. domainFrontingProxyProtocol: opts.DomainFrontingProxyProtocol,
  362. }
  363. proxy.doppelGanger.Run()
  364. if opts.AutoUpdate {
  365. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv4, "tcp4")
  366. proxy.configUpdater.Run(ctx, dc.PublicConfigUpdateURLv6, "tcp6")
  367. }
  368. pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
  369. func(arg any) {
  370. proxy.ServeConn(arg.(essentials.Conn)) //nolint: forcetypeassert
  371. },
  372. ants.WithLogger(opts.getLogger("ants")),
  373. ants.WithNonblocking(true))
  374. if err != nil {
  375. panic(err)
  376. }
  377. proxy.workerPool = pool
  378. return proxy, nil
  379. }