Pārlūkot izejas kodu

Add EventIPBlocklisted

tags/v2.0.0-rc1
9seconds 5 gadus atpakaļ
vecāks
revīzija
2408f1530f

+ 2
- 0
events/event_stream.go Parādīt failu

@@ -73,6 +73,8 @@ func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, ob
73 73
 				observer.EventStart(typedEvt)
74 74
 			case mtglib.EventFinish:
75 75
 				observer.EventFinish(typedEvt)
76
+			case mtglib.EventIPBlocklisted:
77
+				observer.EventIPBlocklisted(typedEvt)
76 78
 			case mtglib.EventConcurrencyLimited:
77 79
 				observer.EventConcurrencyLimited(typedEvt)
78 80
 			}

+ 23
- 0
events/event_stream_test.go Parādīt failu

@@ -106,6 +106,29 @@ func (suite *EventStreamTestSuite) TestEventConcurrencyLimitedOk() {
106 106
 	time.Sleep(100 * time.Millisecond)
107 107
 }
108 108
 
109
+func (suite *EventStreamTestSuite) TestEventIPBlocklistedOk() {
110
+	evt := mtglib.EventIPBlocklisted{
111
+		CreatedAt: time.Now(),
112
+		RemoteIP:  net.ParseIP("10.0.0.10"),
113
+	}
114
+
115
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
116
+		v.
117
+			On("EventIPBlocklisted", mock.Anything).
118
+			Once().
119
+			Run(func(args mock.Arguments) {
120
+				caught := args.Get(0).(mtglib.EventIPBlocklisted)
121
+
122
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
123
+				suite.Equal(evt.StreamID(), caught.StreamID())
124
+				suite.Equal(evt.RemoteIP.String(), caught.RemoteIP.String())
125
+			})
126
+	}
127
+
128
+	suite.stream.Send(suite.ctx, evt)
129
+	time.Sleep(100 * time.Millisecond)
130
+}
131
+
109 132
 func (suite *EventStreamTestSuite) TearDownTest() {
110 133
 	suite.stream.Shutdown()
111 134
 	suite.ctxCancel()

+ 1
- 0
events/init.go Parādīt failu

@@ -6,6 +6,7 @@ type Observer interface {
6 6
 	EventStart(mtglib.EventStart)
7 7
 	EventFinish(mtglib.EventFinish)
8 8
 	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
9
+	EventIPBlocklisted(mtglib.EventIPBlocklisted)
9 10
 
10 11
 	Shutdown()
11 12
 }

+ 4
- 0
events/init_test.go Parādīt failu

@@ -21,6 +21,10 @@ func (o *ObserverMock) EventConcurrencyLimited(evt mtglib.EventConcurrencyLimite
21 21
 	o.Called(evt)
22 22
 }
23 23
 
24
+func (o *ObserverMock) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
25
+	o.Called(evt)
26
+}
27
+
24 28
 func (o *ObserverMock) Shutdown() {
25 29
 	o.Called()
26 30
 }

+ 15
- 0
events/multi_observer.go Parādīt failu

@@ -55,6 +55,21 @@ func (m multiObserver) EventConcurrencyLimited(evt mtglib.EventConcurrencyLimite
55 55
 	wg.Wait()
56 56
 }
57 57
 
58
+func (m multiObserver) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
59
+	wg := &sync.WaitGroup{}
60
+	wg.Add(len(m.observers))
61
+
62
+	for _, v := range m.observers {
63
+		go func(obs Observer) {
64
+			defer wg.Done()
65
+
66
+			obs.EventIPBlocklisted(evt)
67
+		}(v)
68
+	}
69
+
70
+	wg.Wait()
71
+}
72
+
58 73
 func (m multiObserver) Shutdown() {
59 74
 	for _, v := range m.observers {
60 75
 		v.Shutdown()

+ 1
- 0
events/noop.go Parādīt failu

@@ -20,6 +20,7 @@ type noopObserver struct{}
20 20
 func (n noopObserver) EventStart(_ mtglib.EventStart)                           {}
21 21
 func (n noopObserver) EventFinish(_ mtglib.EventFinish)                         {}
22 22
 func (n noopObserver) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {}
23
+func (n noopObserver) EventIPBlocklisted(_ mtglib.EventIPBlocklisted)           {}
23 24
 func (n noopObserver) Shutdown()                                                {}
24 25
 
25 26
 func NewNoopObserver() Observer {

+ 6
- 0
events/noop_test.go Parādīt failu

@@ -30,6 +30,10 @@ func (suite *NoopTestSuite) SetupSuite() {
30 30
 			ConnID:    "connID",
31 31
 		},
32 32
 		"concurrency-limited": mtglib.EventConcurrencyLimited{},
33
+		"ip-blacklisted": mtglib.EventIPBlocklisted{
34
+			RemoteIP:  net.ParseIP("10.0.0.10"),
35
+			CreatedAt: time.Now(),
36
+		},
33 37
 	}
34 38
 	suite.ctx = context.Background()
35 39
 }
@@ -62,6 +66,8 @@ func (suite *NoopTestSuite) TestObserver() {
62 66
 				observer.EventFinish(typedEvt)
63 67
 			case mtglib.EventConcurrencyLimited:
64 68
 				observer.EventConcurrencyLimited(typedEvt)
69
+			case mtglib.EventIPBlocklisted:
70
+				observer.EventIPBlocklisted(typedEvt)
65 71
 			}
66 72
 		})
