Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

conn.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package wrappers
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. "go.uber.org/zap"
  7. "github.com/9seconds/mtg/stats"
  8. "github.com/juju/errors"
  9. )
  10. // ConnPurpose is intended to be identifier of connection purpose. We
  11. // sometimes want to treat client/telegram connection differently (for
  12. // logging for example).
  13. type ConnPurpose uint8
  14. func (c ConnPurpose) String() string {
  15. switch c {
  16. case ConnPurposeClient:
  17. return "client"
  18. case ConnPurposeTelegram:
  19. return "telegram"
  20. }
  21. return ""
  22. }
  23. // ConnPurpose* define different connection types.
  24. const (
  25. ConnPurposeClient = iota
  26. ConnPurposeTelegram
  27. )
  28. const (
  29. connTimeoutRead = 2 * time.Minute
  30. connTimeoutWrite = 2 * time.Minute
  31. )
  32. type ioResult struct {
  33. n int
  34. err error
  35. }
  36. type ioFunc func([]byte) (int, error)
  37. // Conn is a basic wrapper for net.Conn providing the most low-level
  38. // logic and management as possible.
  39. type Conn struct {
  40. conn net.Conn
  41. ctx context.Context
  42. cancel context.CancelFunc
  43. connID string
  44. logger *zap.SugaredLogger
  45. publicIPv4 net.IP
  46. publicIPv6 net.IP
  47. }
  48. func (c *Conn) Write(p []byte) (int, error) {
  49. select {
  50. case <-c.ctx.Done():
  51. return 0, errors.Annotate(c.ctx.Err(), "Cannot write because context was closed")
  52. default:
  53. n, err := c.doIO(c.conn.Write, p, connTimeoutWrite)
  54. c.logger.Debugw("Write to stream", "bytes", n, "error", err)
  55. stats.EgressTraffic(n)
  56. return n, err
  57. }
  58. }
  59. func (c *Conn) Read(p []byte) (int, error) {
  60. select {
  61. case <-c.ctx.Done():
  62. return 0, errors.Annotate(c.ctx.Err(), "Cannot read because context was closed")
  63. default:
  64. n, err := c.doIO(c.conn.Read, p, connTimeoutRead)
  65. c.logger.Debugw("Read from stream", "bytes", n, "error", err)
  66. stats.IngressTraffic(n)
  67. return n, err
  68. }
  69. }
  70. func (c *Conn) doIO(callback ioFunc, p []byte, timeout time.Duration) (int, error) {
  71. resChan := make(chan ioResult, 1)
  72. timer := time.NewTimer(timeout)
  73. go func() {
  74. n, err := callback(p)
  75. resChan <- ioResult{n: n, err: err}
  76. }()
  77. select {
  78. case res := <-resChan:
  79. timer.Stop()
  80. if res.err != nil {
  81. c.Close() // nolint: gosec
  82. }
  83. return res.n, res.err
  84. case <-c.ctx.Done():
  85. timer.Stop()
  86. c.Close() // nolint: gosec
  87. return 0, errors.Annotate(c.ctx.Err(), "Cannot do IO because context is closed")
  88. case <-timer.C:
  89. c.Close() // nolint: gosec
  90. return 0, errors.Annotate(c.ctx.Err(), "Timeout on IO operation")
  91. }
  92. }
  93. // Close closes underlying net.Conn instance.
  94. func (c *Conn) Close() error {
  95. defer c.logger.Debugw("Close connection")
  96. c.cancel()
  97. return c.conn.Close()
  98. }
  99. // Logger returns an instance of the logger for this wrapper.
  100. func (c *Conn) Logger() *zap.SugaredLogger {
  101. return c.logger
  102. }
  103. // LocalAddr returns local address of the underlying net.Conn.
  104. func (c *Conn) LocalAddr() *net.TCPAddr {
  105. addr := c.conn.LocalAddr().(*net.TCPAddr)
  106. newAddr := *addr
  107. if c.RemoteAddr().IP.To4() != nil {
  108. if c.publicIPv4 != nil {
  109. newAddr.IP = c.publicIPv4
  110. }
  111. } else if c.publicIPv6 != nil {
  112. newAddr.IP = c.publicIPv6
  113. }
  114. return &newAddr
  115. }
  116. // RemoteAddr returns remote address of the underlying net.Conn.
  117. func (c *Conn) RemoteAddr() *net.TCPAddr {
  118. return c.conn.RemoteAddr().(*net.TCPAddr)
  119. }
  120. // NewConn initializes Conn wrapper for net.Conn.
  121. func NewConn(ctx context.Context, cancel context.CancelFunc, conn net.Conn,
  122. connID string, purpose ConnPurpose, publicIPv4, publicIPv6 net.IP) StreamReadWriteCloser {
  123. logger := zap.S().With(
  124. "connection_id", connID,
  125. "local_address", conn.LocalAddr(),
  126. "remote_address", conn.RemoteAddr(),
  127. "purpose", purpose,
  128. ).Named("conn")
  129. wrapper := Conn{
  130. conn: conn,
  131. ctx: ctx,
  132. cancel: cancel,
  133. connID: connID,
  134. logger: logger,
  135. publicIPv4: publicIPv4,
  136. publicIPv6: publicIPv6,
  137. }
  138. wrapper.logger = logger.With("faked_local_addr", wrapper.LocalAddr())
  139. return &wrapper
  140. }