Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

event_stream_test.go 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package events_test
  2. import (
  3. "context"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/9seconds/mtg/v2/events"
  8. "github.com/9seconds/mtg/v2/mtglib"
  9. "github.com/stretchr/testify/mock"
  10. "github.com/stretchr/testify/suite"
  11. )
  12. type EventStreamTestSuite struct {
  13. suite.Suite
  14. ctx context.Context
  15. ctxCancel context.CancelFunc
  16. observerMock1 *ObserverMock
  17. observerMock2 *ObserverMock
  18. stream mtglib.EventStream
  19. }
  20. func (suite *EventStreamTestSuite) SetupTest() {
  21. suite.ctx, suite.ctxCancel = context.WithCancel(context.Background())
  22. suite.observerMock1 = &ObserverMock{}
  23. suite.observerMock2 = &ObserverMock{}
  24. suite.observerMock1.On("Shutdown")
  25. suite.observerMock2.On("Shutdown")
  26. factories := make([]events.ObserverFactory, 2)
  27. factories[0] = func() events.Observer { return suite.observerMock1 }
  28. factories[1] = func() events.Observer { return suite.observerMock2 }
  29. suite.stream = events.NewEventStream(factories)
  30. }
  31. func (suite *EventStreamTestSuite) TestEventStart() {
  32. evt := mtglib.EventStart{
  33. CreatedAt: time.Now(),
  34. ConnID: "connID",
  35. RemoteIP: net.ParseIP("10.0.0.1"),
  36. }
  37. for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
  38. v.
  39. On("EventStart", mock.Anything).
  40. Once().
  41. Run(func(args mock.Arguments) {
  42. caught := args.Get(0).(mtglib.EventStart)
  43. suite.Equal(evt.CreatedAt, caught.CreatedAt)
  44. suite.Equal(evt.ConnID, caught.ConnID)
  45. suite.Equal(evt.RemoteIP.String(), caught.RemoteIP.String())
  46. suite.Equal(evt.StreamID(), caught.StreamID())
  47. })
  48. }
  49. suite.stream.Send(suite.ctx, evt)
  50. time.Sleep(100 * time.Millisecond)
  51. }
  52. func (suite *EventStreamTestSuite) TestEventConnectedToDC() {
  53. evt := mtglib.EventConnectedToDC{
  54. CreatedAt: time.Now(),
  55. ConnID: "connID",
  56. RemoteIP: net.ParseIP("10.0.0.1"),
  57. DC: 3,
  58. }
  59. for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
  60. v.
  61. On("EventConnectedToDC", mock.Anything).
  62. Once().
  63. Run(func(args mock.Arguments) {
  64. caught := args.Get(0).(mtglib.EventConnectedToDC)
  65. suite.Equal(evt.CreatedAt, caught.CreatedAt)
  66. suite.Equal(evt.ConnID, caught.ConnID)
  67. suite.Equal(evt.RemoteIP.String(), caught.RemoteIP.String())
  68. suite.Equal(evt.StreamID(), caught.StreamID())
  69. suite.Equal(evt.DC, caught.DC)
  70. })
  71. }
  72. suite.stream.Send(suite.ctx, evt)
  73. time.Sleep(100 * time.Millisecond)
  74. }
  75. func (suite *EventStreamTestSuite) TestEventTraffic() {
  76. evt := mtglib.EventTraffic{
  77. CreatedAt: time.Now(),
  78. ConnID: "connID",
  79. Traffic: 1024,
  80. IsRead: true,
  81. }
  82. for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
  83. v.
  84. On("EventTraffic", mock.Anything).
  85. Once().
  86. Run(func(args mock.Arguments) {
  87. caught := args.Get(0).(mtglib.EventTraffic)
  88. suite.Equal(evt.CreatedAt, caught.CreatedAt)
  89. suite.Equal(evt.ConnID, caught.ConnID)
  90. suite.Equal(evt.StreamID(), caught.StreamID())
  91. suite.Equal(evt.Traffic, caught.Traffic)
  92. suite.Equal(evt.IsRead, caught.IsRead)
  93. })
  94. }
  95. suite.stream.Send(suite.ctx, evt)
  96. time.Sleep(100 * time.Millisecond)
  97. }
  98. func (suite *EventStreamTestSuite) TestEventFinish() {
  99. evt := mtglib.EventFinish{
  100. CreatedAt: time.Now(),
  101. ConnID: "connID",
  102. }
  103. for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
  104. v.
  105. On("EventFinish", mock.Anything).
  106. Once().
  107. Run(func(args mock.Arguments) {
  108. caught := args.Get(0).(mtglib.EventFinish)
  109. suite.Equal(evt.CreatedAt, caught.CreatedAt)
  110. suite.Equal(evt.ConnID, caught.ConnID)
  111. suite.Equal(evt.StreamID(), caught.StreamID())
  112. })
  113. }
  114. suite.stream.Send(suite.ctx, evt)
  115. time.Sleep(100 * time.Millisecond)
  116. }
  117. func (suite *EventStreamTestSuite) TestEventConcurrencyLimited() {
  118. evt := mtglib.EventConcurrencyLimited{
  119. CreatedAt: time.Now(),
  120. }
  121. for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
  122. v.
  123. On("EventConcurrencyLimited", mock.Anything).
  124. Once().
  125. Run(func(args mock.Arguments) {
  126. caught := args.Get(0).(mtglib.EventConcurrencyLimited)
  127. suite.Equal(evt.CreatedAt, caught.CreatedAt)
  128. })
  129. }
  130. suite.stream.Send(suite.ctx, evt)
  131. time.Sleep(100 * time.Millisecond)
  132. }
  133. func (suite *EventStreamTestSuite) TestEventIPBlocklisted() {
  134. evt := mtglib.EventIPBlocklisted{
  135. CreatedAt: time.Now(),
  136. RemoteIP: net.ParseIP("10.0.0.10"),
  137. }
  138. for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
  139. v.
  140. On("EventIPBlocklisted", mock.Anything).
  141. Once().
  142. Run(func(args mock.Arguments) {
  143. caught := args.Get(0).(mtglib.EventIPBlocklisted)
  144. suite.Equal(evt.CreatedAt, caught.CreatedAt)
  145. suite.Equal(evt.StreamID(), caught.StreamID())
  146. suite.Equal(evt.RemoteIP.String(), caught.RemoteIP.String())
  147. })
  148. }
  149. suite.stream.Send(suite.ctx, evt)
  150. time.Sleep(100 * time.Millisecond)
  151. }
  152. func (suite *EventStreamTestSuite) TearDownTest() {
  153. suite.stream.Shutdown()
  154. suite.ctxCancel()
  155. time.Sleep(100 * time.Millisecond)
  156. suite.observerMock1.AssertExpectations(suite.T())
  157. suite.observerMock2.AssertExpectations(suite.T())
  158. }
  159. func TestEventStream(t *testing.T) {
  160. t.Parallel()
  161. suite.Run(t, &EventStreamTestSuite{})
  162. }