| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- package events
-
- import (
- "context"
- "math/rand/v2"
- "runtime"
-
- "github.com/9seconds/mtg/v2/mtglib"
- "github.com/OneOfOne/xxhash"
- )
-
- // EventStream is a default implementation of the [mtglib.EventStream]
- // interface.
- //
- // EventStream manages a set of goroutines, observers. Main
- // responsibility of the event stream is to route an event to relevant
- // observer based on some hash so each observer will have all events
- // which belong to some stream id.
- //
- // Thus, EventStream can spawn many observers.
- type EventStream struct {
- ctx context.Context
- ctxCancel context.CancelFunc
- chans []chan mtglib.Event
- }
-
- // Send starts delivering of the message to observer with respect to a
- // given context If context is closed, message could be not delivered.
- func (e EventStream) Send(ctx context.Context, evt mtglib.Event) {
- var chanNo uint32
-
- if streamID := evt.StreamID(); streamID != "" {
- chanNo = xxhash.ChecksumString32(streamID)
- } else {
- chanNo = rand.Uint32()
- }
-
- select {
- case <-ctx.Done():
- case <-e.ctx.Done():
- case e.chans[int(chanNo)%len(e.chans)] <- evt:
- }
- }
-
- // Shutdown stops an event stream pipeline.
- func (e EventStream) Shutdown() {
- e.ctxCancel()
- }
-
- // NewEventStream builds a new default event stream.
- //
- // If you give an empty array of observers, then NoopObserver is going
- // to be used. If you give many observers, then they will process a
- // message concurrently.
- func NewEventStream(observerFactories []ObserverFactory) EventStream {
- if len(observerFactories) == 0 {
- observerFactories = append(observerFactories, NewNoopObserver)
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- rv := EventStream{
- ctx: ctx,
- ctxCancel: cancel,
- chans: make([]chan mtglib.Event, runtime.NumCPU()),
- }
-
- for i := range runtime.NumCPU() {
- rv.chans[i] = make(chan mtglib.Event, 1)
-
- if len(observerFactories) == 1 {
- go eventStreamProcessor(ctx, rv.chans[i], observerFactories[0]())
- } else {
- go eventStreamProcessor(ctx, rv.chans[i], newMultiObserver(observerFactories))
- }
- }
-
- return rv
- }
-
- func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, observer Observer) { //nolint: cyclop
- defer observer.Shutdown()
-
- for {
- select {
- case <-ctx.Done():
- return
- case evt := <-eventChan:
- switch typedEvt := evt.(type) {
- case mtglib.EventTraffic:
- observer.EventTraffic(typedEvt)
- case mtglib.EventStart:
- observer.EventStart(typedEvt)
- case mtglib.EventFinish:
- observer.EventFinish(typedEvt)
- case mtglib.EventConnectedToDC:
- observer.EventConnectedToDC(typedEvt)
- case mtglib.EventDomainFronting:
- observer.EventDomainFronting(typedEvt)
- case mtglib.EventIPBlocklisted:
- observer.EventIPBlocklisted(typedEvt)
- case mtglib.EventConcurrencyLimited:
- observer.EventConcurrencyLimited(typedEvt)
- case mtglib.EventReplayAttack:
- observer.EventReplayAttack(typedEvt)
- case mtglib.EventIPListSize:
- observer.EventIPListSize(typedEvt)
- }
- }
- }
- }
|