Преглед изворни кода

Correct multiplexing

tags/1.0^2
9seconds пре 6 година
родитељ
комит
22905b2a25

+ 0
- 9
faketls/client_protocol.go Прегледај датотеку

11
 	"sync"
11
 	"sync"
12
 	"time"
12
 	"time"
13
 
13
 
14
-	"github.com/9seconds/mtg/antireplay"
15
 	"github.com/9seconds/mtg/config"
14
 	"github.com/9seconds/mtg/config"
16
 	"github.com/9seconds/mtg/conntypes"
15
 	"github.com/9seconds/mtg/conntypes"
17
 	"github.com/9seconds/mtg/obfuscated2"
16
 	"github.com/9seconds/mtg/obfuscated2"
18
 	"github.com/9seconds/mtg/protocol"
17
 	"github.com/9seconds/mtg/protocol"
19
-	"github.com/9seconds/mtg/stats"
20
 	"github.com/9seconds/mtg/tlstypes"
18
 	"github.com/9seconds/mtg/tlstypes"
21
 	"github.com/9seconds/mtg/wrappers/stream"
19
 	"github.com/9seconds/mtg/wrappers/stream"
22
 )
20
 )
84
 		return errBadTime
82
 		return errBadTime
85
 	}
83
 	}
86
 
84
 
87
-	if antireplay.Cache.HasTLS(clientHello.Random[:]) {
88
-		stats.Stats.AntiReplayDetected()
89
-		return errors.New("antireplay detected")
90
-	}
91
-
92
-	antireplay.Cache.AddTLS(clientHello.Random[:])
93
-
94
 	hostCert, err := connectionServerInstance.get()
85
 	hostCert, err := connectionServerInstance.get()
95
 	if err != nil {
86
 	if err != nil {
96
 		return fmt.Errorf("cannot get host certificate: %w", err)
87
 		return fmt.Errorf("cannot get host certificate: %w", err)

+ 111
- 68
hub/connection.go Прегледај датотеку

5
 	"math/rand"
5
 	"math/rand"
6
 	"sync"
6
 	"sync"
7
 
7
 
8
+	"go.uber.org/zap"
9
+
8
 	"github.com/9seconds/mtg/conntypes"
10
 	"github.com/9seconds/mtg/conntypes"
9
 	"github.com/9seconds/mtg/mtproto"
11
 	"github.com/9seconds/mtg/mtproto"
10
 	"github.com/9seconds/mtg/mtproto/rpc"
12
 	"github.com/9seconds/mtg/mtproto/rpc"
12
 )
14
 )
13
 
15
 
14
 type connection struct {
16
 type connection struct {
15
-	conn         conntypes.PacketReadWriteCloser
16
-	mutex        sync.RWMutex
17
-	shutdownOnce sync.Once
18
-	hub          *connectionHub
19
-	id           int
20
-	pending      uint
21
-	done         chan struct{}
17
+	conn            conntypes.PacketReadWriteCloser
18
+	proxyConns      map[string]*ProxyConn
19
+	closeOnce       sync.Once
20
+	proxyConnsMutex sync.RWMutex
21
+	id              int
22
+	logger          *zap.SugaredLogger
23
+
24
+	channelDone       chan struct{}
25
+	channelWrite      chan conntypes.Packet
26
+	channelRead       chan *rpc.ProxyResponse
27
+	channelConnAttach chan *ProxyConn
28
+	channelConnDetach chan conntypes.ConnID
22
 }
29
 }
23
 
30
 
24
-func (c *connection) read() (conntypes.Packet, error) {
25
-	packet, err := c.conn.Read()
31
+func (c *connection) run() {
32
+	defer c.Close()
26
 
33
 
27
-	c.mutex.Lock()
28
-	if err != nil {
29
-		c.pending--
30
-	} else {
31
-		c.pending = 0
32
-	}
33
-	c.mutex.Unlock()
34
+	for {
35
+		select {
36
+		case <-c.channelDone:
37
+			for _, v := range c.proxyConns {
38
+				v.Close()
39
+			}
34
 
40
 
35
-	return packet, err
41
+			return
42
+		case resp := <-c.channelRead:
43
+			if channel, ok := c.proxyConns[string(resp.ConnID[:])]; ok {
44
+				if resp.Type == rpc.ProxyResponseTypeCloseExt {
45
+					channel.Close()
46
+				} else {
47
+					channel.put(resp)
48
+				}
49
+			}
50
+		case packet := <-c.channelWrite:
51
+			if err := c.conn.Write(packet); err != nil {
52
+				c.logger.Debugw("Cannot write packet", "error", err)
53
+				c.Close()
54
+			}
55
+		case conn := <-c.channelConnAttach:
56
+			c.proxyConnsMutex.Lock()
57
+			c.proxyConns[string(conn.req.ConnID[:])] = conn
58
+			c.proxyConnsMutex.Unlock()
59
+			conn.channelWrite = c.channelWrite
60
+		case connID := <-c.channelConnDetach:
61
+			if conn, ok := c.proxyConns[string(connID[:])]; ok {
62
+				c.proxyConnsMutex.Lock()
63
+				delete(c.proxyConns, string(connID[:]))
64
+				c.proxyConnsMutex.Unlock()
65
+				conn.Close()
66
+			}
67
+		}
68
+	}
36
 }
69
 }
37
 
70
 
38
-func (c *connection) write(packet conntypes.Packet) error {
39
-	err := c.conn.Write(packet)
40
-	if err != nil {
41
-		// if we tried to write into a socket and it was broken, it is
42
-		// a time to reconsider the prescence of this socket at all.
43
-		//
44
-		// probably we need to remove it completely because it seems
45
-		// that connection is broken.
46
-		c.mutex.Lock()
47
-		c.pending = 0
48
-		c.mutex.Unlock()
49
-	}
71
+func (c *connection) readLoop() {
72
+	for {
73
+		packet, err := c.conn.Read()
74
+		if err != nil {
75
+			c.logger.Debugw("Cannot read packet", "error", err)
76
+			c.Close()
50
 
77
 
51
-	return err
78
+			return
79
+		}
80
+
81
+		response, err := rpc.ParseProxyResponse(packet)
82
+		if err != nil {
83
+			c.logger.Debugw("Failed response", "error", err)
84
+			continue
85
+		}
86
+
87
+		select {
88
+		case <-c.channelDone:
89
+			return
90
+		case c.channelRead <- response:
91
+		}
92
+	}
52
 }
93
 }
53
 
94
 
54
-func (c *connection) shutdown() {
55
-	c.shutdownOnce.Do(func() {
95
+func (c *connection) Close() {
96
+	c.closeOnce.Do(func() {
97
+		c.logger.Debugw("Closing connection")
98
+
99
+		close(c.channelDone)
56
 		c.conn.Close()
100
 		c.conn.Close()
57
-		close(c.done)
58
-		c.hub.channelBrokenSockets <- c.id
59
 	})
101
 	})
60
 }
102
 }
61
 
