Просмотр исходного кода

Add EventReplayAttack

tags/v2.0.0-rc1
9seconds 5 лет назад
Родитель
Сommit
daa8b9c798

+ 2
- 0
events/event_stream.go Просмотреть файл

83
 				observer.EventIPBlocklisted(typedEvt)
83
 				observer.EventIPBlocklisted(typedEvt)
84
 			case mtglib.EventConcurrencyLimited:
84
 			case mtglib.EventConcurrencyLimited:
85
 				observer.EventConcurrencyLimited(typedEvt)
85
 				observer.EventConcurrencyLimited(typedEvt)
86
+			case mtglib.EventReplayAttack:
87
+				observer.EventReplayAttack(typedEvt)
86
 			}
88
 			}
87
 		}
89
 		}
88
 	}
90
 	}

+ 22
- 0
events/event_stream_test.go Просмотреть файл

206
 	time.Sleep(100 * time.Millisecond)
206
 	time.Sleep(100 * time.Millisecond)
207
 }
207
 }
208
 
208
 
209
+func (suite *EventStreamTestSuite) TestEventReplayAttack() {
210
+	evt := mtglib.EventReplayAttack{
211
+		CreatedAt: time.Now(),
212
+		ConnID:    "CONNID",
213
+	}
214
+
215
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
216
+		v.
217
+			On("EventReplayAttack", mock.Anything).
218
+			Once().
219
+			Run(func(args mock.Arguments) {
220
+				caught := args.Get(0).(mtglib.EventReplayAttack)
221
+
222
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
223
+				suite.Equal(evt.StreamID(), caught.StreamID())
224
+			})
225
+	}
226
+
227
+	suite.stream.Send(suite.ctx, evt)
228
+	time.Sleep(100 * time.Millisecond)
229
+}
230
+
209
 func (suite *EventStreamTestSuite) TearDownTest() {
231
 func (suite *EventStreamTestSuite) TearDownTest() {
210
 	suite.stream.Shutdown()
232
 	suite.stream.Shutdown()
211
 	suite.ctxCancel()
233
 	suite.ctxCancel()

+ 1
- 0
events/init.go Просмотреть файл

10
 	EventTraffic(mtglib.EventTraffic)
10
 	EventTraffic(mtglib.EventTraffic)
11
 	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
11
 	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
12
 	EventIPBlocklisted(mtglib.EventIPBlocklisted)
12
 	EventIPBlocklisted(mtglib.EventIPBlocklisted)
13
+	EventReplayAttack(mtglib.EventReplayAttack)
13
 
14
 
14
 	Shutdown()
15
 	Shutdown()
15
 }
16
 }

+ 4
- 0
events/init_test.go Просмотреть файл

37
 	o.Called(evt)
37
 	o.Called(evt)
38
 }
38
 }
39
 
39
 
40
+func (o *ObserverMock) EventReplayAttack(evt mtglib.EventReplayAttack) {
41
+	o.Called(evt)
42
+}
43
+
40
 func (o *ObserverMock) Shutdown() {
44
 func (o *ObserverMock) Shutdown() {
41
 	o.Called()
45
 	o.Called()
42
 }
46
 }

+ 15
- 0
events/multi_observer.go Просмотреть файл

115
 	wg.Wait()
115
 	wg.Wait()
116
 }
116
 }
117
 
117
 
