Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package cli
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "github.com/9seconds/mtg/v2/antireplay"
  7. "github.com/9seconds/mtg/v2/events"
  8. "github.com/9seconds/mtg/v2/ipblocklist"
  9. "github.com/9seconds/mtg/v2/logger"
  10. "github.com/9seconds/mtg/v2/mtglib"
  11. "github.com/9seconds/mtg/v2/stats"
  12. "github.com/9seconds/mtg/v2/timeattack"
  13. "github.com/9seconds/mtg/v2/utils"
  14. "github.com/rs/zerolog"
  15. )
  16. type Proxy struct {
  17. base
  18. prometheusListener net.Listener
  19. prometheus *stats.PrometheusFactory
  20. statsdFactory *stats.StatsdFactory
  21. }
  22. func (c *Proxy) Run(cli *CLI, version string) error {
  23. if err := c.ReadConfig(version); err != nil {
  24. return fmt.Errorf("cannot init config: %w", err)
  25. }
  26. return c.Execute()
  27. }
  28. func (c *Proxy) Execute() error { // nolint: funlen
  29. zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
  30. zerolog.TimestampFieldName = "timestamp"
  31. zerolog.LevelFieldName = "level"
  32. ctx := utils.RootContext()
  33. opts := mtglib.ProxyOpts{
  34. Logger: logger.NewZeroLogger(zerolog.New(os.Stdout).With().Timestamp().Logger()),
  35. Network: c.Network,
  36. AntiReplayCache: antireplay.NewNoop(),
  37. IPBlocklist: ipblocklist.NewNoop(),
  38. TimeAttackDetector: timeattack.NewNoop(),
  39. EventStream: events.NewNoopStream(),
  40. Secret: c.Config.Secret,
  41. BufferSize: c.Config.TCPBuffer.Value(mtglib.DefaultBufferSize),
  42. CloakPort: c.Config.CloakPort.Value(mtglib.DefaultCloakPort),
  43. IdleTimeout: c.Config.Network.Timeout.Idle.Value(mtglib.DefaultIdleTimeout),
  44. PreferIP: c.Config.PreferIP.Value(mtglib.DefaultPreferIP),
  45. }
  46. defer func() {
  47. opts.AntiReplayCache.Shutdown()
  48. opts.IPBlocklist.Shutdown()
  49. opts.EventStream.Shutdown()
  50. }()
  51. if opts.Concurrency == 0 {
  52. opts.Concurrency = mtglib.DefaultConcurrency
  53. }
  54. opts.Logger.BindStr("configuration", c.Config.String()).Debug("configuration")
  55. c.setupAntiReplayCache(&opts)
  56. c.setupTimeAttackDetector(&opts)
  57. if err := c.setupIPBlocklist(&opts); err != nil {
  58. return fmt.Errorf("cannot setup ipblocklist: %w", err)
  59. }
  60. if err := c.setupEventStream(&opts); err != nil {
  61. return fmt.Errorf("cannot setup event stream: %w", err)
  62. }
  63. proxy, err := mtglib.NewProxy(opts)
  64. if err != nil {
  65. return fmt.Errorf("cannot create a proxy: %w", err)
  66. }
  67. listener, err := net.Listen("tcp", c.Config.BindTo.String())
  68. if err != nil {
  69. return fmt.Errorf("cannot start proxy: %w", err)
  70. }
  71. go proxy.Serve(listener) // nolint: errcheck
  72. <-ctx.Done()
  73. listener.Close()
  74. if c.prometheusListener != nil {
  75. c.prometheusListener.Close()
  76. }
  77. if c.prometheus != nil {
  78. c.prometheus.Close()
  79. }
  80. if c.statsdFactory != nil {
  81. c.statsdFactory.Close()
  82. }
  83. return nil
  84. }
  85. func (c *Proxy) setupAntiReplayCache(opts *mtglib.ProxyOpts) {
  86. if !c.Config.Defense.AntiReplay.Enabled {
  87. return
  88. }
  89. opts.AntiReplayCache = antireplay.NewStableBloomFilter(
  90. c.Config.Defense.AntiReplay.MaxSize.Value(antireplay.DefaultMaxSize),
  91. c.Config.Defense.AntiReplay.ErrorRate.Value(antireplay.DefaultErrorRate),
  92. )
  93. }
  94. func (c *Proxy) setupTimeAttackDetector(opts *mtglib.ProxyOpts) {
  95. if !c.Config.Defense.Time.Enabled {
  96. return
  97. }
  98. opts.TimeAttackDetector = timeattack.NewDetector(
  99. c.Config.Defense.Time.AllowSkewness.Value(timeattack.DefaultDuration),
  100. )
  101. }
  102. func (c *Proxy) setupIPBlocklist(opts *mtglib.ProxyOpts) error {
  103. if !c.Config.Defense.Blocklist.Enabled {
  104. return nil
  105. }
  106. remoteURLs := []string{}
  107. localFiles := []string{}
  108. for _, v := range c.Config.Defense.Blocklist.URLs {
  109. if v.IsRemote() {
  110. remoteURLs = append(remoteURLs, v.String())
  111. } else {
  112. localFiles = append(localFiles, v.String())
  113. }
  114. }
  115. firehol, err := ipblocklist.NewFirehol(opts.Logger.Named("ipblockist"),
  116. c.Network,
  117. c.Config.Defense.Blocklist.DownloadConcurrency,
  118. remoteURLs,
  119. localFiles)
  120. if err != nil {
  121. return err // nolint: wrapcheck
  122. }
  123. go firehol.Run(c.Config.Defense.Blocklist.UpdateEach.Value(ipblocklist.DefaultUpdateEach))
  124. opts.IPBlocklist = firehol
  125. return nil
  126. }
  127. func (c *Proxy) setupEventStream(opts *mtglib.ProxyOpts) error {
  128. factories := make([]events.ObserverFactory, 0, 2)
  129. if c.Config.Stats.StatsD.Enabled {
  130. statsdFactory, err := stats.NewStatsd(
  131. c.Config.Stats.StatsD.Address.String(),
  132. opts.Logger.Named("statsd"),
  133. c.Config.Stats.StatsD.MetricPrefix.Value(stats.DefaultStatsdMetricPrefix),
  134. c.Config.Stats.StatsD.TagFormat.Value(stats.DefaultStatsdTagFormat))
  135. if err != nil {
  136. return fmt.Errorf("cannot build statsd observer: %w", err)
  137. }
  138. c.statsdFactory = &statsdFactory
  139. factories = append(factories, statsdFactory.Make)
  140. }
  141. if c.Config.Stats.Prometheus.Enabled {
  142. prometheus := stats.NewPrometheus(
  143. c.Config.Stats.Prometheus.MetricPrefix.Value(stats.DefaultMetricPrefix),
  144. c.Config.Stats.Prometheus.HTTPPath.Value("/"),
  145. )
  146. listener, err := net.Listen("tcp", c.Config.Stats.Prometheus.BindTo.String())
  147. if err != nil {
  148. return fmt.Errorf("cannot start a listener for prometheus: %w", err)
  149. }
  150. go prometheus.Serve(listener) // nolint: errcheck
  151. c.prometheusListener = listener
  152. c.prometheus = prometheus
  153. factories = append(factories, prometheus.Make)
  154. }
  155. if len(factories) > 0 {
  156. opts.EventStream = events.NewEventStream(factories)
  157. }
  158. return nil
  159. }