103
 
62
-func (c *connection) closed() bool {
104
+func (c *connection) Done() bool {
63
 	select {
105
 	select {
64
-	case <-c.done:
106
+	case <-c.channelDone:
65
 		return true
107
 		return true
66
 	default:
108
 	default:
67
-		return false
109
+		return c.Len() == 0
68
 	}
110
 	}
69
 }
111
 }
70
 
112
 
71
-func (c *connection) idle() bool {
72
-	c.mutex.RLock()
73
-	defer c.mutex.RUnlock()
113
+func (c *connection) Len() int {
114
+	c.proxyConnsMutex.RLock()
115
+	defer c.proxyConnsMutex.RUnlock()
74
 
116
 
75
-	return c.pending == 0
117
+	return len(c.proxyConns)
76
 }
118
 }
77
 
119
 
78
-func (c *connection) run() {
79
-	logger := c.hub.logger.Named("connection").With("id", c.id)
80
-
81
-	for {
82
-		packet, err := c.read()
83
-		if err != nil {
84
-			c.shutdown()
85
-			return
86
-		}
87
-
88
-		response, err := rpc.ParseProxyResponse(packet)
89
-		if err != nil {
90
-			logger.Debugw("Failed response", "error", err)
91
-			continue
92
-		}
93
-
94
-		if response.Type == rpc.ProxyResponseTypeCloseExt {
95
-			logger.Debugw("Proxy has closed connection")
96
-			return
97
-		}
120
+func (c *connection) Attach(conn *ProxyConn) error {
121
+	select {
122
+	case <-c.channelDone:
123
+		return ErrClosed
124
+	case c.channelConnAttach <- conn:
125
+		return nil
126
+	}
127
+}
98
 
128
 
99
-		if channel, ok := Registry.getChannel(response.ConnID); ok {
100
-			go channel.sendBack(response) // nolint: errcheck
101
-		}
129
+func (c *connection) Detach(connID conntypes.ConnID) {
130
+	select {
131
+	case <-c.channelDone:
132
+	case c.channelConnDetach <- connID:
102
 	}
133
 	}
103
 }
134
 }
104
 
135
 
