Просмотр исходного кода

Add documentation for event stream

tags/v2.0.0-rc1
9seconds 5 лет назад
Родитель
Сommit
04b88cc864
4 измененных файлов: 85 добавлений и 0 удалений
  1. 17
    0
      events/event_stream.go
  2. 51
    0
      events/init.go
  3. 2
    0
      events/noop.go
  4. 15
    0
      mtglib/init.go

+ 17
- 0
events/event_stream.go Просмотреть файл

@@ -9,12 +9,23 @@ import (
9 9
 	"github.com/OneOfOne/xxhash"
10 10
 )
11 11
 
12
+// EventStream is a default implementation of the mtglib.EventStream
13
+// interface.
14
+//
15
+// EventStream manages a set of goroutines, observers. Main
16
+// responsibility of the event stream is to route an event to relevant
17
+// observer based on some hash so each observer will have all events
18
+// which belong to some stream id.
19
+//
20
+// Thus, EventStream can spawn many observers.
12 21
 type EventStream struct {
13 22
 	ctx       context.Context
14 23
 	ctxCancel context.CancelFunc
15 24
 	chans     []chan mtglib.Event
16 25
 }
17 26
 
27
+// Send starts delivering of the message to observer with respect to a
28
+// given context If context is closed, message could be not delivered.
18 29
 func (e EventStream) Send(ctx context.Context, evt mtglib.Event) {
19 30
 	var chanNo uint32
20 31
 
@@ -31,10 +42,16 @@ func (e EventStream) Send(ctx context.Context, evt mtglib.Event) {
31 42
 	}
32 43
 }
33 44
 
45
+// Shutdown stops an event stream pipeline.
34 46
 func (e EventStream) Shutdown() {
35 47
 	e.ctxCancel()
36 48
 }
37 49
 
50
+// NewEventStream builds a new default event stream.
51
+//
52
+// If you give an empty array of observers, then NoopObserver is going
53
+// to be used. If you give many observers, then they will process a
54
+// message concurrently.
38 55
 func NewEventStream(observerFactories []ObserverFactory) EventStream {
39 56
 	if len(observerFactories) == 0 {
40 57
 		observerFactories = append(observerFactories, NewNoopObserver)

+ 51
- 0
events/init.go Просмотреть файл

@@ -1,18 +1,69 @@
1
+// Events has a default implementations of EventStream for mtglib.
2
+//
3
+// Please see documentation for mtglib.EventStream interface to get an
4
+// idea of such an abstraction. This package has implementations for the
5
+// default event stream.
6
+//
7
+// Default event stream has a list of its own concepts. First, all it
8
+// does is a routing of messages to known observers. It takes an event,
9
+// defines its type and pass this message to a method of the observer.
10
+//
11
+// There might be many observers, but default event stream has a
12
+// guarantee though. It uses StreamID as a sharding key and guarantees
13
+// that a message with the same StreamID will be devlivered to the same
14
+// observer instance. So, each producer is guarateed to get all relevant
15
+// messages related to the same session. It is not possible that it will
16
+// get EventFinish if it has not seen EventStart for that session yet.
1 17
 package events
2 18
 
3 19
 import "github.com/9seconds/mtg/v2/mtglib"
4 20
 
21
+// Observer is an instance that listens for the incoming events.
22
+//
23
+// As it is said in the package description, the default event stream
24
+// guarantees that all events with the same StreamID are going to be
25
+// routed to the same instance of the observer. So, there is no need
26
+// to synchronize information about streams between many observers
27
+// instances, they can have their local storage.
5 28
 type Observer interface {
29
+	// EventStart reacts on incoming mtglib.EventStart event.
6 30
 	EventStart(mtglib.EventStart)
31
+
32
+	// EventFinish reacts on incoming mtglib.EventFinish event.
7 33
 	EventFinish(mtglib.EventFinish)
34
+
35
+	// EventConnectedToDC reacts on incoming mtglib.EventConnectedToDC
36
+	// event.
8 37
 	EventConnectedToDC(mtglib.EventConnectedToDC)
38
+
39
+	// EventDomainFronting reacts on incoming mtglib.EventDomainFronting
40
+	// event.
9 41
 	EventDomainFronting(mtglib.EventDomainFronting)
42
+
43
+	// EventTraffic reacts on incoming mtglib.EventTraffic event.
10 44
 	EventTraffic(mtglib.EventTraffic)
45
+
46
+	// EventConcurrencyLimited reacts on incoming
47
+	// mtglib.EventConcurrencyLimited event.
11 48
 	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
49
+
50
+	// EventIPBlocklisted reacts on incoming mtglib.EventIPBlocklisted event.
12 51
 	EventIPBlocklisted(mtglib.EventIPBlocklisted)
52
+
53
+	// EventReplayAttack reacts on incoming mtglib.EventReplayAttack event.
13 54
 	EventReplayAttack(mtglib.EventReplayAttack)
14 55
 
56
+	// Shutdown stop observer. Default event stream guarantees:
57
+	//   1. If shutdown is executed, it is executed only once
58
+	//   2. Observer won't receieve any new message after this
59
+	//      function call.
15 60
 	Shutdown()
16 61
 }
17 62
 
63
+// ObserverFactory creates a new instance of the observer.
64
+//
65
+// Default event stream creates a small set of goroutines to manage
66
+// incoming messages. Each message is routed to an appropriate observer
67
+// based on a sharding key, stream id. So, it is possible that an
68
+// instance of mtg will have many observer instances, not a single one.
18 69
 type ObserverFactory func() Observer

+ 2
- 0
events/noop.go Просмотреть файл

@@ -10,6 +10,7 @@ type noop struct{}
10 10
 
11 11
 func (n noop) Send(ctx context.Context, evt mtglib.Event) {}
12 12
 
13
+// NewNoopStream creates a stream which discards each message.
13 14
 func NewNoopStream() mtglib.EventStream {
14 15
 	return noop{}
15 16
 }
@@ -26,6 +27,7 @@ func (n noopObserver) EventIPBlocklisted(_ mtglib.EventIPBlocklisted)
26 27
 func (n noopObserver) EventReplayAttack(_ mtglib.EventReplayAttack)             {}
27 28
 func (n noopObserver) Shutdown()                                                {}
28 29
 
30
+// NewNoopObserver creates an observer which discards each message.
29 31
 func NewNoopObserver() Observer {
30 32
 	return noopObserver{}
31 33
 }

+ 15
- 0
mtglib/init.go Просмотреть файл

@@ -78,7 +78,22 @@ type Event interface {
78 78
 	Timestamp() time.Time
79 79
 }
80 80
 
81
+// EventStream is an abstraction that accepts a set of events produced
82
+// by mtg. Its main goal is to inject your logging or monitoring system.
83
+//
84
+// The idea is simple. When mtg works, it emits a set of events during
85
+// a lifecycle of the requestor: EventStart, EventFinish etc. mtg is a
86
+// producer which puts these events into a stream. Responsibility of
87
+// the stream is to deliver this event to consumers/observers. There
88
+// might be many different observers (for example, you want to have both
89
+// statsd and prometheus), mtg should know nothing about them.
81 90
 type EventStream interface {
91
+	// Send delivers an event to observers. Given context has to be
92
+	// respected. If the context is closed, all blocking operations should
93
+	// be released ASAP.
94
+	//
95
+	// It is possible that context is closed but the message is delivered.
96
+	// EventStream implementations should solve this issue somehow.
82 97
 	Send(context.Context, Event)
83 98
 }
84 99
 

Загрузка…
Отмена
Сохранить