Преглед изворни кода

Proxy is working but unstable

tags/0.9
9seconds пре 7 година
родитељ
комит
88faeb7195

+ 1
- 1
client/client.go Прегледај датотеку

@@ -9,4 +9,4 @@ import (
9 9
 )
10 10
 
11 11
 // Init has to initialize client connection based on given config.
12
-type Init func(net.Conn, *config.Config) (*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr, error)
12
+type Init func(net.Conn, *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error)

+ 2
- 2
client/direct.go Прегледај датотеку

@@ -15,7 +15,7 @@ import (
15 15
 const handshakeTimeout = 10 * time.Second
16 16
 
17 17
 // DirectInit initializes client to access Telegram bypassing middleproxies.
18
-func DirectInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr, error) {
18
+func DirectInit(conn net.Conn, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
19 19
 	if err := config.SetSocketOptions(conn); err != nil {
20 20
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
21 21
 	}
@@ -37,5 +37,5 @@ func DirectInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, wr
37 37
 	socket := wrappers.NewTimeoutRWC(conn, conf.PublicIPv4, conf.PublicIPv6)
38 38
 	socket = wrappers.NewStreamCipherRWC(socket, obfs2.Encryptor, obfs2.Decryptor)
39 39
 
40
-	return connOpts, socket, nil
40
+	return socket, connOpts, nil
41 41
 }

+ 3
- 3
client/middle.go Прегледај датотеку

@@ -9,8 +9,8 @@ import (
9 9
 	"github.com/9seconds/mtg/wrappers"
10 10
 )
11 11
 
12
-func MiddleInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr, error) {
13
-	opts, newConn, err := DirectInit(conn, conf)
12
+func MiddleInit(conn net.Conn, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
13
+	newConn, opts, err := DirectInit(conn, conf)
14 14
 	if err != nil {
15 15
 		return nil, nil, err
16 16
 	}
@@ -26,5 +26,5 @@ func MiddleInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, wr
26 26
 		opts.ConnectionProto = mtproto.ConnectionProtocolIPv6
27 27
 	}
28 28
 
29
-	return opts, newConn, nil
29
+	return newConn, opts, nil
30 30
 }

+ 7
- 2
mtproto/connection_options.go Прегледај датотеку

@@ -13,14 +13,19 @@ type ConnectionType uint8
13 13
 
14 14
 type ConnectionProtocol uint8
15 15
 
16
+type Hacks struct {
17
+	SimpleAck bool
18
+	QuickAck  bool
19
+}
20
+
16 21
 // ConnectionOpts presents an options, metadata on connection requested
17 22
 // by the user on handshake.
18 23
 type ConnectionOpts struct {
19 24
 	DC              int16
20 25
 	ConnectionType  ConnectionType
21 26
 	ConnectionProto ConnectionProtocol
22
-	QuickAck        bool
23
-	SimpleAck       bool
27
+	ReadHacks       Hacks
28
+	WriteHacks      Hacks
24 29
 	ClientAddr      *net.TCPAddr
25 30
 }
26 31
 

+ 1
- 1
mtproto/rpc/proxy_request.go Прегледај датотеку

