Просмотр исходного кода

Preliminary debug state

tags/0.9
9seconds 7 лет назад
Родитель
Сommit
a3933e6ede

+ 1
- 1
client/client.go Просмотреть файл

9
 )
9
 )
10
 
10
 
11
 // Init has to initialize client connection based on given config.
11
 // Init has to initialize client connection based on given config.
12
-type Init func(net.Conn, *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error)
12
+type Init func(net.Conn, string, *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error)

+ 2
- 2
client/direct.go Просмотреть файл

15
 const handshakeTimeout = 10 * time.Second
15
 const handshakeTimeout = 10 * time.Second
16
 
16
 
17
 // DirectInit initializes client to access Telegram bypassing middleproxies.
17
 // DirectInit initializes client to access Telegram bypassing middleproxies.
18
-func DirectInit(conn net.Conn, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
18
+func DirectInit(conn net.Conn, socketID string, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
19
 	if err := config.SetSocketOptions(conn); err != nil {
19
 	if err := config.SetSocketOptions(conn); err != nil {
20
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
20
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
21
 	}
21
 	}
34
 	connOpts.ConnectionProto = mtproto.ConnectionProtocolAny
34
 	connOpts.ConnectionProto = mtproto.ConnectionProtocolAny
35
 	connOpts.ClientAddr = conn.RemoteAddr().(*net.TCPAddr)
35
 	connOpts.ClientAddr = conn.RemoteAddr().(*net.TCPAddr)
36
 
36
 
37
-	socket := wrappers.NewTimeoutRWC(conn, conf.PublicIPv4, conf.PublicIPv6)
37
+	socket := wrappers.NewTimeoutRWC(conn, socketID, conf.PublicIPv4, conf.PublicIPv6)
38
 	socket = wrappers.NewStreamCipherRWC(socket, obfs2.Encryptor, obfs2.Decryptor)
38
 	socket = wrappers.NewStreamCipherRWC(socket, obfs2.Encryptor, obfs2.Decryptor)
39
 
39
 
40
 	return socket, connOpts, nil
40
 	return socket, connOpts, nil

+ 2
- 2
client/middle.go Просмотреть файл

9
 	"github.com/9seconds/mtg/wrappers"
9
 	"github.com/9seconds/mtg/wrappers"
10
 )
10
 )
11
 
11
 
12
-func MiddleInit(conn net.Conn, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
13
-	newConn, opts, err := DirectInit(conn, conf)
12
+func MiddleInit(conn net.Conn, socketID string, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
13
+	newConn, opts, err := DirectInit(conn, socketID, conf)
14
 	if err != nil {
14
 	if err != nil {
15
 		return nil, nil, err
15
 		return nil, nil, err
16
 	}
16
 	}

+ 12
- 4
mtproto/wrappers/abridged.go Просмотреть файл

31
 		buf := &bytes.Buffer{}
31
 		buf := &bytes.Buffer{}
32
 		buf.Grow(3)
32
 		buf.Grow(3)
33
 
33
 
34
+		q := make([]byte, 1)
35
+
34
 		if _, err := io.CopyN(buf, a.conn, 1); err != nil {
36
 		if _, err := io.CopyN(buf, a.conn, 1); err != nil {
35
 			return errors.Annotate(err, "Cannot read message length")
37
 			return errors.Annotate(err, "Cannot read message length")
36
 		}
38
 		}
37
 		msgLength := uint8(buf.Bytes()[0])
39
 		msgLength := uint8(buf.Bytes()[0])
40
+		q[0] = msgLength
38
 		buf.Reset()
41
 		buf.Reset()
39
 
42
 
40
 		if msgLength >= abridgedQuickAckLength {
43
 		if msgLength >= abridgedQuickAckLength {
41
 			a.opts.ReadHacks.QuickAck = true
44
 			a.opts.ReadHacks.QuickAck = true
42
-			msgLength -= 0x80
45
+			msgLength -= abridgedQuickAckLength
43
 		}
46
 		}
44
 
47
 
45
 		msgLength32 := uint32(msgLength)
48
 		msgLength32 := uint32(msgLength)
49
 			}
52
 			}
