9seconds 5 лет назад
Родитель
Сommit
bc2bd4510a

+ 2
- 2
events/event_stream.go Просмотреть файл

@@ -73,8 +73,8 @@ func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, ob
73 73
 				observer.EventStart(typedEvt)
74 74
 			case mtglib.EventConnectedToDC:
75 75
 				observer.EventConnectedToDC(typedEvt)
76
-			case mtglib.EventTraffic:
77
-				observer.EventTraffic(typedEvt)
76
+			case mtglib.EventTelegramTraffic:
77
+				observer.EventTelegramTraffic(typedEvt)
78 78
 			case mtglib.EventFinish:
79 79
 				observer.EventFinish(typedEvt)
80 80
 			case mtglib.EventIPBlocklisted:

+ 4
- 4
events/event_stream_test.go Просмотреть файл

@@ -90,8 +90,8 @@ func (suite *EventStreamTestSuite) TestEventConnectedToDC() {
90 90
 	time.Sleep(100 * time.Millisecond)
91 91
 }
92 92
 
93
-func (suite *EventStreamTestSuite) TestEventTraffic() {
94
-	evt := mtglib.EventTraffic{
93
+func (suite *EventStreamTestSuite) TestEventTelegramTraffic() {
94
+	evt := mtglib.EventTelegramTraffic{
95 95
 		CreatedAt: time.Now(),
96 96
 		ConnID:    "connID",
97 97
 		Traffic:   1024,
@@ -100,10 +100,10 @@ func (suite *EventStreamTestSuite) TestEventTraffic() {
100 100
 
101 101
 	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
102 102
 		v.
103
-			On("EventTraffic", mock.Anything).
103
+			On("EventTelegramTraffic", mock.Anything).
104 104
 			Once().
105 105
 			Run(func(args mock.Arguments) {
106
-				caught := args.Get(0).(mtglib.EventTraffic)
106
+				caught := args.Get(0).(mtglib.EventTelegramTraffic)
107 107
 
108 108
 				suite.Equal(evt.CreatedAt, caught.CreatedAt)
109 109
 				suite.Equal(evt.ConnID, caught.ConnID)

+ 1
- 1
events/init.go Просмотреть файл

@@ -6,7 +6,7 @@ type Observer interface {
6 6
 	EventStart(mtglib.EventStart)
7 7
 	EventFinish(mtglib.EventFinish)
8 8
 	EventConnectedToDC(mtglib.EventConnectedToDC)
9
-	EventTraffic(mtglib.EventTraffic)
9
+	EventTelegramTraffic(mtglib.EventTelegramTraffic)
10 10
 	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
11 11
 	EventIPBlocklisted(mtglib.EventIPBlocklisted)
12 12
 

+ 1
- 1
events/init_test.go Просмотреть файл

@@ -17,7 +17,7 @@ func (o *ObserverMock) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
17 17
 	o.Called(evt)
18 18
 }
19 19
 
20
-func (o *ObserverMock) EventTraffic(evt mtglib.EventTraffic) {
20
+func (o *ObserverMock) EventTelegramTraffic(evt mtglib.EventTelegramTraffic) {
21 21
 	o.Called(evt)
22 22
 }
23 23
 

+ 2
- 2
events/multi_observer.go Просмотреть файл

@@ -40,7 +40,7 @@ func (m multiObserver) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
40 40
 	wg.Wait()
41 41
 }
42 42
 
43
-func (m multiObserver) EventTraffic(evt mtglib.EventTraffic) {
43
+func (m multiObserver) EventTelegramTraffic(evt mtglib.EventTelegramTraffic) {
44 44
 	wg := &sync.WaitGroup{}
45 45
 	wg.Add(len(m.observers))
46 46
 
@@ -48,7 +48,7 @@ func (m multiObserver) EventTraffic(evt mtglib.EventTraffic) {
48 48
 		go func(obs Observer) {
49 49
 			defer wg.Done()
50 50
 
51
-			obs.EventTraffic(evt)
51
+			obs.EventTelegramTraffic(evt)
52 52
 		}(v)
53 53
 	}
54 54
 

+ 1
- 1
events/noop.go Просмотреть файл

@@ -19,7 +19,7 @@ type noopObserver struct{}
19 19
 
20 20
 func (n noopObserver) EventStart(_ mtglib.EventStart)                           {}
21 21
 func (n noopObserver) EventConnectedToDC(_ mtglib.EventConnectedToDC)           {}
22
-func (n noopObserver) EventTraffic(_ mtglib.EventTraffic)                       {}
22
+func (n noopObserver) EventTelegramTraffic(_ mtglib.EventTelegramTraffic)       {}
23 23
 func (n noopObserver) EventFinish(_ mtglib.EventFinish)                         {}
24 24
 func (n noopObserver) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {}
25 25
 func (n noopObserver) EventIPBlocklisted(_ mtglib.EventIPBlocklisted)           {}

+ 1
- 1
events/noop_test.go Просмотреть файл

@@ -31,7 +31,7 @@ func (suite *NoopTestSuite) SetupSuite() {
31 31
 			RemoteIP:  net.ParseIP("127.1.0.1"),
32 32
 			DC:        2,
33 33
 		},
34
-		"traffic": mtglib.EventTraffic{
34
+		"telegram-traffic": mtglib.EventTelegramTraffic{
35 35
 			CreatedAt: time.Now(),
36 36
 			ConnID:    "connID",
37 37
 			Traffic:   1000,

+ 2
- 2
mtglib/conns.go Просмотреть файл

@@ -21,7 +21,7 @@ func (c connTelegramTraffic) Read(b []byte) (int, error) {
21 21
 	n, err := c.Conn.Read(b)
22 22
 
23 23
 	if n > 0 {
24
-		c.stream.Send(c.ctx, EventTraffic{
24
+		c.stream.Send(c.ctx, EventTelegramTraffic{
25 25
 			CreatedAt: time.Now(),
26 26
 			ConnID:    c.connID,
27 27
 			Traffic:   uint(n),
@@ -36,7 +36,7 @@ func (c connTelegramTraffic) Write(b []byte) (int, error) {
36 36
 	n, err := c.Conn.Write(b)
37 37
 
38 38
 	if n > 0 {
39
-		c.stream.Send(c.ctx, EventTraffic{
39
+		c.stream.Send(c.ctx, EventTelegramTraffic{
40 40
 			CreatedAt: time.Now(),
41 41
 			ConnID:    c.connID,
42 42
 			Traffic:   uint(n),

+ 26
- 2
mtglib/events.go Просмотреть файл

@@ -15,6 +15,10 @@ func (e EventStart) StreamID() string {
15 15
 	return e.ConnID
16 16
 }
17 17
 
18
+func (e EventStart) Timestamp() time.Time {
19
+	return e.CreatedAt
20
+}
21
+
18 22
 type EventConnectedToDC struct {
19 23
 	CreatedAt time.Time
20 24
 	ConnID    string
@@ -26,17 +30,25 @@ func (e EventConnectedToDC) StreamID() string {
26 30
 	return e.ConnID
27 31
 }
28 32
 
29
-type EventTraffic struct {
33
+func (e EventConnectedToDC) Timestamp() time.Time {
34
+	return e.CreatedAt
35
+}
36
+
37
+type EventTelegramTraffic struct {
30 38
 	CreatedAt time.Time
31 39
 	ConnID    string
32 40
 	Traffic   uint
33 41
 	IsRead    bool
34 42
 }
35 43
 
36
-func (e EventTraffic) StreamID() string {
44
+func (e EventTelegramTraffic) StreamID() string {
37 45
 	return e.ConnID
38 46
 }
39 47
 
48
+func (e EventTelegramTraffic) Timestamp() time.Time {
49
+	return e.CreatedAt
50
+}
51
+
40 52
 type EventFinish struct {
41 53
 	CreatedAt time.Time
42 54
 	ConnID    string
@@ -46,6 +58,10 @@ func (e EventFinish) StreamID() string {
46 58
 	return e.ConnID
47 59
 }
48 60
 
61
+func (e EventFinish) Timestamp() time.Time {
62
+	return e.CreatedAt
63
+}
64
+
49 65
 type EventConcurrencyLimited struct {
50 66
 	CreatedAt time.Time
51 67
 }
@@ -54,6 +70,10 @@ func (e EventConcurrencyLimited) StreamID() string {
54 70
 	return ""
55 71
 }
56 72
 
73
+func (e EventConcurrencyLimited) Timestamp() time.Time {
74
+	return e.CreatedAt
75
+}
76
+
57 77
 type EventIPBlocklisted struct {
58 78
 	CreatedAt time.Time
59 79
 	RemoteIP  net.IP
@@ -62,3 +82,7 @@ type EventIPBlocklisted struct {
62 82
 func (e EventIPBlocklisted) StreamID() string {
63 83
 	return ""
64 84
 }
85
+
86
+func (e EventIPBlocklisted) Timestamp() time.Time {
87
+	return e.CreatedAt
88
+}

+ 19
- 4
mtglib/events_test.go Просмотреть файл

@@ -21,6 +21,7 @@ func (suite *EventsTestSuite) TestEventStart() {
21 21
 	}
22 22
 
23 23
 	suite.Equal("CONNID", evt.StreamID())
24
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
24 25
 }
25 26
 
26 27
 func (suite *EventsTestSuite) TestEventFinish() {
@@ -30,6 +31,7 @@ func (suite *EventsTestSuite) TestEventFinish() {
30 31
 	}
31 32
 
32 33
 	suite.Equal("CONNID", evt.StreamID())
34
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
33 35
 }
34 36
 
35 37
 func (suite *EventsTestSuite) TestEventConnectedToDC() {
@@ -41,10 +43,11 @@ func (suite *EventsTestSuite) TestEventConnectedToDC() {
41 43
 	}
42 44
 
43 45
 	suite.Equal("CONNID", evt.StreamID())
46
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
44 47
 }
45 48
 
46
-func (suite *EventsTestSuite) TestEventTraffic() {
47
-	evt := mtglib.EventTraffic{
49
+func (suite *EventsTestSuite) TestEventTelegramTraffic() {
50
+	evt := mtglib.EventTelegramTraffic{
48 51
 		CreatedAt: time.Now(),
49 52
 		ConnID:    "CONNID",
50 53
 		Traffic:   3,
@@ -52,14 +55,26 @@ func (suite *EventsTestSuite) TestEventTraffic() {
52 55
 	}
53 56
 
54 57
 	suite.Equal("CONNID", evt.StreamID())
58
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
55 59
 }
56 60
 
57 61
 func (suite *EventsTestSuite) TestEventConcurrencyLimited() {
58
-	suite.Empty(mtglib.EventConcurrencyLimited{}.StreamID())
62
+	evt := mtglib.EventConcurrencyLimited{
63
+		CreatedAt: time.Now(),
64
+	}
65
+
66
+	suite.Empty(evt.StreamID())
67
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
59 68
 }
60 69
 
61 70
 func (suite *EventsTestSuite) TestEventIPBlocklisted() {
62
-	suite.Empty(mtglib.EventIPBlocklisted{}.StreamID())
71
+	evt := mtglib.EventIPBlocklisted{
72
+		CreatedAt: time.Now(),
73
+		RemoteIP:  net.ParseIP("10.0.0.10"),
74
+	}
75
+
76
+	suite.Empty(evt.StreamID())
77
+	suite.WithinDuration(time.Now(), evt.Timestamp(), 10*time.Millisecond)
63 78
 }
64 79
 
65 80
 func TestEvents(t *testing.T) {

+ 1
- 0
mtglib/init.go Просмотреть файл

@@ -45,6 +45,7 @@ type IPBlocklist interface {
45 45
 
46 46
 type Event interface {
47 47
 	StreamID() string
48
+	Timestamp() time.Time
48 49
 }
49 50
 
50 51
 type EventStream interface {

+ 6
- 0
mtglib/proxy.go Просмотреть файл

@@ -95,6 +95,9 @@ func (p *Proxy) Serve(listener net.Listener) error {
95 95
 
96 96
 		if addr := conn.RemoteAddr().(*net.TCPAddr).IP; p.ipBlocklist.Contains(addr) {
97 97
 			conn.Close()
98
+			p.logger.
99
+				BindStr("ip", conn.RemoteAddr().(*net.TCPAddr).IP.String()).
100
+				Info("ip was blacklisted")
98 101
 			p.eventStream.Send(p.ctx, EventIPBlocklisted{
99 102
 				CreatedAt: time.Now(),
100 103
 				RemoteIP:  addr,
@@ -110,6 +113,9 @@ func (p *Proxy) Serve(listener net.Listener) error {
110 113
 		case errors.Is(err, ants.ErrPoolClosed):
111 114
 			return nil
112 115
 		case errors.Is(err, ants.ErrPoolOverload):
116
+			p.logger.
117
+				BindStr("ip", conn.RemoteAddr().(*net.TCPAddr).IP.String()).
118
+				Info("connection was concurrency limited")
113 119
 			p.eventStream.Send(p.ctx, EventConcurrencyLimited{
114 120
 				CreatedAt: time.Now(),
115 121
 			})

+ 23
- 17
stats/init.go Просмотреть файл

@@ -6,21 +6,27 @@ const (
6 6
 	DefaultStatsdMetricPrefix = DefaultMetricPrefix + "."
7 7
 	DefaultStatsdTagFormat    = "datadog"
8 8
 
9
-	MetricClientConnections   = "client_connections"
10
-	MetricTelegramConnections = "telegram_connections"
11
-	MetricTraffic             = "traffic"
12
-	MetricSessionDuration     = "session_duration"
13
-	MetricSessionTraffic      = "session_traffic"
14
-	MetricConcurrencyLimited  = "concurrency_limited"
15
-	MetricIPBlocklisted       = "ip_blocklisted"
16
-
17
-	TagIPType     = "ip_type"
18
-	TagTelegramIP = "ip"
19
-	TagDC         = "dc"
20
-	TagDirection  = "direction"
21
-
22
-	TagIPTypeIPv4        = "ipv4"
23
-	TagIPTypeIPv6        = "ipv6"
24
-	TagDirectionTelegram = "telegram"
25
-	TagDirectionClient   = "client"
9
+	MetricClientConnections           = "client_connections"
10
+	MetricTelegramConnections         = "telegram_connections"
11
+	MetricDomainDisguisingConnections = "domain_disguising_connections"
12
+
13
+	MetricTelegramTraffic         = "telegram_traffic"
14
+	MetricDomainDisguisingTraffic = "domain_disguising_traffic"
15
+
16
+	MetricDomainDisguising   = "domain_disguising"
17
+	MetricConcurrencyLimited = "concurrency_limited"
18
+	MetricIPBlocklisted      = "ip_blocklisted"
19
+	MetricReplayAttacks      = "replay_attacks"
20
+
21
+	TagIPFamily     = "ip_family"
22
+	TagIPFamilyIPv4 = "ipv4"
23
+	TagIPFamilyIPv6 = "ipv6"
24
+
25
+	TagTelegramIP = "telegram_ip"
26
+
27
+	TagDC = "dc"
28
+
29
+	TagDirection           = "direction"
30
+	TagDirectionToClient   = "to_client"
31
+	TagDirectionFromClient = "from_client"
26 32
 )

+ 20
- 0
stats/pools.go Просмотреть файл

@@ -0,0 +1,20 @@
1
+package stats
2
+
3
+import "sync"
4
+
5
+var streamInfoPool = sync.Pool{
6
+	New: func() interface{} {
7
+		return &streamInfo{
8
+			tags: map[string]string{},
9
+		}
10
+	},
11
+}
12
+
13
+func acquireStreamInfo() *streamInfo {
14
+	return streamInfoPool.Get().(*streamInfo)
15
+}
16
+
17
+func releaseStreamInfo(info *streamInfo) {
18
+	info.Reset()
19
+	streamInfoPool.Put(info)
20
+}

+ 80
- 117
stats/prometheus.go Просмотреть файл

@@ -4,8 +4,6 @@ import (
4 4
 	"context"
5 5
 	"net"
6 6
 	"net/http"
7
-	"strconv"
8
-	"time"
9 7
 
10 8
 	"github.com/9seconds/mtg/v2/events"
11 9
 	"github.com/9seconds/mtg/v2/mtglib"
@@ -19,89 +17,61 @@ type prometheusProcessor struct {
19 17
 }
20 18
 
21 19
 func (p prometheusProcessor) EventStart(evt mtglib.EventStart) {
22
-	sInfo := &streamInfo{
23
-		createdAt: evt.CreatedAt,
24
-		clientIP:  evt.RemoteIP,
25
-	}
26
-	p.streams[evt.StreamID()] = sInfo
27
-
28
-	p.factory.metricClientConnections.WithLabelValues(sInfo.GetClientIPType()).Inc()
20
+	info := acquireStreamInfo()
21
+	info.SetStartTime(evt.CreatedAt)
22
+	info.SetClientIP(evt.RemoteIP)
23
+	p.streams[evt.StreamID()] = info
24
+
25
+	p.factory.metricClientConnections.
26
+		WithLabelValues(info.V(TagIPFamily)).
27
+		Inc()
29 28
 }
30 29
 
31 30
 func (p prometheusProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
32
-	sInfo, ok := p.streams[evt.StreamID()]
31
+	info, ok := p.streams[evt.StreamID()]
33 32
 	if !ok {
34 33
 		return
35 34
 	}
36 35
 
37
-	sInfo.remoteIP = evt.RemoteIP
38
-	sInfo.dc = evt.DC
36
+	info.SetTelegramIP(evt.RemoteIP)
37
+	info.SetDC(evt.DC)
39 38
 
40
-	p.factory.metricTelegramConnections.WithLabelValues(
41
-		sInfo.GetRemoteIPType(),
42
-		sInfo.remoteIP.String(),
43
-		strconv.Itoa(sInfo.dc)).Inc()
39
+	p.factory.metricTelegramConnections.
40
+		WithLabelValues(info.V(TagTelegramIP), info.V(TagDC)).
41
+		Inc()
44 42
 }
45 43
 
46
-func (p prometheusProcessor) EventTraffic(evt mtglib.EventTraffic) {
47
-	sInfo, ok := p.streams[evt.StreamID()]
44
+func (p prometheusProcessor) EventTelegramTraffic(evt mtglib.EventTelegramTraffic) {
45
+	info, ok := p.streams[evt.StreamID()]
48 46
 	if !ok {
49 47
 		return
50 48
 	}
51 49
 
52
-	labels := []string{
53
-		sInfo.GetRemoteIPType(),
54
-		sInfo.remoteIP.String(),
55
-		strconv.Itoa(sInfo.dc),
56
-	}
57
-
58
-	if evt.IsRead {
59
-		sInfo.bytesRecvFromTelegram += evt.Traffic
60
-
61
-		labels = append(labels, TagDirectionClient)
62
-	} else {
63
-		sInfo.bytesSentToTelegram += evt.Traffic
64
-
65
-		labels = append(labels, TagDirectionTelegram)
66
-	}
67
-
68
-	p.factory.metricTraffic.WithLabelValues(labels...).Add(float64(evt.Traffic))
50
+	p.factory.metricTelegramTraffic.
51
+		WithLabelValues(info.V(TagTelegramIP), info.V(TagDC), getDirection(evt.IsRead)).
52
+		Add(float64(evt.Traffic))
69 53
 }
70 54
 
71 55
 func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
72
-	sInfo, ok := p.streams[evt.StreamID()]
56
+	info, ok := p.streams[evt.StreamID()]
73 57
 	if !ok {
74 58
 		return
75 59
 	}
76 60
 
77
-	defer delete(p.streams, evt.StreamID())
78
-
79
-	duration := evt.CreatedAt.Sub(sInfo.createdAt)
61
+	defer func() {
62
+		delete(p.streams, evt.StreamID())
63
+		releaseStreamInfo(info)
64
+	}()
80 65
 
81
-	p.factory.metricClientConnections.WithLabelValues(sInfo.GetClientIPType()).Dec()
82
-	p.factory.metricSessionDuration.Observe(float64(duration) / float64(time.Second))
66
+	p.factory.metricClientConnections.
67
+		WithLabelValues(info.V(TagIPFamily)).
68
+		Dec()
83 69
 
84
-	if sInfo.remoteIP == nil {
85
-		return
70
+	if info.V(TagTelegramIP) != "" {
71
+		p.factory.metricTelegramConnections.
72
+			WithLabelValues(info.V(TagTelegramIP), info.V(TagDC)).
73
+			Dec()
86 74
 	}
87
-
88
-	labels := []string{
89
-		sInfo.GetRemoteIPType(),
90
-		sInfo.remoteIP.String(),
91
-		strconv.Itoa(sInfo.dc),
92
-	}
93
-
94
-	p.factory.metricTelegramConnections.WithLabelValues(labels...).Dec()
95
-
96
-	labels = append(labels, TagDirectionClient)
97
-	p.factory.metricSessionTraffic.
98
-		WithLabelValues(labels...).
99
-		Observe(float64(sInfo.bytesRecvFromTelegram))
100
-
101
-	labels[3] = TagDirectionTelegram
102
-	p.factory.metricSessionTraffic.
103
-		WithLabelValues(labels...).
104
-		Observe(float64(sInfo.bytesSentToTelegram))
105 75
 }
106 76
 
107 77
 func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
@@ -109,11 +79,7 @@ func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLi
109 79
 }
110 80
 
111 81
 func (p prometheusProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
112
-	if evt.RemoteIP.To4() == nil {
113
-		p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv6).Inc()
114
-	} else {
115
-		p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv4).Inc()
116
-	}
82
+	p.factory.metricIPBlocklisted.Inc()
117 83
 }
118 84
 
119 85
 func (p prometheusProcessor) Shutdown() {
@@ -123,13 +89,17 @@ func (p prometheusProcessor) Shutdown() {
123 89
 type PrometheusFactory struct {
124 90
 	httpServer *http.Server
125 91
 
126
-	metricClientConnections   *prometheus.GaugeVec
127
-	metricTelegramConnections *prometheus.GaugeVec
128
-	metricTraffic             *prometheus.CounterVec
129
-	metricIPBlocklisted       *prometheus.CounterVec
130
-	metricSessionTraffic      *prometheus.HistogramVec
131
-	metricConcurrencyLimited  prometheus.Counter
132
-	metricSessionDuration     prometheus.Histogram
92
+	metricClientConnections           *prometheus.GaugeVec
93
+	metricTelegramConnections         *prometheus.GaugeVec
94
+	metricDomainDisguisingConnections *prometheus.GaugeVec
95
+
96
+	metricTelegramTraffic         *prometheus.CounterVec
97
+	metricDomainDisguisingTraffic *prometheus.CounterVec
98
+
99
+	metricDomainDisguising   prometheus.Counter
100
+	metricConcurrencyLimited prometheus.Counter
101
+	metricIPBlocklisted      prometheus.Counter
102
+	metricReplayAttacks      prometheus.Counter
133 103
 }
134 104
 
135 105
 func (p *PrometheusFactory) Make() events.Observer {
@@ -164,70 +134,63 @@ func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory { // nolint
164 134
 		metricClientConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
165 135
 			Namespace: metricPrefix,
166 136
 			Name:      MetricClientConnections,
167
-			Help:      "A number of connections under active processing.",
168
-		}, []string{TagIPType}),
137
+			Help:      "A number of actively processing client connections.",
138
+		}, []string{TagIPFamily}),
169 139
 		metricTelegramConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
170 140
 			Namespace: metricPrefix,
171 141
 			Name:      MetricTelegramConnections,
172 142
 			Help:      "A number of connections to Telegram servers.",
173
-		}, []string{TagIPType, TagTelegramIP, TagDC}),
174
-		metricSessionDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
143
+		}, []string{TagTelegramIP, TagDC}),
144
+		metricDomainDisguisingConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
175 145
 			Namespace: metricPrefix,
176
-			Name:      MetricSessionDuration,
177
-			Help:      "Session duration.",
178
-			Buckets: []float64{ // per 30 seconds
179
-				30,
180
-				60,
181
-				90,
182
-				120,
183
-				150,
184
-				180,
185
-				210,
186
-				240,
187
-				270,
188
-				300,
189
-			},
190
-		}),
191
-		metricSessionTraffic: prometheus.NewHistogramVec(prometheus.HistogramOpts{
146
+			Name:      MetricDomainDisguisingConnections,
147
+			Help:      "A number of connections which talk with disguising domain.",
148
+		}, []string{TagIPFamily}),
149
+
150
+		metricTelegramTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
151
+			Namespace: metricPrefix,
152
+			Name:      MetricTelegramTraffic,
153
+			Help:      "Traffic which is generated talking with Telegram servers.",
154
+		}, []string{TagTelegramIP, TagDC, TagDirection}),
155
+		metricDomainDisguisingTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
192 156
 			Namespace: metricPrefix,
193
-			Name:      MetricSessionTraffic,
194
-			Help:      "A traffic size which flew via proxy within a single session.",
195
-			Buckets: []float64{ // per 1mb
196
-				1 * 1024 * 1024,
197
-				2 * 1024 * 1024,
198
-				3 * 1024 * 1024,
199
-				4 * 1024 * 1024,
200
-				5 * 1024 * 1024,
201
-				6 * 1024 * 1024,
202
-				7 * 1024 * 1024,
203
-				8 * 1024 * 1024,
204
-				9 * 1024 * 1024,
205
-			},
206
-		}, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
207
-		metricTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
157
+			Name:      MetricDomainDisguisingTraffic,
158
+			Help:      "Traffic which is generated talking with disguising domain.",
159
+		}, []string{TagDirection}),
160
+
161
+		metricDomainDisguising: prometheus.NewCounter(prometheus.CounterOpts{
208 162
 			Namespace: metricPrefix,
209
-			Name:      MetricTraffic,
210
-			Help:      "Traffic which is sent through this proxy.",
211
-		}, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
163
+			Name:      MetricDomainDisguising,
164
+			Help:      "A number of routings to disguising domain.",
165
+		}),
212 166
 		metricConcurrencyLimited: prometheus.NewCounter(prometheus.CounterOpts{
213 167
 			Namespace: metricPrefix,
214 168
 			Name:      MetricConcurrencyLimited,
215 169
 			Help:      "A number of sessions that were rejected by concurrency limiter.",
216 170
 		}),
217
-		metricIPBlocklisted: prometheus.NewCounterVec(prometheus.CounterOpts{
171
+		metricIPBlocklisted: prometheus.NewCounter(prometheus.CounterOpts{
218 172
 			Namespace: metricPrefix,
219 173
 			Name:      MetricIPBlocklisted,
220
-			Help:      "A number of rejected sessions due to ip blocklisting",
221
-		}, []string{TagIPType}),
174
+			Help:      "A number of rejected sessions due to ip blocklisting.",
175
+		}),
176
+		metricReplayAttacks: prometheus.NewCounter(prometheus.CounterOpts{
177
+			Namespace: metricPrefix,
178
+			Name:      MetricReplayAttacks,
179
+			Help:      "A number of detected replay attacks.",
180
+		}),
222 181
 	}
223 182
 
224 183
 	registry.MustRegister(factory.metricClientConnections)
225 184
 	registry.MustRegister(factory.metricTelegramConnections)
226
-	registry.MustRegister(factory.metricTraffic)
227
-	registry.MustRegister(factory.metricSessionTraffic)
228
-	registry.MustRegister(factory.metricSessionDuration)
185
+	registry.MustRegister(factory.metricDomainDisguisingConnections)
186
+
187
+	registry.MustRegister(factory.metricTelegramTraffic)
188
+	registry.MustRegister(factory.metricDomainDisguisingTraffic)
189
+
190
+	registry.MustRegister(factory.metricDomainDisguising)
229 191
 	registry.MustRegister(factory.metricConcurrencyLimited)
230 192
 	registry.MustRegister(factory.metricIPBlocklisted)
193
+	registry.MustRegister(factory.metricReplayAttacks)
231 194
 
232 195
 	return factory
233 196
 }

+ 9
- 11
stats/prometheus_test.go Просмотреть файл

@@ -64,7 +64,7 @@ func (suite *PrometheusTestSuite) TestEventStartFinish() {
64 64
 
65 65
 	data, err := suite.Get()
66 66
 	suite.NoError(err)
67
-	suite.Contains(data, `mtg_client_connections{ip_type="ipv4"} 1`)
67
+	suite.Contains(data, `mtg_client_connections{ip_family="ipv4"} 1`)
68 68
 
69 69
 	suite.prometheus.EventConnectedToDC(mtglib.EventConnectedToDC{
70 70
 		CreatedAt: time.Now(),
@@ -76,9 +76,9 @@ func (suite *PrometheusTestSuite) TestEventStartFinish() {
76 76
 
77 77
 	data, err = suite.Get()
78 78
 	suite.NoError(err)
79
-	suite.Contains(data, `mtg_telegram_connections{dc="4",ip="10.0.0.1",ip_type="ipv4"} 1`)
79
+	suite.Contains(data, `mtg_telegram_connections{dc="4",telegram_ip="10.0.0.1"} 1`)
80 80
 
81
-	suite.prometheus.EventTraffic(mtglib.EventTraffic{
81
+	suite.prometheus.EventTelegramTraffic(mtglib.EventTelegramTraffic{
82 82
 		CreatedAt: time.Now(),
83 83
 		ConnID:    "connID",
84 84
 		Traffic:   200,
@@ -88,9 +88,9 @@ func (suite *PrometheusTestSuite) TestEventStartFinish() {
88 88
 
89 89
 	data, err = suite.Get()
90 90
 	suite.NoError(err)
91
-	suite.Contains(data, `mtg_traffic{dc="4",direction="client",ip="10.0.0.1",ip_type="ipv4"} 200`)
91
+	suite.Contains(data, `mtg_telegram_traffic{dc="4",direction="to_client",telegram_ip="10.0.0.1"} 200`)
92 92
 
93
-	suite.prometheus.EventTraffic(mtglib.EventTraffic{
93
+	suite.prometheus.EventTelegramTraffic(mtglib.EventTelegramTraffic{
94 94
 		CreatedAt: time.Now(),
95 95
 		ConnID:    "connID",
96 96
 		Traffic:   100,
@@ -100,7 +100,7 @@ func (suite *PrometheusTestSuite) TestEventStartFinish() {
100 100
 
101 101
 	data, err = suite.Get()
102 102
 	suite.NoError(err)
103
-	suite.Contains(data, `mtg_traffic{dc="4",direction="telegram",ip="10.0.0.1",ip_type="ipv4"} 100`)
103
+	suite.Contains(data, `mtg_telegram_traffic{dc="4",direction="from_client",telegram_ip="10.0.0.1"} 100`)
104 104
 
105 105
 	suite.prometheus.EventFinish(mtglib.EventFinish{
106 106
 		CreatedAt: time.Now(),
@@ -110,10 +110,8 @@ func (suite *PrometheusTestSuite) TestEventStartFinish() {
110 110
 
111 111
 	data, err = suite.Get()
112 112
 	suite.NoError(err)
113
-	suite.Contains(data, `mtg_client_connections{ip_type="ipv4"} 0`)
114
-	suite.Contains(data, `mtg_telegram_connections{dc="4",ip="10.0.0.1",ip_type="ipv4"} 0`)
115
-	suite.Contains(data, `mtg_traffic{dc="4",direction="client",ip="10.0.0.1",ip_type="ipv4"} 200`)
116
-	suite.Contains(data, `mtg_traffic{dc="4",direction="telegram",ip="10.0.0.1",ip_type="ipv4"} 100`)
113
+	suite.Contains(data, `mtg_client_connections{ip_family="ipv4"} 0`)
114
+	suite.Contains(data, `mtg_telegram_connections{dc="4",telegram_ip="10.0.0.1"} 0`)
117 115
 }
118 116
 
119 117
 func (suite *PrometheusTestSuite) TestEventConcurrencyLimited() {
@@ -138,7 +136,7 @@ func (suite *PrometheusTestSuite) TestEventIPBlocklisted() {
138 136
 
139 137
 	data, err := suite.Get()
140 138
 	suite.NoError(err)
141
-	suite.Contains(data, `mtg_ip_blocklisted{ip_type="ipv6"} 1`)
139
+	suite.Contains(data, `mtg_ip_blocklisted 1`)
142 140
 }
143 141
 
144 142
 func TestPrometheus(t *testing.T) {

+ 27
- 45
stats/statsd.go Просмотреть файл

@@ -17,74 +17,64 @@ type statsdProcessor struct {
17 17
 }
18 18
 
19 19
 func (s statsdProcessor) EventStart(evt mtglib.EventStart) {
20
-	sInfo := &streamInfo{
21
-		createdAt: evt.CreatedAt,
22
-		clientIP:  evt.RemoteIP,
23
-	}
24
-	s.streams[evt.StreamID()] = sInfo
20
+	info := acquireStreamInfo()
21
+	info.SetStartTime(evt.CreatedAt)
22
+	info.SetClientIP(evt.RemoteIP)
23
+	s.streams[evt.StreamID()] = info
25 24
 
26 25
 	s.client.GaugeDelta(MetricClientConnections,
27 26
 		1,
28
-		statsd.StringTag(TagIPType, sInfo.GetClientIPType()))
27
+		info.TV(TagIPFamily))
29 28
 }
30 29
 
31 30
 func (s statsdProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
32
-	sInfo, ok := s.streams[evt.StreamID()]
31
+	info, ok := s.streams[evt.StreamID()]
33 32
 	if !ok {
34 33
 		return
35 34
 	}
36 35
 
37
-	sInfo.remoteIP = evt.RemoteIP
38
-	sInfo.dc = evt.DC
36
+	info.SetTelegramIP(evt.RemoteIP)
37
+	info.SetDC(evt.DC)
39 38
 
40 39
 	s.client.GaugeDelta(MetricTelegramConnections,
41 40
 		1,
42
-		statsd.StringTag(TagIPType, sInfo.GetRemoteIPType()),
43
-		statsd.StringTag(TagTelegramIP, sInfo.remoteIP.String()),
44
-		statsd.IntTag(TagDC, sInfo.dc))
41
+		info.TV(TagTelegramIP),
42
+		info.TV(TagDC))
45 43
 }
46 44
 
47
-func (s statsdProcessor) EventTraffic(evt mtglib.EventTraffic) {
48
-	sInfo, ok := s.streams[evt.StreamID()]
45
+func (s statsdProcessor) EventTelegramTraffic(evt mtglib.EventTelegramTraffic) {
46
+	info, ok := s.streams[evt.StreamID()]
49 47
 	if !ok {
50 48
 		return
51 49
 	}
52 50
 
53
-	tags := []statsd.Tag{
54
-		statsd.StringTag(TagIPType, sInfo.GetRemoteIPType()),
55
-		statsd.StringTag(TagTelegramIP, sInfo.remoteIP.String()),
56
-		statsd.IntTag(TagDC, sInfo.dc),
57
-	}
58
-
59
-	if evt.IsRead {
60
-		tags = append(tags, statsd.StringTag(TagDirection, TagDirectionClient))
61
-		s.client.Incr(MetricTraffic, int64(evt.Traffic), tags...)
62
-	} else {
63
-		tags = append(tags, statsd.StringTag(TagDirection, TagDirectionTelegram))
64
-		s.client.Incr(MetricTraffic, int64(evt.Traffic), tags...)
65
-	}
51
+	s.client.Incr(MetricTelegramTraffic,
52
+		int64(evt.Traffic),
53
+		info.TV(TagTelegramIP),
54
+		info.TV(TagDC),
55
+		statsd.StringTag(TagDirection, getDirection(evt.IsRead)))
66 56
 }
67 57
 
68 58
 func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
69
-	sInfo, ok := s.streams[evt.StreamID()]
59
+	info, ok := s.streams[evt.StreamID()]
70 60
 	if !ok {
71 61
 		return
72 62
 	}
73 63
 
74
-	defer delete(s.streams, evt.StreamID())
64
+	defer func() {
65
+		delete(s.streams, evt.StreamID())
66
+		releaseStreamInfo(info)
67
+	}()
75 68
 
76 69
 	s.client.GaugeDelta(MetricClientConnections,
77 70
 		-1,
78
-		statsd.StringTag(TagIPType, sInfo.GetClientIPType()))
79
-	s.client.PrecisionTiming(MetricSessionDuration,
80
-		evt.CreatedAt.Sub(sInfo.createdAt))
71
+		info.TV(TagIPFamily))
81 72
 
82
-	if sInfo.remoteIP != nil {
73
+	if info.V(TagTelegramIP) != "" {
83 74
 		s.client.GaugeDelta(MetricTelegramConnections,
84 75
 			-1,
85
-			statsd.StringTag(TagIPType, sInfo.GetRemoteIPType()),
86
-			statsd.StringTag(TagTelegramIP, sInfo.remoteIP.String()),
87
-			statsd.IntTag(TagDC, sInfo.dc))
76
+			info.TV(TagTelegramIP),
77
+			info.TV(TagDC))
88 78
 	}
89 79
 }
90 80
 
@@ -93,15 +83,7 @@ func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimite
93 83
 }
94 84
 
95 85
 func (s statsdProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
96
-	var tag statsd.Tag
97
-
98
-	if evt.RemoteIP.To4() == nil {
99
-		tag = statsd.StringTag(TagIPType, TagIPTypeIPv6)
100
-	} else {
101
-		tag = statsd.StringTag(TagIPType, TagIPTypeIPv4)
102
-	}
103
-
104
-	s.client.Incr(MetricIPBlocklisted, 1, tag)
86
+	s.client.Incr(MetricIPBlocklisted, 1)
105 87
 }
106 88
 
107 89
 func (s statsdProcessor) Shutdown() {

+ 9
- 10
stats/statsd_test.go Просмотреть файл

@@ -111,7 +111,7 @@ func (suite *StatsdTestSuite) TestEventStartFinish() {
111 111
 		RemoteIP:  net.ParseIP("10.0.0.10"),
112 112
 	})
113 113
 	time.Sleep(statsdSleepTime)
114
-	suite.Equal("mtg.client_connections:+1|g|#ip_type:ipv4", suite.statsdServer.String())
114
+	suite.Equal("mtg.client_connections:+1|g|#ip_family:ipv4", suite.statsdServer.String())
115 115
 
116 116
 	suite.statsd.EventConnectedToDC(mtglib.EventConnectedToDC{
117 117
 		CreatedAt: time.Now(),
@@ -121,9 +121,9 @@ func (suite *StatsdTestSuite) TestEventStartFinish() {
121 121
 	})
122 122
 	time.Sleep(statsdSleepTime)
123 123
 	suite.Contains(suite.statsdServer.String(),
124
-		"mtg.telegram_connections:+1|g|#ip_type:ipv4,ip:10.1.0.10,dc:2")
124
+		"mtg.telegram_connections:+1|g|#telegram_ip:10.1.0.10,dc:2")
125 125
 
126
-	suite.statsd.EventTraffic(mtglib.EventTraffic{
126
+	suite.statsd.EventTelegramTraffic(mtglib.EventTelegramTraffic{
127 127
 		CreatedAt: time.Now(),
128 128
 		ConnID:    "connID",
129 129
 		Traffic:   30,
@@ -131,9 +131,9 @@ func (suite *StatsdTestSuite) TestEventStartFinish() {
131 131
 	})
132 132
 	time.Sleep(statsdSleepTime)
133 133
 	suite.Contains(suite.statsdServer.String(),
134
-		"mtg.traffic:30|c|#ip_type:ipv4,ip:10.1.0.10,dc:2,direction:client")
134
+		"mtg.telegram_traffic:30|c|#telegram_ip:10.1.0.10,dc:2,direction:to_client")
135 135
 
136
-	suite.statsd.EventTraffic(mtglib.EventTraffic{
136
+	suite.statsd.EventTelegramTraffic(mtglib.EventTelegramTraffic{
137 137
 		CreatedAt: time.Now(),
138 138
 		ConnID:    "connID",
139 139
 		Traffic:   90,
@@ -141,18 +141,17 @@ func (suite *StatsdTestSuite) TestEventStartFinish() {
141 141
 	})
142 142
 	time.Sleep(statsdSleepTime)
143 143
 	suite.Contains(suite.statsdServer.String(),
144
-		"mtg.traffic:90|c|#ip_type:ipv4,ip:10.1.0.10,dc:2,direction:telegram")
144
+		"mtg.telegram_traffic:90|c|#telegram_ip:10.1.0.10,dc:2,direction:from_client")
145 145
 
146 146
 	suite.statsd.EventFinish(mtglib.EventFinish{
147 147
 		CreatedAt: time.Now(),
148 148
 		ConnID:    "connID",
149 149
 	})
150 150
 	time.Sleep(statsdSleepTime)
151
-	suite.Contains(suite.statsdServer.String(), "mtg.session_duration")
152 151
 	suite.Contains(suite.statsdServer.String(),
153
-		"mtg.telegram_connections:-1|g|#ip_type:ipv4,ip:10.1.0.10,dc:2")
152
+		"mtg.telegram_connections:-1|g|#telegram_ip:10.1.0.10,dc:2")
154 153
 	suite.Contains(suite.statsdServer.String(),
155
-		"mtg.client_connections:-1|g|#ip_type:ipv4")
154
+		"mtg.client_connections:-1|g|#ip_family:ipv4")
156 155
 }
157 156
 
158 157
 func (suite *StatsdTestSuite) TestEventConcurrencyLimited() {
@@ -171,7 +170,7 @@ func (suite *StatsdTestSuite) TestEventIPBlocklisted() {
171 170
 	})
172 171
 
173 172
 	time.Sleep(statsdSleepTime)
174
-	suite.Equal("mtg.ip_blocklisted:1|c|#ip_type:ipv4", suite.statsdServer.String())
173
+	suite.Equal("mtg.ip_blocklisted:1|c", suite.statsdServer.String())
175 174
 }
176 175
 
177 176
 func TestStatsd(t *testing.T) {

+ 41
- 14
stats/stream_info.go Просмотреть файл

@@ -2,30 +2,57 @@ package stats
2 2
 
3 3
 import (
4 4
 	"net"
5
+	"strconv"
5 6
 	"time"
7
+
8
+	statsd "github.com/smira/go-statsd"
6 9
 )
7 10
 
8 11
 type streamInfo struct {
9
-	createdAt             time.Time
10
-	clientIP              net.IP
11
-	remoteIP              net.IP
12
-	dc                    int
13
-	bytesSentToTelegram   uint
14
-	bytesRecvFromTelegram uint
12
+	startTime time.Time
13
+	tags      map[string]string
14
+}
15
+
16
+func (s *streamInfo) SetStartTime(tme time.Time) {
17
+	s.startTime = tme
18
+}
19
+
20
+func (s *streamInfo) SetClientIP(ip net.IP) {
21
+	if ip.To4() != nil {
22
+		s.tags[TagIPFamily] = TagIPFamilyIPv4
23
+	} else {
24
+		s.tags[TagIPFamily] = TagIPFamilyIPv6
25
+	}
26
+}
27
+
28
+func (s *streamInfo) SetTelegramIP(ip net.IP) {
29
+	s.tags[TagTelegramIP] = ip.String()
30
+}
31
+
32
+func (s *streamInfo) SetDC(dc int) {
33
+	s.tags[TagDC] = strconv.Itoa(dc)
15 34
 }
16 35
 
17
-func (s *streamInfo) GetClientIPType() string {
18
-	return s.getIPType(s.clientIP)
36
+func (s *streamInfo) V(key string) string {
37
+	return s.tags[key]
19 38
 }
20 39
 
21
-func (s *streamInfo) GetRemoteIPType() string {
22
-	return s.getIPType(s.remoteIP)
40
+func (s *streamInfo) TV(key string) statsd.Tag {
41
+	return statsd.StringTag(key, s.tags[key])
42
+}
43
+
44
+func (s *streamInfo) Reset() {
45
+	s.startTime = time.Time{}
46
+
47
+	for k := range s.tags {
48
+		delete(s.tags, k)
49
+	}
23 50
 }
24 51
 
25
-func (s *streamInfo) getIPType(ip net.IP) string {
26
-	if ip.To4() == nil {
27
-		return TagIPTypeIPv6
52
+func getDirection(isRead bool) string {
53
+	if isRead { // for telegram
54
+		return TagDirectionToClient
28 55
 	}
29 56
 
30
-	return TagIPTypeIPv4
57
+	return TagDirectionFromClient
31 58
 }

Загрузка…
Отмена
Сохранить