105
-func newConnection(req *protocol.TelegramRequest, hub *connectionHub) (*connection, error) {
136
+func newConnection(req *protocol.TelegramRequest) (*connection, error) {
106
 	conn, err := mtproto.TelegramProtocol(req)
137
 	conn, err := mtproto.TelegramProtocol(req)
107
 	if err != nil {
138
 	if err != nil {
108
 		return nil, fmt.Errorf("cannot create a new connection: %w", err)
139
 		return nil, fmt.Errorf("cannot create a new connection: %w", err)
109
 	}
140
 	}
110
 
141
 
142
+	id := rand.Int() // nolint: gosec
111
 	rv := &connection{
143
 	rv := &connection{
112
 		conn: conn,
144
 		conn: conn,
113
-		hub:  hub,
114
-		id:   rand.Int(), // nolint: gosec
115
-		done: make(chan struct{}),
145
+		id:   id,
146
+		logger: zap.S().Named("hub-connection").With("id", id,
147
+			"dc", req.ClientProtocol.DC(),
148
+			"protocol", req.ClientProtocol.ConnectionProtocol()),
149
+		proxyConns: make(map[string]*ProxyConn),
150
+
151
+		channelRead:       make(chan *rpc.ProxyResponse, 1),
152
+		channelDone:       make(chan struct{}),
153
+		channelWrite:      make(chan conntypes.Packet),
154
+		channelConnAttach: make(chan *ProxyConn),
155
+		channelConnDetach: make(chan conntypes.ConnID),
116
 	}
156
 	}
157
+
158
+	go rv.readLoop()
159
+
117
 	go rv.run()
160
 	go rv.run()
118
 
161
 
119
 	return rv, nil
162
 	return rv, nil

+ 0
- 114
hub/connection_hub.go Прегледај датотеку

1
-package hub
2
-
3
-import (
4
-	"time"
5
-
6
-	"go.uber.org/zap"
7
-
8
-	"github.com/9seconds/mtg/protocol"
9
-)
10
-
11
-const hubGCEvery = time.Minute
12
-
13
-type connectionHubRequest struct {
14
-	request  *protocol.TelegramRequest
15
-	response chan<- *connection
16
-}
17
-
18
-type connectionHub struct {
19
-	sockets map[int]*connection
20
-	logger  *zap.SugaredLogger
21
-
22
-	channelBrokenSockets      chan int
23
-	channelConnectionRequests chan *connectionHubRequest
24
-	channelReturnConnections  chan *connection
25
-}
26
-
27
-func (c *connectionHub) run() {
28
-	ticker := time.NewTicker(hubGCEvery)
29
-	defer ticker.Stop()
30
-
31
-	for {
32
-		select {
33
-		case <-ticker.C:
34
-			c.runGC()
35
-		case request := <-c.channelConnectionRequests:
36
-			c.runConnectionRequest(request)
37
-		case id := <-c.channelBrokenSockets:
38
-			c.runBrokenSocket(id)
39
-		case conn := <-c.channelReturnConnections:
40
-			c.runReturnConnection(conn)
41
-		}
42
-	}
43
-}
44
-
45
-func (c *connectionHub) runGC() {
46
-	logger := c.logger.Named("gc")
47
-
48
-	for key, conn := range c.sockets {
49
-		switch {
50
-		case conn.closed():
51
-			logger.Debugw("Delete closed socket", "key", key)
52
-			delete(c.sockets, key)
53
-		case conn.idle():
54
-			logger.Debugw("Delete idle socket", "key", key)
55
-			conn.shutdown()
56
-			delete(c.sockets, key)
57
-
58
-			return
59
-		}
60
-	}
61
-}
62
-
63
-func (c *connectionHub) runConnectionRequest(req *connectionHubRequest) {
64
-	logger := c.logger.Named("request").With("connection-id", req.request.ConnID)
65
-
66
-	for key, conn := range c.sockets {
67
-		delete(c.sockets, key)
68
-
69
-		if !conn.closed() {
70
-			logger.Debugw("Choose connection",
71
-				"id", conn.id,
72
-				"remote_addr", conn.conn.RemoteAddr())
73
-			req.response <- conn
74
-			close(req.response)
75
-
76
-			return
77
-		}
78
-	}
79
-
80
-	if conn, err := newConnection(req.request, c); err == nil {
81
-		logger.Debugw("New connection",
82
-			"id", conn.id,
83
-			"remote_addr", conn.conn.RemoteAddr())
84
-		req.response <- conn
85
-	}
86
-
87
-	close(req.response)
88
-}
89
-
90
-func (c *connectionHub) runBrokenSocket(id int) {
91
-	c.logger.Named("broken-socket").Debugw("Delete broken socket", "id", id)
92
-	delete(c.sockets, id)
93
-}
94
-
95
-func (c *connectionHub) runReturnConnection(conn *connection) {
96
-	c.logger.Named("return-connection").Debugw("Return connection",
97
-		"id", conn.id,
98
-		"remote_addr", conn.conn.RemoteAddr())
99
-
100
-	c.sockets[conn.id] = conn
101
-}
102
-
103
-func newConnectionHub(logger *zap.SugaredLogger) *connectionHub {
104
-	rv := &connectionHub{
105
-		logger:                    logger.Named("connection-hub"),
106
-		sockets:                   map[int]*connection{},
107
-		channelBrokenSockets:      make(chan int, 1),
108
-		channelConnectionRequests: make(chan *connectionHubRequest),
109
-		channelReturnConnections:  make(chan *connection, 1),
110
-	}
111
-	go rv.run()
112
-
113
-	return rv
114
-}

+ 70
- 0
hub/connection_list.go Прегледај датотеку

1
+package hub
2
+
3
+import (
4
+	"fmt"
5
+	"sort"
6
+)
7
+
8
+const connectionListMaxClientsPerConnection = 2
9
+
10
+type connectionList struct {
11
+	connections []*connection
12
+}
13
+
14
+func (c *connectionList) Get(conn *ProxyConn) (*connection, error) {
15
+	if len(c.connections) > 0 {
16
+		c.gc()
17
+	}
18
+
19
+	if len(c.connections) > 0 && c.connections[0].Len() < connectionListMaxClientsPerConnection {
20
+		if err := c.connections[0].Attach(conn); err == nil {
21
+			return c.connections[0], nil
22
+		}
23
+	}
24
+
25
+	newConn, err := newConnection(conn.req)
26
+	if err != nil {
27
+		return nil, fmt.Errorf("cannot allocate a new connection: %w", err)
28
+	}
29
+
30
+	if err = newConn.Attach(conn); err != nil {
31
+		newConn.Close()
32
+		return nil, fmt.Errorf("cannot attach to the newly created connection: %w", err)
33
+	}
34
+
35
+	c.connections = append(c.connections, newConn)
36
+	lastIndex := len(c.connections) - 1
37
+	c.connections[0], c.connections[lastIndex] = c.connections[lastIndex], c.connections[0]
38
+
39
+	return newConn, nil
40
+}
41
+
42
+func (c *connectionList) gc() {
43
+	prevLen := len(c.connections)
44
+
45
+	for i := len(c.connections) - 1; i >= 0; i-- {
46
+		lastIndex := len(c.connections) - 1
47
+
48
+		if c.connections[i].Done() {
49
+			c.connections[i].Close()
50
+
51
+			if len(c.connections)-1 == i {
52
+				c.connections = c.connections[:lastIndex]
53
+			} else {
54
+				c.connections[i], c.connections[lastIndex] = c.connections[lastIndex], c.connections[i]
55
+			}
56
+		}
57
+	}
58
+
59
+	if prevLen != len(c.connections) {
60
+		c.sort()
61
+	}
62
+}
63
+
64
+func (c *connectionList) sort() {
65
+	if len(c.connections) > 1 {
66
+		sort.Slice(c.connections, func(i, j int) bool {
67
+			return c.connections[i].Len() < c.connections[j].Len()
68
+		})
69
+	}
70
+}

+ 0
- 61
hub/ctx_channel.go Прегледај датотеку

1
-package hub
2
-
3
-import (
4
-	"context"
5
-	"time"
6
-
7
-	"github.com/9seconds/mtg/mtproto/rpc"
8
-)
9
-
10
-const closeableChannelReadTimeout = 2 * time.Minute
11
-
12
-type ChannelReadCloser interface {
13
-	Read() (*rpc.ProxyResponse, error)
14
-	Close() error
15
-}
16
-
17
-type ctxChannel struct {
18
-	channel chan *rpc.ProxyResponse
19
-	ctx     context.Context
20
-	cancel  context.CancelFunc
21
-}
22
-
23
-func (c *ctxChannel) Read() (*rpc.ProxyResponse, error) {
24
-	timer := time.NewTimer(closeableChannelReadTimeout)
25
-	defer timer.Stop()
26
-
27
-	select {
28
-	case <-timer.C:
29
-		return nil, ErrTimeout
30
-	case <-c.ctx.Done():
31
-		return nil, ErrClosed
32
-	case packet := <-c.channel:
33
-		return packet, nil
34
-	}
35
-}
36
-
37
-func (c *ctxChannel) sendBack(response *rpc.ProxyResponse) error {
38
-	select {
39
-	case <-c.ctx.Done():
40
-		return ErrClosed
41
-	case c.channel <- response:
42
-		return nil
43
-	}
44
-}
45
-
46
-func (c *ctxChannel) Close() error {
47
-	c.cancel()
48
-	c.channel = nil
49
-
50
-	return nil
51
-}
52
-
53
-func newCtxChannel(ctx context.Context) *ctxChannel {
54
-	ctx, cancel := context.WithCancel(ctx)
55
-
56
-	return &ctxChannel{
57
-		channel: make(chan *rpc.ProxyResponse),
58
-		ctx:     ctx,
59
-		cancel:  cancel,
60
-	}
61
-}

+ 15
- 48
hub/hub.go Прегледај датотеку

1
 package hub
1
 package hub
2
 
2
 
3
 import (
3
 import (
4
-	"encoding/binary"
5
-	"fmt"
6
-	"strings"
4
+	"context"
7
 	"sync"
5
 	"sync"
8
 
6
 
9
-	"go.uber.org/zap"
10
-
11
-	"github.com/9seconds/mtg/conntypes"
12
 	"github.com/9seconds/mtg/protocol"
7
 	"github.com/9seconds/mtg/protocol"
13
 )
8
 )
14
 
9
 
15
 type hub struct {
10
 type hub struct {
16
-	logger *zap.SugaredLogger
17
-	subs   map[string]*connectionHub
18
-	mutex  sync.RWMutex
11
+	muxes map[int32]*mux
12
+	mutex sync.RWMutex
13
+	ctx   context.Context
19
 }
14
 }
20
 
15
 
21
-func (h *hub) Write(packet conntypes.Packet, req *protocol.TelegramRequest) error {
22
-	sub := h.getHub(req)
23
-	connections := make(chan *connection)
24
-	sub.channelConnectionRequests <- &connectionHubRequest{
25
-		request:  req,
26
-		response: connections,
27
-	}
28
-
29
-	conn, ok := <-connections
30
-	if !ok {
31
-		return ErrCannotCreateConnection
32
-	}
33
-
34
-	if err := conn.write(packet); err != nil {
35
-		conn.shutdown()
36
-		return fmt.Errorf("cannot send packet: %w", err)
37
-	}
38
-	sub.channelReturnConnections <- conn
39
-
40
-	return nil
16
+func (h *hub) Register(req *protocol.TelegramRequest) (*ProxyConn, error) {
17
+	return h.getMux(req).Get(req)
41
 }
18
 }
42
 
19
 
43
-func (h *hub) getHub(req *protocol.TelegramRequest) *connectionHub {
44
-	keyBuilder := strings.Builder{}
45
-	binary.Write(&keyBuilder, binary.LittleEndian, int16(req.ClientProtocol.DC())) // nolint: errcheck
46
-	keyBuilder.WriteRune('_')
47
-	binary.Write(&keyBuilder, binary.LittleEndian, uint8(req.ClientProtocol.ConnectionProtocol())) // nolint: errcheck
48
-	key := keyBuilder.String()
20
+func (h *hub) getMux(req *protocol.TelegramRequest) *mux {
21
+	var key int32 = 32767 + int32(req.ClientProtocol.DC()) + 100000*int32(req.ClientProtocol.ConnectionProtocol())
49
 
22
 
50
 	h.mutex.RLock()
23
 	h.mutex.RLock()
51
-	rv, ok := h.subs[key]
24
+	m, ok := h.muxes[key]
52
 	h.mutex.RUnlock()
25
 	h.mutex.RUnlock()
53
 
26
 
54
 	if !ok {
27
 	if !ok {
55
 		h.mutex.Lock()
28
 		h.mutex.Lock()
56
-		defer h.mutex.Unlock()
29
+		m, ok = h.muxes[key]
57
 
30
 
58
-		rv, ok = h.subs[key]
59
 		if !ok {
31
 		if !ok {
60
-			h.logger.Debugw("Create new connection hub",
61
-				"dc", req.ClientProtocol.DC(),
62
-				"protocol", req.ClientProtocol.ConnectionProtocol())
63
-
64
-			rv = newConnectionHub(h.logger.With(
65
-				"dc", req.ClientProtocol.DC(),
66
-				"protocol", req.ClientProtocol.ConnectionProtocol(),
67
-			))
68
-			h.subs[key] = rv
32
+			m = newMux(h.ctx)
33
+			h.muxes[key] = m
69
 		}
34
 		}
35
+
36
+		h.mutex.Unlock()
70
 	}
37
 	}
71
 
38
 
72
-	return rv
39
+	return m
73
 }
40
 }

+ 5
- 14
hub/init.go Прегледај датотеку

4
 	"context"
4
 	"context"
5
 	"errors"
5
 	"errors"
6
 	"sync"
6
 	"sync"
7
-
8
-	"go.uber.org/zap"
9
 )
7
 )
