Przeglądaj źródła

Add new stats metric

tags/1.0^2
9seconds 6 lat temu
rodzic
commit
6c7edfb7db

+ 2
- 2
antireplay/init.go Wyświetl plik

16
 	initOnce.Do(func() {
16
 	initOnce.Do(func() {
17
 		c, err := bigcache.NewBigCache(bigcache.Config{
17
 		c, err := bigcache.NewBigCache(bigcache.Config{
18
 			Shards:           1024,
18
 			Shards:           1024,
19
-			LifeWindow:       config.C.AntiReplay.EvictionTime,
19
+			LifeWindow:       config.C.AntiReplayEvictionTime,
20
 			Hasher:           hasher{},
20
 			Hasher:           hasher{},
21
-			HardMaxCacheSize: config.C.AntiReplay.MaxSize,
21
+			HardMaxCacheSize: config.C.AntiReplayMaxSize,
22
 		})
22
 		})
23
 		if err != nil {
23
 		if err != nil {
24
 			panic(err)
24
 			panic(err)

+ 2
- 2
cli/proxy.go Wyświetl plik

43
 	if err := config.InitPublicAddress(ctx); err != nil {
43
 	if err := config.InitPublicAddress(ctx); err != nil {
44
 		Fatal(err)
44
 		Fatal(err)
45
 	}
45
 	}
46
-	zap.S().Debugw("Configuration", "config", config.C.Printable())
46
+	zap.S().Debugw("Configuration", "config", config.Printable())
47
 
47
 
48
 	if len(config.C.AdTag) > 0 {
48
 	if len(config.C.AdTag) > 0 {
49
 		zap.S().Infow("Use middle proxy connection to Telegram")
49
 		zap.S().Infow("Use middle proxy connection to Telegram")
67
 	}
67
 	}
68
 	telegram.MiddleInit()
68
 	telegram.MiddleInit()
69
 
69
 
70
-	proxyListener, err := net.Listen("tcp", config.C.ListenAddr.String())
70
+	proxyListener, err := net.Listen("tcp", config.C.Bind.String())
71
 	if err != nil {
71
 	if err != nil {
72
 		Fatal(err)
72
 		Fatal(err)
73
 	}
73
 	}

+ 76
- 133
config/config.go Wyświetl plik

7
 	"errors"
7
 	"errors"
8
 	"fmt"
8
 	"fmt"
9
 	"net"
9
 	"net"
10
-	"strconv"
11
 	"time"
10
 	"time"
12
 
11
 
13
 	"go.uber.org/zap"
12
 	"go.uber.org/zap"
40
 	OptionTypeDebug OptionType = iota
39
 	OptionTypeDebug OptionType = iota
41
 	OptionTypeVerbose
40
 	OptionTypeVerbose
42
 
41
 
43
-	OptionTypeBindIP
44
-	OptionTypeBindPort
42
+	OptionTypeBind
45
 	OptionTypePublicIPv4
43
 	OptionTypePublicIPv4
46
-	OptionTypePublicIPv4Port
47
 	OptionTypePublicIPv6
44
 	OptionTypePublicIPv6
48
-	OptionTypePublicIPv6Port
49
-	OptionTypeStatsIP
50
-	OptionTypeStatsPort
51
 
45
 
52
-	OptionTypeStatsdIP
53
-	OptionTypeStatsdPort
46
+	OptionTypeStatsBind
47
+	OptionTypeStatsNamespace
48
+	OptionTypeStatsdAddress
54
 	OptionTypeStatsdNetwork
49
 	OptionTypeStatsdNetwork
55
-	OptionTypeStatsdPrefix
56
 	OptionTypeStatsdTagsFormat
50
 	OptionTypeStatsdTagsFormat
57
 	OptionTypeStatsdTags
51
 	OptionTypeStatsdTags
58
-	OptionTypePrometheusPrefix
59
 
52
 
60
 	OptionTypeWriteBufferSize
53
 	OptionTypeWriteBufferSize
61
 	OptionTypeReadBufferSize
54
 	OptionTypeReadBufferSize
67
 	OptionTypeAdtag
60
 	OptionTypeAdtag
68
 )
61
 )
69
 
62
 
70
-type BufferSize struct {
71
-	Read  int `json:"read"`
72
-	Write int `json:"write"`
73
-}
74
-
75
-type AntiReplay struct {
76
-	MaxSize      int           `json:"max_size"`
77
-	EvictionTime time.Duration `json:"duration"`
78
-}
79
-
80
-type Stats struct {
81
-	Prefix string `json:"prefix"`
82
-}
83
-
84
-type StatsdStats struct {
85
-	Stats
86
-
87
-	Addr       Addr              `json:"addr"`
88
-	Tags       map[string]string `json:"tags"`
89
-	TagsFormat statsd.TagFormat  `json:"format"`
90
-}
91
-
92
-type PrometheusStats struct {
93
-	Stats
94
-}
95
-
96
-type Addr struct {
97
-	IP   net.IP `json:"ip"`
98
-	Port int    `json:"port"`
99
-	net  string
100
-}
101
-
102
-func (a Addr) Network() string {
103
-	if a.net == "" {
104
-		return "tcp"
105
-	}
106
-	return a.net
107
-}
108
-
109
-func (a Addr) String() string {
110
-	return net.JoinHostPort(a.IP.String(), strconv.Itoa(a.Port))
111
-}
112
-
113
-func (a Addr) MarshalJSON() ([]byte, error) {
114
-	data := map[string]string{
115
-		"network": a.Network(),
116
-		"addr":    a.String(),
117
-	}
118
-	return json.Marshal(data)
119
-}
120
-
121
 type Config struct {
63
 type Config struct {
122
-	BufferSize BufferSize `json:"buffer_size"`
123
-	AntiReplay AntiReplay `json:"anti_replay"`
124
-
125
-	ListenAddr     Addr `json:"listen_addr"`
126
-	PublicIPv4Addr Addr `json:"public_ipv4_addr"`
127
-	PublicIPv6Addr Addr `json:"public_ipv6_addr"`
128
-	StatsAddr      Addr `json:"stats_addr"`
129
-
130
-	StatsdStats     StatsdStats     `json:"stats_statsd"`
131
-	PrometheusStats PrometheusStats `json:"stats_prometheus"`
132
-
133
-	Debug      bool       `json:"debug"`
134
-	Verbose    bool       `json:"verbose"`
135
-	SecretMode SecretMode `json:"secret_mode"`
136
-	Secret     []byte     `json:"secret"`
137
-	AdTag      []byte     `json:"adtag"`
138
-}
64
+	Bind       *net.TCPAddr `json:"bind"`
65
+	PublicIPv4 *net.TCPAddr `json:"public_ipv4"`
66
+	PublicIPv6 *net.TCPAddr `json:"public_ipv6"`
67
+	StatsBind  *net.TCPAddr `json:"stats_bind"`
68
+	StatsdAddr *net.TCPAddr `json:"stats_addr"`
139
 
69
 
140
-func (c Config) Printable() interface{} {
141
-	data, err := json.Marshal(c)
142
-	if err != nil {
143
-		panic(err)
144
-	}
70
+	StatsNamespace string            `json:"stats_namespace"`
71
+	StatsdNetwork  string            `json:"statsd_network"`
72
+	StatsdTags     map[string]string `json:"statsd_tags"`
145
 
73
 
146
-	rv := map[string]interface{}{}
147
-	if err := json.Unmarshal(data, &rv); err != nil {
148
-		panic(err)
149
-	}
74
+	WriteBuffer int `json:"write_buffer"`
75
+	ReadBuffer  int `json:"read_buffer"`
150
 
76
 
151
-	return rv
152
-}
77
+	AntiReplayMaxSize      int           `json:"anti_replay_max_size"`
78
+	AntiReplayEvictionTime time.Duration `json:"anti_replay_eviction_time"`
79
+
80
+	Debug            bool             `json:"debug"`
81
+	Verbose          bool             `json:"verbose"`
82
+	StatsdTagsFormat statsd.TagFormat `json:"statsd_tags_format"`
83
+	SecretMode       SecretMode       `json:"secret_mode"`
153
 
84
 
154
-func (c Config) String() string {
155
-	data, _ := json.Marshal(c)
156
-	return string(data)
85
+	Secret []byte `json:"secret"`
86
+	AdTag  []byte `json:"adtag"`
157
 }
87
 }
158
 
88
 
159
 type Opt struct {
89
 type Opt struct {
163
 
93
 
164
 var C = Config{}
94
 var C = Config{}
165
 
95
 
166
-func Init(options ...Opt) error { // nolint: gocyclo
96
+func Init(options ...Opt) error { // nolint: gocyclo, funlen
167
 	for _, opt := range options {
97
 	for _, opt := range options {
168
 		switch opt.Option {
98
 		switch opt.Option {
169
 		case OptionTypeDebug:
99
 		case OptionTypeDebug:
170
 			C.Debug = opt.Value.(bool)
100
 			C.Debug = opt.Value.(bool)
171
 		case OptionTypeVerbose:
101
 		case OptionTypeVerbose:
172
 			C.Verbose = opt.Value.(bool)
102
 			C.Verbose = opt.Value.(bool)
173
-		case OptionTypeBindIP:
174
-			C.ListenAddr.IP = opt.Value.(net.IP)
175
-		case OptionTypeBindPort:
176
-			C.ListenAddr.Port = int(opt.Value.(uint16))
103
+		case OptionTypeBind:
104
+			C.Bind = opt.Value.(*net.TCPAddr)
177
 		case OptionTypePublicIPv4:
105
 		case OptionTypePublicIPv4:
178
-			C.PublicIPv4Addr.IP = opt.Value.(net.IP)
179
-		case OptionTypePublicIPv4Port:
180
-			C.PublicIPv4Addr.Port = int(opt.Value.(uint16))
106
+			C.PublicIPv4 = opt.Value.(*net.TCPAddr)
181
 		case OptionTypePublicIPv6:
107
 		case OptionTypePublicIPv6:
182
-			C.PublicIPv6Addr.IP = opt.Value.(net.IP)
183
-		case OptionTypePublicIPv6Port:
184
-			C.PublicIPv6Addr.Port = int(opt.Value.(uint16))
185
-		case OptionTypeStatsIP:
186
-			C.StatsAddr.IP = opt.Value.(net.IP)
187
-		case OptionTypeStatsPort:
188
-			C.StatsAddr.Port = int(opt.Value.(uint16))
189
-		case OptionTypeStatsdIP:
190
-			C.StatsdStats.Addr.IP = opt.Value.(net.IP)
191
-		case OptionTypeStatsdPort:
192
-			C.StatsdStats.Addr.Port = int(opt.Value.(uint16))
108
+			C.PublicIPv6 = opt.Value.(*net.TCPAddr)
109
+		case OptionTypeStatsBind:
110
+			C.StatsBind = opt.Value.(*net.TCPAddr)
111
+		case OptionTypeStatsNamespace:
112
+			C.StatsNamespace = opt.Value.(string)
113
+		case OptionTypeStatsdAddress:
114
+			C.StatsdAddr = opt.Value.(*net.TCPAddr)
193
 		case OptionTypeStatsdNetwork:
115
 		case OptionTypeStatsdNetwork:
194
-			C.StatsdStats.Addr.net = opt.Value.(string)
195
-		case OptionTypeStatsdPrefix:
196
-			C.StatsdStats.Prefix = opt.Value.(string)
116
+			value := opt.Value.(string)
117
+			switch value {
118
+			case "udp", "tcp":
119
+				C.StatsdNetwork = value
120
+			default:
121
+				return fmt.Errorf("unknown statsd network %v", value)
122
+			}
197
 		case OptionTypeStatsdTagsFormat:
123
 		case OptionTypeStatsdTagsFormat:
198
 			value := opt.Value.(string)
124
 			value := opt.Value.(string)
199
 			switch value {
125
 			switch value {
200
 			case "datadog":
126
 			case "datadog":
201
-				C.StatsdStats.TagsFormat = statsd.Datadog
127
+				C.StatsdTagsFormat = statsd.Datadog
202
 			case "influxdb":
128
 			case "influxdb":
203
-				C.StatsdStats.TagsFormat = statsd.InfluxDB
129
+				C.StatsdTagsFormat = statsd.InfluxDB
204
 			default:
130
 			default:
205
 				return fmt.Errorf("Incorrect statsd tag %s", value)
131
 				return fmt.Errorf("Incorrect statsd tag %s", value)
206
 			}
132
 			}
207
 		case OptionTypeStatsdTags:
133
 		case OptionTypeStatsdTags:
208
-			C.StatsdStats.Tags = opt.Value.(map[string]string)
209
-		case OptionTypePrometheusPrefix:
210
-			C.PrometheusStats.Prefix = opt.Value.(string)
134
+			C.StatsdTags = opt.Value.(map[string]string)
211
 		case OptionTypeWriteBufferSize:
135
 		case OptionTypeWriteBufferSize:
212
-			C.BufferSize.Write = int(opt.Value.(uint32))
136
+			C.WriteBuffer = int(opt.Value.(uint32))
213
 		case OptionTypeReadBufferSize:
137
 		case OptionTypeReadBufferSize:
214
-			C.BufferSize.Read = int(opt.Value.(uint32))
138
+			C.ReadBuffer = int(opt.Value.(uint32))
215
 		case OptionTypeAntiReplayMaxSize:
139
 		case OptionTypeAntiReplayMaxSize:
216
-			C.AntiReplay.MaxSize = opt.Value.(int)
140
+			C.AntiReplayMaxSize = opt.Value.(int)
217
 		case OptionTypeAntiReplayEvictionTime:
141
 		case OptionTypeAntiReplayEvictionTime:
218
-			C.AntiReplay.EvictionTime = opt.Value.(time.Duration)
142
+			C.AntiReplayEvictionTime = opt.Value.(time.Duration)
219
 		case OptionTypeSecret:
143
 		case OptionTypeSecret:
220
 			C.Secret = opt.Value.([]byte)
144
 			C.Secret = opt.Value.([]byte)
221
 		case OptionTypeAdtag:
145
 		case OptionTypeAdtag:
239
 }
163
 }
240
 
164
 
241
 func InitPublicAddress(ctx context.Context) error {
165
 func InitPublicAddress(ctx context.Context) error {
242
-	if C.PublicIPv4Addr.Port == 0 {
243
-		C.PublicIPv4Addr.Port = C.ListenAddr.Port
166
+	if C.PublicIPv4.Port == 0 {
167
+		C.PublicIPv4.Port = C.Bind.Port
244
 	}
168
 	}
245
-	if C.PublicIPv6Addr.Port == 0 {
246
-		C.PublicIPv6Addr.Port = C.ListenAddr.Port
169
+	if C.PublicIPv6.Port == 0 {
170
+		C.PublicIPv6.Port = C.Bind.Port
247
 	}
171
 	}
248
 
172
 
249
-	foundAddress := C.PublicIPv4Addr.IP != nil || C.PublicIPv6Addr.IP != nil
250
-	if C.PublicIPv4Addr.IP == nil {
173
+	foundAddress := C.PublicIPv4.IP != nil || C.PublicIPv6.IP != nil
174
+	if C.PublicIPv4.IP == nil {
251
 		ip, err := getGlobalIPv4(ctx)
175
 		ip, err := getGlobalIPv4(ctx)
252
 		if err != nil {
176
 		if err != nil {
253
 			zap.S().Warnw("Cannot resolve public address", "error", err)
177
 			zap.S().Warnw("Cannot resolve public address", "error", err)
254
 		} else {
178
 		} else {
255
-			C.PublicIPv4Addr.IP = ip
179
+			C.PublicIPv4.IP = ip
256
 			foundAddress = true
180
 			foundAddress = true
257
 		}
181
 		}
258
 	}
182
 	}
259
-	if C.PublicIPv6Addr.IP == nil {
183
+	if C.PublicIPv6.IP == nil {
260
 		ip, err := getGlobalIPv6(ctx)
184
 		ip, err := getGlobalIPv6(ctx)
261
 		if err != nil {
185
 		if err != nil {
262
 			zap.S().Warnw("Cannot resolve public address", "error", err)
186
 			zap.S().Warnw("Cannot resolve public address", "error", err)
263
 		} else {
187
 		} else {
264
-			C.PublicIPv6Addr.IP = ip
188
+			C.PublicIPv6.IP = ip
265
 			foundAddress = true
189
 			foundAddress = true
266
 		}
190
 		}
267
 	}
191
 	}
272
 
196
 
273
 	return nil
197
 	return nil
274
 }
198
 }
199
+
200
+func Printable() interface{} {
201
+	data, err := json.Marshal(C)
202
+	if err != nil {
203
+		panic(err)
204
+	}
205
+
206
+	rv := map[string]interface{}{}
207
+	if err := json.Unmarshal(data, &rv); err != nil {
208
+		panic(err)
209
+	}
210
+
211
+	rrv, err := json.Marshal(rv)
212
+	if err != nil {
213
+		panic(err)
214
+	}
215
+
216
+	return rrv
217
+}

+ 4
- 3
config/urls.go Wyświetl plik

2
 
2
 
3
 import (
3
 import (
4
 	"encoding/hex"
4
 	"encoding/hex"
5
+	"fmt"
5
 	"net/url"
6
 	"net/url"
6
 )
7
 )
7
 
8
 
27
 		secret = "dd" + hex.EncodeToString(C.Secret)
28
 		secret = "dd" + hex.EncodeToString(C.Secret)
28
 	}
29
 	}
29
 
30
 
30
-	urls.IPv4 = makeURLs(&C.PublicIPv4Addr, secret)
31
-	urls.IPv6 = makeURLs(&C.PublicIPv6Addr, secret)
31
+	urls.IPv4 = makeURLs(C.PublicIPv4, secret)
32
+	urls.IPv6 = makeURLs(C.PublicIPv6, secret)
32
 	urls.BotSecret = secret
33
 	urls.BotSecret = secret
33
 
34
 
34
 	return urls
35
 	return urls
35
 }
36
 }
36
 
37
 
37
-func makeURLs(addr *Addr, secret string) (urls URLs) {
38
+func makeURLs(addr fmt.Stringer, secret string) (urls URLs) {
38
 	values := url.Values{}
39
 	values := url.Values{}
39
 	values.Set("address", addr.String())
40
 	values.Set("address", addr.String())
40
 	values.Set("secret", secret)
41
 	values.Set("secret", secret)

+ 1
- 1
hub/connection.go Wyświetl plik

52
 func (c *connection) shutdown() {
52
 func (c *connection) shutdown() {
53
 	c.shutdownOnce.Do(func() {
53
 	c.shutdownOnce.Do(func() {
54
 		close(c.done)
54
 		close(c.done)
55
-			c.hub.channelBrokenSockets <- c.id
55
+		c.hub.channelBrokenSockets <- c.id
56
 	})
56
 	})
57
 }
57
 }
58
 
58
 

+ 0
- 1
hub/ctx_channel.go Wyświetl plik

2
 
2
 
3
 import (
3
 import (
4
 	"context"
4
 	"context"
5
-	"errors"
6
 	"time"
5
 	"time"
7
 
6
 
8
 	"github.com/9seconds/mtg/conntypes"
7
 	"github.com/9seconds/mtg/conntypes"

+ 31
- 67
main.go Wyświetl plik

36
 		Short('v').
36
 		Short('v').
37
 		Envar("MTG_VERBOSE").
37
 		Envar("MTG_VERBOSE").
38
 		Bool()
38
 		Bool()
39
-	proxyBindIP = proxyCommand.Flag("bind-ip",
40
-		"Which IP to bind to.").
39
+	proxyBind = proxyCommand.Flag("bind",
40
+		"Host:Port to bind proxy to.").
41
 		Short('b').
41
 		Short('b').
42
-		Envar("MTG_IP").
43
-		Default("127.0.0.1").
44
-		IP()
45
-	proxyBindPort = proxyCommand.Flag("bind-port",
46
-		"Which port to bind to.").
47
-		Short('p').
48
-		Envar("MTG_PORT").
49
-		Default("3128").
50
-		Uint16()
42
+		Envar("MTG_BIND").
43
+		Default("0.0.0.0:3128").
44
+		TCP()
51
 	proxyPublicIPv4 = proxyCommand.Flag("public-ipv4",
45
 	proxyPublicIPv4 = proxyCommand.Flag("public-ipv4",
52
-		"Which IPv4 address is public.").
46
+		"Which IPv4 host:port to use.").
53
 		Short('4').
47
 		Short('4').
54
 		Envar("MTG_IPV4").
48
 		Envar("MTG_IPV4").
55
-		IP()
56
-	proxyPublicIPv4Port = proxyCommand.Flag("public-ipv4-port",
57
-		"Which IPv4 port is public. Default is 'bind-port' value.").
58
-		Envar("MTG_IPV4_PORT").
59
-		Uint16()
49
+		TCP()
60
 	proxyPublicIPv6 = proxyCommand.Flag("public-ipv6",
50
 	proxyPublicIPv6 = proxyCommand.Flag("public-ipv6",
61
-		"Which IPv6 address is public.").
51
+		"Which IPv6 host:port to use.").
62
 		Short('6').
52
 		Short('6').
63
 		Envar("MTG_IPV6").
53
 		Envar("MTG_IPV6").
64
-		IP()
65
-	proxyPublicIPv6Port = proxyCommand.Flag("public-ipv6-port",
66
-		"Which IPv6 port is public. Default is 'bind-port' value.").
67
-		Envar("MTG_IPV6_PORT").
68
-		Uint16()
69
-	proxyStatsIP = proxyCommand.Flag("stats-ip",
70
-		"Which IP bind stats server to.").
54
+		TCP()
55
+	proxyStatsBind = proxyCommand.Flag("stats-bind",
56
+		"Which Host:Port to bind stats server to.").
71
 		Short('t').
57
 		Short('t').
72
-		Envar("MTG_STATS_IP").
73
-		Default("127.0.0.1").
74
-		IP()
75
-	proxyStatsPort = proxyCommand.Flag("stats-port",
76
-		"Which port bind stats to.").
77
-		Short('q').
78
-		Envar("MTG_STATS_PORT").
79
-		Default("3129").
80
-		Uint16()
81
-	proxyStatsdIP = proxyCommand.Flag("statsd-ip",
82
-		"Which IP should we use for working with statsd.").
83
-		Envar("MTG_STATSD_IP").
84
-		IP()
85
-	proxyStatsdPort = proxyCommand.Flag("statsd-port",
86
-		"Which port should we use for working with statsd.").
87
-		Envar("MTG_STATSD_PORT").
88
-		Default("8125").
89
-		Uint16()
58
+		Envar("MTG_STATS_BIND").
59
+		Default("127.0.0.1:3129").
60
+		TCP()
61
+	proxyStatsNamespace = proxyCommand.Flag("prometheus-namespace",
62
+		"Which namespace to use for Prometheus.").
63
+		Envar("MTG_STATS_NAMESPACE").
64
+		Default("mtg").
65
+		String()
66
+	proxyStatsdAddress = proxyCommand.Flag("statsd-addr",
67
+		"Host:port of statsd server").
68
+		Envar("MTG_STATSD_ADDR").
69
+		TCP()
90
 	proxyStatsdNetwork = proxyCommand.Flag("statsd-network",
70
 	proxyStatsdNetwork = proxyCommand.Flag("statsd-network",
91
 		"Which network is used to work with statsd. Only 'tcp' and 'udp' are supported.").
71
 		"Which network is used to work with statsd. Only 'tcp' and 'udp' are supported.").
92
 		Envar("MTG_STATSD_NETWORK").
72
 		Envar("MTG_STATSD_NETWORK").
93
 		Default("udp").
73
 		Default("udp").
94
 		Enum("udp", "tcp")
74
 		Enum("udp", "tcp")
95
-	proxyStatsdPrefix = proxyCommand.Flag("statsd-prefix",
96
-		"Which bucket prefix should we use for sending stats to statsd.").
97
-		Envar("MTG_STATSD_PREFIX").
98
-		Default("mtg").
99
-		String()
100
 	proxyStatsdTagsFormat = proxyCommand.Flag("statsd-tags-format",
75
 	proxyStatsdTagsFormat = proxyCommand.Flag("statsd-tags-format",
101
 		"Which tag format should we use to send stats metrics. Valid options are 'datadog' and 'influxdb'.").
76
 		"Which tag format should we use to send stats metrics. Valid options are 'datadog' and 'influxdb'.").
102
 		Envar("MTG_STATSD_TAGS_FORMAT").
77
 		Envar("MTG_STATSD_TAGS_FORMAT").
106
 		"Tags to use for working with statsd (specified as 'key=value').").
81
 		"Tags to use for working with statsd (specified as 'key=value').").
107
 		Envar("MTG_STATSD_TAGS").
82
 		Envar("MTG_STATSD_TAGS").
108
 		StringMap()
83
 		StringMap()
109
-	proxyPrometheusPrefix = proxyCommand.Flag("prometheus-prefix",
110
-		"Which namespace to use to send stats to Prometheus.").
111
-		Envar("MTG_PROMETHEUS_PREFIX").
112
-		Default("mtg").
113
-		String()
114
 	proxyWriteBufferSize = proxyCommand.Flag("write-buffer",
84
 	proxyWriteBufferSize = proxyCommand.Flag("write-buffer",
115
 		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
85
 		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
116
 		Short('w').
86
 		Short('w').
117
 		Envar("MTG_BUFFER_WRITE").
87
 		Envar("MTG_BUFFER_WRITE").
118
-		Default("65536").
119
-		Uint32()
88
+		Default("65536KB").
89
+		Bytes()
120
 	proxyReadBufferSize = proxyCommand.Flag("read-buffer",
90
 	proxyReadBufferSize = proxyCommand.Flag("read-buffer",
121
 		"Read buffer size in bytes. You can think about it as a buffer from Telegram to client.").
91
 		"Read buffer size in bytes. You can think about it as a buffer from Telegram to client.").
122
 		Short('r').
92
 		Short('r').
123
 		Envar("MTG_BUFFER_READ").
93
 		Envar("MTG_BUFFER_READ").
124
-		Default("131072").
125
-		Uint32()
94
+		Default("131072KB").
95
+		Bytes()
126
 	proxyAntiReplayMaxSize = proxyCommand.Flag("anti-replay-max-size",
96
 	proxyAntiReplayMaxSize = proxyCommand.Flag("anti-replay-max-size",
127
 		"Max size of antireplay cache in megabytes.").
97
 		"Max size of antireplay cache in megabytes.").
128
 		Envar("MTG_ANTIREPLAY_MAXSIZE").
98
 		Envar("MTG_ANTIREPLAY_MAXSIZE").
154
 		err := config.Init(
124
 		err := config.Init(
155
 			config.Opt{Option: config.OptionTypeDebug, Value: *proxyDebug},
125
 			config.Opt{Option: config.OptionTypeDebug, Value: *proxyDebug},
156
 			config.Opt{Option: config.OptionTypeVerbose, Value: *proxyVerbose},
126
 			config.Opt{Option: config.OptionTypeVerbose, Value: *proxyVerbose},
157
-			config.Opt{Option: config.OptionTypeBindIP, Value: *proxyBindIP},
158
-			config.Opt{Option: config.OptionTypeBindPort, Value: *proxyBindPort},
127
+			config.Opt{Option: config.OptionTypeBind, Value: *proxyBind},
159
 			config.Opt{Option: config.OptionTypePublicIPv4, Value: *proxyPublicIPv4},
128
 			config.Opt{Option: config.OptionTypePublicIPv4, Value: *proxyPublicIPv4},
160
-			config.Opt{Option: config.OptionTypePublicIPv4Port, Value: *proxyPublicIPv4Port},
161
 			config.Opt{Option: config.OptionTypePublicIPv6, Value: *proxyPublicIPv6},
129
 			config.Opt{Option: config.OptionTypePublicIPv6, Value: *proxyPublicIPv6},
162
-			config.Opt{Option: config.OptionTypePublicIPv6Port, Value: *proxyPublicIPv6Port},
163
-			config.Opt{Option: config.OptionTypeStatsIP, Value: *proxyStatsIP},
164
-			config.Opt{Option: config.OptionTypeStatsPort, Value: *proxyStatsPort},
165
-			config.Opt{Option: config.OptionTypeStatsdIP, Value: *proxyStatsdIP},
166
-			config.Opt{Option: config.OptionTypeStatsdPort, Value: *proxyStatsdPort},
130
+			config.Opt{Option: config.OptionTypeStatsBind, Value: *proxyStatsBind},
131
+			config.Opt{Option: config.OptionTypeStatsNamespace, Value: *proxyStatsNamespace},
132
+			config.Opt{Option: config.OptionTypeStatsdAddress, Value: *proxyStatsdAddress},
167
 			config.Opt{Option: config.OptionTypeStatsdNetwork, Value: *proxyStatsdNetwork},
133
 			config.Opt{Option: config.OptionTypeStatsdNetwork, Value: *proxyStatsdNetwork},
168
-			config.Opt{Option: config.OptionTypeStatsdPrefix, Value: *proxyStatsdPrefix},
169
 			config.Opt{Option: config.OptionTypeStatsdTagsFormat, Value: *proxyStatsdTagsFormat},
134
 			config.Opt{Option: config.OptionTypeStatsdTagsFormat, Value: *proxyStatsdTagsFormat},
170
 			config.Opt{Option: config.OptionTypeStatsdTags, Value: *proxyStatsdTags},
135
 			config.Opt{Option: config.OptionTypeStatsdTags, Value: *proxyStatsdTags},
171
-			config.Opt{Option: config.OptionTypePrometheusPrefix, Value: *proxyPrometheusPrefix},
172
 			config.Opt{Option: config.OptionTypeWriteBufferSize, Value: *proxyWriteBufferSize},
136
 			config.Opt{Option: config.OptionTypeWriteBufferSize, Value: *proxyWriteBufferSize},
173
 			config.Opt{Option: config.OptionTypeReadBufferSize, Value: *proxyReadBufferSize},
137
 			config.Opt{Option: config.OptionTypeReadBufferSize, Value: *proxyReadBufferSize},
174
 			config.Opt{Option: config.OptionTypeAntiReplayMaxSize, Value: *proxyAntiReplayMaxSize},
138
 			config.Opt{Option: config.OptionTypeAntiReplayMaxSize, Value: *proxyAntiReplayMaxSize},

+ 3
- 4
proxy/proxy.go Wyświetl plik

47
 	defer func() {
47
 	defer func() {
48
 		conn.Close()
48
 		conn.Close()
49
 		if err := recover(); err != nil {
49
 		if err := recover(); err != nil {
50
-			stats.S.Crash()
50
+			stats.Stats.Crash()
51
 			p.Logger.Errorw("Crash of accept handler", "error", err)
51
 			p.Logger.Errorw("Crash of accept handler", "error", err)
52
 		}
52
 		}
53
 	}()
53
 	}()
66
 	clientConn := wrappers.NewClientConn(conn, connID)
66
 	clientConn := wrappers.NewClientConn(conn, connID)
67
 	clientConn = wrappers.NewCtx(ctx, cancel, clientConn)
67
 	clientConn = wrappers.NewCtx(ctx, cancel, clientConn)
68
 	clientConn = wrappers.NewTimeout(clientConn)
68
 	clientConn = wrappers.NewTimeout(clientConn)
69
-	clientConn = wrappers.NewTraffic(clientConn)
70
 	defer clientConn.Close()
69
 	defer clientConn.Close()
71
 
70
 
72
 	clientProtocol := p.ClientProtocolMaker()
71
 	clientProtocol := p.ClientProtocolMaker()
76
 		return
75
 		return
77
 	}
76
 	}
78
 
77
 
79
-	stats.S.ClientConnected(clientProtocol.ConnectionType(), clientConn.RemoteAddr())
80
-	defer stats.S.ClientDisconnected(clientProtocol.ConnectionType(), clientConn.RemoteAddr())
78
+	stats.Stats.ClientConnected(clientProtocol.ConnectionType(), clientConn.RemoteAddr())
79
+	defer stats.Stats.ClientDisconnected(clientProtocol.ConnectionType(), clientConn.RemoteAddr())
81
 	logger.Infow("Client connected", "addr", conn.RemoteAddr())
80
 	logger.Infow("Client connected", "addr", conn.RemoteAddr())
82
 
81
 
83
 	req := &protocol.TelegramRequest{
82
 	req := &protocol.TelegramRequest{

+ 50
- 0
stats/interfaces.go Wyświetl plik

1
+package stats
2
+
3
+import (
4
+	"net"
5
+
6
+	"github.com/9seconds/mtg/conntypes"
7
+)
8
+
9
+type IngressTrafficInterface interface {
10
+	IngressTraffic(int)
11
+}
12
+
13
+type EgressTrafficInterface interface {
14
+	EgressTraffic(int)
15
+}
16
+
17
+type ClientConnectedInterface interface {
18
+	ClientConnected(conntypes.ConnectionType, *net.TCPAddr)
19
+}
20
+
21
+type ClientDisconnectedInterface interface {
22
+	ClientDisconnected(conntypes.ConnectionType, *net.TCPAddr)
23
+}
24
+
25
+type TelegramConnectedInterface interface {
26
+	TelegramConnected(conntypes.DC, *net.TCPAddr)
27
+}
28
+
29
+type TelegramDisconnectedInterface interface {
30
+	TelegramDisconnected(conntypes.DC, *net.TCPAddr)
31
+}
32
+
33
+type CrashInterface interface {
34
+	Crash()
35
+}
36
+
37
+type AntiReplayDetectedInterface interface {
38
+	AntiReplayDetected()
39
+}
40
+
41
+type Interface interface {
42
+	IngressTrafficInterface
43
+	EgressTrafficInterface
44
+	ClientConnectedInterface
45
+	ClientDisconnectedInterface
46
+	TelegramConnectedInterface
47
+	TelegramDisconnectedInterface
48
+	CrashInterface
49
+	AntiReplayDetectedInterface
50
+}

+ 57
- 0
stats/multi_stats.go Wyświetl plik

1
+package stats
2
+
3
+import (
4
+	"net"
5
+
6
+	"github.com/9seconds/mtg/conntypes"
7
+)
8
+
9
+type multiStats []Interface
10
+
11
+func (m multiStats) IngressTraffic(traffic int) {
12
+	for i := range m {
13
+		go m[i].IngressTraffic(traffic)
14
+	}
15
+}
16
+
17
+func (m multiStats) EgressTraffic(traffic int) {
18
+	for i := range m {
19
+		go m[i].EgressTraffic(traffic)
20
+	}
21
+}
22
+
23
+func (m multiStats) ClientConnected(connectionType conntypes.ConnectionType, addr *net.TCPAddr) {
24
+	for i := range m {
25
+		go m[i].ClientConnected(connectionType, addr)
26
+	}
27
+}
28
+
29
+func (m multiStats) ClientDisconnected(connectionType conntypes.ConnectionType, addr *net.TCPAddr) {
30
+	for i := range m {
31
+		go m[i].ClientDisconnected(connectionType, addr)
32
+	}
33
+}
34
+
35
+func (m multiStats) TelegramConnected(dc conntypes.DC, addr *net.TCPAddr) {
36
+	for i := range m {
37
+		go m[i].TelegramConnected(dc, addr)
38
+	}
39
+}
40
+
41
+func (m multiStats) TelegramDisconnected(dc conntypes.DC, addr *net.TCPAddr) {
42
+	for i := range m {
43
+		go m[i].TelegramDisconnected(dc, addr)
44
+	}
45
+}
46
+
47
+func (m multiStats) Crash() {
48
+	for i := range m {
49
+		go m[i].Crash()
50
+	}
51
+}
52
+
53
+func (m multiStats) AntiReplayDetected() {
54
+	for i := range m {
55
+		go m[i].AntiReplayDetected()
56
+	}
57
+}

+ 5
- 54
stats/stats.go Wyświetl plik

7
 	"net/http"
7
 	"net/http"
8
 
8
 
9
 	"github.com/9seconds/mtg/config"
9
 	"github.com/9seconds/mtg/config"
10
-	"github.com/9seconds/mtg/conntypes"
11
 )
10
 )
12
 
11
 
13
-type Stats interface {
14
-	IngressTraffic(int)
15
-	EgressTraffic(int)
16
-	ClientConnected(conntypes.ConnectionType, *net.TCPAddr)
17
-	ClientDisconnected(conntypes.ConnectionType, *net.TCPAddr)
18
-	Crash()
19
-	AntiReplayDetected()
20
-}
21
-
22
-type multiStats []Stats
23
-
24
-func (m multiStats) IngressTraffic(traffic int) {
25
-	for i := range m {
26
-		go m[i].IngressTraffic(traffic)
27
-	}
28
-}
29
-
30
-func (m multiStats) EgressTraffic(traffic int) {
31
-	for i := range m {
32
-		go m[i].EgressTraffic(traffic)
33
-	}
34
-}
35
-
36
-func (m multiStats) ClientConnected(connectionType conntypes.ConnectionType, addr *net.TCPAddr) {
37
-	for i := range m {
38
-		go m[i].ClientConnected(connectionType, addr)
39
-	}
40
-}
41
-
42
-func (m multiStats) ClientDisconnected(connectionType conntypes.ConnectionType, addr *net.TCPAddr) {
43
-	for i := range m {
44
-		go m[i].ClientDisconnected(connectionType, addr)
45
-	}
46
-}
47
-
48
-func (m multiStats) Crash() {
49
-	for i := range m {
50
-		go m[i].Crash()
51
-	}
52
-}
53
-
54
-func (m multiStats) AntiReplayDetected() {
55
-	for i := range m {
56
-		go m[i].AntiReplayDetected()
57
-	}
58
-}
59
-
60
-var S Stats
12
+var Stats Interface
61
 
13
 
62
 func Init(ctx context.Context) error {
14
 func Init(ctx context.Context) error {
63
 	mux := http.NewServeMux()
15
 	mux := http.NewServeMux()
64
 
16
 
65
-	instanceJSON := newStatsJSON(mux)
66
 	instancePrometheus, err := newStatsPrometheus(mux)
17
 	instancePrometheus, err := newStatsPrometheus(mux)
67
 	if err != nil {
18
 	if err != nil {
68
 		return fmt.Errorf("cannot initialize prometheus: %w", err)
19
 		return fmt.Errorf("cannot initialize prometheus: %w", err)
69
 	}
20
 	}
70
 
21
 
71
-	stats := []Stats{instanceJSON, instancePrometheus}
72
-	if config.C.StatsdStats.Addr.IP != nil {
22
+	stats := []Interface{instancePrometheus}
23
+	if config.C.StatsdAddr != nil {
73
 		instanceStatsd, err := newStatsStatsd()
24
 		instanceStatsd, err := newStatsStatsd()
74
 		if err != nil {
25
 		if err != nil {
75
 			return fmt.Errorf("cannot inialize statsd: %w", err)
26
 			return fmt.Errorf("cannot inialize statsd: %w", err)
77
 		stats = append(stats, instanceStatsd)
28
 		stats = append(stats, instanceStatsd)
78
 	}
29
 	}
79
 
30
 
80
-	listener, err := net.Listen("tcp", config.C.StatsAddr.String())
31
+	listener, err := net.Listen("tcp", config.C.StatsBind.String())
81
 	if err != nil {
32
 	if err != nil {
82
 		return fmt.Errorf("cannot initialize stats server: %w", err)
33
 		return fmt.Errorf("cannot initialize stats server: %w", err)
83
 	}
34
 	}
91
 		srv.Shutdown(context.Background()) // nolint: errcheck
42
 		srv.Shutdown(context.Background()) // nolint: errcheck
92
 	}()
43
 	}()
93
 
44
 
94
-	S = multiStats(stats)
45
+	Stats = multiStats(stats)
95
 
46
 
96
 	return nil
47
 	return nil
97
 }
48
 }

+ 0
- 131
stats/stats_json.go Wyświetl plik

1
-package stats
2
-
3
-import (
4
-	"encoding/json"
5
-	"net"
6
-	"net/http"
7
-	"strconv"
8
-	"sync/atomic"
9
-	"time"
10
-
11
-	"go.uber.org/zap"
12
-
13
-	"github.com/9seconds/mtg/conntypes"
14
-)
15
-
16
-type statsJSON struct {
17
-	Connections statsJSONConnections `json:"connections"`
18
-	Traffic     statsJSONTraffic     `json:"traffic"`
19
-	Uptime      statsJSONUptime      `json:"uptime"`
20
-	Crashes     uint32               `json:"crashes"`
21
-	AntiReplays uint32               `json:"anti_replay_detected"`
22
-}
23
-
24
-type statsBaseJSONConnections struct {
25
-	All          statsJSONConnectionType `json:"all"`
26
-	Abridged     statsJSONConnectionType `json:"abridged"`
27
-	Intermediate statsJSONConnectionType `json:"intermediate"`
28
-	Secured      statsJSONConnectionType `json:"secured"`
29
-}
30
-
31
-type statsJSONConnections struct {
32
-	statsBaseJSONConnections
33
-}
34
-
35
-type statsJSONConnectionType struct {
36
-	IPv4 uint32 `json:"ipv4"`
37
-	IPv6 uint32 `json:"ipv6"`
38
-}
39
-
40
-func (c statsJSONConnections) MarshalJSON() ([]byte, error) {
41
-	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4 + c.Secured.IPv4
42
-	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6 + c.Secured.IPv6
43
-
44
-	return json.Marshal(c.statsBaseJSONConnections)
45
-}
46
-
47
-type statsJSONTraffic struct {
48
-	Ingress uint64 `json:"ingress"`
49
-	Egress  uint64 `json:"egress"`
50
-}
51
-
52
-type statsJSONUptime time.Time
53
-
54
-func (s statsJSONUptime) MarshalJSON() ([]byte, error) {
55
-	seconds := strconv.Itoa(int(time.Since(time.Time(s)).Seconds()))
56
-	return []byte(seconds), nil
57
-}
58
-
59
-func (s *statsJSON) IngressTraffic(traffic int) {
60
-	atomic.AddUint64(&s.Traffic.Ingress, uint64(traffic))
61
-}
62
-
63
-func (s *statsJSON) EgressTraffic(traffic int) {
64
-	atomic.AddUint64(&s.Traffic.Egress, uint64(traffic))
65
-}
66
-
67
-func (s *statsJSON) ClientConnected(connectionType conntypes.ConnectionType, addr *net.TCPAddr) {
68
-	s.changeConnections(connectionType, addr, 1)
69
-}
70
-
71
-func (s *statsJSON) ClientDisconnected(connectionType conntypes.ConnectionType, addr *net.TCPAddr) {
72
-	s.changeConnections(connectionType, addr, ^uint32(0))
73
-}
74
-
75
-func (s *statsJSON) changeConnections(connectionType conntypes.ConnectionType, addr *net.TCPAddr, value uint32) {
76
-	var connections *statsJSONConnectionType
77
-
78
-	switch connectionType {
79
-	case conntypes.ConnectionTypeAbridged:
80
-		connections = &s.Connections.Abridged
81
-	case conntypes.ConnectionTypeSecure:
82
-		connections = &s.Connections.Secured
83
-	default:
84
-		connections = &s.Connections.Intermediate
85
-	}
86
-
87
-	if addr.IP.To4() != nil {
88
-		atomic.AddUint32(&connections.IPv4, value)
89
-	} else {
90
-		atomic.AddUint32(&connections.IPv6, value)
91
-	}
92
-}
93
-
94
-func (s *statsJSON) Crash() {
95
-	atomic.AddUint32(&s.Crashes, 1)
96
-}
97
-
98
-func (s *statsJSON) AntiReplayDetected() {
99
-	atomic.AddUint32(&s.AntiReplays, 1)
100
-}
101
-
102
-func newStatsJSON(mux *http.ServeMux) Stats {
103
-	instance := &statsJSON{
104
-		Uptime: statsJSONUptime(time.Now()),
105
-	}
106
-	logger := zap.S().Named("stats")
107
-
108
-	mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
109
-		w.Header().Set("Content-Type", "application/json")
110
-		first, err := json.Marshal(instance)
111
-		if err != nil {
112
-			logger.Errorw("Cannot encode json", "error", err)
113
-			http.Error(w, "Internal server error", http.StatusServiceUnavailable)
114
-			return
115
-		}
116
-
117
-		interim := map[string]interface{}{}
118
-		if err := json.Unmarshal(first, &interim); err != nil {
119
-			panic(err)
120
-		}
121
-
122
-		encoder := json.NewEncoder(w)
123
-		encoder.SetEscapeHTML(false)
124
-		encoder.SetIndent("", "  ")
125
-		if err := encoder.Encode(interim); err != nil {
126
-			logger.Errorw("Cannot encode json", "error", err)
127
-		}
128
-	})
129
-
130
-	return instance
131
-}

+ 46
- 17
stats/stats_prometheus.go Wyświetl plik

4
 	"fmt"
4
 	"fmt"
5
 	"net"
5
 	"net"
6
 	"net/http"
6
 	"net/http"
7
+	"strconv"
7
 
8
 
8
 	"github.com/prometheus/client_golang/prometheus"
9
 	"github.com/prometheus/client_golang/prometheus"
9
 	"github.com/prometheus/client_golang/prometheus/promhttp"
10
 	"github.com/prometheus/client_golang/prometheus/promhttp"
13
 )
14
 )
14
 
15
 
15
 type statsPrometheus struct {
16
 type statsPrometheus struct {
16
-	connections *prometheus.GaugeVec
17
-	traffic     *prometheus.GaugeVec
18
-	crashes     prometheus.Gauge
19
-	antiReplays prometheus.Gauge
17
+	connections         *prometheus.GaugeVec
18
+	telegramConnections *prometheus.GaugeVec
19
+	traffic             *prometheus.GaugeVec
20
+	crashes             prometheus.Gauge
21
+	antiReplays         prometheus.Counter
20
 }
22
 }
21
 
23
 
22
 func (s *statsPrometheus) IngressTraffic(traffic int) {
24
 func (s *statsPrometheus) IngressTraffic(traffic int) {
38
 func (s *statsPrometheus) changeConnections(connectionType conntypes.ConnectionType,
40
 func (s *statsPrometheus) changeConnections(connectionType conntypes.ConnectionType,
39
 	addr *net.TCPAddr,
41
 	addr *net.TCPAddr,
40
 	increment float64) {
42
 	increment float64) {
41
-	var labels [2]string
43
+	labels := [...]string{
44
+		"intermediate",
45
+		"ipv4",
46
+	}
42
 
47
 
43
 	switch connectionType {
48
 	switch connectionType {
44
 	case conntypes.ConnectionTypeAbridged:
49
 	case conntypes.ConnectionTypeAbridged:
45
 		labels[0] = "abridged"
50
 		labels[0] = "abridged"
46
 	case conntypes.ConnectionTypeSecure:
51
 	case conntypes.ConnectionTypeSecure:
47
 		labels[0] = "secured"
52
 		labels[0] = "secured"
48
-	default:
49
-		labels[0] = "intermediate"
50
 	}
53
 	}
51
 
54
 
52
-	labels[1] = "ipv4"
55
+	if addr.IP.To4() == nil {
56
+		labels[1] = "ipv6" // nolint: goconst
57
+	}
58
+
59
+	s.connections.WithLabelValues(labels[:]...).Add(increment)
60
+}
61
+
62
+func (s *statsPrometheus) TelegramConnected(dc conntypes.DC, addr *net.TCPAddr) {
63
+	s.changeTelegramConnections(dc, addr, 1.0)
64
+}
65
+
66
+func (s *statsPrometheus) TelegramDisconnected(dc conntypes.DC, addr *net.TCPAddr) {
67
+	s.changeTelegramConnections(dc, addr, -1.0)
68
+}
69
+
70
+func (s *statsPrometheus) changeTelegramConnections(dc conntypes.DC, addr *net.TCPAddr, increment float64) {
71
+	labels := [...]string{
72
+		strconv.Itoa(int(dc)),
73
+		"ipv4",
74
+	}
75
+
53
 	if addr.IP.To4() == nil {
76
 	if addr.IP.To4() == nil {
54
 		labels[1] = "ipv6"
77
 		labels[1] = "ipv6"
55
 	}
78
 	}
65
 	s.antiReplays.Inc()
88
 	s.antiReplays.Inc()
66
 }
89
 }
67
 
90
 
68
-func newStatsPrometheus(mux *http.ServeMux) (Stats, error) {
69
-	registry := prometheus.NewRegistry()
91
+func newStatsPrometheus(mux *http.ServeMux) (Interface, error) {
92
+	registry := prometheus.NewPedanticRegistry()
93
+
70
 	instance := &statsPrometheus{
94
 	instance := &statsPrometheus{
71
 		connections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
95
 		connections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
72
-			Namespace: config.C.PrometheusStats.Prefix,
96
+			Namespace: config.C.StatsNamespace,
73
 			Name:      "connections",
97
 			Name:      "connections",
74
-			Help:      "Current number of connections to the proxy.",
98
+			Help:      "Current number of client connections to the proxy.",
75
 		}, []string{"type", "protocol"}),
99
 		}, []string{"type", "protocol"}),
100
+		telegramConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
101
+			Namespace: config.C.StatsNamespace,
102
+			Name:      "telegram_connections",
103
+			Help:      "Current number of telegram connections established by this proxy.",
104
+		}, []string{"dc", "protocol"}),
76
 		traffic: prometheus.NewGaugeVec(prometheus.GaugeOpts{
105
 		traffic: prometheus.NewGaugeVec(prometheus.GaugeOpts{
77
-			Namespace: config.C.PrometheusStats.Prefix,
106
+			Namespace: config.C.StatsNamespace,
78
 			Name:      "traffic",
107
 			Name:      "traffic",
79
 			Help:      "Traffic passed through the proxy in bytes.",
108
 			Help:      "Traffic passed through the proxy in bytes.",
80
 		}, []string{"direction"}),
109
 		}, []string{"direction"}),
81
 		crashes: prometheus.NewGauge(prometheus.GaugeOpts{
110
 		crashes: prometheus.NewGauge(prometheus.GaugeOpts{
82
-			Namespace: config.C.PrometheusStats.Prefix,
111
+			Namespace: config.C.StatsNamespace,
83
 			Name:      "crashes",
112
 			Name:      "crashes",
84
 			Help:      "How many crashes happened.",
113
 			Help:      "How many crashes happened.",
85
 		}),
114
 		}),
86
-		antiReplays: prometheus.NewGauge(prometheus.GaugeOpts{
87
-			Namespace: config.C.PrometheusStats.Prefix,
115
+		antiReplays: prometheus.NewCounter(prometheus.CounterOpts{
116
+			Namespace: config.C.StatsNamespace,
88
 			Name:      "anti_replays",
117
 			Name:      "anti_replays",
89
 			Help:      "How many anti replay attacks were prevented.",
118
 			Help:      "How many anti replay attacks were prevented.",
90
 		}),
119
 		}),
104
 	}
133
 	}
105
 
134
 
106
 	handler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
135
 	handler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
107
-	mux.Handle("/prometheus", handler)
136
+	mux.Handle("/", handler)
108
 
137
 
109
 	return instance, nil
138
 	return instance, nil
110
 }
139
 }

+ 36
- 13
stats/stats_statsd.go Wyświetl plik

3
 import (
3
 import (
4
 	"fmt"
4
 	"fmt"
5
 	"net"
5
 	"net"
6
+	"strconv"
6
 	"strings"
7
 	"strings"
7
 
8
 
8
 	"gopkg.in/alexcesaro/statsd.v2"
9
 	"gopkg.in/alexcesaro/statsd.v2"
32
 }
33
 }
33
 
34
 
34
 func (s *statsStatsd) changeConnections(connectionType conntypes.ConnectionType, addr *net.TCPAddr, value int) {
35
 func (s *statsStatsd) changeConnections(connectionType conntypes.ConnectionType, addr *net.TCPAddr, value int) {
35
-	var labels [3]string
36
+	labels := [...]string{
37
+		"connections",
38
+		"intermediate",
39
+		"ipv4",
40
+	}
36
 
41
 
37
-	labels[0] = "connections"
38
 	switch connectionType {
42
 	switch connectionType {
39
 	case conntypes.ConnectionTypeAbridged:
43
 	case conntypes.ConnectionTypeAbridged:
40
 		labels[1] = "abridged"
44
 		labels[1] = "abridged"
41
 	case conntypes.ConnectionTypeSecure:
45
 	case conntypes.ConnectionTypeSecure:
42
 		labels[1] = "secured"
46
 		labels[1] = "secured"
43
-	default:
44
-		labels[1] = "intermediate"
45
 	}
47
 	}
46
 
48
 
47
-	labels[2] = "ipv4"
49
+	if addr.IP.To4() == nil {
50
+		labels[2] = "ipv6"
51
+	}
52
+
53
+	s.client.Count(strings.Join(labels[:], "."), value)
54
+}
55
+
56
+func (s *statsStatsd) TelegramConnected(dc conntypes.DC, addr *net.TCPAddr) {
57
+	s.changeTelegramConnections(dc, addr, 1)
58
+}
59
+
60
+func (s *statsStatsd) TelegramDisconnected(dc conntypes.DC, addr *net.TCPAddr) {
61
+	s.changeTelegramConnections(dc, addr, -1)
62
+}
63
+
64
+func (s *statsStatsd) changeTelegramConnections(dc conntypes.DC, addr *net.TCPAddr, value int) {
65
+	labels := [...]string{
66
+		"telegram",
67
+		strconv.Itoa(int(dc)),
68
+		"ipv4",
69
+	}
70
+
48
 	if addr.IP.To4() == nil {
71
 	if addr.IP.To4() == nil {
49
 		labels[2] = "ipv6"
72
 		labels[2] = "ipv6"
50
 	}
73
 	}
60
 	s.client.Increment("anti_replays")
83
 	s.client.Increment("anti_replays")
61
 }
84
 }
62
 
85
 
63
-func newStatsStatsd() (Stats, error) {
86
+func newStatsStatsd() (Interface, error) {
64
 	options := []statsd.Option{
87
 	options := []statsd.Option{
65
-		statsd.Prefix(config.C.StatsdStats.Prefix),
66
-		statsd.Network(config.C.StatsdStats.Addr.Network()),
67
-		statsd.Address(config.C.StatsdStats.Addr.String()),
68
-		statsd.TagsFormat(config.C.StatsdStats.TagsFormat),
88
+		statsd.Prefix(config.C.StatsNamespace),
89
+		statsd.Network(config.C.StatsdNetwork),
90
+		statsd.Address(config.C.StatsBind.String()),
91
+		statsd.TagsFormat(config.C.StatsdTagsFormat),
69
 	}
92
 	}
70
 
93
 
71
-	if len(config.C.StatsdStats.Tags) > 0 {
72
-		tags := make([]string, len(config.C.StatsdStats.Tags)*2)
73
-		for k, v := range config.C.StatsdStats.Tags {
94
+	if len(config.C.StatsdTags) > 0 {
95
+		tags := make([]string, len(config.C.StatsdTags)*2)
96
+		for k, v := range config.C.StatsdTags {
74
 			tags = append(tags, k, v)
97
 			tags = append(tags, k, v)
75
 		}
98
 		}
76
 		options = append(options, statsd.Tags(tags...))
99
 		options = append(options, statsd.Tags(tags...))

+ 1
- 1
telegram/base.go Wyświetl plik

47
 		return nil, fmt.Errorf("cannot initialize tcp socket: %w", err)
47
 		return nil, fmt.Errorf("cannot initialize tcp socket: %w", err)
48
 	}
48
 	}
49
 
49
 
50
-	return wrappers.NewTelegramConn(conn), nil
50
+	return wrappers.NewTelegramConn(dc, conn), nil
51
 }
51
 }
52
 
52
 
53
 func (b *baseTelegram) chooseAddress(addresses map[conntypes.DC][]string,
53
 func (b *baseTelegram) chooseAddress(addresses map[conntypes.DC][]string,

+ 2
- 2
utils/init_tcp.go Wyświetl plik

13
 	if err := tcpConn.SetNoDelay(true); err != nil {
13
 	if err := tcpConn.SetNoDelay(true); err != nil {
14
 		return fmt.Errorf("cannot set TCP_NO_DELAY: %w", err)
14
 		return fmt.Errorf("cannot set TCP_NO_DELAY: %w", err)
15
 	}
15
 	}
16
-	if err := tcpConn.SetReadBuffer(config.C.BufferSize.Read); err != nil {
16
+	if err := tcpConn.SetReadBuffer(config.C.ReadBuffer); err != nil {
17
 		return fmt.Errorf("cannot set read buffer size: %w", err)
17
 		return fmt.Errorf("cannot set read buffer size: %w", err)
18
 	}
18
 	}
19
-	if err := tcpConn.SetWriteBuffer(config.C.BufferSize.Write); err != nil {
19
+	if err := tcpConn.SetWriteBuffer(config.C.WriteBuffer); err != nil {
20
 		return fmt.Errorf("cannot set write buffer size: %w", err)
20
 		return fmt.Errorf("cannot set write buffer size: %w", err)
21
 	}
21
 	}
22
 
22
 

+ 21
- 0
wrappers/common.go Wyświetl plik

1
+package wrappers
2
+
3
+import (
4
+	"net"
5
+
6
+	"github.com/9seconds/mtg/conntypes"
7
+)
8
+
9
+func NewClientConn(parent net.Conn, connID conntypes.ConnID) conntypes.StreamReadWriteCloser {
10
+	conn := newConn(parent, connID, connPurposeClient)
11
+	conn = NewTrafficStats(conn)
12
+
13
+	return conn
14
+}
15
+
16
+func NewTelegramConn(dc conntypes.DC, parent net.Conn) conntypes.StreamReadWriteCloser {
17
+	conn := newConn(parent, conntypes.ConnID{}, connPurposeTelegram)
18
+	conn = NewTelegramStats(dc, conn)
19
+
20
+	return conn
21
+}

+ 4
- 13
wrappers/conn.go Wyświetl plik

91
 	localAddr := *parent.LocalAddr().(*net.TCPAddr)
91
 	localAddr := *parent.LocalAddr().(*net.TCPAddr)
92
 
92
 
93
 	if parent.RemoteAddr().(*net.TCPAddr).IP.To4() != nil {
93
 	if parent.RemoteAddr().(*net.TCPAddr).IP.To4() != nil {
94
-		if config.C.PublicIPv4Addr.IP != nil {
95
-			localAddr.IP = config.C.PublicIPv4Addr.IP
94
+		if config.C.PublicIPv4.IP != nil {
95
+			localAddr.IP = config.C.PublicIPv4.IP
96
 		}
96
 		}
97
-	} else if config.C.PublicIPv6Addr.IP != nil {
98
-		localAddr.IP = config.C.PublicIPv6Addr.IP
97
+	} else if config.C.PublicIPv6.IP != nil {
98
+		localAddr.IP = config.C.PublicIPv6.IP
99
 	}
99
 	}
100
 
100
 
101
 	logger := zap.S().With(
101
 	logger := zap.S().With(
115
 		localAddr:  &localAddr,
115
 		localAddr:  &localAddr,
116
 	}
116
 	}
117
 }
117
 }
118
-
119
-func NewClientConn(parent net.Conn,
120
-	connID conntypes.ConnID) conntypes.StreamReadWriteCloser {
121
-	return newConn(parent, connID, connPurposeClient)
122
-}
123
-
124
-func NewTelegramConn(parent net.Conn) conntypes.StreamReadWriteCloser {
125
-	return newConn(parent, conntypes.ConnID{}, connPurposeTelegram)
126
-}

+ 0
- 67
wrappers/stats.go Wyświetl plik

1
-package wrappers
2
-
3
-import (
4
-	"net"
5
-	"time"
6
-
7
-	"go.uber.org/zap"
8
-
9
-	"github.com/9seconds/mtg/conntypes"
10
-	"github.com/9seconds/mtg/stats"
11
-)
12
-
13
-type wrapperStats struct {
14
-	parent conntypes.StreamReadWriteCloser
15
-}
16
-
17
-func (w *wrapperStats) Write(p []byte) (int, error) {
18
-	n, err := w.parent.Write(p)
19
-	stats.S.EgressTraffic(n)
20
-
21
-	return n, err
22
-}
23
-
24
-func (w *wrapperStats) WriteTimeout(p []byte, timeout time.Duration) (int, error) {
25
-	n, err := w.parent.WriteTimeout(p, timeout)
26
-	stats.S.EgressTraffic(n)
27
-
28
-	return n, err
29
-}
30
-
31
-func (w *wrapperStats) Read(p []byte) (int, error) {
32
-	n, err := w.parent.Read(p)
33
-	stats.S.IngressTraffic(n)
34
-
35
-	return n, err
36
-}
37
-
38
-func (w *wrapperStats) ReadTimeout(p []byte, timeout time.Duration) (int, error) {
39
-	n, err := w.parent.ReadTimeout(p, timeout)
40
-	stats.S.IngressTraffic(n)
41
-
42
-	return n, err
43
-}
44
-
45
-func (w *wrapperStats) Conn() net.Conn {
46
-	return w.parent.Conn()
47
-}
48
-
49
-func (w *wrapperStats) Logger() *zap.SugaredLogger {
50
-	return w.parent.Logger().Named("traffic")
51
-}
52
-
53
-func (w *wrapperStats) LocalAddr() *net.TCPAddr {
54
-	return w.parent.LocalAddr()
55
-}
56
-
57
-func (w *wrapperStats) RemoteAddr() *net.TCPAddr {
58
-	return w.parent.RemoteAddr()
59
-}
60
-
61
-func (w *wrapperStats) Close() error {
62
-	return w.parent.Close()
63
-}
64
-
65
-func NewTraffic(parent conntypes.StreamReadWriteCloser) conntypes.StreamReadWriteCloser {
66
-	return &wrapperStats{parent}
67
-}

+ 70
- 0
wrappers/stats_telegram.go Wyświetl plik

1
+package wrappers
2
+
3
+import (
4
+	"net"
5
+	"sync"
6
+	"time"
7
+
8
+	"github.com/9seconds/mtg/conntypes"
9
+	"github.com/9seconds/mtg/stats"
10
+	"go.uber.org/zap"
11
+)
12
+
13
+type wrapperTelegramStats struct {
14
+	parent conntypes.StreamReadWriteCloser
15
+	dc     conntypes.DC
16
+	once   sync.Once
17
+}
18
+
19
+func (w *wrapperTelegramStats) Write(p []byte) (int, error) {
20
+	return w.parent.Write(p)
21
+}
22
+
23
+func (w *wrapperTelegramStats) WriteTimeout(p []byte, timeout time.Duration) (int, error) {
24
+	return w.parent.WriteTimeout(p, timeout)
25
+}
26
+
27
+func (w *wrapperTelegramStats) Read(p []byte) (int, error) {
28
+	return w.parent.Read(p)
29
+}
30
+
31
+func (w *wrapperTelegramStats) ReadTimeout(p []byte, timeout time.Duration) (int, error) {
32
+	return w.parent.ReadTimeout(p, timeout)
33
+}
34
+
35
+func (w *wrapperTelegramStats) Conn() net.Conn {
36
+	return w.parent.Conn()
37
+}
38
+
39
+func (w *wrapperTelegramStats) Logger() *zap.SugaredLogger {
40
+	return w.parent.Logger().Named("stats-telegram")
41
+}
42
+
43
+func (w *wrapperTelegramStats) LocalAddr() *net.TCPAddr {
44
+	return w.parent.LocalAddr()
45
+}
46
+
47
+func (w *wrapperTelegramStats) RemoteAddr() *net.TCPAddr {
48
+	return w.parent.RemoteAddr()
49
+}
50
+
51
+func (w *wrapperTelegramStats) Close() error {
52
+	var err error
53
+
54
+	w.once.Do(func() {
55
+		err = w.parent.Close()
56
+		stats.Stats.TelegramDisconnected(w.dc, w.RemoteAddr())
57
+	})
58
+
59
+	return err
60
+}
61
+
62
+func NewTelegramStats(dc conntypes.DC, parent conntypes.StreamReadWriteCloser) conntypes.StreamReadWriteCloser {
63
+	conn := &wrapperTelegramStats{
64
+		parent: parent,
65
+		dc:     dc,
66
+	}
67
+	stats.Stats.TelegramConnected(dc, parent.RemoteAddr())
68
+
69
+	return conn
70
+}

+ 67
- 0
wrappers/stats_traffic.go Wyświetl plik

1
+package wrappers
2
+
3
+import (
4
+	"net"
5
+	"time"
6
+
7
+	"go.uber.org/zap"
8
+
9
+	"github.com/9seconds/mtg/conntypes"
10
+	"github.com/9seconds/mtg/stats"
11
+)
12
+
13
+type wrapperTrafficStats struct {
14
+	parent conntypes.StreamReadWriteCloser
15
+}
16
+
17
+func (w *wrapperTrafficStats) Write(p []byte) (int, error) {
18
+	n, err := w.parent.Write(p)
19
+	stats.Stats.EgressTraffic(n)
20
+
21
+	return n, err
22
+}
23
+
24
+func (w *wrapperTrafficStats) WriteTimeout(p []byte, timeout time.Duration) (int, error) {
25
+	n, err := w.parent.WriteTimeout(p, timeout)
26
+	stats.Stats.EgressTraffic(n)
27
+
28
+	return n, err
29
+}
30
+
31
+func (w *wrapperTrafficStats) Read(p []byte) (int, error) {
32
+	n, err := w.parent.Read(p)
33
+	stats.Stats.IngressTraffic(n)
34
+
35
+	return n, err
36
+}
37
+
38
+func (w *wrapperTrafficStats) ReadTimeout(p []byte, timeout time.Duration) (int, error) {
39
+	n, err := w.parent.ReadTimeout(p, timeout)
40
+	stats.Stats.IngressTraffic(n)
41
+
42
+	return n, err
43
+}
44
+
45
+func (w *wrapperTrafficStats) Conn() net.Conn {
46
+	return w.parent.Conn()
47
+}
48
+
49
+func (w *wrapperTrafficStats) Logger() *zap.SugaredLogger {
50
+	return w.parent.Logger().Named("stats-traffic")
51
+}
52
+
53
+func (w *wrapperTrafficStats) LocalAddr() *net.TCPAddr {
54
+	return w.parent.LocalAddr()
55
+}
56
+
57
+func (w *wrapperTrafficStats) RemoteAddr() *net.TCPAddr {
58
+	return w.parent.RemoteAddr()
59
+}
60
+
61
+func (w *wrapperTrafficStats) Close() error {
62
+	return w.parent.Close()
63
+}
64
+
65
+func NewTrafficStats(parent conntypes.StreamReadWriteCloser) conntypes.StreamReadWriteCloser {
66
+	return &wrapperTrafficStats{parent}
67
+}

Ładowanie…
Anuluj
Zapisz