Просмотр исходного кода

Rework stats server

tags/0.15^2
9seconds 7 лет назад
Родитель
Сommit
c74e0ee359
6 измененных файлов: 166 добавлений и 175 удалений
  1. 1
    1
      main.go
  2. 16
    86
      stats/channels.go
  3. 22
    0
      stats/init.go
  4. 5
    35
      stats/server.go
  5. 117
    46
      stats/stats.go
  6. 5
    7
      stats/statsd.go

+ 1
- 1
main.go Просмотреть файл

193
 		zap.S().Infow("Use direct connection to Telegram")
193
 		zap.S().Infow("Use direct connection to Telegram")
194
 	}
194
 	}
195
 
195
 
196
-	if err := stats.Start(conf); err != nil {
196
+	if err := stats.Init(conf); err != nil {
197
 		panic(err)
197
 		panic(err)
198
 	}
198
 	}
199
 
199
 

+ 16
- 86
stats/channels.go Просмотреть файл

2
 
2
 
3
 import (
3
 import (
4
 	"net"
4
 	"net"
5
-	"time"
6
 
5
 
7
 	"github.com/9seconds/mtg/mtproto"
6
 	"github.com/9seconds/mtg/mtproto"
8
 )
7
 )
9
 
8
 
10
 const (
9
 const (
11
-	crashesChanLength     = 1
12
-	connectionsChanLength = 20
13
-	trafficChanLength     = 5000
10
+	connectionsChanLength = 10
11
+	trafficChanLength     = 10
14
 )
12
 )
15
 
13
 
16
 var (
14
 var (
17
-	crashesChan     = make(chan struct{}, crashesChanLength)
18
-	connectionsChan = make(chan *connectionData, connectionsChanLength)
19
-	trafficChan     = make(chan *trafficData, trafficChanLength)
15
+	crashesChan     = make(chan struct{})
16
+	statsChan       = make(chan chan<- Stats)
17
+	connectionsChan = make(chan connectionData, connectionsChanLength)
18
+	trafficChan     = make(chan trafficData, trafficChanLength)
20
 )
19
 )
21
 
20
 
22
 type connectionData struct {
21
 type connectionData struct {
30
 	ingress bool
29
 	ingress bool
31
 }
30
 }
32
 
31
 
33
-func crashManager() {
34
-	for range crashesChan {
35
-		instance.mutex.RLock()
36
-
37
-		instance.Crashes++
38
-
39
-		instance.mutex.RUnlock()
40
-	}
41
-}
42
-
43
-func connectionManager() {
44
-	for event := range connectionsChan {
45
-		instance.mutex.RLock()
46
-
47
-		isIPv4 := event.addr.IP.To4() != nil
48
-		var inc uint32 = 1
49
-		if !event.connected {
50
-			inc = ^uint32(0)
51
-		}
52
-
53
-		switch event.connectionType {
54
-		case mtproto.ConnectionTypeAbridged:
55
-			if isIPv4 {
56
-				instance.Connections.Abridged.IPv4 += inc
57
-			} else {
58
-				instance.Connections.Abridged.IPv6 += inc
59
-			}
60
-		case mtproto.ConnectionTypeSecure:
61
-			if isIPv4 {
62
-				instance.Connections.Secure.IPv4 += inc
63
-			} else {
64
-				instance.Connections.Secure.IPv6 += inc
65
-			}
66
-		default:
67
-			if isIPv4 {
68
-				instance.Connections.Intermediate.IPv4 += inc
69
-			} else {
70
-				instance.Connections.Intermediate.IPv6 += inc
71
-			}
72
-		}
73
-
74
-		instance.mutex.RUnlock()
75
-	}
76
-}
77
-
78
-func trafficManager() {
79
-	speedChan := time.Tick(time.Second)
80
-
81
-	for {
82
-		select {
83
-		case event := <-trafficChan:
84
-			instance.mutex.RLock()
85
-
86
-			if event.ingress {
87
-				instance.Traffic.Ingress += trafficValue(event.traffic)
88
-				instance.speedCurrent.Ingress += trafficSpeedValue(event.traffic)
89
-			} else {
90
-				instance.Traffic.Egress += trafficValue(event.traffic)
91
-				instance.speedCurrent.Egress += trafficSpeedValue(event.traffic)
92
-			}
93
-
94
-			instance.mutex.RUnlock()
95
-		case <-speedChan:
96
-			instance.mutex.RLock()
97
-
98
-			instance.Speed.Ingress = instance.speedCurrent.Ingress
99
-			instance.Speed.Egress = instance.speedCurrent.Egress
100
-			instance.speedCurrent.Ingress = trafficSpeedValue(0)
101
-			instance.speedCurrent.Egress = trafficSpeedValue(0)
102
-
103
-			instance.mutex.RUnlock()
104
-		}
105
-	}
106
-}
107
-
108
 // NewCrash indicates new crash.