118
+func (m multiObserver) EventReplayAttack(evt mtglib.EventReplayAttack) {
119
+	wg := &sync.WaitGroup{}
120
+	wg.Add(len(m.observers))
121
+
122
+	for _, v := range m.observers {
123
+		go func(obs Observer) {
124
+			defer wg.Done()
125
+
126
+			obs.EventReplayAttack(evt)
127
+		}(v)
128
+	}
129
+
130
+	wg.Wait()
131
+}
132
+
118
 func (m multiObserver) Shutdown() {
133
 func (m multiObserver) Shutdown() {
119
 	for _, v := range m.observers {
134
 	for _, v := range m.observers {
120
 		v.Shutdown()
135
 		v.Shutdown()

+ 1
- 0
events/noop.go Просмотреть файл

24
 func (n noopObserver) EventFinish(_ mtglib.EventFinish)                         {}
24
 func (n noopObserver) EventFinish(_ mtglib.EventFinish)                         {}
25
 func (n noopObserver) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {}
25
 func (n noopObserver) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {}
26
 func (n noopObserver) EventIPBlocklisted(_ mtglib.EventIPBlocklisted)           {}
26
 func (n noopObserver) EventIPBlocklisted(_ mtglib.EventIPBlocklisted)           {}
27
+func (n noopObserver) EventReplayAttack(_ mtglib.EventReplayAttack)             {}
27
 func (n noopObserver) Shutdown()                                                {}
28
 func (n noopObserver) Shutdown()                                                {}
28
 
29
 
29
 func NewNoopObserver() Observer {
30
 func NewNoopObserver() Observer {

+ 9
- 1
events/noop_test.go Просмотреть файл

45
 			CreatedAt: time.Now(),
45
 			CreatedAt: time.Now(),
46
 			ConnID:    "connID",
46
 			ConnID:    "connID",
47
 		},
47
 		},
48
-		"concurrency-limited": mtglib.EventConcurrencyLimited{},
48
+		"concurrency-limited": mtglib.EventConcurrencyLimited{
49
+			CreatedAt: time.Now(),
50
+		},
49
 		"ip-blacklisted": mtglib.EventIPBlocklisted{
51
 		"ip-blacklisted": mtglib.EventIPBlocklisted{
50
 			RemoteIP:  net.ParseIP("10.0.0.10"),
52
 			RemoteIP:  net.ParseIP("10.0.0.10"),
51
 			CreatedAt: time.Now(),
53
 			CreatedAt: time.Now(),
52
 		},
54
 		},
55
+		"replay-attack": mtglib.EventReplayAttack{
56
+			CreatedAt: time.Now(),
57
+			ConnID:    "connID",
58
+		},
53
 	}
59
 	}
54
 	suite.ctx = context.Background()
60
 	suite.ctx = context.Background()
55
 }
61
 }
88
 				observer.EventConcurrencyLimited(typedEvt)
94
 				observer.EventConcurrencyLimited(typedEvt)
89
 			case mtglib.EventIPBlocklisted:
95
 			case mtglib.EventIPBlocklisted:
90
 				observer.EventIPBlocklisted(typedEvt)
96
 				observer.EventIPBlocklisted(typedEvt)
97
+			case mtglib.EventReplayAttack:
98
+				observer.EventReplayAttack(typedEvt)
91
 			}
99
 			}
92
 		})
100
 		})
93
 	}
101
 	}

+ 13
- 0
mtglib/events.go Просмотреть файл

99
 func (e EventIPBlocklisted) Timestamp() time.Time {
99
 func (e EventIPBlocklisted) Timestamp() time.Time {
100
 	return e.CreatedAt
100
 	return e.CreatedAt
101
 }
101
 }
102
+
103
+type EventReplayAttack struct {
104
+	CreatedAt time.Time
105
+	ConnID    string
106
+}
107
+
108
+func (e EventReplayAttack) StreamID() string {
109
+	return e.ConnID
110
+}
111
+
112
+func (e EventReplayAttack) Timestamp() time.Time {
113
+	return e.CreatedAt
114
+}

+ 10
- 0
mtglib/events_test.go Просмотреть файл

87
 	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
87
 	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
88
 }
88
 }
89
 
89
 