50
 			number := uint24{}
53
 			number := uint24{}
51
 			copy(number[:], buf.Bytes())
54
 			copy(number[:], buf.Bytes())
55
+			q = append(q, buf.Bytes()...)
52
 			msgLength32 = fromUint24(number)
56
 			msgLength32 = fromUint24(number)
53
 		}
57
 		}
54
 		msgLength32 *= 4
58
 		msgLength32 *= 4
59
 		if _, err := io.CopyN(buf, a.conn, int64(msgLength32)); err != nil {
63
 		if _, err := io.CopyN(buf, a.conn, int64(msgLength32)); err != nil {
60
 			return errors.Annotate(err, "Cannot read message")
64
 			return errors.Annotate(err, "Cannot read message")
61
 		}
65
 		}
66
+		q = append(q, buf.Bytes()...)
62
 		a.Buffer.Write(buf.Bytes())
67
 		a.Buffer.Write(buf.Bytes())
63
 
68
 
64
 		return nil
69
 		return nil
87
 		buf.Write(length24[:])
92
 		buf.Write(length24[:])
88
 		buf.Write(p)
93
 		buf.Write(p)
89
 		return a.conn.Write(buf.Bytes())
94
 		return a.conn.Write(buf.Bytes())
90
-
91
-	default:
92
-		return 0, errors.Errorf("Packet is too big %d", len(p))
93
 	}
95
 	}
96
+
97
+	return 0, errors.Errorf("Packet is too big %d", len(p))
94
 }
98
 }
95
 
99
 
96
 func (a *AbridgedReadWriteCloserWithAddr) Close() error {
100
 func (a *AbridgedReadWriteCloserWithAddr) Close() error {
105
 	return a.conn.RemoteAddr()
109
 	return a.conn.RemoteAddr()
106
 }
110
 }
107
 
111
 
112
+func (a *AbridgedReadWriteCloserWithAddr) SocketID() string {
113
+	return a.conn.SocketID()
114
+}
115
+
108
 func toUint24(number uint32) uint24 {
116
 func toUint24(number uint32) uint24 {
109
 	return uint24{byte(number), byte(number >> 8), byte(number >> 16)}
117
 	return uint24{byte(number), byte(number >> 8), byte(number >> 16)}
110
 }
118
 }

+ 4
- 0
mtproto/wrappers/frame.go Просмотреть файл

120
 	return f.conn.RemoteAddr()
120
 	return f.conn.RemoteAddr()
121
 }
121
 }
122
 
122
 
