Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

connection.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package hub
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "go.uber.org/zap"
  7. "github.com/9seconds/mtg/conntypes"
  8. "github.com/9seconds/mtg/mtproto"
  9. "github.com/9seconds/mtg/mtproto/rpc"
  10. "github.com/9seconds/mtg/protocol"
  11. )
  12. type connection struct {
  13. conn conntypes.PacketReadWriteCloser
  14. proxyConns map[string]*ProxyConn
  15. closeOnce sync.Once
  16. proxyConnsMutex sync.RWMutex
  17. id int
  18. logger *zap.SugaredLogger
  19. channelDone chan struct{}
  20. channelWrite chan conntypes.Packet
  21. channelRead chan *rpc.ProxyResponse
  22. channelConnAttach chan *ProxyConn
  23. channelConnDetach chan conntypes.ConnID
  24. }
  25. func (c *connection) run() {
  26. defer c.Close()
  27. for {
  28. select {
  29. case <-c.channelDone:
  30. for _, v := range c.proxyConns {
  31. v.Close()
  32. }
  33. return
  34. case resp := <-c.channelRead:
  35. if channel, ok := c.proxyConns[string(resp.ConnID[:])]; ok {
  36. if resp.Type == rpc.ProxyResponseTypeCloseExt {
  37. channel.Close()
  38. } else {
  39. channel.put(resp)
  40. }
  41. }
  42. case packet := <-c.channelWrite:
  43. if err := c.conn.Write(packet); err != nil {
  44. c.logger.Debugw("Cannot write packet", "error", err)
  45. c.Close()
  46. }
  47. case conn := <-c.channelConnAttach:
  48. c.proxyConnsMutex.Lock()
  49. c.proxyConns[string(conn.req.ConnID[:])] = conn
  50. c.proxyConnsMutex.Unlock()
  51. conn.channelWrite = c.channelWrite
  52. case connID := <-c.channelConnDetach:
  53. if conn, ok := c.proxyConns[string(connID[:])]; ok {
  54. c.proxyConnsMutex.Lock()
  55. delete(c.proxyConns, string(connID[:]))
  56. c.proxyConnsMutex.Unlock()
  57. conn.Close()
  58. }
  59. }
  60. }
  61. }
  62. func (c *connection) readLoop() {
  63. for {
  64. packet, err := c.conn.Read()
  65. if err != nil {
  66. c.logger.Debugw("Cannot read packet", "error", err)
  67. c.Close()
  68. return
  69. }
  70. response, err := rpc.ParseProxyResponse(packet)
  71. if err != nil {
  72. c.logger.Debugw("Failed response", "error", err)
  73. continue
  74. }
  75. select {
  76. case <-c.channelDone:
  77. return
  78. case c.channelRead <- response:
  79. }
  80. }
  81. }
  82. func (c *connection) Close() {
  83. c.closeOnce.Do(func() {
  84. c.logger.Debugw("Closing connection")
  85. close(c.channelDone)
  86. c.conn.Close()
  87. })
  88. }
  89. func (c *connection) Done() bool {
  90. select {
  91. case <-c.channelDone:
  92. return true
  93. default:
  94. return c.Len() == 0
  95. }
  96. }
  97. func (c *connection) Len() int {
  98. c.proxyConnsMutex.RLock()
  99. defer c.proxyConnsMutex.RUnlock()
  100. return len(c.proxyConns)
  101. }
  102. func (c *connection) Attach(conn *ProxyConn) error {
  103. select {
  104. case <-c.channelDone:
  105. return ErrClosed
  106. case c.channelConnAttach <- conn:
  107. return nil
  108. }
  109. }
  110. func (c *connection) Detach(connID conntypes.ConnID) {
  111. select {
  112. case <-c.channelDone:
  113. case c.channelConnDetach <- connID:
  114. }
  115. }
  116. func newConnection(req *protocol.TelegramRequest) (*connection, error) {
  117. conn, err := mtproto.TelegramProtocol(req)
  118. if err != nil {
  119. return nil, fmt.Errorf("cannot create a new connection: %w", err)
  120. }
  121. id := rand.Int() // nolint: gosec
  122. rv := &connection{
  123. conn: conn,
  124. id: id,
  125. logger: zap.S().Named("hub-connection").With("id", id,
  126. "dc", req.ClientProtocol.DC(),
  127. "protocol", req.ClientProtocol.ConnectionProtocol()),
  128. proxyConns: make(map[string]*ProxyConn),
  129. channelRead: make(chan *rpc.ProxyResponse, 1),
  130. channelDone: make(chan struct{}),
  131. channelWrite: make(chan conntypes.Packet),
  132. channelConnAttach: make(chan *ProxyConn),
  133. channelConnDetach: make(chan conntypes.ConnID),
  134. }
  135. go rv.readLoop()
  136. go rv.run()
  137. return rv, nil
  138. }