67 73
 	}

+ 9
- 0
mtglib/events.go Parādīt failu

@@ -31,3 +31,12 @@ type EventConcurrencyLimited struct {
31 31
 func (e EventConcurrencyLimited) StreamID() string {
32 32
 	return ""
33 33
 }
34
+
35
+type EventIPBlocklisted struct {
36
+	CreatedAt time.Time
37
+	RemoteIP  net.IP
38
+}
39
+
40
+func (e EventIPBlocklisted) StreamID() string {
41
+	return ""
42
+}

+ 4
- 0
mtglib/events_test.go Parādīt failu

@@ -36,6 +36,10 @@ func (suite *EventsTestSuite) TestEventConcurrencyLimited() {
36 36
 	suite.Empty(mtglib.EventConcurrencyLimited{}.StreamID())
37 37
 }
38 38
 
39
+func (suite *EventsTestSuite) TestEventIPBlocklisted() {
40
+	suite.Empty(mtglib.EventIPBlocklisted{}.StreamID())
41
+}
42
+
39 43
 func TestEvents(t *testing.T) {
40 44
 	t.Parallel()
41 45
 	suite.Run(t, &EventsTestSuite{})

+ 10
- 0
mtglib/proxy.go Parādīt failu

@@ -52,6 +52,16 @@ func (p *Proxy) Serve(listener net.Listener) error {
52 52
 			return fmt.Errorf("cannot accept a new connection: %w", err)
53 53
 		}
54 54
 
55
+		if addr := conn.RemoteAddr().(*net.TCPAddr).IP; p.ipBlocklist.Contains(addr) {
56
+			conn.Close()
57
+			p.eventStream.Send(p.ctx, EventIPBlocklisted{
58
+				CreatedAt: time.Now(),
59
+				RemoteIP:  addr,
60
+			})
61
+
62
+			continue
63
+		}
64
+
55 65
 		err = p.workerPool.Invoke(conn)
56 66
 
57 67
 		switch {

+ 2
- 1
stats/init.go Parādīt failu

@@ -4,9 +4,10 @@ const (
4 4
 	MetricActiveConnection   = "active_connections"
5 5
 	MetricSessionDuration    = "session_duration"
6 6
 	MetricConcurrencyLimited = "concurrency_limited"
7
+	MetricIPBlocklisted      = "ip_blocklisted"
7 8
 
8 9
 	TagIPType = "ip_type"
9 10
 
10 11
 	TagIPTypeIPv4 = "ipv4"
11
-	TagIPTypeIPv6 = "ipv4"
12
+	TagIPTypeIPv6 = "ipv6"
12 13
 )

+ 16
- 1
stats/prometheus.go Parādīt failu

@@ -41,10 +41,18 @@ func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
41 41
 	p.factory.metricSessionDuration.Observe(float64(duration) / float64(time.Second))
42 42
 }
43 43
 
44
-func (p prometheusProcessor) EventConcurrencyLimited(evt mtglib.EventConcurrencyLimited) {
44
+func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
45 45
 	p.factory.metricConcurrencyLimited.Inc()
46 46
 }
47 47
 
48
+func (p prometheusProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
49
+	if evt.RemoteIP.To4() == nil {
50
+		p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv6).Inc()
51
+	} else {
52
+		p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv4).Inc()
53
+	}
54
+}
55
+
48 56
 func (p prometheusProcessor) Shutdown() {
49 57
 	p.streams = make(map[string]*streamInfo)
50 58
 }
