浏览代码

Add statistics server

tags/0.9
9seconds 8 年前
父节点
当前提交
e6dc3f169c
共有 4 个文件被更改,包括 118 次插入9 次删除
  1. 17
    3
      main.go
  2. 10
    6
      proxy/server.go
  3. 58
    0
      proxy/stats.go
  4. 33
    0
      proxy/trafficrwc.go

+ 17
- 3
main.go 查看文件

@@ -38,12 +38,22 @@ var (
38 38
 			Envar("MTG_PORT").
39 39
 			Default("3128").
40 40
 			Uint16()
41
-	readTimeout = app.Flag("read-timeout", "Socket read timeout").
41
+	statsIP = app.Flag("stats-ip", "Which IP bind stats server to").
42
+		Short('t').
43
+		Envar("MTG_STATS_IP").
44
+		Default("127.0.0.1").
45
+		IP()
46
+	statsPort = app.Flag("stats-port", "Which port bind stats to.").
47
+			Short('q').
48
+			Envar("MTG_STATS_PORT").
49
+			Default("3129").
50
+			Uint16()
51
+	readTimeout = app.Flag("read-timeout", "Socket read timeout.").
42 52
 			Short('r').
43 53
 			Envar("MTG_READ_TIMEOUT").
44 54
 			Default("30s").
45 55
 			Duration()
46
-	writeTimeout = app.Flag("write-timeout", "Socket write timeout").
56
+	writeTimeout = app.Flag("write-timeout", "Socket write timeout.").
47 57
 			Short('w').
48 58
 			Envar("MTG_WRITE_TIMEOUT").
49 59
 			Default("30s").
@@ -95,8 +105,12 @@ func main() {
95 105
 	)).Sugar()
96 106
 
97 107
 	printURLs()
108
+
109
+	stat := proxy.NewStats()
110
+	go stat.Serve(*statsIP, *statsPort)
111
+
98 112
 	srv := proxy.NewServer(*bindIP, int(*bindPort), secretBytes, logger,
99
-		*readTimeout, *writeTimeout)
113
+		*readTimeout, *writeTimeout, stat)
100 114
 	if err := srv.Serve(); err != nil {
101 115
 		logger.Fatal(err.Error())
102 116
 	}

+ 10
- 6
proxy/server.go 查看文件

@@ -25,6 +25,7 @@ type Server struct {
25 25
 	ctx          context.Context
26 26
 	readTimeout  time.Duration
27 27
 	writeTimeout time.Duration
28
+	stats        *Stats
28 29
 }
29 30
 
