Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

proxy.go 4.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package proxy
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "github.com/juju/errors"
  7. uuid "github.com/satori/go.uuid"
  8. "go.uber.org/zap"
  9. "github.com/9seconds/mtg/client"
  10. "github.com/9seconds/mtg/config"
  11. "github.com/9seconds/mtg/mtproto"
  12. "github.com/9seconds/mtg/stats"
  13. "github.com/9seconds/mtg/telegram"
  14. "github.com/9seconds/mtg/wrappers"
  15. )
  16. // Proxy is a core of this program.
  17. type Proxy struct {
  18. clientInit client.Init
  19. tg telegram.Telegram
  20. conf *config.Config
  21. }
  22. // Serve runs TCP proxy server.
  23. func (p *Proxy) Serve() error {
  24. lsock, err := net.Listen("tcp", p.conf.BindAddr())
  25. if err != nil {
  26. return errors.Annotate(err, "Cannot create listen socket")
  27. }
  28. for {
  29. if conn, err := lsock.Accept(); err != nil {
  30. zap.S().Errorw("Cannot allocate incoming connection", "error", err)
  31. } else {
  32. go p.accept(conn)
  33. }
  34. }
  35. }
  36. func (p *Proxy) accept(conn net.Conn) {
  37. connID := uuid.NewV4().String()
  38. log := zap.S().With("connection_id", connID).Named("main")
  39. defer func() {
  40. conn.Close() // nolint: errcheck
  41. if err := recover(); err != nil {
  42. stats.NewCrash()
  43. log.Errorw("Crash of accept handler", "error", err)
  44. }
  45. }()
  46. log.Infow("Client connected", "addr", conn.RemoteAddr())
  47. clientConn, opts, err := p.clientInit(conn, connID, p.conf)
  48. if err != nil {
  49. log.Errorw("Cannot initialize client connection", "error", err)
  50. return
  51. }
  52. defer clientConn.(io.Closer).Close() // nolint: errcheck
  53. stats.ClientConnected(opts.ConnectionType, clientConn.RemoteAddr())
  54. defer stats.ClientDisconnected(opts.ConnectionType, clientConn.RemoteAddr())
  55. serverConn, err := p.getTelegramConn(opts, connID)
  56. if err != nil {
  57. log.Errorw("Cannot initialize server connection", "error", err)
  58. return
  59. }
  60. defer serverConn.(io.Closer).Close() // nolint: errcheck
  61. wait := &sync.WaitGroup{}
  62. wait.Add(2)
  63. if p.conf.UseMiddleProxy() {
  64. clientPacket := clientConn.(wrappers.PacketReadWriteCloser)
  65. serverPacket := serverConn.(wrappers.PacketReadWriteCloser)
  66. go p.middlePipe(clientPacket, serverPacket, wait, &opts.ReadHacks)
  67. go p.middlePipe(serverPacket, clientPacket, wait, &opts.WriteHacks)
  68. } else {
  69. clientStream := clientConn.(wrappers.StreamReadWriteCloser)
  70. serverStream := serverConn.(wrappers.StreamReadWriteCloser)
  71. go p.directPipe(clientStream, serverStream, wait)
  72. go p.directPipe(serverStream, clientStream, wait)
  73. }
  74. wait.Wait()
  75. log.Infow("Client disconnected", "addr", conn.RemoteAddr())
  76. }
  77. func (p *Proxy) getTelegramConn(opts *mtproto.ConnectionOpts, connID string) (wrappers.Wrap, error) {
  78. streamConn, err := p.tg.Dial(connID, opts)
  79. if err != nil {
  80. return nil, errors.Annotate(err, "Cannot dial to Telegram")
  81. }
  82. packetConn, err := p.tg.Init(opts, streamConn)
  83. if err != nil {
  84. return nil, errors.Annotate(err, "Cannot handshake telegram")
  85. }
  86. return packetConn, nil
  87. }
  88. func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst io.WriteCloser,
  89. wait *sync.WaitGroup, hacks *mtproto.Hacks) {
  90. defer func() {
  91. src.Close() // nolint: errcheck
  92. dst.Close() // nolint: errcheck
  93. wait.Done()
  94. }()
  95. for {
  96. hacks.SimpleAck = false
  97. hacks.QuickAck = false
  98. packet, err := src.Read()
  99. if err != nil {
  100. src.Logger().Warnw("Cannot read packet", "error", err)
  101. return
  102. }
  103. if _, err = dst.Write(packet); err != nil {
  104. src.Logger().Warnw("Cannot write packet", "error", err)
  105. return
  106. }
  107. }
  108. }
  109. func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser, wait *sync.WaitGroup) {
  110. defer func() {
  111. src.Close() // nolint: errcheck
  112. dst.Close() // nolint: errcheck
  113. wait.Done()
  114. }()
  115. if _, err := io.Copy(dst, src); err != nil {
  116. src.Logger().Warnw("Cannot pump sockets", "error", err)
  117. }
  118. }
  119. // NewProxy returns new proxy instance.
  120. func NewProxy(conf *config.Config) *Proxy {
  121. var clientInit client.Init
  122. var tg telegram.Telegram
  123. if conf.UseMiddleProxy() {
  124. clientInit = client.MiddleInit
  125. tg = telegram.NewMiddleTelegram(conf)
  126. } else {
  127. clientInit = client.DirectInit
  128. tg = telegram.NewDirectTelegram(conf)
  129. }
  130. return &Proxy{
  131. conf: conf,
  132. clientInit: clientInit,
  133. tg: tg,
  134. }
  135. }