Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

wrapper_conn.go 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package wrappers
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "go.uber.org/zap"
  8. "github.com/9seconds/mtg/config"
  9. "github.com/9seconds/mtg/conntypes"
  10. )
  11. type connPurpose uint8
  12. const (
  13. connPurposeClient connPurpose = 1 << iota
  14. connPurposeTelegram
  15. )
  16. const (
  17. connTimeoutRead = 2 * time.Minute
  18. connTimeoutWrite = 2 * time.Minute
  19. )
  20. type wrapperConn struct {
  21. parent net.Conn
  22. ctx context.Context
  23. cancel context.CancelFunc
  24. connID conntypes.ConnID
  25. logger *zap.SugaredLogger
  26. localAddr *net.TCPAddr
  27. remoteAddr *net.TCPAddr
  28. }
  29. func (w *wrapperConn) WriteTimeout(p []byte, timeout time.Duration) (int, error) {
  30. select {
  31. case <-w.ctx.Done():
  32. w.Close()
  33. return 0, fmt.Errorf("cannot write because context was closed: %w", w.ctx.Err())
  34. default:
  35. if err := w.parent.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
  36. w.Close() // nolint: gosec
  37. return 0, fmt.Errorf("cannot set write deadline to the socket: %w", err)
  38. }
  39. n, err := w.parent.Write(p)
  40. w.logger.Debugw("Write to stream", "bytes", n, "error", err)
  41. if err != nil {
  42. w.Close() // nolint: gosec
  43. }
  44. return n, err
  45. }
  46. }
  47. func (w *wrapperConn) Write(p []byte) (int, error) {
  48. return w.WriteTimeout(p, connTimeoutWrite)
  49. }
  50. func (w *wrapperConn) ReadTimeout(p []byte, timeout time.Duration) (int, error) {
  51. select {
  52. case <-w.ctx.Done():
  53. w.Close()
  54. return 0, fmt.Errorf("cannot read because context was closed: %w", w.ctx.Err())
  55. default:
  56. if err := w.parent.SetReadDeadline(time.Now().Add(timeout)); err != nil {
  57. w.Close()
  58. return 0, fmt.Errorf("cannot set read deadline to the socket: %w", err)
  59. }
  60. n, err := w.parent.Read(p)
  61. w.logger.Debugw("Read from stream", "bytes", n, "error", err)
  62. if err != nil {
  63. w.Close()
  64. }
  65. return n, err
  66. }
  67. }
  68. func (w *wrapperConn) Read(p []byte) (int, error) {
  69. return w.ReadTimeout(p, connTimeoutRead)
  70. }
  71. func (w *wrapperConn) Close() error {
  72. w.logger.Debugw("Close connection")
  73. w.cancel()
  74. return w.parent.Close()
  75. }
  76. func (w *wrapperConn) Conn() net.Conn {
  77. return w.parent
  78. }
  79. func (w *wrapperConn) Logger() *zap.SugaredLogger {
  80. return w.logger
  81. }
  82. func (w *wrapperConn) LocalAddr() *net.TCPAddr {
  83. return w.localAddr
  84. }
  85. func (w *wrapperConn) RemoteAddr() *net.TCPAddr {
  86. return w.remoteAddr
  87. }
  88. func newConn(ctx context.Context,
  89. cancel context.CancelFunc,
  90. parent net.Conn,
  91. connID conntypes.ConnID,
  92. purpose connPurpose) StreamReadWriteCloser {
  93. localAddr := *parent.LocalAddr().(*net.TCPAddr)
  94. if parent.RemoteAddr().(*net.TCPAddr).IP.To4() != nil {
  95. if config.C.PublicIPv4Addr.IP != nil {
  96. localAddr.IP = config.C.PublicIPv4Addr.IP
  97. }
  98. } else if config.C.PublicIPv6Addr.IP != nil {
  99. localAddr.IP = config.C.PublicIPv6Addr.IP
  100. }
  101. logger := zap.S().With(
  102. "local_address", localAddr,
  103. "remote_address", parent.RemoteAddr(),
  104. ).Named("conn")
  105. if purpose == connPurposeClient {
  106. logger = logger.With("connection_id", connID.String())
  107. }
  108. return &wrapperConn{
  109. parent: parent,
  110. ctx: ctx,
  111. cancel: cancel,
  112. connID: connID,
  113. logger: logger,
  114. remoteAddr: parent.RemoteAddr().(*net.TCPAddr),
  115. localAddr: &localAddr,
  116. }
  117. }
  118. func NewClientConn(ctx context.Context,
  119. cancel context.CancelFunc,
  120. parent net.Conn,
  121. connID conntypes.ConnID) StreamReadWriteCloser {
  122. return newConn(ctx, cancel, parent, connID, connPurposeClient)
  123. }
  124. func NewTelegramConn(ctx context.Context,
  125. cancel context.CancelFunc,
  126. parent net.Conn) StreamReadWriteCloser {
  127. return newConn(ctx, cancel, parent, conntypes.ConnID{}, connPurposeTelegram)
  128. }