Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

relay.go 1.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package relay
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. )
  8. type Relay struct {
  9. ctx context.Context
  10. ctxCancel context.CancelFunc
  11. logger Logger
  12. eastBuffer []byte
  13. westBuffer []byte
  14. tickChannel chan struct{}
  15. errorChannel chan error
  16. tickTimeout time.Duration
  17. }
  18. func (r *Relay) Process(eastConn, westConn io.ReadWriteCloser) error {
  19. eastConn = conn{
  20. ReadWriteCloser: eastConn,
  21. relay: r,
  22. }
  23. westConn = conn{
  24. ReadWriteCloser: westConn,
  25. relay: r,
  26. }
  27. defer func() {
  28. r.ctxCancel()
  29. eastConn.Close()
  30. westConn.Close()
  31. }()
  32. wg := &sync.WaitGroup{}
  33. wg.Add(3) // nolint: gomnd
  34. go r.runObserver(r.ctx, wg)
  35. go r.transmit(eastConn, westConn, r.westBuffer, "west", wg)
  36. r.transmit(westConn, eastConn, r.eastBuffer, "east", wg)
  37. wg.Wait()
  38. select {
  39. case err := <-r.errorChannel:
  40. return err
  41. default:
  42. return nil
  43. }
  44. }
  45. func (r *Relay) transmit(src io.ReadCloser, dst io.WriteCloser,
  46. buffer []byte, direction string, wg *sync.WaitGroup) {
  47. defer func() {
  48. wg.Done()
  49. src.Close()
  50. dst.Close()
  51. }()
  52. if _, err := io.CopyBuffer(dst, src, buffer); err != nil {
  53. r.logger.Printf("error '%v' happened on direction %s", err, direction)
  54. select {
  55. case <-r.ctx.Done():
  56. case r.errorChannel <- err:
  57. default:
  58. }
  59. }
  60. }
  61. func (r *Relay) runObserver(ctx context.Context, wg *sync.WaitGroup) {
  62. ticker := time.NewTicker(time.Second)
  63. defer func() {
  64. ticker.Stop()
  65. select {
  66. case <-ticker.C:
  67. default:
  68. }
  69. wg.Done()
  70. }()
  71. lastTickAt := time.Now()
  72. for {
  73. select {
  74. case <-ctx.Done():
  75. return
  76. case <-r.tickChannel:
  77. lastTickAt = time.Now()
  78. case <-ticker.C:
  79. if time.Since(lastTickAt) > r.tickTimeout {
  80. r.logger.Printf("exit due to a timeout")
  81. r.ctxCancel()
  82. return
  83. }
  84. }
  85. }
  86. }