Kaynağa Gözat

Merge pull request #27 from 9seconds/statsd

Integration with statsd
tags/0.10
Sergey Arkhipov 7 yıl önce
ebeveyn
işleme
c814e9405d
No account linked to committer's email address
7 değiştirilmiş dosya ile 207 ekleme ve 8 silme
  1. 7
    1
      Gopkg.lock
  2. 4
    0
      Gopkg.toml
  3. 30
    0
      README.md
  4. 43
    1
      config/config.go
  5. 28
    2
      main.go
  6. 16
    4
      stats/server.go
  7. 79
    0
      stats/statsd.go

+ 7
- 1
Gopkg.lock Dosyayı Görüntüle

100
   revision = "947dcec5ba9c011838740e680966fd7087a71d0d"
100
   revision = "947dcec5ba9c011838740e680966fd7087a71d0d"
101
   version = "v2.2.6"
101
   version = "v2.2.6"
102
 
102
 
103
+[[projects]]
104
+  name = "gopkg.in/alexcesaro/statsd.v2"
105
+  packages = ["."]
106
+  revision = "7fea3f0d2fab1ad973e641e51dba45443a311a90"
107
+  version = "v2.0.0"
108
+
103
 [solve-meta]
109
 [solve-meta]
104
   analyzer-name = "dep"
110
   analyzer-name = "dep"
105
   analyzer-version = 1
111
   analyzer-version = 1
106
-  inputs-digest = "f828340a30ea13c563829f9a37d0ff62974d4578411c9be02e61125dbdf98692"
112
+  inputs-digest = "7fad0f62feb7737b064d85cc4333a1a3e9298faec2afd864b4404f515fc7f17c"
107
   solver-name = "gps-cdcl"
113
   solver-name = "gps-cdcl"
108
   solver-version = 1
114
   solver-version = 1

+ 4
- 0
Gopkg.toml Dosyayı Görüntüle

52
 [[constraint]]
52
 [[constraint]]
53
   name = "github.com/beevik/ntp"
53
   name = "github.com/beevik/ntp"
54
   version = "0.2.0"
54
   version = "0.2.0"
55
+
56
+[[constraint]]
57
+  name = "gopkg.in/alexcesaro/statsd.v2"
58
+  version = "2.0.0"

+ 30
- 0
README.md Dosyayı Görüntüle

145
 port 3129 will show you some statistics if you are interested in.
145
 port 3129 will show you some statistics if you are interested in.
146
 
146
 
147
 Also, you can use [run-mtg.sh](https://github.com/9seconds/mtg/blob/master/run-mtg.sh) script
147
 Also, you can use [run-mtg.sh](https://github.com/9seconds/mtg/blob/master/run-mtg.sh) script
148
+
149
+
150
+# statsd integration
151
+
152
+mtg provides an integration with statsd, you can enable it with command
153
+line interface. To enable it, you have to provide IP address of statsd
154
+service.
155
+
156
+Out of the box, mtg supports 2 additional dialects: [InfluxDB](https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/)
157
+and [Datadog](https://docs.datadoghq.com/developers/dogstatsd/).
158
+
159
+All metrics are gauges. Here is the list of metrics and their meaning:
160
+
161
+| Metric name                     | Unit    | Description                                               |
162
+|---------------------------------|---------|-----------------------------------------------------------|
163
+| `connections.abridged.ipv4`     | number  | The number of active abridged IPv4 connections            |
164
+| `connections.abridged.ipv6`     | number  | The number of active abridged IPv6 connections            |
165
+| `connections.intermediate.ipv4` | number  | The number of active intermediate IPv4 connections        |
166
+| `connections.intermediate.ipv6` | number  | The number of active intermediate IPv6 connections        |
167
+| `connections.secure.ipv4`       | number  | The number of active secure intermediate IPv4 connections |
168
+| `connections.secure.ipv6`       | number  | The number of active secure intermediate IPv6 connections |
169
+| `crashes`                       | number  | An amount of crashes in client handlers                   |
170
+| `traffic.ingress`               | bytes   | Ingress traffic from the start of application (incoming)  |
171
+| `traffic.egress`                | bytes   | Egress traffic from the start of application (outgoing)   |
172
+| `speed.ingress`                 | bytes/s | Ingress bandwidth of the latest second (incoming traffic) |
173
+| `speed.egress`                  | bytes/s | Egress bandwidth of the latest second (outgoing traffic)  |
174
+
175
+All metrics are prefixed with given prefix. Default prefix is `mtg`.
176
+With such prefix metric name `traffic.ingress`, for example, would be
177
+`mtg.traffic.ingress`.

+ 43
- 1
config/config.go Dosyayı Görüntüle

8
 	"strings"
8
 	"strings"
9
 
9
 
10
 	"github.com/juju/errors"
10
 	"github.com/juju/errors"
11
+	statsd "gopkg.in/alexcesaro/statsd.v2"
11
 )
12
 )
12
 
13
 
13
 // Buffer sizes define internal socket buffer sizes.
14
 // Buffer sizes define internal socket buffer sizes.
32
 	PublicIPv6 net.IP
33
 	PublicIPv6 net.IP
33
 	StatsIP    net.IP
34
 	StatsIP    net.IP
34
 
35
 
36
+	StatsD struct {
37
+		Addr       net.Addr
38
+		Prefix     string
39
+		Tags       map[string]string
40
+		TagsFormat statsd.TagFormat
41
+		Enabled    bool
42
+	}
43
+
35
 	Secret []byte
44
 	Secret []byte
36
 	AdTag  []byte
45
 	AdTag  []byte
37
 }
