9seconds 6 лет назад
Родитель
Сommit
d459efcf9c
9 измененных файлов: 38 добавлений и 12 удалений
  1. 2
    0
      cli/proxy.go
  2. 9
    2
      config/config.go
  3. 1
    1
      go.mod
  4. 1
    0
      hub/connection.go
  5. 17
    0
      hub/connection_hub.go
  6. 3
    0
      hub/hub.go
  7. 0
    7
      hub/registry.go
  8. 4
    1
      stats/stats_prometheus.go
  9. 1
    1
      stats/stats_statsd.go

+ 2
- 0
cli/proxy.go Просмотреть файл

10
 
10
 
11
 	"github.com/9seconds/mtg/antireplay"
11
 	"github.com/9seconds/mtg/antireplay"
12
 	"github.com/9seconds/mtg/config"
12
 	"github.com/9seconds/mtg/config"
13
+	"github.com/9seconds/mtg/hub"
13
 	"github.com/9seconds/mtg/ntp"
14
 	"github.com/9seconds/mtg/ntp"
14
 	"github.com/9seconds/mtg/obfuscated2"
15
 	"github.com/9seconds/mtg/obfuscated2"
15
 	"github.com/9seconds/mtg/proxy"
16
 	"github.com/9seconds/mtg/proxy"
66
 		Fatal(err)
67
 		Fatal(err)
67
 	}
68
 	}
68
 	telegram.Init()
69
 	telegram.Init()
70
+	hub.Init(ctx)
69
 
71
 
70
 	proxyListener, err := net.Listen("tcp", config.C.Bind.String())
72
 	proxyListener, err := net.Listen("tcp", config.C.Bind.String())
71
 	if err != nil {
73
 	if err != nil {

+ 9
- 2
config/config.go Просмотреть файл

9
 	"net"
9
 	"net"
10
 	"time"
10
 	"time"
11
 
11
 
12
+	"github.com/alecthomas/units"
12
 	"go.uber.org/zap"
13
 	"go.uber.org/zap"
13
 	statsd "gopkg.in/alexcesaro/statsd.v2"
14
 	statsd "gopkg.in/alexcesaro/statsd.v2"
14
 )
15
 )
104
 			C.Bind = opt.Value.(*net.TCPAddr)
105
 			C.Bind = opt.Value.(*net.TCPAddr)
105
 		case OptionTypePublicIPv4:
106
 		case OptionTypePublicIPv4:
106
 			C.PublicIPv4 = opt.Value.(*net.TCPAddr)
107
 			C.PublicIPv4 = opt.Value.(*net.TCPAddr)
108
+			if C.PublicIPv4 == nil {
109
+				C.PublicIPv4 = &net.TCPAddr{}
110
+			}
107
 		case OptionTypePublicIPv6:
111
 		case OptionTypePublicIPv6:
108
 			C.PublicIPv6 = opt.Value.(*net.TCPAddr)
112
 			C.PublicIPv6 = opt.Value.(*net.TCPAddr)
113
+			if C.PublicIPv6 == nil {
114
+				C.PublicIPv6 = &net.TCPAddr{}
115
+			}
109
 		case OptionTypeStatsBind:
116
 		case OptionTypeStatsBind:
110
 			C.StatsBind = opt.Value.(*net.TCPAddr)
117
 			C.StatsBind = opt.Value.(*net.TCPAddr)
111
 		case OptionTypeStatsNamespace:
118
 		case OptionTypeStatsNamespace:
133
 		case OptionTypeStatsdTags:
140
 		case OptionTypeStatsdTags:
134
 			C.StatsdTags = opt.Value.(map[string]string)
141
 			C.StatsdTags = opt.Value.(map[string]string)
135
 		case OptionTypeWriteBufferSize:
142
 		case OptionTypeWriteBufferSize:
136
-			C.WriteBuffer = int(opt.Value.(uint32))
143
+			C.WriteBuffer = int(opt.Value.(units.Base2Bytes))
137
 		case OptionTypeReadBufferSize:
