Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

conns.go 2.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package mtglib
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net"
  8. "time"
  9. "github.com/9seconds/mtg/v2/essentials"
  10. "github.com/pires/go-proxyproto"
  11. )
  12. type connTraffic struct {
  13. essentials.Conn
  14. streamID string
  15. stream EventStream
  16. ctx context.Context
  17. }
  18. func (c connTraffic) Read(b []byte) (int, error) {
  19. n, err := c.Conn.Read(b)
  20. if n > 0 {
  21. c.stream.Send(c.ctx, NewEventTraffic(c.streamID, uint(n), true))
  22. }
  23. return n, err //nolint: wrapcheck
  24. }
  25. func (c connTraffic) Write(b []byte) (int, error) {
  26. n, err := c.Conn.Write(b)
  27. if n > 0 {
  28. c.stream.Send(c.ctx, NewEventTraffic(c.streamID, uint(n), false))
  29. }
  30. return n, err //nolint: wrapcheck
  31. }
  32. type connRewind struct {
  33. essentials.Conn
  34. buf bytes.Buffer
  35. active io.Reader
  36. }
  37. func (c *connRewind) Read(p []byte) (int, error) {
  38. return c.active.Read(p)
  39. }
  40. func (c *connRewind) Rewind() {
  41. c.active = io.MultiReader(&c.buf, c.Conn)
  42. }
  43. func newConnRewind(conn essentials.Conn) *connRewind {
  44. rv := &connRewind{
  45. Conn: conn,
  46. }
  47. rv.active = io.TeeReader(conn, &rv.buf)
  48. return rv
  49. }
  50. type connProxyProtocol struct {
  51. essentials.Conn
  52. sourceAddr net.Addr
  53. headersWritten bool
  54. }
  55. func (c *connProxyProtocol) Write(p []byte) (int, error) {
  56. if !c.headersWritten {
  57. headers := proxyproto.HeaderProxyFromAddrs(2, c.sourceAddr, c.RemoteAddr())
  58. toSend, err := headers.Format()
  59. if err != nil {
  60. panic(err)
  61. }
  62. if _, err := c.Conn.Write(toSend); err != nil {
  63. return 0, fmt.Errorf("cannot send proxy protocol header: %w", err)
  64. }
  65. c.headersWritten = true
  66. }
  67. return c.Conn.Write(p)
  68. }
  69. func newConnProxyProtocol(source, target essentials.Conn) *connProxyProtocol {
  70. return &connProxyProtocol{
  71. Conn: target,
  72. sourceAddr: source.RemoteAddr(),
  73. }
  74. }
  75. type connIdleTimeout struct {
  76. essentials.Conn
  77. timeout time.Duration
  78. }
  79. func (c connIdleTimeout) Read(b []byte) (int, error) {
  80. c.SetReadDeadline(time.Now().Add(c.timeout)) //nolint: errcheck
  81. return c.Conn.Read(b) //nolint: wrapcheck
  82. }
  83. func (c connIdleTimeout) Write(b []byte) (int, error) {
  84. c.SetWriteDeadline(time.Now().Add(c.timeout)) //nolint: errcheck
  85. return c.Conn.Write(b) //nolint: wrapcheck
  86. }