Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

connection.go 3.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package hub
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. "github.com/9seconds/mtg/conntypes"
  8. "github.com/9seconds/mtg/mtproto"
  9. "github.com/9seconds/mtg/mtproto/rpc"
  10. "github.com/9seconds/mtg/protocol"
  11. "go.uber.org/zap"
  12. )
  13. const connectionTTL = time.Hour
  14. type connection struct {
  15. conn conntypes.PacketReadWriteCloser
  16. proxyConns map[string]*ProxyConn
  17. closeOnce sync.Once
  18. proxyConnsMutex sync.RWMutex
  19. id int
  20. logger *zap.SugaredLogger
  21. channelDone chan struct{}
  22. channelWrite chan conntypes.Packet
  23. channelRead chan *rpc.ProxyResponse
  24. channelConnAttach chan *ProxyConn
  25. channelConnDetach chan conntypes.ConnID
  26. }
  27. func (c *connection) run() {
  28. defer c.Close()
  29. ttl := time.NewTimer(connectionTTL)
  30. defer ttl.Stop()
  31. for {
  32. select {
  33. case <-c.channelDone:
  34. for _, v := range c.proxyConns {
  35. v.Close()
  36. }
  37. return
  38. case <-ttl.C:
  39. c.logger.Debugw("Closing connection by TTL")
  40. c.Close()
  41. case resp := <-c.channelRead:
  42. if channel, ok := c.proxyConns[string(resp.ConnID[:])]; ok {
  43. if resp.Type == rpc.ProxyResponseTypeCloseExt {
  44. channel.Close()
  45. } else {
  46. channel.put(resp)
  47. }
  48. }
  49. case packet := <-c.channelWrite:
  50. if err := c.conn.Write(packet); err != nil {
  51. c.logger.Debugw("Cannot write packet", "error", err)
  52. c.Close()
  53. }
  54. case conn := <-c.channelConnAttach:
  55. c.proxyConnsMutex.Lock()
  56. c.proxyConns[string(conn.req.ConnID[:])] = conn
  57. c.proxyConnsMutex.Unlock()
  58. conn.channelWrite = c.channelWrite
  59. case connID := <-c.channelConnDetach:
  60. if conn, ok := c.proxyConns[string(connID[:])]; ok {
  61. c.proxyConnsMutex.Lock()
  62. delete(c.proxyConns, string(connID[:]))
  63. c.proxyConnsMutex.Unlock()
  64. conn.Close()
  65. }
  66. }
  67. }
  68. }
  69. func (c *connection) readLoop() {
  70. for {
  71. packet, err := c.conn.Read()
  72. if err != nil {
  73. c.logger.Debugw("Cannot read packet", "error", err)
  74. c.Close()
  75. return
  76. }
  77. response, err := rpc.ParseProxyResponse(packet)
  78. if err != nil {
  79. c.logger.Debugw("Failed response", "error", err)
  80. continue
  81. }
  82. select {
  83. case <-c.channelDone:
  84. return
  85. case c.channelRead <- response:
  86. }
  87. }
  88. }
  89. func (c *connection) Close() {
  90. c.closeOnce.Do(func() {
  91. c.logger.Debugw("Closing connection")
  92. close(c.channelDone)
  93. c.conn.Close()
  94. })
  95. }
  96. func (c *connection) Done() bool {
  97. select {
  98. case <-c.channelDone:
  99. return true
  100. default:
  101. return c.Len() == 0
  102. }
  103. }
  104. func (c *connection) Len() int {
  105. c.proxyConnsMutex.RLock()
  106. defer c.proxyConnsMutex.RUnlock()
  107. return len(c.proxyConns)
  108. }
  109. func (c *connection) Attach(conn *ProxyConn) error {
  110. select {
  111. case <-c.channelDone:
  112. return ErrClosed
  113. case c.channelConnAttach <- conn:
  114. return nil
  115. }
  116. }
  117. func (c *connection) Detach(connID conntypes.ConnID) {
  118. select {
  119. case <-c.channelDone:
  120. case c.channelConnDetach <- connID:
  121. }
  122. }
  123. func newConnection(req *protocol.TelegramRequest) (*connection, error) {
  124. conn, err := mtproto.TelegramProtocol(req)
  125. if err != nil {
  126. return nil, fmt.Errorf("cannot create a new connection: %w", err)
  127. }
  128. id := rand.Int() // nolint: gosec
  129. rv := &connection{
  130. conn: conn,
  131. id: id,
  132. logger: zap.S().Named("hub-connection").With("id", id,
  133. "dc", req.ClientProtocol.DC(),
  134. "protocol", req.ClientProtocol.ConnectionProtocol()),
  135. proxyConns: make(map[string]*ProxyConn),
  136. channelRead: make(chan *rpc.ProxyResponse, 1),
  137. channelDone: make(chan struct{}),
  138. channelWrite: make(chan conntypes.Packet),
  139. channelConnAttach: make(chan *ProxyConn),
  140. channelConnDetach: make(chan conntypes.ConnID),
  141. }
  142. go rv.readLoop()
  143. go rv.run()
  144. return rv, nil
  145. }