46
 }
109
 	publicIPv4 net.IP, PublicIPv4Port uint16,
118
 	publicIPv4 net.IP, PublicIPv4Port uint16,
110
 	publicIPv6 net.IP, publicIPv6Port uint16,
119
 	publicIPv6 net.IP, publicIPv6Port uint16,
111
 	statsIP net.IP, statsPort uint16,
120
 	statsIP net.IP, statsPort uint16,
112
-	secret, adtag string) (*Config, error) {
121
+	secret, adtag string,
122
+	statsdIP string, statsdPort uint16, statsdNetwork string, statsdPrefix string,
123
+	statsdTagsFormat string, statsdTags map[string]string) (*Config, error) {
113
 	secureMode := false
124
 	secureMode := false
114
 	if strings.HasPrefix(secret, "dd") && len(secret) == 34 {
125
 	if strings.HasPrefix(secret, "dd") && len(secret) == 34 {
115
 		secureMode = true
126
 		secureMode = true
174
 		SecureMode:     secureMode,
185
 		SecureMode:     secureMode,
175
 	}
186
 	}
176
 
187
 
188
+	if statsdIP != "" {
189
+		conf.StatsD.Enabled = true
190
+		conf.StatsD.Prefix = statsdPrefix
191
+		conf.StatsD.Tags = statsdTags
192
+
193
+		var addr net.Addr
194
+		hostPort := net.JoinHostPort(statsdIP, strconv.Itoa(int(statsdPort)))
195
+		switch statsdNetwork {
196
+		case "tcp":
197
+			addr, err = net.ResolveTCPAddr("tcp", hostPort)
198
+		case "udp":
199
+			addr, err = net.ResolveUDPAddr("udp", hostPort)
200
+		default:
201
+			err = errors.Errorf("Unknown network %s", statsdNetwork)
202
+		}
203
+		if err != nil {
204
+			return nil, errors.Annotate(err, "Cannot resolve statsd address")
205
+		}
206
+		conf.StatsD.Addr = addr
207
+
208
+		switch statsdTagsFormat {
209
+		case "datadog":
210
+			conf.StatsD.TagsFormat = statsd.Datadog
211
+		case "influxdb":
212
+			conf.StatsD.TagsFormat = statsd.InfluxDB
213
+		case "":
214
+		default:
215
+			return nil, errors.Errorf("Unknown tags format %s", statsdTagsFormat)
216
+		}
217
+	}
218
+
177
 	return conf, nil
219
 	return conf, nil
178
 }
220
 }

+ 28
- 2
main.go Dosyayı Görüntüle

61
 			Envar("MTG_IPV6_PORT").
61
 			Envar("MTG_IPV6_PORT").
62
 			Uint16()
62
 			Uint16()
63
 
63
 
64
-	statsIP = app.Flag("stats-ip", "Which IP bind stats server to").
64
+	statsIP = app.Flag("stats-ip", "Which IP bind stats server to.").
65
 		Short('t').
