Просмотр исходного кода

Merge pull request #13 from 9seconds/block-rwc

Stats server
tags/0.9
Sergey Arkhipov 7 лет назад
Родитель
Сommit
2b8e57f7f5
Аккаунт пользователя с таким Email не найден
8 измененных файлов: 332 добавлений и 6 удалений
  1. 7
    1
      Gopkg.lock
  2. 4
    0
      Gopkg.toml
  3. 4
    1
      main.go
  4. 6
    1
      proxy/proxy.go
  5. 147
    0
      stats/channels.go
  6. 56
    0
      stats/server.go
  7. 100
    0
      stats/stats.go
  8. 8
    3
      wrappers/conn.go

+ 7
- 1
Gopkg.lock Просмотреть файл

@@ -22,6 +22,12 @@
22 22
   revision = "346938d642f2ec3594ed81d874461961cd0faa76"
23 23
   version = "v1.1.0"
24 24
 
25
+[[projects]]
26
+  branch = "master"
27
+  name = "github.com/dustin/go-humanize"
28
+  packages = ["."]
29
+  revision = "02af3965c54e8cacf948b97fef38925c4120652c"
30
+
25 31
 [[projects]]
26 32
   branch = "master"
27 33
   name = "github.com/juju/errors"
@@ -80,6 +86,6 @@
80 86
 [solve-meta]
81 87
   analyzer-name = "dep"
82 88
   analyzer-version = 1
83
-  inputs-digest = "24afdd6b64331aeba47fed75918d04032e13e404612cac107bad1d68a5038b72"
89
+  inputs-digest = "312c9fb15085cbe9660443b15a07981990e1f70ec3ddfcce1b7e6cd5902307da"
84 90
   solver-name = "gps-cdcl"
85 91
   solver-version = 1

+ 4
- 0
Gopkg.toml Просмотреть файл

@@ -44,3 +44,7 @@
44 44
 [[constraint]]
45 45
   name = "github.com/satori/go.uuid"
46 46
   version = "1.2.0"
47
+
48
+[[constraint]]
49
+  branch = "master"
50
+  name = "github.com/dustin/go-humanize"

+ 4
- 1
main.go Просмотреть файл

@@ -16,6 +16,7 @@ import (
16 16
 
17 17
 	"github.com/9seconds/mtg/config"
18 18
 	"github.com/9seconds/mtg/proxy"
19
+	"github.com/9seconds/mtg/stats"
19 20
 	"github.com/juju/errors"
20 21
 )
21 22
 
