Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package relay
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. )
  8. type Relay struct {
  9. ctx context.Context
  10. ctxCancel context.CancelFunc
  11. logger Logger
  12. eastBuffer []byte
  13. westBuffer []byte
  14. tickChannel chan struct{}
  15. errorChannel chan error
  16. tickTimeout time.Duration
  17. }
  18. func (r *Relay) Process(eastConn, westConn io.ReadWriteCloser) error {
  19. eastConn = conn{
  20. ReadWriteCloser: eastConn,
  21. ctx: r.ctx,
  22. tickChannel: r.tickChannel,
  23. }
  24. westConn = conn{
  25. ReadWriteCloser: westConn,
  26. ctx: r.ctx,
  27. tickChannel: r.tickChannel,
  28. }
  29. defer func() {
  30. r.ctxCancel()
  31. eastConn.Close()
  32. westConn.Close()
  33. }()
  34. wg := &sync.WaitGroup{}
  35. wg.Add(3) // nolint: gomnd
  36. go r.runObserver(wg)
  37. go r.transmit(eastConn, westConn, r.westBuffer, "west", wg)
  38. r.transmit(westConn, eastConn, r.eastBuffer, "east", wg)
  39. wg.Wait()
  40. select {
  41. case err := <-r.errorChannel:
  42. return err
  43. default:
  44. return nil
  45. }
  46. }
  47. func (r *Relay) transmit(src io.ReadCloser, dst io.WriteCloser,
  48. buffer []byte, direction string, wg *sync.WaitGroup) {
  49. defer func() {
  50. wg.Done()
  51. src.Close()
  52. dst.Close()
  53. }()
  54. if _, err := io.CopyBuffer(dst, src, buffer); err != nil {
  55. r.logger.Printf("error '%v' happened on direction %s", err, direction)
  56. select {
  57. case <-r.ctx.Done():
  58. err = r.ctx.Err()
  59. default:
  60. }
  61. select {
  62. case r.errorChannel <- err:
  63. default:
  64. }
  65. }
  66. }
  67. func (r *Relay) runObserver(wg *sync.WaitGroup) {
  68. ticker := time.NewTicker(time.Second)
  69. defer func() {
  70. ticker.Stop()
  71. select {
  72. case <-ticker.C:
  73. default:
  74. }
  75. wg.Done()
  76. }()
  77. lastTickAt := time.Now()
  78. for {
  79. select {
  80. case <-r.ctx.Done():
  81. return
  82. case <-r.tickChannel:
  83. lastTickAt = time.Now()
  84. case <-ticker.C:
  85. if time.Since(lastTickAt) > r.tickTimeout {
  86. r.logger.Printf("exit due to a timeout")
  87. r.ctxCancel()
  88. return
  89. }
  90. }
  91. }
  92. }