Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package proxy
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "sync"
  7. "github.com/juju/errors"
  8. uuid "github.com/satori/go.uuid"
  9. "go.uber.org/zap"
  10. "github.com/9seconds/mtg/config"
  11. "github.com/9seconds/mtg/obfuscated2"
  12. "github.com/9seconds/mtg/telegram"
  13. "github.com/9seconds/mtg/wrappers"
  14. )
  15. // Server is an insgtance of MTPROTO proxy.
  16. type Server struct {
  17. conf *config.Config
  18. logger *zap.SugaredLogger
  19. stats *Stats
  20. tg telegram.Telegram
  21. }
  22. // Serve does MTPROTO proxying.
  23. func (s *Server) Serve() error {
  24. lsock, err := net.Listen("tcp", s.conf.BindAddr())
  25. if err != nil {
  26. return errors.Annotate(err, "Cannot create listen socket")
  27. }
  28. for {
  29. if conn, err := lsock.Accept(); err != nil {
  30. s.logger.Warn("Cannot allocate incoming connection", "error", err)
  31. } else {
  32. go s.accept(conn)
  33. }
  34. }
  35. }
  36. func (s *Server) accept(conn net.Conn) {
  37. defer func() {
  38. s.stats.closeConnection()
  39. conn.Close() // nolint: errcheck
  40. if r := recover(); r != nil {
  41. s.logger.Errorw("Crash of accept handler", "error", r)
  42. }
  43. }()
  44. s.stats.newConnection()
  45. ctx, cancel := context.WithCancel(context.Background())
  46. socketID := uuid.NewV4().String()
  47. s.logger.Debugw("Client connected",
  48. "addr", conn.RemoteAddr().String(),
  49. "socketid", socketID,
  50. )
  51. clientConn, dc, err := s.getClientStream(ctx, cancel, conn, socketID)
  52. if err != nil {
  53. s.logger.Warnw("Cannot initialize client connection",
  54. "addr", conn.RemoteAddr().String(),
  55. "socketid", socketID,
  56. "error", err,
  57. )
  58. return
  59. }
  60. defer clientConn.Close() // nolint: errcheck
  61. tgConn, err := s.getTelegramStream(ctx, cancel, dc, socketID)
  62. if err != nil {
  63. s.logger.Warnw("Cannot initialize Telegram connection",
  64. "socketid", socketID,
  65. "error", err,
  66. )
  67. return
  68. }
  69. defer tgConn.Close() // nolint: errcheck
  70. wait := &sync.WaitGroup{}
  71. wait.Add(2)
  72. go func() {
  73. defer wait.Done()
  74. io.Copy(clientConn, tgConn) // nolint: errcheck
  75. }()
  76. go func() {
  77. defer wait.Done()
  78. io.Copy(tgConn, clientConn) // nolint: errcheck
  79. }()
  80. <-ctx.Done()
  81. wait.Wait()
  82. s.logger.Debugw("Client disconnected",
  83. "addr", conn.RemoteAddr().String(),
  84. "socketid", socketID,
  85. )
  86. }
  87. func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (io.ReadWriteCloser, int16, error) {
  88. wConn := wrappers.NewTimeoutRWC(conn, s.conf.TimeoutRead, s.conf.TimeoutWrite)
  89. wConn = wrappers.NewTrafficRWC(wConn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
  90. frame, err := obfuscated2.ExtractFrame(wConn)
  91. if err != nil {
  92. return nil, 0, errors.Annotate(err, "Cannot create client stream")
  93. }
  94. obfs2, dc, err := obfuscated2.ParseObfuscated2ClientFrame(s.conf.Secret, frame)
  95. if err != nil {
  96. return nil, 0, errors.Annotate(err, "Cannot create client stream")
  97. }
  98. wConn = wrappers.NewLogRWC(wConn, s.logger, socketID, "client")
  99. wConn = wrappers.NewStreamCipherRWC(wConn, obfs2.Encryptor, obfs2.Decryptor)
  100. wConn = wrappers.NewCtxRWC(ctx, cancel, wConn)
  101. return wConn, dc, nil
  102. }
  103. func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFunc, dc int16, socketID string) (io.ReadWriteCloser, error) {
  104. conn, err := s.tg.Dial(dc)
  105. if err != nil {
  106. return nil, errors.Annotate(err, "Cannot connect to Telegram")
  107. }
  108. conn = wrappers.NewTrafficRWC(conn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
  109. conn, err = s.tg.Init(conn)
  110. if err != nil {
  111. return nil, errors.Annotate(err, "Cannot handshake Telegram")
  112. }
  113. conn = wrappers.NewLogRWC(conn, s.logger, socketID, "telegram")
  114. conn = wrappers.NewCtxRWC(ctx, cancel, conn)
  115. return conn, nil
  116. }
  117. // NewServer creates new instance of MTPROTO proxy.
  118. func NewServer(conf *config.Config, logger *zap.SugaredLogger, stat *Stats) *Server {
  119. return &Server{
  120. conf: conf,
  121. logger: logger,
  122. stats: stat,
  123. tg: telegram.NewDirectTelegram(conf),
  124. }
  125. }