@@ -115,13 +116,15 @@ func main() {
115 116
 	zap.ReplaceGlobals(logger)
116 117
 	defer logger.Sync()
117 118
 
119
+	printURLs(conf.GetURLs())
120
+
118 121
 	if conf.UseMiddleProxy() {
119 122
 		zap.S().Infow("Use middle proxy connection to Telegram")
120 123
 	} else {
121 124
 		zap.S().Infow("Use direct connection to Telegram")
122 125
 	}
123 126
 
124
-	printURLs(conf.GetURLs())
127
+	go stats.Start(conf)
125 128
 
126 129
 	server := proxy.NewProxy(conf)
127 130
 	if err := server.Serve(); err != nil {

+ 6
- 1
proxy/proxy.go Просмотреть файл

@@ -12,6 +12,7 @@ import (
12 12
 	"github.com/9seconds/mtg/client"
13 13
 	"github.com/9seconds/mtg/config"
14 14
 	"github.com/9seconds/mtg/mtproto"
15
+	"github.com/9seconds/mtg/stats"
15 16
 	"github.com/9seconds/mtg/telegram"
16 17
 	"github.com/9seconds/mtg/wrappers"
17 18
 )
@@ -39,12 +40,13 @@ func (p *Proxy) Serve() error {
39 40
 
40 41
 func (p *Proxy) accept(conn net.Conn) {
41 42
 	connID := uuid.NewV4().String()
42
-	log := zap.S().With("connection_id", connID)
43
+	log := zap.S().With("connection_id", connID).Named("main")
43 44
 
44 45
 	defer func() {
45 46
 		conn.Close()
46 47
 
47 48
 		if err := recover(); err != nil {
49
+			stats.NewCrash()
48 50
 			log.Errorw("Crash of accept handler", "error", err)
49 51
 		}
50 52
 	}()
@@ -58,6 +60,9 @@ func (p *Proxy) accept(conn net.Conn) {
58 60
 	}
59 61
 	defer client.(io.Closer).Close()
60 62
 
63
+	stats.ClientConnected(opts.ConnectionType, client.RemoteAddr())
64
+	defer stats.ClientDisconnected(opts.ConnectionType, client.RemoteAddr())
65
+
61 66
 	server, err := p.getTelegramConn(opts, connID)
62 67
 	if err != nil {
63 68
 		log.Errorw("Cannot initialize server connection", "error", err)

+ 147
- 0
stats/channels.go Просмотреть файл

@@ -0,0 +1,147 @@
1
+package stats
2
+
3
+import (
4
+	"net"
5
+	"sync/atomic"
6
+	"time"
7
+
8
+	"github.com/9seconds/mtg/mtproto"
9
+)
10
+
11
+const (
12
+	crashesChanLength     = 1
13
+	connectionsChanLength = 20
14
+	trafficChanLength     = 5000
15
+)
16
+
17
+var (
18
+	CrashesChan     = make(chan struct{}, crashesChanLength)
19
+	ConnectionsChan = make(chan *connectionData, connectionsChanLength)
20
+	TrafficChan     = make(chan *trafficData, trafficChanLength)
21
+)
22
+
23
+type connectionData struct {
24
+	connectionType mtproto.ConnectionType
25
+	addr           *net.TCPAddr
26
+	connected      bool
27
+}
28
+
29
+type trafficData struct {
30
+	traffic int
31
+	ingress bool
32
+}
33
+
34
+func crashManager() {
35
+	for range CrashesChan {
36
+		instance.mutex.RLock()
37
+
38
+		instance.Crashes++
39
+
40
+		instance.mutex.RUnlock()
41
+	}
42
+}
43
+
44
+func connectionManager() {
45
+	for event := range ConnectionsChan {
46
+		instance.mutex.RLock()
47
+
48
+		isIPv4 := event.addr.IP.To4() != nil
49
+		var inc uint32 = 1
50
+		if !event.connected {
51
+			inc = ^uint32(0)
52
+		}
53
+
54
+		switch event.connectionType {
55
+		case mtproto.ConnectionTypeAbridged:
56
+			if isIPv4 {
57
+				atomic.AddUint32(&instance.ActiveConnections.Abridged.IPv4, inc)
58
+				if event.connected {
59
+					atomic.AddUint32(&instance.AllConnections.Abridged.IPv4, inc)
60
+				}
61
+			} else {
62
+				atomic.AddUint32(&instance.ActiveConnections.Abridged.IPv6, inc)
63
+				if event.connected {
64
+					atomic.AddUint32(&instance.AllConnections.Abridged.IPv6, inc)
65
+				}
66
+			}
67
+		default:
68
+			if isIPv4 {
69
+				atomic.AddUint32(&instance.ActiveConnections.Intermediate.IPv4, inc)
70
+				if event.connected {
71
+					atomic.AddUint32(&instance.AllConnections.Intermediate.IPv4, inc)
72
+				}
73
+			} else {
74
+				atomic.AddUint32(&instance.ActiveConnections.Intermediate.IPv6, inc)
75
+				if event.connected {
76
+					atomic.AddUint32(&instance.AllConnections.Intermediate.IPv6, inc)
77
+				}
78
+			}
79
+		}
80
+
81
+		instance.mutex.RUnlock()
82
+	}
83
+}
84
+
85
+func trafficManager() {
86
+	speedChan := time.Tick(time.Second)
87
+
88
+	for {
89
+		select {
90
+		case event := <-TrafficChan:
91
+			instance.mutex.RLock()
92
+
93
+			if event.ingress {
94
+				instance.Traffic.Ingress += trafficValue(event.traffic)
95
+				instance.speedCurrent.Ingress += trafficSpeedValue(event.traffic)
96
+			} else {
97
+				instance.Traffic.Egress += trafficValue(event.traffic)
98
+				instance.speedCurrent.Egress += trafficSpeedValue(event.traffic)
99
+			}
100
+
101
+			instance.mutex.RUnlock()
102
+		case <-speedChan:
103
+			instance.mutex.RLock()
104
+
105
+			instance.Speed.Ingress = instance.speedCurrent.Ingress
106
+			instance.Speed.Egress = instance.speedCurrent.Egress
107
+			instance.speedCurrent.Ingress = trafficSpeedValue(0)
108
+			instance.speedCurrent.Egress = trafficSpeedValue(0)
109
+
110
+			instance.mutex.RUnlock()
111
+		}
112
+	}
113
+}
114
+
115
+func NewCrash() {
116
+	CrashesChan <- struct{}{}
117
+}
118
+
119
+func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
120
+	ConnectionsChan <- &connectionData{
121
+		connectionType: connectionType,
122
+		addr:           addr,
123
+		connected:      true,
124
+	}
125
+}
126
+
127
+func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
128
+	ConnectionsChan <- &connectionData{
129
+		connectionType: connectionType,
130
+		addr:           addr,
131
+		connected:      false,
132
+	}
133
+}
134
+
135
+func IngressTraffic(traffic int) {
136
+	TrafficChan <- &trafficData{
137
+		traffic: traffic,
138
+		ingress: true,
139
+	}
140
+}
141
+
142
+func EgressTraffic(traffic int) {
143
+	TrafficChan <- &trafficData{
144
+		traffic: traffic,
145
+		ingress: false,
146
+	}
147
+}

+ 56
- 0
stats/server.go Просмотреть файл

@@ -0,0 +1,56 @@
1
+package stats
2
+
3
+import (
4
+	"encoding/json"
5
+	"net/http"
6
+	"sync"
7
+	"time"
8
+
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/config"
12
+)
13
+
14
+var instance *stats
15
+
16
+func Start(conf *config.Config) {
17
+	log := zap.S().Named("stats")
18
+
19
+	instance = &stats{
20
+		URLs:   conf.GetURLs(),
21
+		Uptime: uptime(time.Now()),
22
+		mutex:  &sync.RWMutex{},
23
+	}
24
+
25
+	go crashManager()
26
+	go connectionManager()
27
+	go trafficManager()
28
+
29
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
30
+		w.Header().Set("Content-Type", "application/json")
31
+
32
+		instance.mutex.Lock()
33
+		first, err := json.Marshal(instance)
34
+		instance.mutex.Unlock()
35
+
36
+		if err != nil {
37
+			log.Errorw("Cannot encode json", "error", err)
38
+			http.Error(w, "Internal server error", 500)
39
+			return
40
+		}
41
+
42
+		interm := map[string]interface{}{}
43
+		json.Unmarshal(first, &interm)
44
+
45
+		encoder := json.NewEncoder(w)
46
+		encoder.SetEscapeHTML(false)
47
+		encoder.SetIndent("", "  ")
48
+		if err = encoder.Encode(interm); err != nil {
49
+			log.Errorw("Cannot encode json", "error", err)
50
+		}
51
+	})
52
+
53
+	if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
54
+		log.Fatalw("Stats server has been stopped", "error", err)
55
+	}
56
+}