123
+func (f *FrameRWC) SocketID() string {
124
+	return f.conn.SocketID()
125
+}
126
+
123
 func NewFrameRWC(conn wrappers.ReadWriteCloserWithAddr, seqNo int32) wrappers.ReadWriteCloserWithAddr {
127
 func NewFrameRWC(conn wrappers.ReadWriteCloserWithAddr, seqNo int32) wrappers.ReadWriteCloserWithAddr {
124
 	return &FrameRWC{
128
 	return &FrameRWC{
125
 		BufferedReader: wrappers.NewBufferedReader(),
129
 		BufferedReader: wrappers.NewBufferedReader(),

+ 4
- 0
mtproto/wrappers/intermediate.go Просмотреть файл

77
 	return i.conn.RemoteAddr()
77
 	return i.conn.RemoteAddr()
78
 }
78
 }
79
 
79
 
80
+func (i *IntermediateReadWriteCloserWithAddr) SocketID() string {
81
+	return i.conn.SocketID()
82
+}
83
+
80
 func NewIntermediateRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts) wrappers.ReadWriteCloserWithAddr {
84
 func NewIntermediateRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts) wrappers.ReadWriteCloserWithAddr {
81
 	return &IntermediateReadWriteCloserWithAddr{
85
 	return &IntermediateReadWriteCloserWithAddr{
82
 		BufferedReader: wrappers.NewBufferedReader(),
86
 		BufferedReader: wrappers.NewBufferedReader(),

+ 11
- 11
mtproto/wrappers/proxy_request.go Просмотреть файл

10
 
10
 
11
 	"github.com/9seconds/mtg/mtproto"
11
 	"github.com/9seconds/mtg/mtproto"
12
 	"github.com/9seconds/mtg/mtproto/rpc"
12
 	"github.com/9seconds/mtg/mtproto/rpc"
13
+	"github.com/9seconds/mtg/utils"
13
 	"github.com/9seconds/mtg/wrappers"
14
 	"github.com/9seconds/mtg/wrappers"
14
 )
15
 )
15
 
16
 
29
 
30
 
30
 		switch {
31
 		switch {
31
 		case bytes.Equal(ans, rpc.TagProxyAns):
32
 		case bytes.Equal(ans, rpc.TagProxyAns):
32
-			return p.readProxyAns(buf)
33
+			return p.readProxyAns()
33
 		case bytes.Equal(ans, rpc.TagSimpleAck):
34
 		case bytes.Equal(ans, rpc.TagSimpleAck):
34
 			return p.readSimpleAck()
35
 			return p.readSimpleAck()
35
 		case bytes.Equal(ans, rpc.TagCloseExt):
36
 		case bytes.Equal(ans, rpc.TagCloseExt):
44
 	return errors.New("Connection has been closed remotely")
45
 	return errors.New("Connection has been closed remotely")
45
 }
46
 }
46
 
47
 
47
-func (p *ProxyRequestReadWriteCloserWithAddr) readProxyAns(buf []byte) (err error) {
48
+func (p *ProxyRequestReadWriteCloserWithAddr) readProxyAns() (err error) {
48
 	if _, err = io.CopyN(ioutil.Discard, p.conn, 8+4); err != nil {
49
 	if _, err = io.CopyN(ioutil.Discard, p.conn, 8+4); err != nil {
49
 		return errors.Annotate(err, "Cannot skip flags and connid")
50
 		return errors.Annotate(err, "Cannot skip flags and connid")
50
 	}
51
 	}
51
 
52
 
52
-	n := len(buf)
53
-	preBuffer := &bytes.Buffer{}
54
-	for n == len(buf) {
55
-		n, err = p.conn.Read(buf)
56
-		if err != nil {
57
-			return errors.Annotate(err, "Cannot read proxy answer")
58
-		}
59
-		preBuffer.Write(buf[:n])
53
+	buf, err := utils.ReadCurrentData(p.conn)
54
+	if err != nil {
55
+		return errors.Annotate(err, "Cannot read proxy answer")
60
 	}
56
 	}
61
-	p.Buffer.Write(preBuffer.Bytes())
57
+	p.Buffer.Write(buf)
62
 
58
 
63
 	return nil
59
 	return nil
64
 }
60
 }
97
 	return p.conn.RemoteAddr()
93
 	return p.conn.RemoteAddr()
98
 }
94
 }
99
 
95
 
96
+func (p *ProxyRequestReadWriteCloserWithAddr) SocketID() string {
97
+	return p.conn.SocketID()
98
+}
99
+
100
 func NewProxyRequestRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts, adTag []byte) (wrappers.ReadWriteCloserWithAddr, error) {
100
 func NewProxyRequestRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts, adTag []byte) (wrappers.ReadWriteCloserWithAddr, error) {
101
 	req, err := rpc.NewProxyRequest(connOpts.ClientAddr, conn.LocalAddr(), connOpts, adTag)
101
 	req, err := rpc.NewProxyRequest(connOpts.ClientAddr, conn.LocalAddr(), connOpts, adTag)
102
 	if err != nil {
102
 	if err != nil {

+ 10
- 16
proxy/server.go Просмотреть файл

14
 	"github.com/9seconds/mtg/config"
14
 	"github.com/9seconds/mtg/config"
15
 	"github.com/9seconds/mtg/mtproto"
15
 	"github.com/9seconds/mtg/mtproto"
16
 	"github.com/9seconds/mtg/telegram"
16
 	"github.com/9seconds/mtg/telegram"
17
+	"github.com/9seconds/mtg/utils"
17
 	"github.com/9seconds/mtg/wrappers"
18
 	"github.com/9seconds/mtg/wrappers"
18
 )
19
 )
19
 
20
 
122
 }