10
 
8
 
11
 var (
9
 var (
12
-	Registry *registry
13
-	Hub      *hub
14
-
15
-	ErrTimeout                = errors.New("timeout")
16
-	ErrClosed                 = errors.New("channel was closed")
17
-	ErrCannotCreateConnection = errors.New("cannot create connection")
10
+	ErrTimeout = errors.New("timeout")
11
+	ErrClosed  = errors.New("context is closed")
18
 
12
 
13
+	Hub      Interface
19
 	initOnce sync.Once
14
 	initOnce sync.Once
20
 )
15
 )
21
 
16
 
22
 func Init(ctx context.Context) {
17
 func Init(ctx context.Context) {
23
 	initOnce.Do(func() {
18
 	initOnce.Do(func() {
24
-		Registry = &registry{
25
-			conns: map[string]*ctxChannel{},
26
-			ctx:   ctx,
27
-		}
28
 		Hub = &hub{
19
 		Hub = &hub{
29
-			subs:   map[string]*connectionHub{},
30
-			logger: zap.S().Named("hub"),
20
+			muxes: make(map[int32]*mux),
21
+			ctx:   ctx,
31
 		}
22
 		}
32
 	})
23
 	})
33
 }
24
 }

+ 7
- 0
hub/interface.go Прегледај датотеку

1
+package hub
2
+
3
+import "github.com/9seconds/mtg/protocol"
4
+
5
+type Interface interface {
6
+	Register(*protocol.TelegramRequest) (*ProxyConn, error)
7
+}

+ 80
- 0
hub/mux.go Прегледај датотеку

1
+package hub
2
+
3
+import (
4
+	"context"
5
+
6
+	"github.com/9seconds/mtg/conntypes"
7
+	"github.com/9seconds/mtg/protocol"
8
+)
9
+
10
+type muxNewRequest struct {
11
+	req  *protocol.TelegramRequest
12
+	resp chan<- muxNewResponse
13
+}
14
+
15
+type muxNewResponse struct {
16
+	conn *ProxyConn
17
+	err  error
18
+}
19
+
20
+type mux struct {
21
+	connections   connectionList
22
+	clients       map[string]*connection
23
+	ctx           context.Context
24
+	channelClosed chan conntypes.ConnID
25
+	channelNew    chan muxNewRequest
26
+}
27
+
28
+func (m *mux) run() {
29
+	for {
30
+		select {
31
+		case <-m.ctx.Done():
32
+			for _, v := range m.clients {
33
+				v.Close()
34
+			}
35
+
36
+			return
37
+		case req := <-m.channelNew:
38
+			proxyConn := newProxyConn(req.req, m.channelClosed)
39
+			conn, err := m.connections.Get(proxyConn)
40
+
41
+			if err == nil {
42
+				m.clients[string(req.req.ConnID[:])] = conn
43
+			}
44
+
45
+			req.resp <- muxNewResponse{
46
+				conn: proxyConn,
47
+				err:  err,
48
+			}
49
+			close(req.resp)
50
+		case connID := <-m.channelClosed:
51
+			if conn, ok := m.clients[string(connID[:])]; ok {
52
+				conn.Detach(connID)
53
+				delete(m.clients, string(connID[:]))
54
+			}
55
+		}
56
+	}
57
+}
58
+
59
+func (m *mux) Get(req *protocol.TelegramRequest) (*ProxyConn, error) {
60
+	resp := make(chan muxNewResponse)
61
+	m.channelNew <- muxNewRequest{
62
+		req:  req,
63
+		resp: resp,
64
+	}
65
+	rv := <-resp
66
+
67
+	return rv.conn, rv.err
68
+}
69
+
70
+func newMux(ctx context.Context) *mux {
71
+	m := &mux{
72
+		ctx:           ctx,
73
+		clients:       make(map[string]*connection),
74
+		channelClosed: make(chan conntypes.ConnID, 1),
75
+		channelNew:    make(chan muxNewRequest),
76
+	}
77
+	go m.run()
78
+
79
+	return m
80
+}

+ 77
- 0
hub/proxy_conn.go Прегледај датотеку