32
 // NewCrash indicates new crash.
109
 func NewCrash() {
33
 func NewCrash() {
110
 	crashesChan <- struct{}{}
34
 	crashesChan <- struct{}{}
112
 
36
 
113
 // ClientConnected indicates that new client was connected.
37
 // ClientConnected indicates that new client was connected.
114
 func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
38
 func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
115
-	connectionsChan <- &connectionData{
39
+	connectionsChan <- connectionData{
116
 		connectionType: connectionType,
40
 		connectionType: connectionType,
117
 		addr:           addr,
41
 		addr:           addr,
118
 		connected:      true,
42
 		connected:      true,
121
 
45
 
122
 // ClientDisconnected indicates that client was disconnected.
46
 // ClientDisconnected indicates that client was disconnected.
123
 func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
47
 func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
124
-	connectionsChan <- &connectionData{
48
+	connectionsChan <- connectionData{
125
 		connectionType: connectionType,
49
 		connectionType: connectionType,
126
 		addr:           addr,
50
 		addr:           addr,
127
 		connected:      false,
51
 		connected:      false,
130
 
54
 
131
 // IngressTraffic accounts new ingress traffic.
55
 // IngressTraffic accounts new ingress traffic.
132
 func IngressTraffic(traffic int) {
56
 func IngressTraffic(traffic int) {
133
-	trafficChan <- &trafficData{
57
+	trafficChan <- trafficData{
134
 		traffic: traffic,
58
 		traffic: traffic,
135
 		ingress: true,
59
 		ingress: true,
136
 	}
60
 	}
138
 
62
 
139
 // EgressTraffic accounts new ingress traffic.
63
 // EgressTraffic accounts new ingress traffic.
140
 func EgressTraffic(traffic int) {
64
 func EgressTraffic(traffic int) {
141
-	trafficChan <- &trafficData{
65
+	trafficChan <- trafficData{
142
 		traffic: traffic,
66
 		traffic: traffic,
143
 		ingress: false,
67
 		ingress: false,
144
 	}
68
 	}
145
 }
69
 }
70
+
71
+func GetStats() Stats {
72
+	rpcChan := make(chan Stats)
73
+	statsChan <- rpcChan
74
+	return <-rpcChan
75
+}

+ 22
- 0
stats/init.go Просмотреть файл

1
+package stats
2
+
3
+import (
4
+	"github.com/juju/errors"
5
+
6
+	"github.com/9seconds/mtg/config"
7
+)
8
+
9
+func Init(conf *config.Config) error {
10
+	if conf.StatsD.Enabled {
11
+		client, err := newStatsd(conf)
12
+		if err != nil {
13
+			return errors.Annotate(err, "Cannot initialize statsd client")
14
+		}
15
+		go client.run()
16
+	}
17
+
18
+	go NewStats(conf).start()
19
+	go startServer(conf)
20
+
21
+	return nil
22
+}

+ 5
- 35
stats/server.go Просмотреть файл

3
 import (
3
 import (
4
 	"encoding/json"
4
 	"encoding/json"
5
 	"net/http"
5
 	"net/http"
6
-	"sync"
7
-	"time"
8
 
6
 
9
 	"go.uber.org/zap"
7
 	"go.uber.org/zap"
10
 
8
 
11
 	"github.com/9seconds/mtg/config"
9
 	"github.com/9seconds/mtg/config"
12
 )
10
 )
13
 
11
 
14
-var instance *stats
15
-
16
-// Start starts new statistics server.
17
-func Start(conf *config.Config) error {
12
+func startServer(conf *config.Config) {
18
 	log := zap.S().Named("stats")
13
 	log := zap.S().Named("stats")
19
 
14
 
20
-	instance = &stats{
21
-		URLs:   conf.GetURLs(),
22
-		Uptime: uptime(time.Now()),
23
-		mutex:  &sync.RWMutex{},
24
-	}
25
-
26
-	if conf.StatsD.Enabled {
27
-		client, err := newStatsd(conf)
28
-		if err != nil {
29
-			return err
30
-		}
31
-		go client.run()
32
-	}
33
-
34
-	go crashManager()
35
-	go connectionManager()
36
-	go trafficManager()
37
-
38
 	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
15
 	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
39
 		w.Header().Set("Content-Type", "application/json")
16
 		w.Header().Set("Content-Type", "application/json")
40
 
17
 
41
-		instance.mutex.Lock()
42
-		first, err := json.Marshal(instance)
43
-		instance.mutex.Unlock()
44
-
18
+		first, err := json.Marshal(GetStats())
45
 		if err != nil {
19
 		if err != nil {
46
 			log.Errorw("Cannot encode json", "error", err)
20
 			log.Errorw("Cannot encode json", "error", err)
47
 			http.Error(w, "Internal server error", 500)
21
 			http.Error(w, "Internal server error", 500)
59
 		}
33
 		}
60
 	})
34
 	})
61
 
35
 
62
-	go func() {
63
-		if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
64
-			log.Fatalw("Stats server has been stopped", "error", err)
65
-		}
66
-	}()
67
-
68
-	return nil
36
+	if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
37
+		log.Fatalw("Stats server has been stopped", "error", err)
38
+	}
69
 }
39
 }

+ 117
- 46
stats/stats.go Просмотреть файл

4
 	"encoding/json"
4
 	"encoding/json"
5
 	"fmt"
5
 	"fmt"
6
 	"strconv"
6
 	"strconv"
7
-	"sync"
8
 	"time"
7
 	"time"
9
 
8
 
10
 	humanize "github.com/dustin/go-humanize"
9
 	humanize "github.com/dustin/go-humanize"
11
 
10
 
12
 	"github.com/9seconds/mtg/config"
11
 	"github.com/9seconds/mtg/config"
12
+	"github.com/9seconds/mtg/mtproto"
13
 )
13
 )
