Procházet zdrojové kódy

Add tests for noop event stream

tags/v2.0.0-rc1
9seconds před 5 roky
rodič
revize
e009d05a90
8 změnil soubory, kde provedl 158 přidání a 11 odebrání
  1. 22
    3
      events/event_stream.go
  2. 1
    0
      events/init.go
  3. 4
    0
      events/init_test.go
  4. 15
    0
      events/multi_observer.go
  5. 27
    0
      events/noop.go
  6. 75
    0
      events/noop_test.go
  7. 13
    7
      mtglib/events.go
  8. 1
    1
      mtglib/init.go

+ 22
- 3
events/event_stream.go Zobrazit soubor

@@ -2,6 +2,7 @@ package events
2 2
 
3 3
 import (
4 4
 	"context"
5
+	"math/rand"
5 6
 	"runtime"
6 7
 
7 8
 	"github.com/9seconds/mtg/v2/mtglib"
@@ -15,12 +16,20 @@ type eventStream struct {
15 16
 }
16 17
 
17 18
 func (e eventStream) Send(ctx context.Context, evt mtglib.Event) {
18
-	chanNo := int(xxhash.ChecksumString32(evt.ConnectionID())) % len(e.chans)
19
+	var chanNo uint32
20
+
21
+	streamID := evt.StreamID()
22
+
23
+	if streamID == "" {
24
+		chanNo = rand.Uint32()
25
+	} else {
26
+		chanNo = xxhash.ChecksumString32(streamID)
27
+	}
19 28
 
20 29
 	select {
21 30
 	case <-ctx.Done():
22 31
 	case <-e.ctx.Done():
23
-	case e.chans[chanNo] <- evt:
32
+	case e.chans[int(chanNo)%len(e.chans)] <- evt:
24 33
 	}
25 34
 }
26 35
 
@@ -29,6 +38,10 @@ func (e eventStream) Shutdown() {
29 38
 }
30 39
 
31 40
 func NewEventStream(observerFactories []ObserverFactory) mtglib.EventStream {
41
+	if len(observerFactories) == 0 {
42
+		observerFactories = append(observerFactories, NewNoopObserver)
43
+	}
44
+
32 45
 	ctx, cancel := context.WithCancel(context.Background())
33 46
 	rv := eventStream{
34 47
 		ctx:       ctx,
@@ -39,7 +52,11 @@ func NewEventStream(observerFactories []ObserverFactory) mtglib.EventStream {
39 52
 	for i := 0; i < runtime.NumCPU(); i++ {
40 53
 		rv.chans[i] = make(chan mtglib.Event, 1)
41 54
 
42
-		go eventStreamProcessor(ctx, rv.chans[i], newMultiObserver(observerFactories))
55
+		if len(observerFactories) == 1 {
56
+			go eventStreamProcessor(ctx, rv.chans[i], observerFactories[0]())
57
+		} else {
58
+			go eventStreamProcessor(ctx, rv.chans[i], newMultiObserver(observerFactories))
59
+		}
43 60
 	}
44 61
 
45 62
 	return rv
@@ -58,6 +75,8 @@ func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, ob
58 75
 				observer.EventStart(typedEvt)
59 76
 			case mtglib.EventFinish:
60 77
 				observer.EventFinish(typedEvt)
78
+			case mtglib.EventConcurrencyLimited:
79
+				observer.EventConcurrencyLimited(typedEvt)
61 80
 			}
62 81
 		}
63 82
 	}

+ 1
- 0
events/init.go Zobrazit soubor

@@ -5,6 +5,7 @@ import "github.com/9seconds/mtg/v2/mtglib"
5 5
 type Observer interface {
6 6
 	EventStart(mtglib.EventStart)
7 7
 	EventFinish(mtglib.EventFinish)
8
+    EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
8 9
 
9 10
 	Shutdown()
10 11
 }

+ 4
- 0
events/init_test.go Zobrazit soubor

@@ -17,6 +17,10 @@ func (o *ObserverMock) EventFinish(evt mtglib.EventStart) {
17 17
 	o.Called(evt)
18 18
 }
19 19
 
20
+func (o *ObserverMock) EventConcurrencyLimited(evt mtglib.EventConcurrencyLimited) {
21
+	o.Called(evt)
22
+}
23
+
20 24
 func (o *ObserverMock) Shutdown() {
21 25
 	o.Called()
22 26
 }

+ 15
- 0
events/multi_observer.go Zobrazit soubor

@@ -40,6 +40,21 @@ func (m multiObserver) EventFinish(evt mtglib.EventFinish) {
40 40
 	wg.Wait()
41 41
 }
42 42
 
43
+func (m multiObserver) EventConcurrencyLimited(evt mtglib.EventConcurrencyLimited) {
44
+	wg := &sync.WaitGroup{}
45
+	wg.Add(len(m.observers))
46
+
47
+	for _, v := range m.observers {
48
+		go func(obs Observer) {
49
+			defer wg.Done()
50
+
51
+			obs.EventConcurrencyLimited(evt)
52
+		}(v)
53
+	}
54
+
55
+	wg.Wait()
56
+}
57
+
43 58
 func (m multiObserver) Shutdown() {
44 59
 	for _, v := range m.observers {
45 60
 		v.Shutdown()

+ 27
- 0
events/noop.go Zobrazit soubor

@@ -0,0 +1,27 @@
1
+package events
2
+
3
+import (
4
+	"context"
5
+
6
+	"github.com/9seconds/mtg/v2/mtglib"
7
+)
8
+
9
+type noop struct{}
10
+
11
+func (n noop) Send(ctx context.Context, evt mtglib.Event) {}
12
+func (n noop) Shutdown()                                  {}
13
+
14
+func NewNoopStream() mtglib.EventStream {
15
+	return noop{}
16
+}
17
+
18
+type noopObserver struct{}
19
+
20
+func (n noopObserver) EventStart(_ mtglib.EventStart)                           {}
21
+func (n noopObserver) EventFinish(_ mtglib.EventFinish)                         {}
22
+func (n noopObserver) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {}
23
+func (n noopObserver) Shutdown()                                                {}
24
+
25
+func NewNoopObserver() Observer {
26
+	return noopObserver{}
27
+}

+ 75
- 0
events/noop_test.go Zobrazit soubor

@@ -0,0 +1,75 @@
1
+package events_test
2
+
3
+import (
4
+	"context"
5
+	"net"
6
+	"testing"
7
+	"time"
8
+
9
+	"github.com/9seconds/mtg/v2/events"
10
+	"github.com/9seconds/mtg/v2/mtglib"
11
+	"github.com/stretchr/testify/suite"
12
+)
13
+
14
+type NoopTestSuite struct {
15
+	suite.Suite
16
+
17
+	testData map[string]mtglib.Event
18
+	ctx      context.Context
19
+}
20
+
21
+func (suite *NoopTestSuite) SetupSuite() {
22
+	suite.testData = map[string]mtglib.Event{
23
+		"start": mtglib.EventStart{
24
+			CreatedAt: time.Now(),
25
+			ConnID:    "connID",
26
+			RemoteIP:  net.ParseIP("127.0.0.1"),
27
+		},
28
+		"finish": mtglib.EventFinish{
29
+			CreatedAt: time.Now(),
30
+			ConnID:    "connID",
31
+		},
32
+		"concurrency-limited": mtglib.EventConcurrencyLimited{},
33
+	}
34
+	suite.ctx = context.Background()
35
+}
36
+
37
+func (suite *NoopTestSuite) TestStream() {
38
+	stream := events.NewNoopStream()
39
+
40
+	for name, v := range suite.testData {
41
+		value := v
42
+
43
+		suite.T().Run(name, func(t *testing.T) {
44
+			stream.Send(suite.ctx, value)
45
+		})
46
+	}
47
+
48
+	stream.Shutdown()
49
+}
50
+
51
+func (suite *NoopTestSuite) TestObserver() {
52
+	observer := events.NewNoopObserver()
53
+
54
+	for name, v := range suite.testData {
55
+		value := v
56
+
57
+		suite.T().Run(name, func(t *testing.T) {
58
+			switch typedEvt := value.(type) {
59
+			case mtglib.EventStart:
60
+				observer.EventStart(typedEvt)
61
+			case mtglib.EventFinish:
62
+				observer.EventFinish(typedEvt)
63
+			case mtglib.EventConcurrencyLimited:
64
+				observer.EventConcurrencyLimited(typedEvt)
65
+			}
66
+		})
67
+	}
68
+
69
+	observer.Shutdown()
70
+}
71
+
72
+func TestNoop(t *testing.T) {
73
+	t.Parallel()
74
+	suite.Run(t, &NoopTestSuite{})
75
+}

+ 13
- 7
mtglib/events.go Zobrazit soubor

@@ -5,21 +5,27 @@ import (
5 5
 	"time"
6 6
 )
7 7
 
8
-type eventBase struct {
8
+type EventStart struct {
9 9
 	CreatedAt time.Time
10 10
 	ConnID    string
11
+	RemoteIP  net.IP
11 12
 }
12 13
 
13
-func (e eventBase) ConnectionID() string {
14
+func (e EventStart) StreamID() string {
14 15
 	return e.ConnID
15 16
 }
16 17
 
17
-type EventStart struct {
18
-	eventBase
18
+type EventFinish struct {
19
+	CreatedAt time.Time
20
+	ConnID    string
21
+}
19 22
 
20
-	RemoteIP net.IP
23
+func (e EventFinish) StreamID() string {
24
+	return e.ConnID
21 25
 }
22 26
 
23
-type EventFinish struct {
24
-	eventBase
27
+type EventConcurrencyLimited struct{}
28
+
29
+func (e EventConcurrencyLimited) StreamID() string {
30
+	return ""
25 31
 }

+ 1
- 1
mtglib/init.go Zobrazit soubor

@@ -28,7 +28,7 @@ type IPBlocklist interface {
28 28
 }
29 29
 
30 30
 type Event interface {
31
-	ConnectionID() string
31
+	StreamID() string
32 32
 }
33 33
 
34 34
 type EventStream interface {

Načítá se…
Zrušit
Uložit