Sfoglia il codice sorgente

Stats management utilities

tags/0.9
9seconds 7 anni fa
parent
commit
c66e300425
4 ha cambiato i file con 182 aggiunte e 9 eliminazioni
  1. 1
    1
      proxy/proxy.go
  2. 147
    0
      stats/channels.go
  3. 7
    4
      stats/server.go
  4. 27
    4
      stats/stats.go

+ 1
- 1
proxy/proxy.go Vedi File

@@ -39,7 +39,7 @@ func (p *Proxy) Serve() error {
39 39
 
40 40
 func (p *Proxy) accept(conn net.Conn) {
41 41
 	connID := uuid.NewV4().String()
42
-	log := zap.S().With("connection_id", connID)
42
+	log := zap.S().With("connection_id", connID).Named("main")
43 43
 
44 44
 	defer func() {
45 45
 		conn.Close()

+ 147
- 0
stats/channels.go Vedi File

@@ -0,0 +1,147 @@
1
+package stats
2
+
3
+import (
4
+	"net"
5
+	"sync/atomic"
6
+	"time"
7
+
8
+	"github.com/9seconds/mtg/mtproto"
9
+)
10
+
11
+const (
12
+	crashesChanLength     = 1
13
+	connectionsChanLength = 20
14
+	trafficChanLength     = 5000
15
+)
16
+
17
+var (
18
+	CrashesChan     = make(chan struct{}, crashesChanLength)
19
+	ConnectionsChan = make(chan *connectionData, connectionsChanLength)
20
+	TrafficChan     = make(chan *trafficData, trafficChanLength)
21
+)
22
+
23
+type connectionData struct {
24
+	connectionType mtproto.ConnectionType
25
+	addr           *net.TCPAddr
26
+	connected      bool
27
+}
28
+
29
+type trafficData struct {
30
+	traffic int
31
+	ingress bool
32
+}
33
+
34
+func crashManager() {
35
+	for range CrashesChan {
36
+		instance.mutex.RLock()
37
+
38
+		instance.Crashes++
39
+
40
+		instance.mutex.RUnlock()
41
+	}
42
+}
43
+
44
+func connectionManager() {
45
+	for event := range ConnectionsChan {
46
+		instance.mutex.RLock()
47
+
48
+		isIPv4 := event.addr.IP.To4() == nil
49
+		var inc uint32 = 1
50
+		if !event.connected {
51
+			inc = ^uint32(0)
52
+		}
53
+
54
+		switch event.connectionType {
55
+		case mtproto.ConnectionTypeAbridged:
56
+			if isIPv4 {
57
+				atomic.AddUint32(&instance.ActiveConnections.Abridged.IPv4, inc)
58
+				if event.connected {
59
+					atomic.AddUint32(&instance.AllConnections.Abridged.IPv4, inc)
60
+				}
61
+			} else {
62
+				atomic.AddUint32(&instance.ActiveConnections.Abridged.IPv6, inc)
63
+				if event.connected {
64
+					atomic.AddUint32(&instance.AllConnections.Abridged.IPv6, inc)
65
+				}
66
+			}
67
+		default:
68
+			if isIPv4 {
69
+				atomic.AddUint32(&instance.ActiveConnections.Intermediate.IPv4, inc)
70
+				if event.connected {
71
+					atomic.AddUint32(&instance.AllConnections.Intermediate.IPv4, inc)
72
+				}
73
+			} else {
74
+				atomic.AddUint32(&instance.ActiveConnections.Intermediate.IPv6, inc)
75
+				if event.connected {
76
+					atomic.AddUint32(&instance.AllConnections.Intermediate.IPv6, inc)
77
+				}
78
+			}
79
+		}
80
+
81
+		instance.mutex.RUnlock()
82
+	}
83
+}
84
+
85
+func trafficManager() {
86
+	speedChan := time.Tick(time.Second)
87
+
88
+	for {
89
+		select {
90
+		case event := <-TrafficChan:
91
+			instance.mutex.RLock()
92
+
93
+			if event.ingress {
94
+				instance.Traffic.Ingress += trafficValue(event.traffic)
95
+				instance.speedCurrent.Ingress += trafficSpeedValue(event.traffic)
96
+			} else {
97
+				instance.Traffic.Egress += trafficValue(event.traffic)
98
+				instance.speedCurrent.Egress += trafficSpeedValue(event.traffic)
99
+			}
100
+
101
+			instance.mutex.RUnlock()
102
+		case <-speedChan:
103
+			instance.mutex.RLock()
104
+
105
+			instance.Speed.Ingress = instance.speedCurrent.Ingress
106
+			instance.Speed.Egress = instance.speedCurrent.Egress
107
+			instance.speedCurrent.Ingress = trafficSpeedValue(0)
108
+			instance.speedCurrent.Egress = trafficSpeedValue(0)
109
+
110
+			instance.mutex.RUnlock()
111
+		}
112
+	}
113
+}
114
+
115
+func NewCrash() {
116
+	CrashesChan <- struct{}{}
117
+}
118
+
119
+func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
120
+	ConnectionsChan <- &connectionData{
121
+		connectionType: connectionType,
122
+		addr:           addr,
123
+		connected:      true,
124
+	}
125
+}
126
+
127
+func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
128
+	ConnectionsChan <- &connectionData{
129
+		connectionType: connectionType,
130
+		addr:           addr,
131
+		connected:      false,
132
+	}
133
+}
134
+
135
+func IngressTraffic(traffic int) {
136
+	TrafficChan <- &trafficData{
137
+		traffic: traffic,
138
+		ingress: true,
139
+	}
140
+}
141
+
142
+func EgressTraffic(traffic int) {
143
+	TrafficChan <- &trafficData{
144
+		traffic: traffic,
145
+		ingress: false,
146
+	}
147
+}

+ 7
- 4
stats/server.go Vedi File

@@ -13,12 +13,15 @@ var instance *stats
13 13
 
14 14
 func Start(conf *config.Config) {
15 15
 	instance = &stats{
16
-		URLs:         conf.GetURLs(),
17
-		Uptime:       uptime(time.Now()),
18
-		speedCurrent: &speed{},
19
-		mutex:        &sync.RWMutex{},
16
+		URLs:   conf.GetURLs(),
17
+		Uptime: uptime(time.Now()),
18
+		mutex:  &sync.RWMutex{},
20 19
 	}
21 20
 
21
+	go crashManager()
22
+	go connectionManager()
23
+	go trafficManager()
24
+
22 25
 	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
23 26
 		w.Header().Set("Content-Type", "application/json")
24 27
 

+ 27
- 4
stats/stats.go Vedi File

@@ -49,9 +49,31 @@ func (t trafficSpeedValue) MarshalJSON() ([]byte, error) {
49 49
 }
50 50
 
51 51
 type connections struct {
52
-	All          uint32 `json:"all"`
53
-	Abridged     uint32 `json:"abridged"`
54
-	Intermediate uint32 `json:"intermediate"`
52
+	All          connectionType `json:"all"`
53
+	Abridged     connectionType `json:"abridged"`
54
+	Intermediate connectionType `json:"intermediate"`
55
+}
56
+
57
+func (c connections) MarshalJSON() ([]byte, error) {
58
+	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4
59
+	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6
60
+
61
+	value := struct {
62
+		All          connectionType `json:"all"`
63
+		Abridged     connectionType `json:"abridged"`
64
+		Intermediate connectionType `json:"intermediate"`
65
+	}{
66
+		All:          c.All,
67
+		Abridged:     c.Abridged,
68
+		Intermediate: c.Intermediate,
69
+	}
70
+
71
+	return json.Marshal(value)
72
+}
73
+
74
+type connectionType struct {
75
+	IPv6 uint32 `json:"ipv6"`
76
+	IPv4 uint32 `json:"ipv4"`
55 77
 }
56 78
 
57 79
 type traffic struct {
@@ -71,7 +93,8 @@ type stats struct {
71 93
 	Traffic           traffic       `json:"traffic"`
72 94
 	Speed             speed         `json:"speed"`
73 95
 	Uptime            uptime        `json:"uptime"`
96
+	Crashes           uint32        `json:"crashes"`
74 97
 
75
-	speedCurrent *speed
98
+	speedCurrent speed
76 99
 	mutex        *sync.RWMutex
77 100
 }

Loading…
Annulla
Salva