65
 		Short('t').
66
 		Envar("MTG_STATS_IP").
66
 		Envar("MTG_STATS_IP").
67
 		Default("127.0.0.1").
67
 		Default("127.0.0.1").
72
 			Default("3129").
72
 			Default("3129").
73
 			Uint16()
73
 			Uint16()
74
 
74
 
75
+	statsdIP = app.Flag("statsd-ip", "Which IP should we use for working with statsd.").
76
+			Envar("MTG_STATSD_IP").
77
+			String()
78
+	statsdPort = app.Flag("statsd-port", "Which port should we use for working with statsd.").
79
+			Envar("MTG_STATSD_PORT").
80
+			Default("8125").
81
+			Uint16()
82
+	statsdNetwork = app.Flag("statsd-network", "Which network is used to work with statsd. Only 'tcp' and 'udp' are supported.").
83
+			Envar("MTG_STATSD_NETWORK").
84
+			Default("udp").
85
+			String()
86
+	statsdPrefix = app.Flag("statsd-prefix", "Which bucket prefix should we use for sending stats to statsd.").
87
+			Envar("MTG_STATSD_PREFIX").
88
+			Default("mtg").
89
+			String()
90
+	statsdTagsFormat = app.Flag("statsd-tags-format", "Which tag format should we use to send stats metrics. Valid options are 'datadog' and 'influxdb'.").
91
+				Envar("MTG_STATSD_TAGS_FORMAT").
92
+				String()
93
+	statsdTags = app.Flag("statsd-tags", "Tags to use for working with statsd (specified as 'key=value').").
94
+			Envar("MTG_STATSD_TAGS").
95
+			StringMap()
96
+
75
 	secret = app.Arg("secret", "Secret of this proxy.").Required().String()
97
 	secret = app.Arg("secret", "Secret of this proxy.").Required().String()
76
 	adtag  = app.Arg("adtag", "ADTag of the proxy.").String()
98
 	adtag  = app.Arg("adtag", "ADTag of the proxy.").String()
77
 )
99
 )
96
 		*publicIPv6, *publicIPv6Port,
118
 		*publicIPv6, *publicIPv6Port,
97
 		*statsIP, *statsPort,
119
 		*statsIP, *statsPort,
98
 		*secret, *adtag,
120
 		*secret, *adtag,
121
+		*statsdIP, *statsdPort, *statsdNetwork, *statsdPrefix,
122
+		*statsdTagsFormat, *statsdTags,
99
 	)
123
 	)
100
 	if err != nil {
124
 	if err != nil {
101
 		usage(err.Error())
125
 		usage(err.Error())
134
 		zap.S().Infow("Use direct connection to Telegram")
158
 		zap.S().Infow("Use direct connection to Telegram")
135
 	}
159
 	}
136
 
160
 
137
-	go stats.Start(conf)
161
+	if err := stats.Start(conf); err != nil {
162
+		panic(err)
163
+	}
138
 
164
 
139
 	server := proxy.NewProxy(conf)
165
 	server := proxy.NewProxy(conf)
140
 	if err := server.Serve(); err != nil {
166
 	if err := server.Serve(); err != nil {

+ 16
- 4
stats/server.go Dosyayı Görüntüle

14
 var instance *stats
14
 var instance *stats
15
 
15
 
16
 // Start starts new statisitcs server.
16
 // Start starts new statisitcs server.
17
-func Start(conf *config.Config) {
17
+func Start(conf *config.Config) error {
18
 	log := zap.S().Named("stats")
18
 	log := zap.S().Named("stats")
19
 
19
 
20
 	instance = &stats{
20
 	instance = &stats{
23
 		mutex:  &sync.RWMutex{},
23
 		mutex:  &sync.RWMutex{},
24
 	}
24
 	}
25
 
25
 
26
+	if conf.StatsD.Enabled {
27
+		client, err := newStatsd(conf)
28
+		if err != nil {
29
+			return err
30
+		}
31
+		go client.run()
32
+	}
33
+
26
 	go crashManager()
34
 	go crashManager()
27
 	go connectionManager()
35
 	go connectionManager()
28
 	go trafficManager()
36
 	go trafficManager()
51
 		}
59
 		}
52
 	})