1
+package hub
2
+
3
+import (
4
+	"sync"
5
+	"time"
6
+
7
+	"github.com/9seconds/mtg/conntypes"
8
+	"github.com/9seconds/mtg/mtproto/rpc"
9
+	"github.com/9seconds/mtg/protocol"
10
+)
11
+
12
+const (
13
+	proxyConnWriteTimeout = 2 * time.Minute
14
+	proxyConnReadTimeout  = 2 * time.Minute
15
+)
16
+
17
+type ProxyConn struct {
18
+	closeOnce       sync.Once
19
+	req             *protocol.TelegramRequest
20
+	channelResponse chan *rpc.ProxyResponse
21
+	channelClosed   chan<- conntypes.ConnID
22
+	channelWrite    chan<- conntypes.Packet
23
+	channelDone     chan struct{}
24
+}
25
+
26
+func (p *ProxyConn) Read() (*rpc.ProxyResponse, error) {
27
+	timer := time.NewTimer(proxyConnReadTimeout)
28
+	defer timer.Stop()
29
+
30
+	select {
31
+	case <-timer.C:
32
+		return nil, ErrTimeout
33
+	case <-p.channelDone:
34
+		return nil, ErrClosed
35
+	case packet := <-p.channelResponse:
36
+		return packet, nil
37
+	}
38
+}
39
+
40
+func (p *ProxyConn) Write(packet conntypes.Packet) error {
41
+	timer := time.NewTimer(proxyConnWriteTimeout)
42
+	defer timer.Stop()
43
+
44
+	select {
45
+	case <-timer.C:
46
+		return ErrTimeout
47
+	case <-p.channelDone:
48
+		return ErrClosed
49
+	case p.channelWrite <- packet:
50
+		return nil
51
+	}
52
+}
53
+
54
+func (p *ProxyConn) put(response *rpc.ProxyResponse) {
55
+	select {
56
+	case <-p.channelDone:
57
+	case p.channelResponse <- response:
58
+	}
59
+}
60
+
61
+func (p *ProxyConn) Close() {
62
+	p.closeOnce.Do(func() {
63
+		close(p.channelDone)
64
+		go func() {
65
+			p.channelClosed <- p.req.ConnID
66
+		}()
67
+	})
68
+}
69
+
70
+func newProxyConn(req *protocol.TelegramRequest, channelClosed chan<- conntypes.ConnID) *ProxyConn {
71
+	return &ProxyConn{
72
+		channelResponse: make(chan *rpc.ProxyResponse),
73
+		channelDone:     make(chan struct{}),
74
+		channelClosed:   channelClosed,
75
+		req:             req,
76
+	}
77
+}

+ 0
- 45
hub/registry.go Прегледај датотеку

1
-package hub
2
-
3
-import (
4
-	"context"
5
-	"sync"
6
-
7
-	"github.com/9seconds/mtg/conntypes"
8
-)
9
-
10
-type registry struct {
11
-	conns map[string]*ctxChannel
12
-	ctx   context.Context
13
-	mutex sync.RWMutex
14
-}
15
-
16
-func (r *registry) Register(id conntypes.ConnID) ChannelReadCloser {
17
-	channel := newCtxChannel(r.ctx)
18
-
19
-	r.mutex.Lock()
20
-	r.conns[string(id[:])] = channel
21
-	r.mutex.Unlock()
22
-
23
-	return channel
24
-}
25
-
26
-func (r *registry) Unregister(id conntypes.ConnID) {
27
-	r.mutex.Lock()
28
-	defer r.mutex.Unlock()
29
-
30
-	if channel, ok := r.conns[string(id[:])]; ok {
31
-		channel.Close()
32
-		delete(r.conns, string(id[:]))
33
-	}
34
-}
35
-
36
-func (r *registry) getChannel(id conntypes.ConnID) (*ctxChannel, bool) {
37
-	r.mutex.RLock()
38
-	defer r.mutex.RUnlock()
39
-
40
-	if value, ok := r.conns[string(id[:])]; ok {
41
-		return value, true
42
-	}
43
-
44
-	return nil, false
45
-}

+ 38
- 38
main.go Прегледај датотеку

29
 		Required().
29
 		Required().
30
 		Enum("simple", "secured", "tls")
30
 		Enum("simple", "secured", "tls")
31
 
31
 
32
-	proxyCommand = app.Command("proxy",
32
+	runCommand = app.Command("run",
33
 		"Run new proxy instance")
33
 		"Run new proxy instance")
34
-	proxyDebug = proxyCommand.Flag("debug",
34
+	runDebug = runCommand.Flag("debug",
35
 		"Run in debug mode.").
35
 		"Run in debug mode.").
36
 		Short('d').
36
 		Short('d').
37
 		Envar("MTG_DEBUG").
37
 		Envar("MTG_DEBUG").
38
 		Bool()
38
 		Bool()
39
-	proxyVerbose = proxyCommand.Flag("verbose",
39
+	runVerbose = runCommand.Flag("verbose",
40
 		"Run in verbose mode.").
40
 		"Run in verbose mode.").
41
 		Short('v').
41
 		Short('v').
42
 		Envar("MTG_VERBOSE").
42
 		Envar("MTG_VERBOSE").
43
 		Bool()
43
 		Bool()
44
-	proxyBind = proxyCommand.Flag("bind",
44
+	runBind = runCommand.Flag("bind",
45
 		"Host:Port to bind proxy to.").
45
 		"Host:Port to bind proxy to.").
46
 		Short('b').
46
 		Short('b').
47
 		Envar("MTG_BIND").
47
 		Envar("MTG_BIND").
48
 		Default("0.0.0.0:3128").
48
 		Default("0.0.0.0:3128").
49
 		TCP()
49
 		TCP()
50
-	proxyPublicIPv4 = proxyCommand.Flag("public-ipv4",
50
+	runPublicIPv4 = runCommand.Flag("public-ipv4",
51
 		"Which IPv4 host:port to use.").
51
 		"Which IPv4 host:port to use.").
52
 		Short('4').
52
 		Short('4').
53
 		Envar("MTG_IPV4").
53
 		Envar("MTG_IPV4").
54
 		TCP()
54
 		TCP()
55
-	proxyPublicIPv6 = proxyCommand.Flag("public-ipv6",
55
+	runPublicIPv6 = runCommand.Flag("public-ipv6",
56
 		"Which IPv6 host:port to use.").
56
 		"Which IPv6 host:port to use.").
57
 		Short('6').
57
 		Short('6').
58
 		Envar("MTG_IPV6").
58
 		Envar("MTG_IPV6").
59
 		TCP()
59
 		TCP()
60
-	proxyStatsBind = proxyCommand.Flag("stats-bind",
60
+	runStatsBind = runCommand.Flag("stats-bind",
61
 		"Which Host:Port to bind stats server to.").
61
 		"Which Host:Port to bind stats server to.").
62
 		Short('t').
62
 		Short('t').
63
 		Envar("MTG_STATS_BIND").
63
 		Envar("MTG_STATS_BIND").
64
 		Default("127.0.0.1:3129").
64
 		Default("127.0.0.1:3129").
65
 		TCP()
65
 		TCP()
66
-	proxyStatsNamespace = proxyCommand.Flag("stats-namespace",
66
+	runStatsNamespace = runCommand.Flag("stats-namespace",
67
 		"Which namespace to use for Prometheus.").
67
 		"Which namespace to use for Prometheus.").
68
 		Envar("MTG_STATS_NAMESPACE").
68
 		Envar("MTG_STATS_NAMESPACE").
69
 		Default("mtg").
69
 		Default("mtg").
70
 		String()