123
 }
123
 
124
 
124
 func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
125
 func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
125
-	socket, connOpts, err := s.clientInit(conn, s.conf)
126
+	socket, connOpts, err := s.clientInit(conn, socketID, s.conf)
126
 	if err != nil {
127
 	if err != nil {
127
 		return nil, nil, errors.Annotate(err, "Cannot init client connection")
128
 		return nil, nil, errors.Annotate(err, "Cannot init client connection")
128
 	}
129
 	}
135
 }
136
 }
136
 
137
 
137
 func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFunc, connOpts *mtproto.ConnectionOpts, socketID string) (io.ReadWriteCloser, error) {
138
 func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFunc, connOpts *mtproto.ConnectionOpts, socketID string) (io.ReadWriteCloser, error) {
138
-	conn, err := s.tg.Dial(connOpts)
139
+	conn, err := s.tg.Dial(socketID, connOpts)
139
 	if err != nil {
140
 	if err != nil {
140
 		return nil, errors.Annotate(err, "Cannot connect to Telegram")
141
 		return nil, errors.Annotate(err, "Cannot connect to Telegram")
141
 	}
142
 	}
152
 	return conn, nil
153
 	return conn, nil
153
 }
154
 }
154
 
155
 
155
-func (s *Server) pump(src io.Reader, dst io.Writer, socketID, name string) (err error) {
156
-	copyBuf := make([]byte, 1024*1024*2)
157
-
158
-	n := config.BufferSizeCopy
159
-	for n == config.BufferSizeCopy {
160
-		n, err = src.Read(copyBuf)
161
-		if err != nil {
162
-			break
163
-		}
164
-		_, err = dst.Write(copyBuf[:n])
165
-		if err != nil {
166
-			break
167
-		}
156
+func (s *Server) pump(src io.Reader, dst io.Writer, socketID, name string) error {
157
+	buf, err := utils.ReadCurrentData(src)
158
+	if err != nil {
159
+		return errors.Annotate(err, "Cannot pump the socket")
168
 	}
160
 	}
169
 
161
 
170
-	return
162
+	_, err = dst.Write(buf)
163
+
164
+	return err
171
 }
165
 }
172
 
166
 
173
 // NewServer creates new instance of MTPROTO proxy.
167
 // NewServer creates new instance of MTPROTO proxy.

+ 2
- 2
telegram/dialer.go Просмотреть файл

30
 	return conn, nil
30
 	return conn, nil
31
 }
31
 }
32
 
32
 
33
-func (t *tgDialer) dialRWC(addr string) (wrappers.ReadWriteCloserWithAddr, error) {
33
+func (t *tgDialer) dialRWC(addr, sock string) (wrappers.ReadWriteCloserWithAddr, error) {
34
 	conn, err := t.dial(addr)
34
 	conn, err := t.dial(addr)
35
 	if err != nil {
35
 	if err != nil {
36
 		return nil, err
36
 		return nil, err
37
 	}
37
 	}
38
 
38
 
39
-	return wrappers.NewTimeoutRWC(conn, t.conf.PublicIPv4, t.conf.PublicIPv6), nil
39
+	return wrappers.NewTimeoutRWC(conn, sock, t.conf.PublicIPv4, t.conf.PublicIPv6), nil
40
 }
40
 }

+ 2
- 2
telegram/direct.go Просмотреть файл

33
 	baseTelegram
33
 	baseTelegram
34
 }
34
 }
35
 
35
 