14
 
14
 
15
 type uptime time.Time
15
 type uptime time.Time
24
 	return json.Marshal(value)
24
 	return json.Marshal(value)
25
 }
25
 }
26
 
26
 
27
-type trafficValue uint64
28
-
29
-func (t trafficValue) MarshalJSON() ([]byte, error) {
30
-	tv := uint64(t)
31
-	value := map[string]interface{}{
32
-		"bytes": tv,
33
-		"human": humanize.Bytes(tv),
34
-	}
35
-
36
-	return json.Marshal(value)
37
-}
38
-
39
-type trafficSpeedValue uint64
40
-
41
-func (t trafficSpeedValue) MarshalJSON() ([]byte, error) {
42
-	speed := uint64(t)
43
-	value := map[string]interface{}{
44
-		"bytes/s": speed,
45
-		"human":   fmt.Sprintf("%s/S", humanize.Bytes(speed)),
46
-	}
47
-
48
-	return json.Marshal(value)
27
+type connectionType struct {
28
+	IPv6 uint32 `json:"ipv6"`
29
+	IPv4 uint32 `json:"ipv4"`
49
 }
30
 }
50
 
31
 
51
-type connections struct {
32
+type baseConnections struct {
52
 	All          connectionType `json:"all"`
33
 	All          connectionType `json:"all"`
53
 	Abridged     connectionType `json:"abridged"`
34
 	Abridged     connectionType `json:"abridged"`
54
 	Intermediate connectionType `json:"intermediate"`
35
 	Intermediate connectionType `json:"intermediate"`
55
 	Secure       connectionType `json:"secure"`
36
 	Secure       connectionType `json:"secure"`
56
 }
37
 }
57
 
38
 