70
 		String()
71
-	proxyStatsdAddress = proxyCommand.Flag("statsd-addr",
71
+	runStatsdAddress = runCommand.Flag("statsd-addr",
72
 		"Host:port of statsd server").
72
 		"Host:port of statsd server").
73
 		Envar("MTG_STATSD_ADDR").
73
 		Envar("MTG_STATSD_ADDR").
74
 		TCP()
74
 		TCP()
75
-	proxyStatsdNetwork = proxyCommand.Flag("statsd-network",
75
+	runStatsdNetwork = runCommand.Flag("statsd-network",
76
 		"Which network is used to work with statsd. Only 'tcp' and 'udp' are supported.").
76
 		"Which network is used to work with statsd. Only 'tcp' and 'udp' are supported.").
77
 		Envar("MTG_STATSD_NETWORK").
77
 		Envar("MTG_STATSD_NETWORK").
78
 		Default("udp").
78
 		Default("udp").
79
 		Enum("udp", "tcp")
79
 		Enum("udp", "tcp")
80
-	proxyStatsdTagsFormat = proxyCommand.Flag("statsd-tags-format",
80
+	runStatsdTagsFormat = runCommand.Flag("statsd-tags-format",
81
 		"Which tag format should we use to send stats metrics. Valid options are 'datadog' and 'influxdb'.").
81
 		"Which tag format should we use to send stats metrics. Valid options are 'datadog' and 'influxdb'.").
82
 		Envar("MTG_STATSD_TAGS_FORMAT").
82
 		Envar("MTG_STATSD_TAGS_FORMAT").
83
 		Default("influxdb").
83
 		Default("influxdb").
84
 		Enum("datadog", "influxdb")
84
 		Enum("datadog", "influxdb")
85
-	proxyStatsdTags = proxyCommand.Flag("statsd-tags",
85
+	runStatsdTags = runCommand.Flag("statsd-tags",
86
 		"Tags to use for working with statsd (specified as 'key=value').").
86
 		"Tags to use for working with statsd (specified as 'key=value').").
87
 		Envar("MTG_STATSD_TAGS").
87
 		Envar("MTG_STATSD_TAGS").
88
 		StringMap()
88
 		StringMap()
89
-	proxyWriteBufferSize = proxyCommand.Flag("write-buffer",
89
+	runWriteBufferSize = runCommand.Flag("write-buffer",
90
 		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
90
 		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
91
 		Short('w').
91
 		Short('w').
92
 		Envar("MTG_BUFFER_WRITE").
92
 		Envar("MTG_BUFFER_WRITE").
93
 		Default("65536KB").
93
 		Default("65536KB").
94
 		Bytes()
94
 		Bytes()
95
-	proxyReadBufferSize = proxyCommand.Flag("read-buffer",
95
+	runReadBufferSize = runCommand.Flag("read-buffer",
96
 		"Read buffer size in bytes. You can think about it as a buffer from Telegram to client.").
96
 		"Read buffer size in bytes. You can think about it as a buffer from Telegram to client.").
97
 		Short('r').
97
 		Short('r').
98
 		Envar("MTG_BUFFER_READ").
98
 		Envar("MTG_BUFFER_READ").
99
 		Default("131072KB").
99
 		Default("131072KB").
100
 		Bytes()
100
 		Bytes()
101
-	proxyTLSCloakPort = proxyCommand.Flag("cloak-port",
101
+	runTLSCloakPort = runCommand.Flag("cloak-port",
102
 		"Port which should be used for host cloaking.").
102
 		"Port which should be used for host cloaking.").
103
 		Envar("MTG_CLOAK_PORT").
103
 		Envar("MTG_CLOAK_PORT").
104
 		Default("443").
104
 		Default("443").
105
 		Uint16()
105
 		Uint16()
106
-	proxyAntiReplayMaxSize = proxyCommand.Flag("anti-replay-max-size",
106
+	runAntiReplayMaxSize = runCommand.Flag("anti-replay-max-size",
107
 		"Max size of antireplay cache in megabytes.").
107
 		"Max size of antireplay cache in megabytes.").
108
 		Envar("MTG_ANTIREPLAY_MAXSIZE").
108
 		Envar("MTG_ANTIREPLAY_MAXSIZE").
109
 		Default("128").
109
 		Default("128").
110
 		Int()
110
 		Int()
111
-	proxyAntiReplayEvictionTime = proxyCommand.Flag("anti-replay-eviction-time",
111
+	runAntiReplayEvictionTime = runCommand.Flag("anti-replay-eviction-time",
112
 		"Eviction time period for obfuscated2 handshakes").
112
 		"Eviction time period for obfuscated2 handshakes").
113
 		Envar("MTG_ANTIREPLAY_EVICTIONTIME").
113
 		Envar("MTG_ANTIREPLAY_EVICTIONTIME").
114
 		Default("168h").
114
 		Default("168h").
115
 		Duration()
115
 		Duration()
116
-	proxySecret = proxyCommand.Arg("secret", "Secret of this proxy.").Required().HexBytes()
117
-	proxyAdtag  = proxyCommand.Arg("adtag", "ADTag of the proxy.").HexBytes()
116
+	runSecret = runCommand.Arg("secret", "Secret of this proxy.").Required().HexBytes()
117
+	runAdtag  = runCommand.Arg("adtag", "ADTag of the proxy.").HexBytes()
118
 )
118
 )
119
 
119
 