36
-func (t *directTelegram) Dial(connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
36
+func (t *directTelegram) Dial(sock string, connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
37
 	dc := connOpts.DC
37
 	dc := connOpts.DC
38
 	if dc < 0 {
38
 	if dc < 0 {
39
 		dc = -dc
39
 		dc = -dc
41
 		dc = 1
41
 		dc = 1
42
 	}
42
 	}
43
 
43
 
44
-	return t.baseTelegram.dial(dc-1, connOpts.ConnectionProto)
44
+	return t.baseTelegram.dial(dc-1, sock, connOpts.ConnectionProto)
45
 }
45
 }
46
 
46
 
47
 func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error) {
47
 func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error) {

+ 2
- 2
telegram/middle_caller.go Просмотреть файл

39
 	httpClient  *http.Client
39
 	httpClient  *http.Client
40
 }
40
 }
41
 
41
 
42
-func (t *middleTelegramCaller) Dial(connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
42
+func (t *middleTelegramCaller) Dial(sock string, connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
43
 	dc := connOpts.DC
43
 	dc := connOpts.DC
44
 	if dc == 0 {
44
 	if dc == 0 {
45
 		dc = 1
45
 		dc = 1
47
 	t.dialerMutex.RLock()
47
 	t.dialerMutex.RLock()
48
 	defer t.dialerMutex.RUnlock()
48
 	defer t.dialerMutex.RUnlock()
49
 
49
 
50
-	return t.baseTelegram.dial(dc, connOpts.ConnectionProto)
50
+	return t.baseTelegram.dial(dc, sock, connOpts.ConnectionProto)
51
 }
51
 }
52
 
52
 
53
 func (t *middleTelegramCaller) autoUpdate() {
53
 func (t *middleTelegramCaller) autoUpdate() {

+ 3
- 3
telegram/telegram.go Просмотреть файл

13
 // encapsulates logic of working with middleproxies or direct
13
 // encapsulates logic of working with middleproxies or direct
14
 // connections.
14
 // connections.
15
 type Telegram interface {
15
 type Telegram interface {
16
-	Dial(*mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error)
16
+	Dial(string, *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error)
17
 	Init(*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error)
17
 	Init(*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error)
18
 }
18
 }
19
 
19
 
24
 	v6Addresses map[int16][]string
24
 	v6Addresses map[int16][]string
25
 }
25
 }
26
 
26
 
27
-func (b *baseTelegram) dial(dcIdx int16, proto mtproto.ConnectionProtocol) (wrappers.ReadWriteCloserWithAddr, error) {
27
+func (b *baseTelegram) dial(dcIdx int16, sock string, proto mtproto.ConnectionProtocol) (wrappers.ReadWriteCloserWithAddr, error) {
28
 	addrs := make([]string, 2)
28
 	addrs := make([]string, 2)
29
 
29
 
30
 	if proto&mtproto.ConnectionProtocolIPv6 != 0 {
30
 	if proto&mtproto.ConnectionProtocolIPv6 != 0 {
39
 	}
39
 	}
40
 
40
 
41
 	for _, addr := range addrs {
41
 	for _, addr := range addrs {
42
-		if conn, err := b.dialer.dialRWC(addr); err == nil {
42
+		if conn, err := b.dialer.dialRWC(addr, sock); err == nil {
43
 			return conn, err
43
 			return conn, err
44
 		}
44
 		}
45
 	}
45
 	}

+ 20
- 0
utils/read_current_data.go Просмотреть файл

1
+package utils
2
+
3
+import "io"
4
+
5
+const readCurrentDataBufferSize = 1024 + 1
6
+
7
+func ReadCurrentData(src io.Reader) (rv []byte, err error) {
8
+	buf := make([]byte, readCurrentDataBufferSize)
9
+	n := readCurrentDataBufferSize
10
+
11
+	for n == len(buf) {
12
+		n, err = src.Read(buf)
13
+		if err != nil {
14
+			return nil, err
15
+		}
16
+		rv = append(rv, buf[:n]...)
17
+	}
18
+
19
+	return rv, nil
20
+}

+ 16
- 7
wrappers/blockcipherrwc.go Просмотреть файл

1
 package wrappers
1
 package wrappers
2
 
2
 
3
 import (
3
 import (
4
-	"bytes"
5
 	"crypto/aes"
4
 	"crypto/aes"
6
 	"crypto/cipher"
5
 	"crypto/cipher"
6
+	"fmt"
7
 	"net"
7
 	"net"
8
 
8
 
9
 	"github.com/juju/errors"
9
 	"github.com/juju/errors"
10
+
11
+	"github.com/9seconds/mtg/utils"
10
 )
12
 )
11
 
13
 
12
 type BlockCipherReadWriteCloserWithAddr struct {
14
 type BlockCipherReadWriteCloserWithAddr struct {
19
 
21
 
20
 func (c *BlockCipherReadWriteCloserWithAddr) Read(p []byte) (int, error) {
22
 func (c *BlockCipherReadWriteCloserWithAddr) Read(p []byte) (int, error) {
21
 	return c.BufferedRead(p, func() error {
23
 	return c.BufferedRead(p, func() error {
22
-		buf := &bytes.Buffer{}
23
-		for buf.Len()%aes.BlockSize != 0 || buf.Len() == 0 {
24
-			n, err := c.conn.Read(p)
24
+		var buf []byte
25
+
26
+		for len(buf) == 0 || len(buf)%aes.BlockSize != 0 {
27
+			rv, err := utils.ReadCurrentData(c.conn)
25
 			if err != nil {
28
 			if err != nil {
26
 				return errors.Annotate(err, "Cannot read from socket")
29
 				return errors.Annotate(err, "Cannot read from socket")
27
 			}
30
 			}
28
-			buf.Write(p[:n])
31
+			buf = append(buf, rv...)
29
 		}
32
 		}
30
-		c.decryptor.CryptBlocks(buf.Bytes(), buf.Bytes())
31
-		c.Buffer.Write(buf.Bytes())
33
+
34
+		c.decryptor.CryptBlocks(buf, buf)
35
+		c.Buffer.Write(buf)
32
 
36
 
33
 		return nil
37
 		return nil
34
 	})
38
 	})
46
 }
50
 }
47
 
51
 
48
 func (c *BlockCipherReadWriteCloserWithAddr) Close() error {
52
 func (c *BlockCipherReadWriteCloserWithAddr) Close() error {
53
+	fmt.Println("BlockCipherReadWriteCloserWithAddr closes", "sockid", c.SocketID(), "bufsize", c.Buffer.Len())
49
 	return c.conn.Close()
54
 	return c.conn.Close()
50
 }
55
 }
51
 
56
 
57
 	return c.conn.RemoteAddr()
62
 	return c.conn.RemoteAddr()
58
 }
63
 }
59
 
64
 
65
+func (c *BlockCipherReadWriteCloserWithAddr) SocketID() string {
66
+	return c.conn.SocketID()
67
+}
68
+
60
 func NewBlockCipherRWC(conn ReadWriteCloserWithAddr, encryptor, decryptor cipher.BlockMode) ReadWriteCloserWithAddr {
69
 func NewBlockCipherRWC(conn ReadWriteCloserWithAddr, encryptor, decryptor cipher.BlockMode) ReadWriteCloserWithAddr {
61
 	return &BlockCipherReadWriteCloserWithAddr{
70
 	return &BlockCipherReadWriteCloserWithAddr{
62
 		BufferedReader: NewBufferedReader(),
71
 		BufferedReader: NewBufferedReader(),

+ 10
- 2
wrappers/buffered_reader.go Просмотреть файл

1
 package wrappers
1
 package wrappers
2
 
2
 
3
-import "bytes"
3
+import (
4
+	"bytes"
5
+
6
+	"github.com/juju/errors"
7
+)
4
 
8
 
5
 type BufferedReader struct {
9
 type BufferedReader struct {
6
 	Buffer *bytes.Buffer
10
 	Buffer *bytes.Buffer
7
 }
11
 }
8
 
12
 
13
+var (
14
+	BufferedReaderContinue = errors.New("Please continue reading")
15
+)
16
+
9
 func (b *BufferedReader) BufferedRead(p []byte, callback func() error) (int, error) {
17
 func (b *BufferedReader) BufferedRead(p []byte, callback func() error) (int, error) {
10
 	if b.Buffer.Len() > 0 {
18
 	if b.Buffer.Len() > 0 {
11
 		return b.flush(p)
19
 		return b.flush(p)
17
 }
25
 }
18
 
26
 
19
 func (b *BufferedReader) flush(p []byte) (int, error) {
27
 func (b *BufferedReader) flush(p []byte) (int, error) {
20
-	if b.Buffer.Len() < len(p) {
28
+	if b.Buffer.Len() <= len(p) {
21
 		sizeToReturn := b.Buffer.Len()
29
 		sizeToReturn := b.Buffer.Len()
22
 		copy(p, b.Buffer.Bytes())
30
 		copy(p, b.Buffer.Bytes())
23
 		b.Buffer.Reset()
31
 		b.Buffer.Reset()

+ 4
- 0
wrappers/ctxrwc.go Просмотреть файл

56
 	return c.conn.RemoteAddr()
56
 	return c.conn.RemoteAddr()
57
 }
57
 }
58
 
58
 
59
+func (c *CtxReadWriteCloserWithAddr) SocketID() string {
60
+	return c.conn.SocketID()
61
+}
62
+
59
 // NewCtxRWC returns ReadWriteCloser which respects given context,
63
 // NewCtxRWC returns ReadWriteCloser which respects given context,
60
 // cancellation etc.
64
 // cancellation etc.
61
 func NewCtxRWC(ctx context.Context, cancel context.CancelFunc, conn ReadWriteCloserWithAddr) ReadWriteCloserWithAddr {
65
 func NewCtxRWC(ctx context.Context, cancel context.CancelFunc, conn ReadWriteCloserWithAddr) ReadWriteCloserWithAddr {

+ 6
- 2
wrappers/logrwc.go Просмотреть файл

18
 // Read reads from connection
18
 // Read reads from connection
19
 func (l *LogReadWriteCloserWithAddr) Read(p []byte) (n int, err error) {
19
 func (l *LogReadWriteCloserWithAddr) Read(p []byte) (n int, err error) {
20
 	n, err = l.conn.Read(p)
20
 	n, err = l.conn.Read(p)
21
-	l.logger.Debugw("Finish reading", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err)
21
+	l.logger.Debugw("Finish reading", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err, "localAddr", l.LocalAddr())
22
 	return
22
 	return
23
 }
23
 }
24
 
24
 
25
 // Write writes into connection.
25
 // Write writes into connection.
26
 func (l *LogReadWriteCloserWithAddr) Write(p []byte) (n int, err error) {
26
 func (l *LogReadWriteCloserWithAddr) Write(p []byte) (n int, err error) {
27
 	n, err = l.conn.Write(p)
27
 	n, err = l.conn.Write(p)
28
-	l.logger.Debugw("Finish writing", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err)
28
+	l.logger.Debugw("Finish writing", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err, "localAddr", l.LocalAddr())
29
 	return
29
 	return
30
 }
30
 }
31
 
31
 
44
 	return l.conn.RemoteAddr()
44
 	return l.conn.RemoteAddr()
45
 }
45
 }
46
 
46
 
47
+func (l *LogReadWriteCloserWithAddr) SocketID() string {
48
+	return l.sockid
49
+}
50
+
47
 // NewLogRWC wraps ReadWriteCloser with logger calls.
51
 // NewLogRWC wraps ReadWriteCloser with logger calls.
48
 func NewLogRWC(conn ReadWriteCloserWithAddr, logger *zap.SugaredLogger, sockid string, name string) ReadWriteCloserWithAddr {
52
 func NewLogRWC(conn ReadWriteCloserWithAddr, logger *zap.SugaredLogger, sockid string, name string) ReadWriteCloserWithAddr {
49
 	return &LogReadWriteCloserWithAddr{
53
 	return &LogReadWriteCloserWithAddr{

+ 1
- 0
wrappers/rwcaddr.go Просмотреть файл

10
 
10
 
11
 	LocalAddr() *net.TCPAddr
11
 	LocalAddr() *net.TCPAddr
12
 	RemoteAddr() *net.TCPAddr
12
 	RemoteAddr() *net.TCPAddr
13
+	SocketID() string
13
 }
14
 }

+ 4
- 0
wrappers/streamcipherrwc.go Просмотреть файл

51
 	return c.conn.RemoteAddr()
51
 	return c.conn.RemoteAddr()
52
 }
52
 }
53
 
53
 
54
+func (c *StreamCipherReadWriteCloserWithAddr) SocketID() string {
55
+	return c.conn.SocketID()
56
+}
57
+
54
 // NewStreamCipherRWC returns wrapper which transparently
58
 // NewStreamCipherRWC returns wrapper which transparently
55
 // encrypts/decrypts traffic with obfuscated2 protocol.
59
 // encrypts/decrypts traffic with obfuscated2 protocol.
56
 func NewStreamCipherRWC(conn ReadWriteCloserWithAddr, encryptor, decryptor cipher.Stream) ReadWriteCloserWithAddr {
60
 func NewStreamCipherRWC(conn ReadWriteCloserWithAddr, encryptor, decryptor cipher.Stream) ReadWriteCloserWithAddr {

+ 7
- 1
wrappers/timeoutrwc.go Просмотреть файл

9
 
9
 
10
 type TimeoutReadWriteCloserWithAddr struct {
10
 type TimeoutReadWriteCloserWithAddr struct {
11
 	conn       net.Conn
11
 	conn       net.Conn
12
+	sock       string
12
 	publicIPv4 net.IP
13
 	publicIPv4 net.IP
13
 	publicIPv6 net.IP
14
 	publicIPv6 net.IP
14
 }
15
 }
46
 	return &newAddr
47
 	return &newAddr
47
 }
48
 }
48
 
49
 
49
-func NewTimeoutRWC(conn net.Conn, ipv4, ipv6 net.IP) ReadWriteCloserWithAddr {
50
+func (t *TimeoutReadWriteCloserWithAddr) SocketID() string {
51
+	return t.sock
52
+}
53
+
54
+func NewTimeoutRWC(conn net.Conn, sock string, ipv4, ipv6 net.IP) ReadWriteCloserWithAddr {
50
 	return &TimeoutReadWriteCloserWithAddr{
55
 	return &TimeoutReadWriteCloserWithAddr{
51
 		conn:       conn,
56
 		conn:       conn,
52
 		publicIPv4: ipv4,
57
 		publicIPv4: ipv4,
53
 		publicIPv6: ipv6,
58
 		publicIPv6: ipv6,
59
+		sock:       sock,
54
 	}
60
 	}
55
 }
61
 }

+ 4
- 0
wrappers/trafficrwc.go Просмотреть файл

37
 	return t.conn.RemoteAddr()
37
 	return t.conn.RemoteAddr()
38
 }
38
 }
39
 
39
 
40
+func (t *TrafficReadWriteCloserWithAddr) SocketID() string {
41
+	return t.conn.SocketID()
42
+}
43
+
40
 // NewTrafficRWC wraps ReadWriteCloser to have read/write callbacks.
44
 // NewTrafficRWC wraps ReadWriteCloser to have read/write callbacks.
41
 func NewTrafficRWC(conn ReadWriteCloserWithAddr, readCallback, writeCallback func(int)) ReadWriteCloserWithAddr {
45
 func NewTrafficRWC(conn ReadWriteCloserWithAddr, readCallback, writeCallback func(int)) ReadWriteCloserWithAddr {
42
 	return &TrafficReadWriteCloserWithAddr{
46
 	return &TrafficReadWriteCloserWithAddr{

Загрузка…
Отмена
Сохранить