Explorar el Código

Add EventTraffic

tags/v2.0.0-rc1
9seconds hace 5 años
padre
commit
42160a08fe

+ 4
- 0
events/event_stream.go Ver fichero

@@ -71,6 +71,10 @@ func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, ob
71 71
 			switch typedEvt := evt.(type) {
72 72
 			case mtglib.EventStart:
73 73
 				observer.EventStart(typedEvt)
74
+			case mtglib.EventConnectedToDC:
75
+				observer.EventConnectedToDC(typedEvt)
76
+			case mtglib.EventTraffic:
77
+				observer.EventTraffic(typedEvt)
74 78
 			case mtglib.EventFinish:
75 79
 				observer.EventFinish(typedEvt)
76 80
 			case mtglib.EventIPBlocklisted:

+ 58
- 4
events/event_stream_test.go Ver fichero

@@ -38,7 +38,7 @@ func (suite *EventStreamTestSuite) SetupTest() {
38 38
 	suite.stream = events.NewEventStream(factories)
39 39
 }
40 40
 
41
-func (suite *EventStreamTestSuite) TestEventStartOk() {
41
+func (suite *EventStreamTestSuite) TestEventStart() {
42 42
 	evt := mtglib.EventStart{
43 43
 		CreatedAt: time.Now(),
44 44
 		ConnID:    "connID",
@@ -63,7 +63,61 @@ func (suite *EventStreamTestSuite) TestEventStartOk() {
63 63
 	time.Sleep(100 * time.Millisecond)
64 64
 }
65 65
 
66
-func (suite *EventStreamTestSuite) TestEventFinishOk() {
66
+func (suite *EventStreamTestSuite) TestEventConnectedToDC() {
67
+	evt := mtglib.EventConnectedToDC{
68
+		CreatedAt: time.Now(),
69
+		ConnID:    "connID",
70
+		RemoteIP:  net.ParseIP("10.0.0.1"),
71
+		DC:        3,
72
+	}
73
+
74
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
75
+		v.
76
+			On("EventConnectedToDC", mock.Anything).
77
+			Once().
78
+			Run(func(args mock.Arguments) {
79
+				caught := args.Get(0).(mtglib.EventConnectedToDC)
80
+
81
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
82
+				suite.Equal(evt.ConnID, caught.ConnID)
83
+				suite.Equal(evt.RemoteIP.String(), caught.RemoteIP.String())
84
+				suite.Equal(evt.StreamID(), caught.StreamID())
85
+				suite.Equal(evt.DC, caught.DC)
86
+			})
87
+	}
88
+
89
+	suite.stream.Send(suite.ctx, evt)
90
+	time.Sleep(100 * time.Millisecond)
91
+}
92
+
93
+func (suite *EventStreamTestSuite) TestEventTraffic() {
94
+	evt := mtglib.EventTraffic{
95
+		CreatedAt: time.Now(),
96
+		ConnID:    "connID",
97
+		Traffic:   1024,
98
+		IsRead:    true,
99
+	}
100
+
101
+	for _, v := range []*ObserverMock{suite.observerMock1, suite.observerMock2} {
102
+		v.
103
+			On("EventTraffic", mock.Anything).
104
+			Once().
105
+			Run(func(args mock.Arguments) {
106
+				caught := args.Get(0).(mtglib.EventTraffic)
107
+
108
+				suite.Equal(evt.CreatedAt, caught.CreatedAt)
109
+				suite.Equal(evt.ConnID, caught.ConnID)
110
+				suite.Equal(evt.StreamID(), caught.StreamID())
111
+				suite.Equal(evt.Traffic, caught.Traffic)
112
+				suite.Equal(evt.IsRead, caught.IsRead)
113
+			})
114
+	}
115
+
116
+	suite.stream.Send(suite.ctx, evt)
117
+	time.Sleep(100 * time.Millisecond)
118
+}
119
+
120
+func (suite *EventStreamTestSuite) TestEventFinish() {
67 121
 	evt := mtglib.EventFinish{
68 122
 		CreatedAt: time.Now(),
69 123
 		ConnID:    "connID",
@@ -86,7 +140,7 @@ func (suite *EventStreamTestSuite) TestEventFinishOk() {
86 140
 	time.Sleep(100 * time.Millisecond)
87 141
 }
88 142
 
89
-func (suite *EventStreamTestSuite) TestEventConcurrencyLimitedOk() {
143
+func (suite *EventStreamTestSuite) TestEventConcurrencyLimited() {
90 144
 	evt := mtglib.EventConcurrencyLimited{
91 145
 		CreatedAt: time.Now(),
92 146
 	}
@@ -106,7 +160,7 @@ func (suite *EventStreamTestSuite) TestEventConcurrencyLimitedOk() {
106 160
 	time.Sleep(100 * time.Millisecond)
107 161
 }
108 162
 
109
-func (suite *EventStreamTestSuite) TestEventIPBlocklistedOk() {
163
+func (suite *EventStreamTestSuite) TestEventIPBlocklisted() {
110 164
 	evt := mtglib.EventIPBlocklisted{
111 165
 		CreatedAt: time.Now(),
112 166
 		RemoteIP:  net.ParseIP("10.0.0.10"),

+ 2
- 0
events/init.go Ver fichero

@@ -5,6 +5,8 @@ import "github.com/9seconds/mtg/v2/mtglib"
5 5
 type Observer interface {
6 6
 	EventStart(mtglib.EventStart)
7 7
 	EventFinish(mtglib.EventFinish)
8
+	EventConnectedToDC(mtglib.EventConnectedToDC)
9
+	EventTraffic(mtglib.EventTraffic)
8 10
 	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)
9 11
 	EventIPBlocklisted(mtglib.EventIPBlocklisted)
10 12
 

+ 8
- 0
events/init_test.go Ver fichero

@@ -13,6 +13,14 @@ func (o *ObserverMock) EventStart(evt mtglib.EventStart) {
13 13
 	o.Called(evt)
14 14
 }
15 15
 
16
+func (o *ObserverMock) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
17
+	o.Called(evt)
18
+}
19
+
20
+func (o *ObserverMock) EventTraffic(evt mtglib.EventTraffic) {
21
+	o.Called(evt)
22
+}
23
+
16 24
 func (o *ObserverMock) EventFinish(evt mtglib.EventFinish) {
17 25
 	o.Called(evt)
18 26
 }

+ 30
- 0
events/multi_observer.go Ver fichero

@@ -25,6 +25,36 @@ func (m multiObserver) EventStart(evt mtglib.EventStart) {
25 25
 	wg.Wait()
26 26
 }
27 27
 
28
+func (m multiObserver) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
29
+	wg := &sync.WaitGroup{}
30
+	wg.Add(len(m.observers))
31
+
32
+	for _, v := range m.observers {
33
+		go func(obs Observer) {
34
+			defer wg.Done()
35
+
36
+			obs.EventConnectedToDC(evt)
37
+		}(v)
38
+	}
39
+
40
+	wg.Wait()
41
+}
42
+
43
+func (m multiObserver) EventTraffic(evt mtglib.EventTraffic) {
44
+	wg := &sync.WaitGroup{}
45
+	wg.Add(len(m.observers))
46
+
47
+	for _, v := range m.observers {
48
+		go func(obs Observer) {
49
+			defer wg.Done()
50
+
51
+			obs.EventTraffic(evt)
52
+		}(v)
53
+	}
54
+
55
+	wg.Wait()
56
+}
57
+
28 58
 func (m multiObserver) EventFinish(evt mtglib.EventFinish) {
29 59
 	wg := &sync.WaitGroup{}
30 60
 	wg.Add(len(m.observers))

+ 2
- 0
events/noop.go Ver fichero

@@ -18,6 +18,8 @@ func NewNoopStream() mtglib.EventStream {
18 18
 type noopObserver struct{}
19 19
 
20 20
 func (n noopObserver) EventStart(_ mtglib.EventStart)                           {}
21
+func (n noopObserver) EventConnectedToDC(_ mtglib.EventConnectedToDC)           {}
22
+func (n noopObserver) EventTraffic(_ mtglib.EventTraffic)                       {}
21 23
 func (n noopObserver) EventFinish(_ mtglib.EventFinish)                         {}
22 24
 func (n noopObserver) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {}
23 25
 func (n noopObserver) EventIPBlocklisted(_ mtglib.EventIPBlocklisted)           {}

+ 14
- 0
events/noop_test.go Ver fichero

@@ -25,6 +25,18 @@ func (suite *NoopTestSuite) SetupSuite() {
25 25
 			ConnID:    "connID",
26 26
 			RemoteIP:  net.ParseIP("127.0.0.1"),
27 27
 		},
28
+		"connected-to-dc": mtglib.EventConnectedToDC{
29
+			CreatedAt: time.Now(),
30
+			ConnID:    "connID",
31
+			RemoteIP:  net.ParseIP("127.1.0.1"),
32
+			DC:        2,
33
+		},
34
+		"traffic": mtglib.EventTraffic{
35
+			CreatedAt: time.Now(),
36
+			ConnID:    "connID",
37
+			Traffic:   1000,
38
+			IsRead:    true,
39
+		},
28 40
 		"finish": mtglib.EventFinish{
29 41
 			CreatedAt: time.Now(),
30 42
 			ConnID:    "connID",
@@ -62,6 +74,8 @@ func (suite *NoopTestSuite) TestObserver() {
62 74
 			switch typedEvt := value.(type) {
63 75
 			case mtglib.EventStart:
64 76
 				observer.EventStart(typedEvt)
77
+			case mtglib.EventConnectedToDC:
78
+				observer.EventConnectedToDC(typedEvt)
65 79
 			case mtglib.EventFinish:
66 80
 				observer.EventFinish(typedEvt)
67 81
 			case mtglib.EventConcurrencyLimited:

+ 91
- 0
mtglib/conns.go Ver fichero

@@ -0,0 +1,91 @@
1
+package mtglib
2
+
3
+import (
4
+	"context"
5
+	"fmt"
6
+	"net"
7
+	"time"
8
+)
9
+
10
+type connStandard struct {
11
+	conn        net.Conn
12
+	idleTimeout time.Duration
13
+}
14
+
15
+func (c connStandard) Read(b []byte) (int, error) {
16
+	if err := c.conn.SetReadDeadline(time.Now().Add(c.idleTimeout)); err != nil {
17
+		return 0, fmt.Errorf("cannot set read deadline: %w", err)
18
+	}
19
+
20
+	return c.conn.Read(b)
21
+}
22
+
23
+func (c connStandard) Write(b []byte) (int, error) {
24
+	if err := c.conn.SetWriteDeadline(time.Now().Add(c.idleTimeout)); err != nil {
25
+		return 0, fmt.Errorf("cannot set write deadline: %w", err)
26
+	}
27
+
28
+	return c.conn.Write(b)
29
+}
30
+
31
+func (c connStandard) Close() error {
32
+	return c.conn.Close()
33
+}
34
+
35
+func (c connStandard) LocalAddr() net.Addr {
36
+	return c.conn.LocalAddr()
37
+}
38
+
39
+func (c connStandard) RemoteAddr() net.Addr {
40
+	return c.conn.RemoteAddr()
41
+}
42
+
43
+func (c connStandard) SetDeadline(t time.Time) error {
44
+	return c.conn.SetDeadline(t)
45
+}
46
+
47
+func (c connStandard) SetReadDeadline(t time.Time) error {
48
+	return c.conn.SetReadDeadline(t)
49
+}
50
+
51
+func (c connStandard) SetWriteDeadline(t time.Time) error {
52
+	return c.conn.SetWriteDeadline(t)
53
+}
54
+
55
+type connEventTraffic struct {
56
+	net.Conn
57
+
58
+	connID string
59
+	stream EventStream
60
+	ctx    context.Context
61
+}
62
+
63
+func (c connEventTraffic) Read(b []byte) (int, error) {
64
+	n, err := c.Conn.Read(b)
65
+
66
+	if n > 0 {
67
+		c.stream.Send(c.ctx, EventTraffic{
68
+			CreatedAt: time.Now(),
69
+			ConnID:    c.connID,
70
+			Traffic:   uint(n),
71
+			IsRead:    true,
72
+		})
73
+	}
74
+
75
+	return n, err // nolint: wrapcheck
76
+}
77
+
78
+func (c connEventTraffic) Write(b []byte) (int, error) {
79
+	n, err := c.Conn.Write(b)
80
+
81
+	if n > 0 {
82
+		c.stream.Send(c.ctx, EventTraffic{
83
+			CreatedAt: time.Now(),
84
+			ConnID:    c.connID,
85
+			Traffic:   uint(n),
86
+			IsRead:    false,
87
+		})
88
+	}
89
+
90
+	return n, err // nolint: wrapcheck
91
+}

+ 22
- 0
mtglib/events.go Ver fichero

@@ -15,6 +15,28 @@ func (e EventStart) StreamID() string {
15 15
 	return e.ConnID
16 16
 }
17 17
 
18
+type EventConnectedToDC struct {
19
+	CreatedAt time.Time
20
+	ConnID    string
21
+	RemoteIP  net.IP
22
+	DC        int
23
+}
24
+
25
+func (e EventConnectedToDC) StreamID() string {
26
+	return e.ConnID
27
+}
28
+
29
+type EventTraffic struct {
30
+	CreatedAt time.Time
31
+	ConnID    string
32
+	Traffic   uint
33
+	IsRead    bool
34
+}
35
+
36
+func (e EventTraffic) StreamID() string {
37
+	return e.ConnID
38
+}
39
+
18 40
 type EventFinish struct {
19 41
 	CreatedAt time.Time
20 42
 	ConnID    string

+ 2
- 2
mtglib/internal/obfuscated2/conn.go Ver fichero

@@ -14,7 +14,7 @@ type Conn struct {
14 14
 	writeBuf []byte
15 15
 }
16 16
 
17
-func (c *Conn) Read(p []byte) (int, error) {
17
+func (c Conn) Read(p []byte) (int, error) {
18 18
 	n, err := c.Conn.Read(p)
19 19
 	if err != nil {
20 20
 		return n, err // nolint: wrapcheck
@@ -25,7 +25,7 @@ func (c *Conn) Read(p []byte) (int, error) {
25 25
 	return n, nil
26 26
 }
27 27
 
28
-func (c *Conn) Write(p []byte) (int, error) {
28
+func (c Conn) Write(p []byte) (int, error) {
29 29
 	c.writeBuf = append(c.writeBuf[:0], p...)
30 30
 	c.Encryptor.XORKeyStream(c.writeBuf, c.writeBuf)
31 31
 

+ 57
- 8
mtglib/proxy.go Ver fichero

@@ -9,6 +9,7 @@ import (
9 9
 	"time"
10 10
 
11 11
 	"github.com/9seconds/mtg/v2/mtglib/internal/obfuscated2"
12
+	"github.com/9seconds/mtg/v2/mtglib/internal/telegram"
12 13
 	"github.com/panjf2000/ants/v2"
13 14
 )
14 15
 
@@ -16,10 +17,12 @@ type Proxy struct {
16 17
 	ctx             context.Context
17 18
 	ctxCancel       context.CancelFunc
18 19
 	streamWaitGroup sync.WaitGroup
19
-	workerPool      *ants.PoolWithFunc
20
+
21
+	idleTimeout time.Duration
22
+	workerPool  *ants.PoolWithFunc
23
+	telegram    *telegram.Telegram
20 24
 
21 25
 	secret          Secret
22
-	network         Network
23 26
 	antiReplayCache AntiReplayCache
24 27
 	ipBlocklist     IPBlocklist
25 28
 	eventStream     EventStream
@@ -55,6 +58,12 @@ func (p *Proxy) ServeConn(conn net.Conn) {
55 58
 
56 59
 		return
57 60
 	}
61
+
62
+	if err := p.doTelegramCall(ctx); err != nil {
63
+		p.logger.WarningError("cannot dial to telegram", err)
64
+
65
+		return
66
+	}
58 67
 }
59 68
 
60 69
 func (p *Proxy) Serve(listener net.Listener) error {
@@ -102,16 +111,45 @@ func (p *Proxy) doObfuscated2Handshake(ctx *streamContext) error {
102 111
 
103 112
 	ctx.dc = dc
104 113
 	ctx.logger = ctx.logger.BindInt("dc", dc)
105
-	ctx.clientConn = &obfuscated2.Conn{
106
-		Conn:      ctx.clientConn,
107
-		Encryptor: encryptor,
108
-		Decryptor: decryptor,
114
+	ctx.clientConn = connStandard{
115
+		conn: obfuscated2.Conn{
116
+			Conn:      ctx.clientConn,
117
+			Encryptor: encryptor,
118
+			Decryptor: decryptor,
119
+		},
120
+		idleTimeout: p.idleTimeout,
121
+	}
122
+
123
+	return nil
124
+}
125
+
126
+func (p *Proxy) doTelegramCall(ctx *streamContext) error {
127
+	conn, err := p.telegram.Dial(ctx, ctx.dc)
128
+	if err != nil {
129
+		return fmt.Errorf("cannot dial to Telegram: %w", err)
109 130
 	}
110 131
 
132
+	ctx.telegramConn = connEventTraffic{
133
+		Conn: connStandard{
134
+			conn:        conn,
135
+			idleTimeout: p.idleTimeout,
136
+		},
137
+		connID: ctx.connID,
138
+		stream: p.eventStream,
139
+		ctx:    ctx,
140
+	}
141
+
142
+	p.eventStream.Send(ctx, EventConnectedToDC{
143
+		CreatedAt: time.Now(),
144
+		ConnID:    ctx.connID,
145
+		RemoteIP:  conn.RemoteAddr().(*net.TCPAddr).IP,
146
+		DC:        ctx.dc,
147
+	})
148
+
111 149
 	return nil
112 150
 }
113 151
 
114
-func NewProxy(opts ProxyOpts) (*Proxy, error) {
152
+func NewProxy(opts ProxyOpts) (*Proxy, error) { // nolint: cyclop
115 153
 	switch {
116 154
 	case opts.Network == nil:
117 155
 		return nil, ErrNetworkIsNotDefined
@@ -127,21 +165,32 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
127 165
 		return nil, ErrSecretInvalid
128 166
 	}
129 167
 
168
+	tg, err := telegram.New(opts.Network, opts.PreferIP)
169
+	if err != nil {
170
+		return nil, fmt.Errorf("cannot build telegram dialer: %w", err)
171
+	}
172
+
130 173
 	concurrency := opts.Concurrency
131 174
 	if concurrency == 0 {
132 175
 		concurrency = DefaultConcurrency
133 176
 	}
134 177
 
178
+	idleTimeout := opts.IdleTimeout
179
+	if idleTimeout < 1 {
180
+		idleTimeout = DefaultIdleTimeout
181
+	}
182
+
135 183
 	ctx, cancel := context.WithCancel(context.Background())
136 184
 	proxy := &Proxy{
137 185
 		ctx:             ctx,
138 186
 		ctxCancel:       cancel,
139 187
 		secret:          opts.Secret,
140
-		network:         opts.Network,
141 188
 		antiReplayCache: opts.AntiReplayCache,
142 189
 		ipBlocklist:     opts.IPBlocklist,
143 190
 		eventStream:     opts.EventStream,
144 191
 		logger:          opts.Logger.Named("proxy"),
192
+		idleTimeout:     idleTimeout,
193
+		telegram:        tg,
145 194
 	}
146 195
 
147 196
 	pool, err := ants.NewPoolWithFunc(int(concurrency), func(arg interface{}) {

+ 15
- 7
mtglib/stream_context.go Ver fichero

@@ -9,12 +9,13 @@ import (
9 9
 )
10 10
 
11 11
 type streamContext struct {
12
-	ctx        context.Context
13
-	ctxCancel  context.CancelFunc
14
-	clientConn net.Conn
15
-	connID     string
16
-	dc         int
17
-	logger     Logger
12
+	ctx          context.Context
13
+	ctxCancel    context.CancelFunc
14
+	clientConn   net.Conn
15
+	telegramConn net.Conn
16
+	connID       string
17
+	dc           int
18
+	logger       Logger
18 19
 }
19 20
 
20 21
 func (s *streamContext) Deadline() (time.Time, bool) {
@@ -35,7 +36,14 @@ func (s *streamContext) Value(key interface{}) interface{} {
35 36
 
36 37
 func (s *streamContext) Close() {
37 38
 	s.ctxCancel()
38
-	s.clientConn.Close()
39
+
40
+	if s.clientConn != nil {
41
+		s.clientConn.Close()
42
+	}
43
+
44
+	if s.telegramConn != nil {
45
+		s.telegramConn.Close()
46
+	}
39 47
 }
40 48
 
41 49
 func (s *streamContext) ClientIP() net.IP {

+ 15
- 7
stats/init.go Ver fichero

@@ -6,13 +6,21 @@ const (
6 6
 	DefaultStatsdMetricPrefix = DefaultMetricPrefix + "."
7 7
 	DefaultStatsdTagFormat    = "datadog"
8 8
 
9
-	MetricActiveConnection   = "active_connections"
10
-	MetricSessionDuration    = "session_duration"
11
-	MetricConcurrencyLimited = "concurrency_limited"
12
-	MetricIPBlocklisted      = "ip_blocklisted"
9
+	MetricClientConnections   = "client_connections"
10
+	MetricTelegramConnections = "telegram_connections"
11
+	MetricTraffic             = "traffic"
12
+	MetricSessionDuration     = "session_duration"
13
+	MetricSessionTraffic      = "session_traffic"
14
+	MetricConcurrencyLimited  = "concurrency_limited"
15
+	MetricIPBlocklisted       = "ip_blocklisted"
13 16
 
14
-	TagIPType = "ip_type"
17
+	TagIPType     = "ip_type"
18
+	TagTelegramIP = "ip"
19
+	TagDC         = "dc"
20
+	TagDirection  = "direction"
15 21
 
16
-	TagIPTypeIPv4 = "ipv4"
17
-	TagIPTypeIPv6 = "ipv6"
22
+	TagIPTypeIPv4        = "ipv4"
23
+	TagIPTypeIPv6        = "ipv6"
24
+	TagDirectionTelegram = "telegram"
25
+	TagDirectionClient   = "client"
18 26
 )

+ 105
- 10
stats/prometheus.go Ver fichero

@@ -4,6 +4,7 @@ import (
4 4
 	"context"
5 5
 	"net"
6 6
 	"net/http"
7
+	"strconv"
7 8
 	"time"
8 9
 
9 10
 	"github.com/9seconds/mtg/v2/events"
@@ -24,7 +25,47 @@ func (p prometheusProcessor) EventStart(evt mtglib.EventStart) {
24 25
 	}
25 26
 	p.streams[evt.StreamID()] = sInfo
26 27
 
27
-	p.factory.metricActiveConnections.WithLabelValues(sInfo.IPType()).Inc()
28
+	p.factory.metricClientConnections.WithLabelValues(sInfo.GetClientIPType()).Inc()
29
+}
30
+
31
+func (p prometheusProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
32
+	sInfo, ok := p.streams[evt.StreamID()]
33
+	if !ok {
34
+		return
35
+	}
36
+
37
+	sInfo.remoteIP = evt.RemoteIP
38
+	sInfo.dc = evt.DC
39
+
40
+	p.factory.metricTelegramConnections.WithLabelValues(
41
+		sInfo.GetRemoteIPType(),
42
+		sInfo.remoteIP.String(),
43
+		strconv.Itoa(sInfo.dc)).Inc()
44
+}
45
+
46
+func (p prometheusProcessor) EventTraffic(evt mtglib.EventTraffic) {
47
+	sInfo, ok := p.streams[evt.StreamID()]
48
+	if !ok {
49
+		return
50
+	}
51
+
52
+	labels := []string{
53
+		sInfo.GetRemoteIPType(),
54
+		sInfo.remoteIP.String(),
55
+		strconv.Itoa(sInfo.dc),
56
+	}
57
+
58
+	if evt.IsRead {
59
+		sInfo.bytesRecvFromTelegram += evt.Traffic
60
+
61
+		labels = append(labels, TagDirectionClient)
62
+	} else {
63
+		sInfo.bytesSentToTelegram += evt.Traffic
64
+
65
+		labels = append(labels, TagDirectionTelegram)
66
+	}
67
+
68
+	p.factory.metricTraffic.WithLabelValues(labels...).Add(float64(evt.Traffic))
28 69
 }
29 70
 
30 71
 func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
@@ -37,8 +78,30 @@ func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
37 78
 
38 79
 	duration := evt.CreatedAt.Sub(sInfo.createdAt)
39 80
 
40
-	p.factory.metricActiveConnections.WithLabelValues(sInfo.IPType()).Dec()
81
+	p.factory.metricClientConnections.WithLabelValues(sInfo.GetRemoteIPType()).Dec()
41 82
 	p.factory.metricSessionDuration.Observe(float64(duration) / float64(time.Second))
83
+
84
+	if sInfo.remoteIP == nil {
85
+		return
86
+	}
87
+
88
+	labels := []string{
89
+		sInfo.GetRemoteIPType(),
90
+		sInfo.remoteIP.String(),
91
+		strconv.Itoa(sInfo.dc),
92
+	}
93
+
94
+	p.factory.metricTelegramConnections.WithLabelValues(labels...).Dec()
95
+
96
+	labels = append(labels, TagDirectionClient)
97
+	p.factory.metricSessionTraffic.
98
+		WithLabelValues(labels...).
99
+		Observe(float64(sInfo.bytesRecvFromTelegram))
100
+
101
+	labels[3] = TagDirectionTelegram
102
+	p.factory.metricSessionTraffic.
103
+		WithLabelValues(labels...).
104
+		Observe(float64(sInfo.bytesSentToTelegram))
42 105
 }
43 106
 
44 107
 func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
@@ -60,10 +123,13 @@ func (p prometheusProcessor) Shutdown() {
60 123
 type PrometheusFactory struct {
61 124
 	httpServer *http.Server
62 125
 
63
-	metricActiveConnections  *prometheus.GaugeVec
64
-	metricIPBlocklisted      *prometheus.CounterVec
65
-	metricConcurrencyLimited prometheus.Counter
66
-	metricSessionDuration    prometheus.Histogram
126
+	metricClientConnections   *prometheus.GaugeVec
127
+	metricTelegramConnections *prometheus.GaugeVec
128
+	metricTraffic             *prometheus.CounterVec
129
+	metricIPBlocklisted       *prometheus.CounterVec
130
+	metricSessionTraffic      *prometheus.HistogramVec
131
+	metricConcurrencyLimited  prometheus.Counter
132
+	metricSessionDuration     prometheus.Histogram
67 133
 }
68 134
 
69 135
 func (p *PrometheusFactory) Make() events.Observer {
@@ -81,7 +147,7 @@ func (p *PrometheusFactory) Close() error {
81 147
 	return p.httpServer.Shutdown(context.Background())
82 148
 }
83 149
 
84
-func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory {
150
+func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory { // nolint: funlen
85 151
 	registry := prometheus.NewPedanticRegistry()
86 152
 	httpHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
87 153
 		EnableOpenMetrics: true,
@@ -95,11 +161,16 @@ func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory {
95 161
 			Handler: mux,
96 162
 		},
97 163
 
98
-		metricActiveConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
164
+		metricClientConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
99 165
 			Namespace: metricPrefix,
100
-			Name:      MetricActiveConnection,
166
+			Name:      MetricClientConnections,
101 167
 			Help:      "A number of connections under active processing.",
102 168
 		}, []string{TagIPType}),
169
+		metricTelegramConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
170
+			Namespace: metricPrefix,
171
+			Name:      MetricTelegramConnections,
172
+			Help:      "A number of connections to Telegram servers.",
173
+		}, []string{TagIPType, TagTelegramIP, TagDC}),
103 174
 		metricSessionDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
104 175
 			Namespace: metricPrefix,
105 176
 			Name:      MetricSessionDuration,
@@ -117,6 +188,27 @@ func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory {
117 188
 				300,
118 189
 			},
119 190
 		}),
191
+		metricSessionTraffic: prometheus.NewHistogramVec(prometheus.HistogramOpts{
192
+			Namespace: metricPrefix,
193
+			Name:      MetricSessionTraffic,
194
+			Help:      "A traffic size which flew via proxy within a single session.",
195
+			Buckets: []float64{ // per 1mb
196
+				1 * 1024 * 1024,
197
+				2 * 1024 * 1024,
198
+				3 * 1024 * 1024,
199
+				4 * 1024 * 1024,
200
+				5 * 1024 * 1024,
201
+				6 * 1024 * 1024,
202
+				7 * 1024 * 1024,
203
+				8 * 1024 * 1024,
204
+				9 * 1024 * 1024,
205
+			},
206
+		}, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
207
+		metricTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
208
+			Namespace: metricPrefix,
209
+			Name:      MetricTraffic,
210
+			Help:      "Traffic which is sent through this proxy.",
211
+		}, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
120 212
 		metricConcurrencyLimited: prometheus.NewCounter(prometheus.CounterOpts{
121 213
 			Namespace: metricPrefix,
122 214
 			Name:      MetricConcurrencyLimited,
@@ -129,7 +221,10 @@ func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory {
129 221
 		}, []string{TagIPType}),
130 222
 	}
131 223
 
132
-	registry.MustRegister(factory.metricActiveConnections)
224
+	registry.MustRegister(factory.metricClientConnections)
225
+	registry.MustRegister(factory.metricTelegramConnections)
226
+	registry.MustRegister(factory.metricTraffic)
227
+	registry.MustRegister(factory.metricSessionTraffic)
133 228
 	registry.MustRegister(factory.metricSessionDuration)
134 229
 	registry.MustRegister(factory.metricConcurrencyLimited)
135 230
 	registry.MustRegister(factory.metricIPBlocklisted)

+ 41
- 4
stats/prometheus_test.go Ver fichero

@@ -60,23 +60,60 @@ func (suite *PrometheusTestSuite) TestEventStartFinish() {
60 60
 		ConnID:    "connID",
61 61
 		RemoteIP:  net.ParseIP("10.0.0.10"),
62 62
 	})
63
-
64 63
 	time.Sleep(100 * time.Millisecond)
65 64
 
66 65
 	data, err := suite.Get()
67 66
 	suite.NoError(err)
68
-	suite.Contains(data, `mtg_active_connections{ip_type="ipv4"} 1`)
67
+	suite.Contains(data, `mtg_client_connections{ip_type="ipv4"} 1`)
69 68
 
70
-	suite.prometheus.EventFinish(mtglib.EventFinish{
69
+	suite.prometheus.EventConnectedToDC(mtglib.EventConnectedToDC{
70
+		CreatedAt: time.Now(),
71
+		ConnID:    "connID",
72
+		RemoteIP:  net.ParseIP("10.0.0.1"),
73
+		DC:        4,
74
+	})
75
+	time.Sleep(100 * time.Millisecond)
76
+
77
+	data, err = suite.Get()
78
+	suite.NoError(err)
79
+	suite.Contains(data, `mtg_telegram_connections{dc="4",ip="10.0.0.1",ip_type="ipv4"} 1`)
80
+
81
+	suite.prometheus.EventTraffic(mtglib.EventTraffic{
82
+		CreatedAt: time.Now(),
83
+		ConnID:    "connID",
84
+		Traffic:   200,
85
+		IsRead:    true,
86
+	})
87
+	time.Sleep(100 * time.Millisecond)
88
+
89
+	data, err = suite.Get()
90
+	suite.NoError(err)
91
+	suite.Contains(data, `mtg_traffic{dc="4",direction="client",ip="10.0.0.1",ip_type="ipv4"} 200`)
92
+
93
+	suite.prometheus.EventTraffic(mtglib.EventTraffic{
71 94
 		CreatedAt: time.Now(),
72 95
 		ConnID:    "connID",
96
+		Traffic:   100,
97
+		IsRead:    false,
73 98
 	})
99
+	time.Sleep(100 * time.Millisecond)
74 100
 
101
+	data, err = suite.Get()
102
+	suite.NoError(err)
103
+	suite.Contains(data, `mtg_traffic{dc="4",direction="telegram",ip="10.0.0.1",ip_type="ipv4"} 100`)
104
+
105
+	suite.prometheus.EventFinish(mtglib.EventFinish{
106
+		CreatedAt: time.Now(),
107
+		ConnID:    "connID",
108
+	})
75 109
 	time.Sleep(100 * time.Millisecond)
76 110
 
77 111
 	data, err = suite.Get()
78 112
 	suite.NoError(err)
79
-	suite.Contains(data, `mtg_active_connections{ip_type="ipv4"} 0`)
113
+	suite.Contains(data, `mtg_client_connections{ip_type="ipv4"} 0`)
114
+	suite.Contains(data, `mtg_telegram_connections{dc="4",ip="10.0.0.1",ip_type="ipv4"} 0`)
115
+	suite.Contains(data, `mtg_traffic{dc="4",direction="client",ip="10.0.0.1",ip_type="ipv4"} 200`)
116
+	suite.Contains(data, `mtg_traffic{dc="4",direction="telegram",ip="10.0.0.1",ip_type="ipv4"} 100`)
80 117
 }
81 118
 
82 119
 func (suite *PrometheusTestSuite) TestEventConcurrencyLimited() {

+ 53
- 7
stats/statsd.go Ver fichero

@@ -22,9 +22,47 @@ func (s statsdProcessor) EventStart(evt mtglib.EventStart) {
22 22
 		clientIP:  evt.RemoteIP,
23 23
 	}
24 24
 	s.streams[evt.StreamID()] = sInfo
25
-	ipTypeTag := statsd.StringTag(TagIPType, sInfo.IPType())
26 25
 
27
-	s.client.GaugeDelta(MetricActiveConnection, 1, ipTypeTag)
26
+	s.client.GaugeDelta(MetricClientConnections,
27
+		1,
28
+		statsd.StringTag(TagIPType, sInfo.GetClientIPType()))
29
+}
30
+
31
+func (s statsdProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
32
+	sInfo, ok := s.streams[evt.StreamID()]
33
+	if !ok {
34
+		return
35
+	}
36
+
37
+	sInfo.remoteIP = evt.RemoteIP
38
+	sInfo.dc = evt.DC
39
+
40
+	s.client.GaugeDelta(MetricTelegramConnections,
41
+		1,
42
+		statsd.StringTag(TagIPType, sInfo.GetRemoteIPType()),
43
+		statsd.StringTag(TagTelegramIP, sInfo.remoteIP.String()),
44
+		statsd.IntTag(TagDC, sInfo.dc))
45
+}
46
+
47
+func (s statsdProcessor) EventTraffic(evt mtglib.EventTraffic) {
48
+	sInfo, ok := s.streams[evt.StreamID()]
49
+	if !ok {
50
+		return
51
+	}
52
+
53
+	tags := []statsd.Tag{
54
+		statsd.StringTag(TagIPType, sInfo.GetRemoteIPType()),
55
+		statsd.StringTag(TagTelegramIP, sInfo.remoteIP.String()),
56
+		statsd.IntTag(TagDC, sInfo.dc),
57
+	}
58
+
59
+	if evt.IsRead {
60
+		tags = append(tags, statsd.StringTag(TagDirection, TagDirectionClient))
61
+		s.client.Incr(MetricTraffic, int64(evt.Traffic), tags...)
62
+	} else {
63
+		tags = append(tags, statsd.StringTag(TagDirection, TagDirectionTelegram))
64
+		s.client.Incr(MetricTraffic, int64(evt.Traffic), tags...)
65
+	}
28 66
 }
29 67
 
30 68
 func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
@@ -35,11 +73,19 @@ func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
35 73
 
36 74
 	defer delete(s.streams, evt.StreamID())
37 75
 
38
-	duration := evt.CreatedAt.Sub(sInfo.createdAt)
39
-	ipTypeTag := statsd.StringTag(TagIPType, sInfo.IPType())
40
-
41
-	s.client.GaugeDelta(MetricActiveConnection, -1, ipTypeTag)
42
-	s.client.PrecisionTiming(MetricSessionDuration, duration)
76
+	s.client.GaugeDelta(MetricClientConnections,
77
+		-1,
78
+		statsd.StringTag(TagIPType, sInfo.GetClientIPType()))
79
+	s.client.PrecisionTiming(MetricSessionDuration,
80
+		evt.CreatedAt.Sub(sInfo.createdAt))
81
+
82
+	if sInfo.remoteIP != nil {
83
+		s.client.GaugeDelta(MetricTelegramConnections,
84
+			-1,
85
+			statsd.StringTag(TagIPType, sInfo.GetRemoteIPType()),
86
+			statsd.StringTag(TagTelegramIP, sInfo.remoteIP.String()),
87
+			statsd.IntTag(TagDC, sInfo.dc))
88
+	}
43 89
 }
44 90
 
45 91
 func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {

+ 34
- 2
stats/statsd_test.go Ver fichero

@@ -103,17 +103,49 @@ func (suite *StatsdTestSuite) TestEventStartFinish() {
103 103
 		ConnID:    "connID",
104 104
 		RemoteIP:  net.ParseIP("10.0.0.10"),
105 105
 	})
106
+	time.Sleep(statsdSleepTime)
107
+	suite.Equal("mtg.client_connections:+1|g|#ip_type:ipv4", suite.statsdServer.String())
106 108
 
109
+	suite.statsd.EventConnectedToDC(mtglib.EventConnectedToDC{
110
+		CreatedAt: time.Now(),
111
+		ConnID:    "connID",
112
+		RemoteIP:  net.ParseIP("10.1.0.10"),
113
+		DC:        2,
114
+	})
107 115
 	time.Sleep(statsdSleepTime)
108
-	suite.Equal("mtg.active_connections:+1|g|#ip_type:ipv4", suite.statsdServer.String())
116
+	suite.Contains(suite.statsdServer.String(),
117
+		"mtg.telegram_connections:+1|g|#ip_type:ipv4,ip:10.1.0.10,dc:2")
109 118
 
110
-	suite.statsd.EventFinish(mtglib.EventFinish{
119
+	suite.statsd.EventTraffic(mtglib.EventTraffic{
111 120
 		CreatedAt: time.Now(),
112 121
 		ConnID:    "connID",
122
+		Traffic:   30,
123
+		IsRead:    true,
113 124
 	})
125
+	time.Sleep(statsdSleepTime)
126
+	suite.Contains(suite.statsdServer.String(),
127
+		"mtg.traffic:30|c|#ip_type:ipv4,ip:10.1.0.10,dc:2,direction:client")
114 128
 
129
+	suite.statsd.EventTraffic(mtglib.EventTraffic{
130
+		CreatedAt: time.Now(),
131
+		ConnID:    "connID",
132
+		Traffic:   90,
133
+		IsRead:    false,
134
+	})
135
+	time.Sleep(statsdSleepTime)
136
+	suite.Contains(suite.statsdServer.String(),
137
+		"mtg.traffic:90|c|#ip_type:ipv4,ip:10.1.0.10,dc:2,direction:telegram")
138
+
139
+	suite.statsd.EventFinish(mtglib.EventFinish{
140
+		CreatedAt: time.Now(),
141
+		ConnID:    "connID",
142
+	})
115 143
 	time.Sleep(statsdSleepTime)
116 144
 	suite.Contains(suite.statsdServer.String(), "mtg.session_duration")
145
+	suite.Contains(suite.statsdServer.String(),
146
+		"mtg.telegram_connections:-1|g|#ip_type:ipv4,ip:10.1.0.10,dc:2")
147
+	suite.Contains(suite.statsdServer.String(),
148
+		"mtg.client_connections:-1|g|#ip_type:ipv4")
117 149
 }
118 150
 
119 151
 func (suite *StatsdTestSuite) TestEventConcurrencyLimited() {

+ 16
- 4
stats/stream_info.go Ver fichero

@@ -6,12 +6,24 @@ import (
6 6
 )
7 7
 
8 8
 type streamInfo struct {
9
-	createdAt time.Time
10
-	clientIP  net.IP
9
+	createdAt             time.Time
10
+	clientIP              net.IP
11
+	remoteIP              net.IP
12
+	dc                    int
13
+	bytesSentToTelegram   uint
14
+	bytesRecvFromTelegram uint
11 15
 }
12 16
 
13
-func (s *streamInfo) IPType() string {
14
-	if s.clientIP.To4() == nil {
17
+func (s *streamInfo) GetClientIPType() string {
18
+	return s.getIPType(s.clientIP)
19
+}
20
+
21
+func (s *streamInfo) GetRemoteIPType() string {
22
+	return s.getIPType(s.remoteIP)
23
+}
24
+
25
+func (s *streamInfo) getIPType(ip net.IP) string {
26
+	if ip.To4() == nil {
15 27
 		return TagIPTypeIPv6
16 28
 	}
17 29
 

Loading…
Cancelar
Guardar