120
 func main() {
120
 func main() {
129
 	switch kingpin.MustParse(app.Parse(os.Args[1:])) {
129
 	switch kingpin.MustParse(app.Parse(os.Args[1:])) {
130
 	case generateSecretCommand.FullCommand():
130
 	case generateSecretCommand.FullCommand():
131
 		cli.Generate(*generateSecretType, *generateCloakHost)
131
 		cli.Generate(*generateSecretType, *generateCloakHost)
132
-	case proxyCommand.FullCommand():
132
+	case runCommand.FullCommand():
133
 		err := config.Init(
133
 		err := config.Init(
134
-			config.Opt{Option: config.OptionTypeDebug, Value: *proxyDebug},
135
-			config.Opt{Option: config.OptionTypeVerbose, Value: *proxyVerbose},
136
-			config.Opt{Option: config.OptionTypeBind, Value: *proxyBind},
137
-			config.Opt{Option: config.OptionTypePublicIPv4, Value: *proxyPublicIPv4},
138
-			config.Opt{Option: config.OptionTypePublicIPv6, Value: *proxyPublicIPv6},
139
-			config.Opt{Option: config.OptionTypeStatsBind, Value: *proxyStatsBind},
140
-			config.Opt{Option: config.OptionTypeStatsNamespace, Value: *proxyStatsNamespace},
141
-			config.Opt{Option: config.OptionTypeStatsdAddress, Value: *proxyStatsdAddress},
142
-			config.Opt{Option: config.OptionTypeStatsdNetwork, Value: *proxyStatsdNetwork},
143
-			config.Opt{Option: config.OptionTypeStatsdTagsFormat, Value: *proxyStatsdTagsFormat},
144
-			config.Opt{Option: config.OptionTypeStatsdTags, Value: *proxyStatsdTags},
145
-			config.Opt{Option: config.OptionTypeWriteBufferSize, Value: *proxyWriteBufferSize},
146
-			config.Opt{Option: config.OptionTypeReadBufferSize, Value: *proxyReadBufferSize},
147
-			config.Opt{Option: config.OptionTypeCloakPort, Value: *proxyTLSCloakPort},
148
-			config.Opt{Option: config.OptionTypeAntiReplayMaxSize, Value: *proxyAntiReplayMaxSize},
149
-			config.Opt{Option: config.OptionTypeAntiReplayEvictionTime, Value: *proxyAntiReplayEvictionTime},
150
-			config.Opt{Option: config.OptionTypeSecret, Value: *proxySecret},
151
-			config.Opt{Option: config.OptionTypeAdtag, Value: *proxyAdtag},
134
+			config.Opt{Option: config.OptionTypeDebug, Value: *runDebug},
135
+			config.Opt{Option: config.OptionTypeVerbose, Value: *runVerbose},
136
+			config.Opt{Option: config.OptionTypeBind, Value: *runBind},
137
+			config.Opt{Option: config.OptionTypePublicIPv4, Value: *runPublicIPv4},
138
+			config.Opt{Option: config.OptionTypePublicIPv6, Value: *runPublicIPv6},
139
+			config.Opt{Option: config.OptionTypeStatsBind, Value: *runStatsBind},
140
+			config.Opt{Option: config.OptionTypeStatsNamespace, Value: *runStatsNamespace},
141
+			config.Opt{Option: config.OptionTypeStatsdAddress, Value: *runStatsdAddress},
142
+			config.Opt{Option: config.OptionTypeStatsdNetwork, Value: *runStatsdNetwork},
143
+			config.Opt{Option: config.OptionTypeStatsdTagsFormat, Value: *runStatsdTagsFormat},
144
+			config.Opt{Option: config.OptionTypeStatsdTags, Value: *runStatsdTags},
145
+			config.Opt{Option: config.OptionTypeWriteBufferSize, Value: *runWriteBufferSize},
146
+			config.Opt{Option: config.OptionTypeReadBufferSize, Value: *runReadBufferSize},
147
+			config.Opt{Option: config.OptionTypeCloakPort, Value: *runTLSCloakPort},
148
+			config.Opt{Option: config.OptionTypeAntiReplayMaxSize, Value: *runAntiReplayMaxSize},
149
+			config.Opt{Option: config.OptionTypeAntiReplayEvictionTime, Value: *runAntiReplayEvictionTime},
150
+			config.Opt{Option: config.OptionTypeSecret, Value: *runSecret},
151
+			config.Opt{Option: config.OptionTypeAdtag, Value: *runAdtag},
152
 		)
152
 		)
153
 		if err != nil {
153
 		if err != nil {
154
 			cli.Fatal(err)
154
 			cli.Fatal(err)

+ 4
- 4
obfuscated2/client_protocol.go Прегледај датотеку

81
 		c.dc = conntypes.DCDefaultIdx
81
 		c.dc = conntypes.DCDefaultIdx
82
 	}
82
 	}
83
 
83
 
84
-	antiReplayKey := decryptedFrame.Unique()
85
-	if antireplay.Cache.HasObfuscated2(antiReplayKey) {
86
-		stats.Stats.AntiReplayDetected()
84
+	replayKeys := decryptedFrame.Unique()
85
+	if antireplay.Cache.HasObfuscated2(replayKeys) {
86
+		stats.Stats.ReplayDetected()
87
 		return nil, errors.New("replay attack is detected")
87
 		return nil, errors.New("replay attack is detected")
88
 	}
88
 	}
89
 
89
 
90
-	antireplay.Cache.AddObfuscated2(antiReplayKey)
90
+	antireplay.Cache.AddObfuscated2(replayKeys)
91
 
91
 
92
 	return stream.NewObfuscated2(socket, encryptor, decryptor), nil
92
 	return stream.NewObfuscated2(socket, encryptor, decryptor), nil
93
 }
93
 }

+ 5
- 1
proxy/middle.go Прегледај датотеку

11
 )
11
 )
12
 
12
 
13
 func middleConnection(request *protocol.TelegramRequest) {
13
 func middleConnection(request *protocol.TelegramRequest) {
14
-	telegramConn := packetack.NewProxy(request)
14
+	telegramConn, err := packetack.NewProxy(request)
15
+	if err != nil {
16
+		request.Logger.Debugw("Cannot dial to Telegram", "error", err)
17
+		return
18
+	}
15
 	defer telegramConn.Close()
19
 	defer telegramConn.Close()
16
 
20
 
17
 	var clientConn conntypes.PacketAckFullReadWriteCloser
21
 	var clientConn conntypes.PacketAckFullReadWriteCloser

+ 3
- 3
stats/interfaces.go Прегледај датотеку

34
 	Crash()
34
 	Crash()
35
 }
35
 }
36
 
36
 
37
-type AntiReplayDetectedInterface interface {
38
-	AntiReplayDetected()
37
+type ReplayDetectedInterface interface {
38
+	ReplayDetected()
39
 }
39
 }
40
 
40
 
41
 type Interface interface {
41
 type Interface interface {
46
 	TelegramConnectedInterface
46
 	TelegramConnectedInterface
47
 	TelegramDisconnectedInterface
47
 	TelegramDisconnectedInterface
48
 	CrashInterface
48
 	CrashInterface
49
-	AntiReplayDetectedInterface
49
+	ReplayDetectedInterface
50
 }
50
 }

+ 2
- 2
stats/multi_stats.go Прегледај датотеку

50
 	}
50
 	}
51
 }
51
 }
52
 
52
 
53
-func (m multiStats) AntiReplayDetected() {
53
+func (m multiStats) ReplayDetected() {
54
 	for i := range m {
54
 	for i := range m {
55
-		go m[i].AntiReplayDetected()
55
+		go m[i].ReplayDetected()
56
 	}
56
 	}
57
 }
57
 }

+ 8
- 8
stats/stats_prometheus.go Прегледај датотеку

18
 	telegramConnections *prometheus.GaugeVec
18
 	telegramConnections *prometheus.GaugeVec
19
 	traffic             *prometheus.GaugeVec
19
 	traffic             *prometheus.GaugeVec
20
 	crashes             prometheus.Gauge
20
 	crashes             prometheus.Gauge
21
-	antiReplays         prometheus.Counter
21
+	replayAttacks       prometheus.Counter
22
 }
22
 }
23
 
23
 
24
 func (s *statsPrometheus) IngressTraffic(traffic int) {
24
 func (s *statsPrometheus) IngressTraffic(traffic int) {
84
 	s.crashes.Inc()
84
 	s.crashes.Inc()
85
 }
85
 }
86
 
86
 