+ 100
- 0
stats/stats.go Просмотреть файл

@@ -0,0 +1,100 @@
1
+package stats
2
+
3
+import (
4
+	"encoding/json"
5
+	"fmt"
6
+	"strconv"
7
+	"sync"
8
+	"time"
9
+
10
+	humanize "github.com/dustin/go-humanize"
11
+
12
+	"github.com/9seconds/mtg/config"
13
+)
14
+
15
+type uptime time.Time
16
+
17
+func (u uptime) MarshalJSON() ([]byte, error) {
18
+	duration := time.Since(time.Time(u))
19
+	value := map[string]string{
20
+		"seconds": strconv.Itoa(int(duration.Seconds())),
21
+		"human":   humanize.Time(time.Time(u)),
22
+	}
23
+
24
+	return json.Marshal(value)
25
+}
26
+
27
+type trafficValue uint64
28
+
29
+func (t trafficValue) MarshalJSON() ([]byte, error) {
30
+	tv := uint64(t)
31
+	value := map[string]interface{}{
32
+		"bytes": tv,
33
+		"human": humanize.Bytes(tv),
34
+	}
35
+
36
+	return json.Marshal(value)
37
+}
38
+
39
+type trafficSpeedValue uint64
40
+
41
+func (t trafficSpeedValue) MarshalJSON() ([]byte, error) {
42
+	speed := uint64(t)
43
+	value := map[string]interface{}{
44
+		"bytes/s": speed,
45
+		"human":   fmt.Sprintf("%s/S", humanize.Bytes(speed)),
46
+	}
47
+
48
+	return json.Marshal(value)
49
+}
50
+
51
+type connections struct {
52
+	All          connectionType `json:"all"`
53
+	Abridged     connectionType `json:"abridged"`
54
+	Intermediate connectionType `json:"intermediate"`
55
+}
56
+
57
+func (c connections) MarshalJSON() ([]byte, error) {
58
+	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4
59
+	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6
60
+
61
+	value := struct {
62
+		All          connectionType `json:"all"`
63
+		Abridged     connectionType `json:"abridged"`
64
+		Intermediate connectionType `json:"intermediate"`
65
+	}{
66
+		All:          c.All,
67
+		Abridged:     c.Abridged,
68
+		Intermediate: c.Intermediate,
69
+	}
70
+
71
+	return json.Marshal(value)
72
+}
73
+
74
+type connectionType struct {
75
+	IPv6 uint32 `json:"ipv6"`
76
+	IPv4 uint32 `json:"ipv4"`
77
+}
78
+
79
+type traffic struct {
80
+	Ingress trafficValue `json:"ingress"`
81
+	Egress  trafficValue `json:"egress"`
82
+}
83
+
84
+type speed struct {
85
+	Ingress trafficSpeedValue `json:"ingress"`
86
+	Egress  trafficSpeedValue `json:"egress"`
87
+}
88
+
89
+type stats struct {
90
+	URLs              config.IPURLs `json:"urls"`
91
+	ActiveConnections connections   `json:"active_connections"`
92
+	AllConnections    connections   `json:"all_connections"`
93
+	Traffic           traffic       `json:"traffic"`
94
+	Speed             speed         `json:"speed"`
95
+	Uptime            uptime        `json:"uptime"`
96
+	Crashes           uint32        `json:"crashes"`
97
+
98
+	speedCurrent speed
99
+	mutex        *sync.RWMutex
100
+}