@@ -24,7 +24,7 @@ func (r *ProxyRequest) Bytes(message []byte) []byte {
24 24
 	buf := &bytes.Buffer{}
25 25
 
26 26
 	flags := r.Flags
27
-	if r.Options.QuickAck {
27
+	if r.Options.ReadHacks.QuickAck {
28 28
 		flags |= proxyRequestFlagsQuickAck
29 29
 	}
30 30
 

+ 7
- 6
mtproto/wrappers/abridged.go Прегледај датотеку

@@ -28,9 +28,6 @@ type AbridgedReadWriteCloserWithAddr struct {
28 28
 
29 29
 func (a *AbridgedReadWriteCloserWithAddr) Read(p []byte) (int, error) {
30 30
 	return a.BufferedRead(p, func() error {
31
-		a.opts.QuickAck = false
32
-		a.opts.SimpleAck = false
33
-
34 31
 		buf := &bytes.Buffer{}
35 32
 		buf.Grow(3)
36 33
 
@@ -41,7 +38,7 @@ func (a *AbridgedReadWriteCloserWithAddr) Read(p []byte) (int, error) {
41 38
 		buf.Reset()
42 39
 
43 40
 		if msgLength >= abridgedQuickAckLength {
44
-			a.opts.QuickAck = true
41
+			a.opts.ReadHacks.QuickAck = true
45 42
 			msgLength -= 0x80
46 43
 		}
47 44
 
@@ -56,9 +53,13 @@ func (a *AbridgedReadWriteCloserWithAddr) Read(p []byte) (int, error) {
56 53
 		}
57 54
 		msgLength32 *= 4
58 55
 
59
-		if _, err := io.CopyN(a.Buffer, a.conn, int64(msgLength32)); err != nil {
56
+		buf.Reset()
57
+		buf.Grow(int(msgLength32))
58
+
59
+		if _, err := io.CopyN(buf, a.conn, int64(msgLength32)); err != nil {
60 60
 			return errors.Annotate(err, "Cannot read message")
61 61
 		}
62
+		a.Buffer.Write(buf.Bytes())
62 63
 
63 64
 		return nil
64 65
 	})
@@ -68,7 +69,7 @@ func (a *AbridgedReadWriteCloserWithAddr) Write(p []byte) (int, error) {
68 69
 	if len(p)%4 != 0 {
69 70
 		return 0, errors.Errorf("Incorrect packet length %d", len(p))
70 71
 	}
71
-	if a.opts.SimpleAck {
72
+	if a.opts.WriteHacks.SimpleAck {
72 73
 		return a.conn.Write(reverseBytes(p))
73 74
 	}
74 75
 

+ 2
- 5
mtproto/wrappers/intermediate.go Прегледај датотеку

@@ -23,9 +23,6 @@ type IntermediateReadWriteCloserWithAddr struct {
23 23
 
24 24
 func (i *IntermediateReadWriteCloserWithAddr) Read(p []byte) (int, error) {
25 25
 	return i.BufferedRead(p, func() error {
26
-		i.opts.QuickAck = false
27
-		i.opts.SimpleAck = false
28
-
29 26
 		buf := &bytes.Buffer{}
30 27
 		buf.Grow(4)
31 28
 
@@ -37,7 +34,7 @@ func (i *IntermediateReadWriteCloserWithAddr) Read(p []byte) (int, error) {
37 34
 		buf.Grow(int(length))
38 35
 
39 36
 		if length > intermediateQuickAckLength {
40
-			i.opts.QuickAck = true
37
+			i.opts.ReadHacks.QuickAck = true
41 38
 			length -= intermediateQuickAckLength
42 39
 		}
43 40
 
@@ -58,7 +55,7 @@ func (i *IntermediateReadWriteCloserWithAddr) Read(p []byte) (int, error) {
58 55
 }
59 56
 
60 57
 func (i *IntermediateReadWriteCloserWithAddr) Write(p []byte) (int, error) {
61
-	if i.opts.SimpleAck {
58
+	if i.opts.WriteHacks.SimpleAck {
62 59
 		return i.conn.Write(p)
63 60
 	}
64 61
 

+ 19
- 19
mtproto/wrappers/proxy_request.go Прегледај датотеку

@@ -22,23 +22,21 @@ type ProxyRequestReadWriteCloserWithAddr struct {
22 22
 
23 23
 func (p *ProxyRequestReadWriteCloserWithAddr) Read(buf []byte) (int, error) {
24 24
 	return p.BufferedRead(buf, func() error {
25
-		ansBuf := &bytes.Buffer{}
26
-		ansBuf.Grow(4)
27
-
28
-		if _, err := io.CopyN(ansBuf, p.conn, 4); err != nil {
25
+		ans := make([]byte, 4)
26
+		if _, err := io.ReadFull(p.conn, ans); err != nil {
29 27
 			return errors.Annotate(err, "Cannot read RPC tag")
30 28
 		}
31 29
 
32 30
 		switch {
33
-		case bytes.Equal(ansBuf.Bytes(), rpc.TagCloseExt):
34
-			return p.readCloseExt()
35
-		case bytes.Equal(ansBuf.Bytes(), rpc.TagProxyAns):
31
+		case bytes.Equal(ans, rpc.TagProxyAns):
36 32
 			return p.readProxyAns(buf)
37
-		case bytes.Equal(ansBuf.Bytes(), rpc.TagSimpleAck):
33
+		case bytes.Equal(ans, rpc.TagSimpleAck):
38 34
 			return p.readSimpleAck()
35
+		case bytes.Equal(ans, rpc.TagCloseExt):
36
+			return p.readCloseExt()
39 37
 		}
40 38
 
41
-		return errors.Errorf("Unknown RPC answer %s", ansBuf.Bytes())
39
+		return errors.Errorf("Unknown RPC answer %v", ans)
42 40
 	})
43 41
 }
44 42
 
@@ -46,21 +44,21 @@ func (p *ProxyRequestReadWriteCloserWithAddr) readCloseExt() error {
46 44
 	return errors.New("Connection has been closed remotely")
47 45
 }
48 46
 
49
-func (p *ProxyRequestReadWriteCloserWithAddr) readProxyAns(buf []byte) error {
50
-	if _, err := io.CopyN(ioutil.Discard, p.conn, 8+4); err != nil {
47
+func (p *ProxyRequestReadWriteCloserWithAddr) readProxyAns(buf []byte) (err error) {
48
+	if _, err = io.CopyN(ioutil.Discard, p.conn, 8+4); err != nil {
51 49
 		return errors.Annotate(err, "Cannot skip flags and connid")
52 50
 	}
53 51
 
54
-	for {
55
-		n, err := p.conn.Read(buf)
52
+	n := len(buf)
53
+	preBuffer := &bytes.Buffer{}
54
+	for n == len(buf) {
55
+		n, err = p.conn.Read(buf)
56 56
 		if err != nil {
57 57
 			return errors.Annotate(err, "Cannot read proxy answer")
58 58
 		}
59
-		if n == 0 {
60
-			break
61
-		}
62
-		p.Buffer.Write(buf[:n])
59
+		preBuffer.Write(buf[:n])
63 60
 	}
61
+	p.Buffer.Write(preBuffer.Bytes())
64 62
 
65 63
 	return nil
66 64
 }
@@ -69,10 +67,12 @@ func (p *ProxyRequestReadWriteCloserWithAddr) readSimpleAck() error {
69 67
 	if _, err := io.CopyN(ioutil.Discard, p.conn, 8); err != nil {
70 68
 		return errors.Annotate(err, "Cannot skip connid")
71 69
 	}
72
-	if _, err := io.CopyN(p.Buffer, p.conn, 4); err != nil {
70
+
71
+	ackData := make([]byte, 4)
72
+	if _, err := io.ReadFull(p.conn, ackData); err != nil {
73 73
 		return errors.Annotate(err, "Cannot read simple ack")
74 74
 	}
75
-	p.req.Options.SimpleAck = true
75
+	p.Buffer.Write(ackData)
76 76
 
77 77
 	return nil
78 78
 }

+ 0
- 18
proxy/copy_pool.go Прегледај датотеку

@@ -1,18 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"sync"
5
-
6
-	"github.com/9seconds/mtg/config"
7
-)
8
-
9
-var copyPool sync.Pool
10
-
11
-func init() {
12
-	copyPool = sync.Pool{
13
-		New: func() interface{} {
14
-			data := make([]byte, config.BufferSizeCopy)
15
-			return &data
16
-		},
17
-	}
18
-}

+ 41
- 8
proxy/server.go Прегледај датотеку

@@ -85,8 +85,32 @@ func (s *Server) accept(conn net.Conn) {
85 85
 	wait := &sync.WaitGroup{}
86 86
 	wait.Add(2)
87 87
 
88
-	go s.pipe(clientConn, tgConn, wait)
89
-	go s.pipe(tgConn, clientConn, wait)
88
+	go func() {
89
+		defer wait.Done()
90
+
91
+		for {
92
+			connOpts.ReadHacks.QuickAck = false
93
+			connOpts.ReadHacks.SimpleAck = false
94
+			if err := s.pump(clientConn, tgConn, socketID, "client"); err != nil {
95
+				s.logger.Infow("Client stream is aborted",
96
+					"socketid", socketID, "error", err)
97
+				return
98
+			}
99
+		}
100
+	}()
101
+	go func() {
102
+		defer wait.Done()
103
+
104
+		for {
105
+			connOpts.WriteHacks.QuickAck = false
106
+			connOpts.WriteHacks.SimpleAck = false
107
+			if err := s.pump(tgConn, clientConn, socketID, "telegram"); err != nil {
108
+				s.logger.Infow("Telegram stream is aborted",
109
+					"socketid", socketID, "error", err)
110
+				return
111
+			}
112
+		}
113
+	}()
90 114
 
91 115
 	<-ctx.Done()
92 116
 	wait.Wait()
@@ -98,7 +122,7 @@ func (s *Server) accept(conn net.Conn) {
98 122
 }
99 123
 
100 124
 func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
101
-	connOpts, socket, err := s.clientInit(conn, s.conf)
125
+	socket, connOpts, err := s.clientInit(conn, s.conf)
102 126
 	if err != nil {
103 127
 		return nil, nil, errors.Annotate(err, "Cannot init client connection")
104 128
 	}
@@ -128,13 +152,22 @@ func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFun
128 152
 	return conn, nil
129 153
 }
130 154
 
131
-func (s *Server) pipe(dst io.Writer, src io.Reader, wait *sync.WaitGroup) {
132
-	defer wait.Done()
155
+func (s *Server) pump(src io.Reader, dst io.Writer, socketID, name string) (err error) {
156
+	copyBuf := make([]byte, 1024*1024*2)
133 157
 
134
-	buf := copyPool.Get().(*[]byte)
135
-	defer copyPool.Put(buf)
158
+	n := config.BufferSizeCopy
159
+	for n == config.BufferSizeCopy {
160
+		n, err = src.Read(copyBuf)
161
+		if err != nil {
162
+			break
163
+		}
164
+		_, err = dst.Write(copyBuf[:n])
165
+		if err != nil {
166
+			break
167
+		}
168
+	}
136 169
 
137
-	io.CopyBuffer(dst, src, *buf) // nolint: errcheck
170
+	return
138 171
 }
139 172
 
140 173
 // NewServer creates new instance of MTPROTO proxy.

Loading…
Откажи
Сачувај