Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

event_stream.go 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package events
  2. import (
  3. "context"
  4. "math/rand/v2"
  5. "runtime"
  6. "github.com/9seconds/mtg/v2/mtglib"
  7. "github.com/OneOfOne/xxhash"
  8. )
  9. // EventStream is a default implementation of the [mtglib.EventStream]
  10. // interface.
  11. //
  12. // EventStream manages a set of goroutines, observers. Main
  13. // responsibility of the event stream is to route an event to relevant
  14. // observer based on some hash so each observer will have all events
  15. // which belong to some stream id.
  16. //
  17. // Thus, EventStream can spawn many observers.
  18. type EventStream struct {
  19. ctx context.Context
  20. ctxCancel context.CancelFunc
  21. chans []chan mtglib.Event
  22. }
  23. // Send starts delivering of the message to observer with respect to a
  24. // given context If context is closed, message could be not delivered.
  25. func (e EventStream) Send(ctx context.Context, evt mtglib.Event) {
  26. var chanNo uint32
  27. if streamID := evt.StreamID(); streamID != "" {
  28. chanNo = xxhash.ChecksumString32(streamID)
  29. } else {
  30. chanNo = rand.Uint32()
  31. }
  32. select {
  33. case <-ctx.Done():
  34. case <-e.ctx.Done():
  35. case e.chans[chanNo%uint32(len(e.chans))] <- evt:
  36. }
  37. }
  38. // Shutdown stops an event stream pipeline.
  39. func (e EventStream) Shutdown() {
  40. e.ctxCancel()
  41. }
  42. // NewEventStream builds a new default event stream.
  43. //
  44. // If you give an empty array of observers, then NoopObserver is going
  45. // to be used. If you give many observers, then they will process a
  46. // message concurrently.
  47. func NewEventStream(observerFactories []ObserverFactory) EventStream {
  48. if len(observerFactories) == 0 {
  49. observerFactories = append(observerFactories, NewNoopObserver)
  50. }
  51. ctx, cancel := context.WithCancel(context.Background())
  52. rv := EventStream{
  53. ctx: ctx,
  54. ctxCancel: cancel,
  55. chans: make([]chan mtglib.Event, runtime.NumCPU()),
  56. }
  57. for i := range runtime.NumCPU() {
  58. rv.chans[i] = make(chan mtglib.Event, 1)
  59. if len(observerFactories) == 1 {
  60. go eventStreamProcessor(ctx, rv.chans[i], observerFactories[0]())
  61. } else {
  62. go eventStreamProcessor(ctx, rv.chans[i], newMultiObserver(observerFactories))
  63. }
  64. }
  65. return rv
  66. }
  67. func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, observer Observer) { //nolint: cyclop
  68. defer observer.Shutdown()
  69. for {
  70. select {
  71. case <-ctx.Done():
  72. return
  73. case evt := <-eventChan:
  74. switch typedEvt := evt.(type) {
  75. case mtglib.EventTraffic:
  76. observer.EventTraffic(typedEvt)
  77. case mtglib.EventStart:
  78. observer.EventStart(typedEvt)
  79. case mtglib.EventFinish:
  80. observer.EventFinish(typedEvt)
  81. case mtglib.EventConnectedToDC:
  82. observer.EventConnectedToDC(typedEvt)
  83. case mtglib.EventDomainFronting:
  84. observer.EventDomainFronting(typedEvt)
  85. case mtglib.EventIPBlocklisted:
  86. observer.EventIPBlocklisted(typedEvt)
  87. case mtglib.EventConcurrencyLimited:
  88. observer.EventConcurrencyLimited(typedEvt)
  89. case mtglib.EventReplayAttack:
  90. observer.EventReplayAttack(typedEvt)
  91. case mtglib.EventIPListSize:
  92. observer.EventIPListSize(typedEvt)
  93. }
  94. }
  95. }
  96. }