144
 		case OptionTypeReadBufferSize:
138
-			C.ReadBuffer = int(opt.Value.(uint32))
145
+			C.ReadBuffer = int(opt.Value.(units.Base2Bytes))
139
 		case OptionTypeAntiReplayMaxSize:
146
 		case OptionTypeAntiReplayMaxSize:
140
 			C.AntiReplayMaxSize = opt.Value.(int)
147
 			C.AntiReplayMaxSize = opt.Value.(int)
141
 		case OptionTypeAntiReplayEvictionTime:
148
 		case OptionTypeAntiReplayEvictionTime:

+ 1
- 1
go.mod Просмотреть файл

5
 require (
5
 require (
6
 	github.com/OneOfOne/xxhash v1.2.5 // indirect
6
 	github.com/OneOfOne/xxhash v1.2.5 // indirect
7
 	github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
7
 	github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
8
-	github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect
8
+	github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4
9
 	github.com/allegro/bigcache v1.2.1
9
 	github.com/allegro/bigcache v1.2.1
10
 	github.com/beevik/ntp v0.2.0
10
 	github.com/beevik/ntp v0.2.0
11
 	github.com/cespare/xxhash v1.1.0
11
 	github.com/cespare/xxhash v1.1.0

+ 1
- 0
hub/connection.go Просмотреть файл

109
 		conn: conn,
109
 		conn: conn,
110
 		hub:  hub,
110
 		hub:  hub,
111
 		id:   rand.Int(), // nolint: gosec
111
 		id:   rand.Int(), // nolint: gosec
112
+		done: make(chan struct{}),
112
 	}
113
 	}
113
 	go rv.run()
114
 	go rv.run()
114
 
115
 

+ 17
- 0
hub/connection_hub.go Просмотреть файл

43
 }
43
 }
44
 
44
 
45
 func (c *connectionHub) runGC() {
45
 func (c *connectionHub) runGC() {
46
+	logger := c.logger.Named("gc")
47
+
46
 	for key, conn := range c.sockets {
48
 	for key, conn := range c.sockets {
47
 		switch {
49
 		switch {
48
 		case conn.closed():
50
 		case conn.closed():
51
+			logger.Debugw("Delete closed socket", "key", key)
49
 			delete(c.sockets, key)
52
 			delete(c.sockets, key)
53
+
50
 		case conn.idle():
54
 		case conn.idle():
55
+			logger.Debugw("Delete idle socket", "key", key)
51
 			conn.shutdown()
56
 			conn.shutdown()
52
 			delete(c.sockets, key)
57
 			delete(c.sockets, key)
53
 			return
58
 			return
56
 }
61
 }
57
 
62
 
58
 func (c *connectionHub) runConnectionRequest(req *connectionHubRequest) {
63
 func (c *connectionHub) runConnectionRequest(req *connectionHubRequest) {
64
+	logger := c.logger.Named("request").With("connection-id", req.request.ConnID)
65
+
59
 	for key, conn := range c.sockets {
66
 	for key, conn := range c.sockets {
60
 		delete(c.sockets, key)
67
 		delete(c.sockets, key)
61
 		if !conn.closed() {
68
 		if !conn.closed() {
69
+			logger.Debugw("Choose connection",
70
+				"id", conn.id,
71
+				"remote_addr", conn.conn.RemoteAddr())
62
 			req.response <- conn
72
 			req.response <- conn
63
 			close(req.response)
73
 			close(req.response)
64
 			return
74
 			return
66
 	}
76
 	}
67
 
77
 
68
 	if conn, err := newConnection(req.request, c); err == nil {
78
 	if conn, err := newConnection(req.request, c); err == nil {
79
+		logger.Debugw("New connection",
80
+			"id", conn.id,
81
+			"remote_addr", conn.conn.RemoteAddr())
69
 		req.response <- conn
82
 		req.response <- conn
70
 	}
83
 	}
71
 	close(req.response)
84
 	close(req.response)
72
 }
85
 }
73
 
86
 
74
 func (c *connectionHub) runBrokenSocket(id int) {
87
 func (c *connectionHub) runBrokenSocket(id int) {
88
+	c.logger.Named("broken-socket").Debugw("Delete broken socket", "id", id)
75
 	delete(c.sockets, id)
89
 	delete(c.sockets, id)
76
 }
90
 }
77
 
91
 
78
 func (c *connectionHub) runReturnConnection(conn *connection) {
92
 func (c *connectionHub) runReturnConnection(conn *connection) {
93
+	c.logger.Named("return-connection").Debugw("Return connection",
94
+		"id", conn.id,
95
+		"remote_addr", conn.conn.RemoteAddr())
79
 	c.sockets[conn.id] = conn
96
 	c.sockets[conn.id] = conn
80
 }
97
 }
81
 
98
 

+ 3
- 0
hub/hub.go Просмотреть файл

32
 	}
32
 	}
33
 
33
 
34
 	if err := conn.write(packet); err != nil {
34
 	if err := conn.write(packet); err != nil {
35
+		conn.shutdown()
35
 		return fmt.Errorf("cannot send packet: %w", err)
36
 		return fmt.Errorf("cannot send packet: %w", err)
36
 	}
37
 	}
38
+	sub.channelReturnConnections <- conn
39
+
37
 	return nil
40
 	return nil
38
 }
41
 }
39
 
42
 

+ 0
- 7
hub/registry.go Просмотреть файл

42
 	}
42
 	}