@@ -53,6 +61,7 @@ type PrometheusFactory struct {
53 61
 	httpServer *http.Server
54 62
 
55 63
 	metricActiveConnections  *prometheus.GaugeVec
64
+	metricIPBlocklisted      *prometheus.CounterVec
56 65
 	metricConcurrencyLimited prometheus.Counter
57 66
 	metricSessionDuration    prometheus.Histogram
58 67
 }
@@ -113,11 +122,17 @@ func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory {
113 122
 			Name:      MetricConcurrencyLimited,
114 123
 			Help:      "A number of sessions that were rejected by concurrency limiter.",
115 124
 		}),
125
+		metricIPBlocklisted: prometheus.NewCounterVec(prometheus.CounterOpts{
126
+			Namespace: metricPrefix,
127
+			Name:      MetricIPBlocklisted,
128
+			Help:      "A number of rejected sessions due to ip blocklisting",
129
+		}, []string{TagIPType}),
116 130
 	}
117 131
 
118 132
 	registry.MustRegister(factory.metricActiveConnections)
119 133
 	registry.MustRegister(factory.metricSessionDuration)
120 134
 	registry.MustRegister(factory.metricConcurrencyLimited)
135
+	registry.MustRegister(factory.metricIPBlocklisted)
121 136
 
122 137
 	return factory
123 138
 }

+ 13
- 0
stats/prometheus_test.go Parādīt failu

@@ -91,6 +91,19 @@ func (suite *PrometheusTestSuite) TestEventConcurrencyLimited() {
91 91
 	suite.Contains(data, `mtg_concurrency_limited 1`)
92 92
 }
93 93
 
94
+func (suite *PrometheusTestSuite) TestEventIPBlocklisted() {
95
+	suite.prometheus.EventIPBlocklisted(mtglib.EventIPBlocklisted{
96
+		CreatedAt: time.Now(),
97
+		RemoteIP:  net.ParseIP("2001:db8::68"),
98
+	})
99
+
100
+	time.Sleep(100 * time.Millisecond)
101
+
102
+	data, err := suite.Get()
103
+	suite.NoError(err)
104
+	suite.Contains(data, `mtg_ip_blocklisted{ip_type="ipv6"} 1`)
105
+}
106
+
94 107
 func TestPrometheus(t *testing.T) {
95 108
 	t.Parallel()
96 109
 	suite.Run(t, &PrometheusTestSuite{})

+ 12
- 0
stats/statsd.go Parādīt failu

@@ -49,6 +49,18 @@ func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimite
49 49
 	s.client.Incr(MetricConcurrencyLimited, 1)
50 50
 }
51 51
 
52
+func (s statsdProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
53
+	var tag statsd.Tag
54
+
55
+	if evt.RemoteIP.To4() == nil {
56
+		tag = statsd.StringTag(TagIPType, TagIPTypeIPv6)
57
+	} else {
58
+		tag = statsd.StringTag(TagIPType, TagIPTypeIPv4)
59
+	}
60
+
61
+	s.client.Incr(MetricIPBlocklisted, 1, tag)
62
+}
63
+
52 64
 func (s statsdProcessor) Shutdown() {
53 65
 	now := time.Now()
54 66
 	events := make([]mtglib.EventFinish, 0, len(s.streams))

+ 10
- 0
stats/statsd_test.go Parādīt failu

@@ -121,6 +121,16 @@ func (suite *StatsdTestSuite) TestEventConcurrencyLimited() {
121 121
 	suite.Equal("mtg.concurrency_limited:1|c", suite.statsdServer.String())
122 122
 }
123 123
 
124
+func (suite *StatsdTestSuite) TestEventIPBlocklisted() {
125
+	suite.statsd.EventIPBlocklisted(mtglib.EventIPBlocklisted{
126
+		CreatedAt: time.Now(),
127
+		RemoteIP:  net.ParseIP("10.0.0.10"),
128
+	})
129
+
130
+	time.Sleep(2 * statsd.DefaultFlushInterval)
131
+	suite.Equal("mtg.ip_blocklisted:1|c|#ip_type:ipv4", suite.statsdServer.String())
132
+}
133
+
124 134
 func TestStatsd(t *testing.T) {
125 135
 	t.Parallel()
126 136
 	suite.Run(t, &StatsdTestSuite{})

Notiek ielāde…
Atcelt
Saglabāt