Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

conns.go 3.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package mtglib
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "sync/atomic"
  10. "time"
  11. "github.com/9seconds/mtg/v2/essentials"
  12. "github.com/pires/go-proxyproto"
  13. )
  14. type connTraffic struct {
  15. essentials.Conn
  16. streamID string
  17. stream EventStream
  18. ctx context.Context
  19. }
  20. func (c connTraffic) Read(b []byte) (int, error) {
  21. n, err := c.Conn.Read(b)
  22. if n > 0 {
  23. c.stream.Send(c.ctx, NewEventTraffic(c.streamID, uint(n), true))
  24. }
  25. return n, err //nolint: wrapcheck
  26. }
  27. func (c connTraffic) Write(b []byte) (int, error) {
  28. n, err := c.Conn.Write(b)
  29. if n > 0 {
  30. c.stream.Send(c.ctx, NewEventTraffic(c.streamID, uint(n), false))
  31. }
  32. return n, err //nolint: wrapcheck
  33. }
  34. type connRewind struct {
  35. essentials.Conn
  36. buf bytes.Buffer
  37. active io.Reader
  38. }
  39. func (c *connRewind) Read(p []byte) (int, error) {
  40. return c.active.Read(p)
  41. }
  42. func (c *connRewind) Rewind() {
  43. c.active = io.MultiReader(&c.buf, c.Conn)
  44. }
  45. func newConnRewind(conn essentials.Conn) *connRewind {
  46. rv := &connRewind{
  47. Conn: conn,
  48. }
  49. rv.active = io.TeeReader(conn, &rv.buf)
  50. return rv
  51. }
  52. type connProxyProtocol struct {
  53. essentials.Conn
  54. sourceAddr net.Addr
  55. headersWritten bool
  56. }
  57. func (c *connProxyProtocol) Write(p []byte) (int, error) {
  58. if !c.headersWritten {
  59. headers := proxyproto.HeaderProxyFromAddrs(2, c.sourceAddr, c.RemoteAddr())
  60. toSend, err := headers.Format()
  61. if err != nil {
  62. panic(err)
  63. }
  64. if _, err := c.Conn.Write(toSend); err != nil {
  65. return 0, fmt.Errorf("cannot send proxy protocol header: %w", err)
  66. }
  67. c.headersWritten = true
  68. }
  69. return c.Conn.Write(p)
  70. }
  71. func newConnProxyProtocol(source, target essentials.Conn) *connProxyProtocol {
  72. return &connProxyProtocol{
  73. Conn: target,
  74. sourceAddr: source.RemoteAddr(),
  75. }
  76. }
  77. // idleTracker is a shared idle tracker for a pair of relay connections.
  78. // Both directions update the same timestamp so that activity in one direction
  79. // prevents the other (idle) direction from timing out.
  80. type idleTracker struct {
  81. lastActive atomic.Pointer[time.Time]
  82. timeout time.Duration
  83. }
  84. func newIdleTracker(timeout time.Duration) *idleTracker {
  85. t := &idleTracker{timeout: timeout}
  86. t.touch()
  87. return t
  88. }
  89. func (t *idleTracker) touch() {
  90. stamp := time.Now()
  91. t.lastActive.Store(&stamp)
  92. }
  93. func (t *idleTracker) isIdle() bool {
  94. return time.Since(*t.lastActive.Load()) >= t.timeout
  95. }
  96. type connIdleTimeout struct {
  97. essentials.Conn
  98. tracker *idleTracker
  99. }
  100. func (c connIdleTimeout) Read(b []byte) (int, error) {
  101. var netErr net.Error
  102. for {
  103. c.SetReadDeadline(time.Now().Add(c.tracker.timeout)) //nolint: errcheck
  104. n, err := c.Conn.Read(b)
  105. switch {
  106. case err == nil:
  107. c.tracker.touch()
  108. return n, nil
  109. case errors.As(err, &netErr) && netErr.Timeout() && !c.tracker.isIdle():
  110. continue
  111. }
  112. return n, err
  113. }
  114. }
  115. func (c connIdleTimeout) Write(b []byte) (int, error) {
  116. c.SetWriteDeadline(time.Now().Add(c.tracker.timeout)) //nolint: errcheck
  117. n, err := c.Conn.Write(b)
  118. if n > 0 {
  119. c.tracker.touch()
  120. }
  121. return n, err //nolint: wrapcheck
  122. }