43
 	return nil, false
43
 	return nil, false
44
 }
44
 }
45
-
46
-func InitRegistry(ctx context.Context) {
47
-	Registry = &registry{
48
-		ctx:   ctx,
49
-		conns: map[string]*ctxChannel{},
50
-	}
51
-}

+ 4
- 1
stats/stats_prometheus.go Просмотреть файл

77
 		labels[1] = "ipv6"
77
 		labels[1] = "ipv6"
78
 	}
78
 	}
79
 
79
 
80
-	s.connections.WithLabelValues(labels[:]...).Add(increment)
80
+	s.telegramConnections.WithLabelValues(labels[:]...).Add(increment)
81
 }
81
 }
82
 
82
 
83
 func (s *statsPrometheus) Crash() {
83
 func (s *statsPrometheus) Crash() {
122
 	if err := registry.Register(instance.connections); err != nil {
122
 	if err := registry.Register(instance.connections); err != nil {
123
 		return nil, fmt.Errorf("cannot register metrics for connections: %w", err)
123
 		return nil, fmt.Errorf("cannot register metrics for connections: %w", err)
124
 	}
124
 	}
125
+	if err := registry.Register(instance.telegramConnections); err != nil {
126
+		return nil, fmt.Errorf("cannot register metrics for telegram connections: %w", err)
127
+	}
125
 	if err := registry.Register(instance.traffic); err != nil {
128
 	if err := registry.Register(instance.traffic); err != nil {
126
 		return nil, fmt.Errorf("cannot register metrics for traffic: %w", err)
129
 		return nil, fmt.Errorf("cannot register metrics for traffic: %w", err)
127
 	}
130
 	}

+ 1
- 1
stats/stats_statsd.go Просмотреть файл

63
 
63
 
64
 func (s *statsStatsd) changeTelegramConnections(dc conntypes.DC, addr *net.TCPAddr, value int) {
64
 func (s *statsStatsd) changeTelegramConnections(dc conntypes.DC, addr *net.TCPAddr, value int) {
65
 	labels := [...]string{
65
 	labels := [...]string{
66
-		"telegram",
66
+		"telegram_connections",
67
 		strconv.Itoa(int(dc)),
67
 		strconv.Itoa(int(dc)),
68
 		"ipv4",
68
 		"ipv4",
69
 	}
69
 	}

Загрузка…
Отмена
Сохранить