87
-func (s *statsPrometheus) AntiReplayDetected() {
88
-	s.antiReplays.Inc()
87
+func (s *statsPrometheus) ReplayDetected() {
88
+	s.replayAttacks.Inc()
89
 }
89
 }
90
 
90
 
91
 func newStatsPrometheus(mux *http.ServeMux) (Interface, error) {
91
 func newStatsPrometheus(mux *http.ServeMux) (Interface, error) {
112
 			Name:      "crashes",
112
 			Name:      "crashes",
113
 			Help:      "How many crashes happened.",
113
 			Help:      "How many crashes happened.",
114
 		}),
114
 		}),
115
-		antiReplays: prometheus.NewCounter(prometheus.CounterOpts{
115
+		replayAttacks: prometheus.NewCounter(prometheus.CounterOpts{
116
 			Namespace: config.C.StatsNamespace,
116
 			Namespace: config.C.StatsNamespace,
117
-			Name:      "anti_replays",
118
-			Help:      "How many anti replay attacks were prevented.",
117
+			Name:      "replay_attacks",
118
+			Help:      "How many replay attacks were prevented.",
119
 		}),
119
 		}),
120
 	}
120
 	}
121
 
121
 
135
 		return nil, fmt.Errorf("cannot register metrics for crashes: %w", err)
135
 		return nil, fmt.Errorf("cannot register metrics for crashes: %w", err)
136
 	}
136
 	}
137
 
137
 
138
-	if err := registry.Register(instance.antiReplays); err != nil {
139
-		return nil, fmt.Errorf("cannot register metrics for anti replays: %w", err)
138
+	if err := registry.Register(instance.replayAttacks); err != nil {
139
+		return nil, fmt.Errorf("cannot register metrics for replays: %w", err)
140
 	}
140
 	}
141
 
141
 
142
 	handler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
142
 	handler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})

+ 2
- 2
stats/stats_statsd.go Прегледај датотеку

79
 	s.client.Increment("crashes")
79
 	s.client.Increment("crashes")
80
 }
80
 }
81
 
81
 
82
-func (s *statsStatsd) AntiReplayDetected() {
83
-	s.client.Increment("anti_replays")
82
+func (s *statsStatsd) ReplayDetected() {
83
+	s.client.Increment("replay_attacks")
84
 }
84
 }
85
 
85
 
86
 func newStatsStatsd() (Interface, error) {
86
 func newStatsStatsd() (Interface, error) {

+ 12
- 13
wrappers/packetack/proxy.go Прегледај датотеку

5
 	"encoding/binary"
5
 	"encoding/binary"
6
 	"fmt"
6
 	"fmt"
7
 	"net"
7
 	"net"
8
-	"sync"
9
 
8
 
10
 	"github.com/9seconds/mtg/config"
9
 	"github.com/9seconds/mtg/config"
11
 	"github.com/9seconds/mtg/conntypes"
10
 	"github.com/9seconds/mtg/conntypes"
16
 
15
 
17
 type wrapperProxy struct {
16
 type wrapperProxy struct {
18
 	request      *protocol.TelegramRequest
17
 	request      *protocol.TelegramRequest
18
+	proxy        *hub.ProxyConn
19
 	clientIPPort []byte
19
 	clientIPPort []byte
20
 	ourIPPort    []byte
20
 	ourIPPort    []byte
21
-	channelRead  hub.ChannelReadCloser
22
-	closeOnce    sync.Once
23
 	flags        rpc.ProxyRequestFlags
21
 	flags        rpc.ProxyRequestFlags
24
 }
22
 }
25
 
23
 
47
 	buf.Write(make([]byte, (4-buf.Len()%4)%4))
45
 	buf.Write(make([]byte, (4-buf.Len()%4)%4))
48
 	buf.Write(packet)
46
 	buf.Write(packet)
49
 
47
 
50
-	return hub.Hub.Write(buf.Bytes(), w.request)
48
+	return w.proxy.Write(buf.Bytes())
51
 }
49
 }
52
 
50
 
53
 func (w *wrapperProxy) Read(acks *conntypes.ConnectionAcks) (conntypes.Packet, error) {
51
 func (w *wrapperProxy) Read(acks *conntypes.ConnectionAcks) (conntypes.Packet, error) {
54
-	resp, err := w.channelRead.Read()
52
+	resp, err := w.proxy.Read()
55
 	if err != nil {
53
 	if err != nil {
56
 		return nil, fmt.Errorf("cannot read a response: %w", err)
54
 		return nil, fmt.Errorf("cannot read a response: %w", err)
57
 	}
55
 	}
64
 }
62
 }
65
 
63
 
66
 func (w *wrapperProxy) Close() error {
64
 func (w *wrapperProxy) Close() error {
67
-	w.closeOnce.Do(func() {
68
-		w.channelRead.Close()
69
-		hub.Registry.Unregister(w.request.ConnID)
70
-	})
71
-
65
+	w.proxy.Close()
72
 	return nil
66
 	return nil
73
 }
67
 }
74
 
68
 
75
-func NewProxy(request *protocol.TelegramRequest) conntypes.PacketAckReadWriteCloser {
69
+func NewProxy(request *protocol.TelegramRequest) (conntypes.PacketAckReadWriteCloser, error) {
76
 	flags := rpc.ProxyRequestFlagsHasAdTag | rpc.ProxyRequestFlagsMagic | rpc.ProxyRequestFlagsExtMode2
70
 	flags := rpc.ProxyRequestFlagsHasAdTag | rpc.ProxyRequestFlagsMagic | rpc.ProxyRequestFlagsExtMode2
77
 
71
 
78
 	switch request.ClientProtocol.ConnectionType() {
72
 	switch request.ClientProtocol.ConnectionType() {
86
 		panic("unknown connection type")
80
 		panic("unknown connection type")
87
 	}
81
 	}
88
 
82
 
83
+	proxy, err := hub.Hub.Register(request)
84
+	if err != nil {
85
+		return nil, fmt.Errorf("cannot make a new proxy wrapper: %w", err)
86
+	}
87
+
89
 	return &wrapperProxy{
88
 	return &wrapperProxy{
90
 		flags:        flags,
89
 		flags:        flags,
91
 		request:      request,
90
 		request:      request,
92
-		channelRead:  hub.Registry.Register(request.ConnID),
91
+		proxy:        proxy,
93
 		clientIPPort: proxyGetIPPort(request.ClientConn.RemoteAddr()),
92
 		clientIPPort: proxyGetIPPort(request.ClientConn.RemoteAddr()),
94
 		ourIPPort:    proxyGetIPPort(request.ClientConn.LocalAddr()),
93
 		ourIPPort:    proxyGetIPPort(request.ClientConn.LocalAddr()),
95
-	}
94
+	}, nil
96
 }
95
 }
97
 
96
 
98
 func proxyGetIPPort(addr *net.TCPAddr) []byte {
97
 func proxyGetIPPort(addr *net.TCPAddr) []byte {

Loading…
Откажи
Сачувај