+ 8
- 3
wrappers/conn.go Просмотреть файл

@@ -5,6 +5,8 @@ import (
5 5
 	"time"
6 6
 
7 7
 	"go.uber.org/zap"
8
+
9
+	"github.com/9seconds/mtg/stats"
8 10
 )
9 11
 
10 12
 type ConnPurpose uint8
@@ -31,9 +33,10 @@ const (
31 33
 )
32 34
 
33 35
 type Conn struct {
34
-	connID     string
35
-	conn       net.Conn
36
-	logger     *zap.SugaredLogger
36
+	connID string
37
+	conn   net.Conn
38
+	logger *zap.SugaredLogger
39
+
37 40
 	publicIPv4 net.IP
38 41
 	publicIPv6 net.IP
39 42
 }
@@ -43,6 +46,7 @@ func (c *Conn) Write(p []byte) (int, error) {
43 46
 	n, err := c.conn.Write(p)
44 47
 
45 48
 	c.logger.Debugw("Write to stream", "bytes", n, "error", err)
49
+	stats.EgressTraffic(n)
46 50
 
47 51
 	return n, err
48 52
 }
@@ -52,6 +56,7 @@ func (c *Conn) Read(p []byte) (int, error) {
52 56
 	n, err := c.conn.Read(p)
53 57
 
54 58
 	c.logger.Debugw("Read from stream", "bytes", n, "error", err)
59
+	stats.IngressTraffic(n)
55 60
 
56 61
 	return n, err
57 62
 }

Загрузка…
Отмена
Сохранить