39
+type connections struct {
40
+	baseConnections
41
+}
42
+
58
 func (c connections) MarshalJSON() ([]byte, error) {
43
 func (c connections) MarshalJSON() ([]byte, error) {
59
 	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4 + c.Secure.IPv4
44
 	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4 + c.Secure.IPv4
60
 	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6 + c.Secure.IPv6
45
 	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6 + c.Secure.IPv6
61
 
46
 
62
-	value := struct {
63
-		All          connectionType `json:"all"`
64
-		Abridged     connectionType `json:"abridged"`
65
-		Intermediate connectionType `json:"intermediate"`
66
-		Secure       connectionType `json:"secure"`
67
-	}{
68
-		All:          c.All,
69
-		Abridged:     c.Abridged,
70
-		Intermediate: c.Intermediate,
71
-		Secure:       c.Secure,
47
+	return json.Marshal(c.baseConnections)
48
+}
49
+
50
+type traffic struct {
51
+	ingress uint64
52
+	egress  uint64
53
+}
54
+
55
+func (t *traffic) dumpValue(value uint64) map[string]interface{} {
56
+	return map[string]interface{}{
57
+		"bytes": value,
58
+		"human": humanize.Bytes(value),
59
+	}
60
+}
61
+
62
+func (t traffic) MarshalJSON() ([]byte, error) {
63
+	value := map[string]map[string]interface{}{
64
+		"ingress": t.dumpValue(t.ingress),
65
+		"egress":  t.dumpValue(t.egress),
72
 	}
66
 	}
73
 
67
 
74
 	return json.Marshal(value)
68
 	return json.Marshal(value)
75
 }
69
 }
76
 
70
 
77
-type connectionType struct {
78
-	IPv6 uint32 `json:"ipv6"`
79
-	IPv4 uint32 `json:"ipv4"`
71
+type speed struct {
72
+	ingress uint64
73
+	egress  uint64
80
 }
74
 }
81
 
75
 
82
-type traffic struct {
83
-	Ingress trafficValue `json:"ingress"`
84
-	Egress  trafficValue `json:"egress"`
76
+func (s *speed) dumpValue(value uint64) map[string]interface{} {
77
+	return map[string]interface{}{
78
+		"bytes/s": value,
79
+		"human":   fmt.Sprintf("%s/s", humanize.Bytes(value)),
80
+	}
85
 }
81
 }
86
 
82
 
87
-type speed struct {
88
-	Ingress trafficSpeedValue `json:"ingress"`
89
-	Egress  trafficSpeedValue `json:"egress"`
83
+func (s speed) MarshalJSON() ([]byte, error) {
84
+	value := map[string]map[string]interface{}{
85
+		"ingress": s.dumpValue(s.ingress),
86
+		"egress":  s.dumpValue(s.egress),
87
+	}
88
+
89
+	return json.Marshal(value)
90
 }
90
 }
91
 
91
 
92
-type stats struct {
92
+type Stats struct {
93
 	URLs        config.IPURLs `json:"urls"`
93
 	URLs        config.IPURLs `json:"urls"`
94
 	Connections connections   `json:"connections"`
94
 	Connections connections   `json:"connections"`
95
 	Traffic     traffic       `json:"traffic"`
95
 	Traffic     traffic       `json:"traffic"`
97
 	Uptime      uptime        `json:"uptime"`
97
 	Uptime      uptime        `json:"uptime"`
98
 	Crashes     uint32        `json:"crashes"`
98
 	Crashes     uint32        `json:"crashes"`
99
 
99
 
100
-	speedCurrent speed
101
-	mutex        *sync.RWMutex
100
+	previousTraffic traffic
101
+}
102
+
103
+func (s *Stats) start() {
104
+	speedChan := time.Tick(time.Second)
105
+
106
+	for {
107
+		select {
108
+		case <-speedChan:
109
+			s.handleSpeed()
110
+		case event := <-trafficChan:
111
+			s.handleTraffic(event)
112
+		case event := <-connectionsChan:
113
+			s.handleConnection(event)
114
+		case getStatsChan := <-statsChan:
115
+			s.handleGetStats(getStatsChan)
116
+		case <-crashesChan:
117
+			s.handleCrash()
118
+		}
119
+	}
120
+}
121
+
122
+func (s *Stats) handleTraffic(evt trafficData) {
123
+	if evt.ingress {
124
+		s.Traffic.ingress += uint64(evt.traffic)
125
+	} else {
126
+		s.Traffic.egress += uint64(evt.traffic)
127
+	}
128
+}
129
+
130
+func (s *Stats) handleSpeed() {
131
+	s.Speed.ingress = s.Traffic.ingress - s.previousTraffic.ingress
132
+	s.Speed.egress = s.Traffic.egress - s.previousTraffic.egress
133
+	s.previousTraffic.ingress = s.Traffic.ingress
134
+	s.previousTraffic.egress = s.Traffic.egress
135
+}
136
+
137
+func (s *Stats) handleConnection(evt connectionData) {
138
+	var inc uint32 = 1
139
+	if !evt.connected {
140
+		inc = ^uint32(0)
141
+	}
142
+
143
+	var conn *connectionType
144
+	switch evt.connectionType {
145
+	case mtproto.ConnectionTypeAbridged:
146
+		conn = &s.Connections.Abridged
147
+	case mtproto.ConnectionTypeSecure:
148
+		conn = &s.Connections.Secure
149
+	default:
150
+		conn = &s.Connections.Intermediate
151
+	}
152
+
153
+	if evt.addr.IP.To4() != nil {
154
+		conn.IPv4 += inc
155
+	} else {
156
+		conn.IPv6 += inc
157
+	}
158
+}
159
+
160
+func (s *Stats) handleGetStats(getStatsChan chan<- Stats) {
161
+	getStatsChan <- *s
162
+}
163
+
164
+func (s *Stats) handleCrash() {
165
+	s.Crashes++
166
+}
167
+
168
+func NewStats(conf *config.Config) *Stats {
169
+	return &Stats{
170
+		URLs:   conf.GetURLs(),
171
+		Uptime: uptime(time.Now()),
172
+	}
102
 }
173
 }