60
 	})
53
 
61
 
54
-	if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
55
-		log.Fatalw("Stats server has been stopped", "error", err)
56
-	}
62
+	go func() {
63
+		if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
64
+			log.Fatalw("Stats server has been stopped", "error", err)
65
+		}
66
+	}()
67
+
68
+	return nil
57
 }
69
 }

+ 79
- 0
stats/statsd.go Dosyayı Görüntüle

1
+package stats
2
+
3
+import (
4
+	"time"
5
+
6
+	"github.com/juju/errors"
7
+	statsd "gopkg.in/alexcesaro/statsd.v2"
8
+
9
+	"github.com/9seconds/mtg/config"
10
+)
11
+
12
+const (
13
+	statsdConnectionsAbridgedV4 = "connections.abridged.ipv4"
14
+	statsdConnectionsAbridgedV6 = "connections.abridged.ipv6"
15
+
16
+	statsdConnectionsIntermediateV4 = "connections.intermediate.ipv4"
17
+	statsdConnectionsIntermediateV6 = "connections.intermediate.ipv6"
18
+
19
+	statsdConnectionsSecureV4 = "connections.secure.ipv4"
20
+	statsdConnectionsSecureV6 = "connections.secure.ipv6"
21
+
22
+	statsdTrafficIngress = "traffic.ingress"
23
+	statsdTrafficEgress  = "traffic.egress"
24
+
25
+	statsdSpeedIngress = "speed.ingress"
26
+	statsdSpeedEgress  = "speed.egress"
27
+
28
+	statsdCrashes = "crashes"
29
+)
30
+
31
+const statsdPollTime = time.Second
32
+
33
+type statsdExporter struct {
34
+	client *statsd.Client
35
+}
36
+
37
+func (s *statsdExporter) run() {
38
+	for range time.Tick(statsdPollTime) {
39
+		instance.mutex.Lock()
40
+
41
+		s.client.Gauge(statsdConnectionsAbridgedV4, instance.Connections.Abridged.IPv4)
42
+		s.client.Gauge(statsdConnectionsAbridgedV6, instance.Connections.Abridged.IPv6)
43
+		s.client.Gauge(statsdConnectionsIntermediateV4, instance.Connections.Intermediate.IPv4)
44
+		s.client.Gauge(statsdConnectionsIntermediateV6, instance.Connections.Intermediate.IPv6)
45
+		s.client.Gauge(statsdConnectionsSecureV4, instance.Connections.Secure.IPv4)
46
+		s.client.Gauge(statsdConnectionsSecureV6, instance.Connections.Secure.IPv6)
47
+		s.client.Gauge(statsdTrafficIngress, uint64(instance.Traffic.Ingress))
48
+		s.client.Gauge(statsdTrafficEgress, uint64(instance.Traffic.Egress))
49
+		s.client.Gauge(statsdSpeedIngress, uint64(instance.Speed.Ingress))
50
+		s.client.Gauge(statsdSpeedEgress, uint64(instance.Speed.Egress))
51
+		s.client.Gauge(statsdCrashes, instance.Crashes)
52
+
53
+		instance.mutex.Unlock()
54
+	}
55
+}
56
+
57
+func newStatsd(conf *config.Config) (*statsdExporter, error) {
58
+	options := []statsd.Option{
59
+		statsd.Network(conf.StatsD.Addr.Network()),
60
+		statsd.Address(conf.StatsD.Addr.String()),
61
+		statsd.Prefix(conf.StatsD.Prefix),
62
+	}
63
+
64
+	if conf.StatsD.TagsFormat > 0 {
65
+		options = append(options, statsd.TagsFormat(conf.StatsD.TagsFormat))
66
+		tags := make([]string, len(conf.StatsD.Tags)*2)
67
+		for k, v := range conf.StatsD.Tags {
68
+			tags = append(tags, k, v)
69
+		}
70
+		options = append(options, statsd.Tags(tags...))
71
+	}
72
+
73
+	client, err := statsd.New(options...)
74
+	if err != nil {
75
+		return nil, errors.Annotate(err, "Cannot create statsd client")
76
+	}
77
+
78
+	return &statsdExporter{client: client}, nil
79
+}

Loading…
İptal
Kaydet