90
+func (suite *EventsTestSuite) TestEventReplayAttack() {
91
+	evt := mtglib.EventReplayAttack{
92
+		CreatedAt: time.Now(),
93
+		ConnID:    "CONNID",
94
+	}
95
+
96
+	suite.Equal("CONNID", evt.StreamID())
97
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
98
+}
99
+
90
 func TestEvents(t *testing.T) {
100
 func TestEvents(t *testing.T) {
91
 	t.Parallel()
101
 	t.Parallel()
92
 	suite.Run(t, &EventsTestSuite{})
102
 	suite.Run(t, &EventsTestSuite{})

+ 4
- 0
mtglib/proxy.go Просмотреть файл

166
 
166
 
167
 	if p.antiReplayCache.SeenBefore(hello.SessionID) {
167
 	if p.antiReplayCache.SeenBefore(hello.SessionID) {
168
 		p.logger.Warning("replay attack has been detected!")
168
 		p.logger.Warning("replay attack has been detected!")
169
+		p.eventStream.Send(p.ctx, EventReplayAttack{
170
+			CreatedAt: time.Now(),
171
+			ConnID:    ctx.connID,
172
+		})
169
 		p.doDomainFronting(ctx, rewind)
173
 		p.doDomainFronting(ctx, rewind)
170
 
174
 
171
 		return false
175
 		return false

+ 5
- 1
stats/prometheus.go Просмотреть файл

110
 	p.factory.metricConcurrencyLimited.Inc()
110
 	p.factory.metricConcurrencyLimited.Inc()
111
 }
111
 }
112
 
112
 
113
-func (p prometheusProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
113
+func (p prometheusProcessor) EventIPBlocklisted(_ mtglib.EventIPBlocklisted) {
114
 	p.factory.metricIPBlocklisted.Inc()
114
 	p.factory.metricIPBlocklisted.Inc()
115
 }
115
 }
116
 
116
 
117
+func (p prometheusProcessor) EventReplayAttack(_ mtglib.EventReplayAttack) {
118
+	p.factory.metricReplayAttacks.Inc()
119
+}
120
+
117
 func (p prometheusProcessor) Shutdown() {
121
 func (p prometheusProcessor) Shutdown() {
118
 	for _, v := range p.streams {
122
 	for _, v := range p.streams {
119
 		releaseStreamInfo(v)
123
 		releaseStreamInfo(v)

+ 13
- 0
stats/prometheus_test.go Просмотреть файл

198
 	suite.Contains(data, `mtg_ip_blocklisted 1`)
198
 	suite.Contains(data, `mtg_ip_blocklisted 1`)
199
 }
199
 }
200
 
200
 
201
+func (suite *PrometheusTestSuite) TestEventReplayAttack() {
202
+	suite.prometheus.EventReplayAttack(mtglib.EventReplayAttack{
203
+		CreatedAt: time.Now(),
204
+		ConnID:    "connID",
205
+	})
206
+
207
+	time.Sleep(100 * time.Millisecond)
208
+
209
+	data, err := suite.Get()
210
+	suite.NoError(err)
211
+	suite.Contains(data, `mtg_replay_attacks 1`)
212
+}
213
+
201
 func TestPrometheus(t *testing.T) {
214
 func TestPrometheus(t *testing.T) {
202
 	t.Parallel()
215
 	t.Parallel()
203
 	suite.Run(t, &PrometheusTestSuite{})
216
 	suite.Run(t, &PrometheusTestSuite{})

+ 5
- 1
stats/statsd.go Просмотреть файл

114
 	s.client.Incr(MetricConcurrencyLimited, 1)
114
 	s.client.Incr(MetricConcurrencyLimited, 1)
115
 }
115
 }
116
 
116
 
117
-func (s statsdProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
117
+func (s statsdProcessor) EventIPBlocklisted(_ mtglib.EventIPBlocklisted) {
118
 	s.client.Incr(MetricIPBlocklisted, 1)
118
 	s.client.Incr(MetricIPBlocklisted, 1)
119
 }
119
 }
120
 
120
 
121
+func (s statsdProcessor) EventReplayAttack(_ mtglib.EventReplayAttack) {
122
+	s.client.Incr(MetricReplayAttacks, 1)
123
+}
124
+
121
 func (s statsdProcessor) Shutdown() {
125
 func (s statsdProcessor) Shutdown() {
122
 	now := time.Now()
126
 	now := time.Now()
123
 	events := make([]mtglib.EventFinish, 0, len(s.streams))
127
 	events := make([]mtglib.EventFinish, 0, len(s.streams))

+ 10
- 0
stats/statsd_test.go Просмотреть файл

228
 	suite.Equal("mtg.ip_blocklisted:1|c", suite.statsdServer.String())
228
 	suite.Equal("mtg.ip_blocklisted:1|c", suite.statsdServer.String())
229
 }
229
 }
230
 
230
 
231
+func (suite *StatsdTestSuite) TestEventReplayAttack() {
232
+	suite.statsd.EventReplayAttack(mtglib.EventReplayAttack{
233
+		CreatedAt: time.Now(),
234
+		ConnID:    "connID",
235
+	})
236
+
237
+	time.Sleep(statsdSleepTime)
238
+	suite.Equal("mtg.replay_attacks:1|c", suite.statsdServer.String())
239
+}
240
+
231
 func TestStatsd(t *testing.T) {
241
 func TestStatsd(t *testing.T) {
232
 	t.Parallel()
242
 	t.Parallel()
233
 	suite.Run(t, &StatsdTestSuite{})
243
 	suite.Run(t, &StatsdTestSuite{})

Загрузка…
Отмена
Сохранить