Просмотр исходного кода

Test event stream

tags/v2.0.0-rc1
9seconds 5 лет назад
Родитель
Сommit
7617aeadda
5 измененных файлов: 130 добавлений и 8 удалений
  1. 3
    5
      events/event_stream.go
  2. 122
    0
      events/event_stream_test.go
  3. 1
    1
      events/init.go
  4. 1
    1
      events/init_test.go
  5. 3
    1
      mtglib/events.go

+ 3
- 5
events/event_stream.go Просмотреть файл

@@ -18,12 +18,10 @@ type eventStream struct {
18 18
 func (e eventStream) Send(ctx context.Context, evt mtglib.Event) {
19 19
 	var chanNo uint32
20 20
 
21
-	streamID := evt.StreamID()
22
-
23
-	if streamID == "" {
24
-		chanNo = rand.Uint32()
25
-	} else {
21
+	if streamID := evt.StreamID(); streamID != "" {
26 22
 		chanNo = xxhash.ChecksumString32(streamID)
23
+	} else {
24
+		chanNo = rand.Uint32()
27 25
 	}
28 26
 
29 27
 	select {

+ 122
- 0
events/event_stream_test.go Просмотреть файл

@@ -0,0 +1,122 @@
1
+package events_test
2
+
3
+import (
4
+	"context"
5
+	"net"
6
+	"testing"
7
+	"time"
8
+
9
+	"github.com/9seconds/mtg/v2/events"
10
+	"github.com/9seconds/mtg/v2/mtglib"
11
+	"github.com/stretchr/testify/mock"
12
+	"github.com/stretchr/testify/suite"
13
+)
14
+
15
+type EventStreamTestSuite struct {
16
+	suite.Suite
17
+
18
+	ctx           context.Context
19
+	ctxCancel     context.CancelFunc
20
+	observerMock1 *ObserverMock
21
+	observerMock2 *ObserverMock
22
+	stream        mtglib.EventStream
23
+}
24
+
25
+func (suite *EventStreamTestSuite) SetupTest() {
26
+	suite.ctx, suite.ctxCancel = context.WithCancel(context.Background())
27
+
28
+	suite.observerMock1 = &ObserverMock{}
29
+	suite.observerMock2 = &ObserverMock{}
30
+
31
+	suite.observerMock1.On("Shutdown")
32
+	suite.observerMock2.On("Shutdown")
33
+
34
+	factories := make([]events.ObserverFactory, 2)
35
+	factories[0] = func() events.Observer { return suite.observerMock1 }
36
+	factories[1] = func() events.Observer { return suite.observerMock2 }
37
+
38
+	suite.stream = events.NewEventStream(factories)
39
+}
40
+
41
+func (suite *EventStreamTestSuite) TestEventStartOk() {
42
+	evt := mtglib.EventStart{
43
+		CreatedAt: time.Now(),
44
+		ConnID:    "connID",
45
+		RemoteIP:  net.ParseIP("10.0.0.1"),
46
+	}
47
+
48
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
49
+		v.
50
+			On("EventStart", mock.Anything).
51
+			Once().
52
+			Run(func(args mock.Arguments) {
53
+				caught := args.Get(0).(mtglib.EventStart)
54
+
55
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
56
+				suite.Equal(evt.ConnID, caught.ConnID)
57
+				suite.Equal(evt.RemoteIP.String(), caught.RemoteIP.String())
58
+				suite.Equal(evt.StreamID(), caught.StreamID())
59
+			})
60
+	}
61
+
62
+	suite.stream.Send(suite.ctx, evt)
63
+	time.Sleep(100 * time.Millisecond)
64
+}
65
+
66
+func (suite *EventStreamTestSuite) TestEventFinishOk() {
67
+	evt := mtglib.EventFinish{
68
+		CreatedAt: time.Now(),
69
+		ConnID:    "connID",
70
+	}
71
+
72
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
73
+		v.
74
+			On("EventFinish", mock.Anything).
75
+			Once().
76
+			Run(func(args mock.Arguments) {
77
+				caught := args.Get(0).(mtglib.EventFinish)
78
+
79
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
80
+				suite.Equal(evt.ConnID, caught.ConnID)
81
+				suite.Equal(evt.StreamID(), caught.StreamID())
82
+			})
83
+	}
84
+
85
+	suite.stream.Send(suite.ctx, evt)
86
+	time.Sleep(100 * time.Millisecond)
87
+}
88
+
89
+func (suite *EventStreamTestSuite) TestEventConcurrencyLimitedOk() {
90
+	evt := mtglib.EventConcurrencyLimited{
91
+		CreatedAt: time.Now(),
92
+	}
93
+
94
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
95
+		v.
96
+			On("EventConcurrencyLimited", mock.Anything).
97
+			Once().
98
+			Run(func(args mock.Arguments) {
99
+				caught := args.Get(0).(mtglib.EventConcurrencyLimited)
100
+
101
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
102
+			})
103
+	}
104
+
105
+	suite.stream.Send(suite.ctx, evt)
106
+	time.Sleep(100 * time.Millisecond)
107
+}
108
+
109
+func (suite *EventStreamTestSuite) TearDownTest() {
110
+	suite.stream.Shutdown()
111
+	suite.ctxCancel()
112
+
113
+	time.Sleep(100 * time.Millisecond)
114
+
115
+	suite.observerMock1.AssertExpectations(suite.T())
116
+	suite.observerMock2.AssertExpectations(suite.T())
117
+}
118
+
119
+func TestEventStream(t *testing.T) {
120
+	t.Parallel()
121
+	suite.Run(t, &EventStreamTestSuite{})
122
+}

+ 1
- 1
events/init.go Просмотреть файл

@@ -5,7 +5,7 @@ import "github.com/9seconds/mtg/v2/mtglib"
5 5
 type Observer interface {
6 6
 	EventStart(mtglib.EventStart)
7 7
 	EventFinish(mtglib.EventFinish)
8
-    EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
8
+	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
9 9
 
10 10
 	Shutdown()
11 11
 }

+ 1
- 1
events/init_test.go Просмотреть файл

@@ -13,7 +13,7 @@ func (o *ObserverMock) EventStart(evt mtglib.EventStart) {
13 13
 	o.Called(evt)
14 14
 }
15 15
 
16
-func (o *ObserverMock) EventFinish(evt mtglib.EventStart) {
16
+func (o *ObserverMock) EventFinish(evt mtglib.EventFinish) {
17 17
 	o.Called(evt)
18 18
 }
19 19
 

+ 3
- 1
mtglib/events.go Просмотреть файл

@@ -24,7 +24,9 @@ func (e EventFinish) StreamID() string {
24 24
 	return e.ConnID
25 25
 }
26 26
 
27
-type EventConcurrencyLimited struct{}
27
+type EventConcurrencyLimited struct {
28
+	CreatedAt time.Time
29
+}
28 30
 
29 31
 func (e EventConcurrencyLimited) StreamID() string {
30 32
 	return ""

Загрузка…
Отмена
Сохранить