+ 5
- 7
stats/statsd.go Просмотреть файл

36
 
36
 
37
 func (s *statsdExporter) run() {
37
 func (s *statsdExporter) run() {
38
 	for range time.Tick(statsdPollTime) {
38
 	for range time.Tick(statsdPollTime) {
39
-		instance.mutex.Lock()
39
+		instance := GetStats()
40
 
40
 
41
 		s.client.Gauge(statsdConnectionsAbridgedV4, instance.Connections.Abridged.IPv4)
41
 		s.client.Gauge(statsdConnectionsAbridgedV4, instance.Connections.Abridged.IPv4)
42
 		s.client.Gauge(statsdConnectionsAbridgedV6, instance.Connections.Abridged.IPv6)
42
 		s.client.Gauge(statsdConnectionsAbridgedV6, instance.Connections.Abridged.IPv6)
44
 		s.client.Gauge(statsdConnectionsIntermediateV6, instance.Connections.Intermediate.IPv6)
44
 		s.client.Gauge(statsdConnectionsIntermediateV6, instance.Connections.Intermediate.IPv6)
45
 		s.client.Gauge(statsdConnectionsSecureV4, instance.Connections.Secure.IPv4)
45
 		s.client.Gauge(statsdConnectionsSecureV4, instance.Connections.Secure.IPv4)
46
 		s.client.Gauge(statsdConnectionsSecureV6, instance.Connections.Secure.IPv6)
46
 		s.client.Gauge(statsdConnectionsSecureV6, instance.Connections.Secure.IPv6)
47
-		s.client.Gauge(statsdTrafficIngress, uint64(instance.Traffic.Ingress))
48
-		s.client.Gauge(statsdTrafficEgress, uint64(instance.Traffic.Egress))
49
-		s.client.Gauge(statsdSpeedIngress, uint64(instance.Speed.Ingress))
50
-		s.client.Gauge(statsdSpeedEgress, uint64(instance.Speed.Egress))
47
+		s.client.Gauge(statsdTrafficIngress, instance.Traffic.ingress)
48
+		s.client.Gauge(statsdTrafficEgress, instance.Traffic.egress)
49
+		s.client.Gauge(statsdSpeedIngress, instance.Speed.ingress)
50
+		s.client.Gauge(statsdSpeedEgress, instance.Speed.egress)
51
 		s.client.Gauge(statsdCrashes, instance.Crashes)
51
 		s.client.Gauge(statsdCrashes, instance.Crashes)
52
-
53
-		instance.mutex.Unlock()
54
 	}
52
 	}
55
 }
53
 }
56
 
54
 

Загрузка…
Отмена
Сохранить