30 31
 func (s *Server) Serve() error {
@@ -50,6 +51,8 @@ func (s *Server) Addr() string {
50 51
 
51 52
 func (s *Server) accept(conn net.Conn) {
52 53
 	defer conn.Close()
54
+	defer s.stats.closeConnection()
55
+	s.stats.newConnection()
53 56
 
54 57
 	ctx, cancel := context.WithCancel(context.Background())
55 58
 	socketID := s.makeSocketID()
@@ -102,6 +105,7 @@ func (s *Server) makeSocketID() string {
102 105
 
103 106
 func (s *Server) getClientStream(conn net.Conn, ctx context.Context, cancel context.CancelFunc, socketID string) (io.ReadWriteCloser, int16, error) {
104 107
 	wConn := newTimeoutReadWriteCloser(conn, s.readTimeout, s.writeTimeout)
108
+	wConn = newTrafficReadWriteCloser(wConn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
105 109
 	frame, err := obfuscated2.ExtractFrame(wConn)
106 110
 	if err != nil {
107 111
 		return nil, 0, errors.Annotate(err, "Cannot create client stream")
@@ -113,7 +117,7 @@ func (s *Server) getClientStream(conn net.Conn, ctx context.Context, cancel cont
113 117
 	}
114 118
 
115 119
 	wConn = newLogReadWriteCloser(wConn, s.logger, socketID, "client")
116
-	wConn = newCipherReadWriteCloser(conn, obfs2)
120
+	wConn = newCipherReadWriteCloser(wConn, obfs2)
117 121
 	wConn = newCtxReadWriteCloser(wConn, ctx, cancel)
118 122
 
119 123
 	return wConn, dc, nil
@@ -125,13 +129,14 @@ func (s *Server) getTelegramStream(dc int16, ctx context.Context, cancel context
125 129
 		return nil, errors.Annotate(err, "Cannot dial")
126 130
 	}
127 131
 	wConn := newTimeoutReadWriteCloser(socket, s.readTimeout, s.writeTimeout)
132
+	wConn = newTrafficReadWriteCloser(wConn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
128 133
 
129 134
 	obfs2, frame := obfuscated2.MakeTelegramObfuscated2Frame()
130 135
 	if n, err := socket.Write(frame); err != nil || n != len(frame) {
131 136
 		return nil, errors.Annotate(err, "Cannot write hadnshake frame")
132 137
 	}
133 138
 
134
-	wConn = newLogReadWriteCloser(socket, s.logger, socketID, "telegram")
139
+	wConn = newLogReadWriteCloser(wConn, s.logger, socketID, "telegram")
135 140
 	wConn = newCipherReadWriteCloser(wConn, obfs2)
136 141
 	wConn = newCtxReadWriteCloser(wConn, ctx, cancel)
137 142
 
@@ -140,13 +145,11 @@ func (s *Server) getTelegramStream(dc int16, ctx context.Context, cancel context
140 145
 
141 146
 func (s *Server) pipe(wait *sync.WaitGroup, reader io.Reader, writer io.Writer) {
142 147
 	defer wait.Done()
143
-
144
-	buf := make([]byte, bufferSize)
145
-	io.CopyBuffer(writer, reader, buf)
148
+	io.Copy(writer, reader)
146 149
 }
147 150
 
148 151
 func NewServer(ip net.IP, port int, secret []byte, logger *zap.SugaredLogger,
149
-	readTimeout, writeTimeout time.Duration) *Server {
152
+	readTimeout, writeTimeout time.Duration, stat *Stats) *Server {
150 153
 	return &Server{
151 154
 		ip:           ip,
152 155
 		port:         port,
@@ -155,5 +158,6 @@ func NewServer(ip net.IP, port int, secret []byte, logger *zap.SugaredLogger,
155 158
 		logger:       logger,
156 159
 		readTimeout:  readTimeout,
157 160
 		writeTimeout: writeTimeout,
161
+		stats:        stat,
158 162
 	}
159 163
 }

+ 58
- 0
proxy/stats.go 查看文件

@@ -0,0 +1,58 @@
1
+package proxy
2
+
3
+import (
4
+	"encoding/json"
5
+	"net"
6
+	"net/http"
7
+	"strconv"
8
+	"sync/atomic"
9
+	"time"
10
+)
11
+
12
+type statsUptime time.Time
13
+
14
+func (s statsUptime) MarshalJSON() ([]byte, error) {
15
+	uptime := int(time.Since(time.Time(s)).Seconds())
16
+	return []byte(strconv.Itoa(uptime)), nil
17
+}
18
+
19
+type Stats struct {
20
+	AllConnections    uint64 `json:"all_connections"`
21
+	ActiveConnections uint32 `json:"active_connections"`
22
+	Traffic           struct {
23
+		Incoming uint64 `json:"incoming"`
24
+		Outgoing uint64 `json:"outgoing"`
25
+	} `json:"traffic"`
26
+	Uptime statsUptime `json:"uptime"`
27
+}
28
+
29
+func (s *Stats) newConnection() {
30
+	atomic.AddUint64(&s.AllConnections, 1)
31
+	atomic.AddUint32(&s.ActiveConnections, 1)
32
+}
33
+
34
+func (s *Stats) closeConnection() {
35
+	atomic.AddUint32(&s.ActiveConnections, ^uint32(0))
36
+}
37
+
38
+func (s *Stats) addIncomingTraffic(n int) {
39
+	atomic.AddUint64(&s.Traffic.Incoming, uint64(n))
40
+}
41
+
42
+func (s *Stats) addOutgoingTraffic(n int) {
43
+	atomic.AddUint64(&s.Traffic.Outgoing, uint64(n))
44
+}
45
+
46
+func (s *Stats) Serve(host net.IP, port uint16) {
47
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
48
+		w.Header().Set("Content-Type", "application/json")
49
+		json.NewEncoder(w).Encode(s)
50
+	})
51
+
52
+	addr := net.JoinHostPort(host.String(), strconv.Itoa(int(port)))
53
+	http.ListenAndServe(addr, nil)
54
+}
55
+
56
+func NewStats() *Stats {
57
+	return &Stats{Uptime: statsUptime(time.Now())}
58
+}

+ 33
- 0
proxy/trafficrwc.go 查看文件

@@ -0,0 +1,33 @@
1
+package proxy
2
+
3
+import "io"
4
+
5
+type TrafficReadWriteCloser struct {
6
+	conn          io.ReadWriteCloser
7
+	readCallback  func(int)
8
+	writeCallback func(int)
9
+}
10
+
11
+func (t *TrafficReadWriteCloser) Read(p []byte) (n int, err error) {
12
+	n, err = t.conn.Read(p)
13
+	t.readCallback(n)
14
+	return
15
+}
16
+
17
+func (t *TrafficReadWriteCloser) Write(p []byte) (n int, err error) {
18
+	n, err = t.conn.Write(p)
19
+	t.writeCallback(n)
20
+	return
21
+}
22
+
23
+func (t *TrafficReadWriteCloser) Close() error {
24
+	return t.conn.Close()
25
+}
26
+
27
+func newTrafficReadWriteCloser(conn io.ReadWriteCloser, readCallback, writeCallback func(int)) io.ReadWriteCloser {
28
+	return &TrafficReadWriteCloser{
29
+		conn:          conn,
30
+		readCallback:  readCallback,
31
+		writeCallback: writeCallback,
32
+	}
33
+}

正在加载...
取消
保存