Просмотр исходного кода

Merge pull request #12 from 9seconds/block-rwc

Refactoring for packet mode
tags/0.9
Sergey Arkhipov 7 лет назад
Родитель
Сommit
51a60441f3
Аккаунт пользователя с таким Email не найден
51 измененных файлов: 1443 добавлений и 1471 удалений
  1. 1
    2
      client/client.go
  2. 11
    10
      client/direct.go
  3. 12
    8
      client/middle.go
  4. 4
    5
      config/config.go
  5. 11
    6
      main.go
  6. 7
    2
      mtproto/connection_options.go
  7. 22
    0
      mtproto/rpc/handshake_request.go
  8. 7
    7
      mtproto/rpc/handshake_response.go
  9. 6
    6
      mtproto/rpc/nonce_request.go
  10. 14
    14
      mtproto/rpc/nonce_response.go
  11. 55
    0
      mtproto/rpc/proxy_flags.go
  12. 95
    0
      mtproto/rpc/proxy_request.go
  13. 16
    16
      mtproto/rpc/rpc.go
  14. 0
    21
      mtproto/rpc/rpc_handshake_request.go
  15. 0
    24
      mtproto/rpc/rpc_proxy_flags.go
  16. 0
    83
      mtproto/rpc/rpc_proxy_request.go
  17. 0
    121
      mtproto/wrappers/abridged.go
  18. 0
    45
      mtproto/wrappers/crypt_test.go
  19. 0
    130
      mtproto/wrappers/frame.go
  20. 0
    83
      mtproto/wrappers/intermediate.go
  21. 0
    112
      mtproto/wrappers/proxy_request.go
  22. 0
    18
      proxy/copy_pool.go
  23. 154
    0
      proxy/proxy.go
  24. 0
    157
      proxy/server.go
  25. 0
    74
      proxy/stats.go
  26. 3
    2
      telegram/dialer.go
  27. 8
    9
      telegram/direct.go
  28. 51
    53
      telegram/middle.go
  29. 5
    10
      telegram/middle_caller.go
  30. 4
    7
      telegram/telegram.go
  31. 20
    0
      utils/read_current_data.go
  32. 14
    0
      utils/reverse_bytes.go
  33. 11
    0
      utils/uint24.go
  34. 90
    0
      wrappers/blockcipher.go
  35. 0
    66
      wrappers/blockcipherrwc.go
  36. 0
    27
      wrappers/buffer_pool.go
  37. 0
    32
      wrappers/buffered_reader.go
  38. 105
    0
      wrappers/conn.go
  39. 0
    67
      wrappers/ctxrwc.go
  40. 0
    55
      wrappers/logrwc.go
  41. 152
    0
      wrappers/mtproto_abridged.go
  42. 8
    22
      wrappers/mtproto_cipher.go
  43. 143
    0
      wrappers/mtproto_frame.go
  44. 113
    0
      wrappers/mtproto_intermediate.go
  45. 158
    0
      wrappers/mtproto_proxy.go
  46. 0
    13
      wrappers/rwcaddr.go
  47. 58
    0
      wrappers/streamcipher.go
  48. 0
    62
      wrappers/streamcipherrwc.go
  49. 0
    55
      wrappers/timeoutrwc.go
  50. 0
    47
      wrappers/trafficrwc.go
  51. 85
    0
      wrappers/wrap.go

+ 1
- 2
client/client.go Просмотреть файл

@@ -8,5 +8,4 @@ import (
8 8
 	"github.com/9seconds/mtg/wrappers"
9 9
 )
10 10
 
11
-// Init has to initialize client connection based on given config.
12
-type Init func(net.Conn, *config.Config) (*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr, error)
11
+type Init func(net.Conn, string, *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error)

+ 11
- 10
client/direct.go Просмотреть файл

@@ -14,28 +14,29 @@ import (
14 14
 
15 15
 const handshakeTimeout = 10 * time.Second
16 16
 
17
-// DirectInit initializes client to access Telegram bypassing middleproxies.
18
-func DirectInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr, error) {
19
-	if err := config.SetSocketOptions(conn); err != nil {
17
+func DirectInit(socket net.Conn, connID string, conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
18
+	if err := config.SetSocketOptions(socket); err != nil {
20 19
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
21 20
 	}
22 21
 
23
-	conn.SetReadDeadline(time.Now().Add(handshakeTimeout)) // nolint: errcheck
24
-	frame, err := obfuscated2.ExtractFrame(conn)
25
-	conn.SetReadDeadline(time.Time{}) // nolint: errcheck
22
+	socket.SetReadDeadline(time.Now().Add(handshakeTimeout))
23
+	frame, err := obfuscated2.ExtractFrame(socket)
26 24
 	if err != nil {
27 25
 		return nil, nil, errors.Annotate(err, "Cannot extract frame")
28 26
 	}
27
+	socket.SetReadDeadline(time.Time{})
28
+	conn := wrappers.NewConn(socket, connID, wrappers.ConnPurposeClient, conf.PublicIPv4, conf.PublicIPv6)
29 29
 
30 30
 	obfs2, connOpts, err := obfuscated2.ParseObfuscated2ClientFrame(conf.Secret, frame)
31 31
 	if err != nil {
32 32
 		return nil, nil, errors.Annotate(err, "Cannot parse obfuscated frame")
33 33
 	}
34 34
 	connOpts.ConnectionProto = mtproto.ConnectionProtocolAny
35
-	connOpts.ClientAddr = conn.RemoteAddr().(*net.TCPAddr)
35
+	connOpts.ClientAddr = conn.RemoteAddr()
36 36
 
37
-	socket := wrappers.NewTimeoutRWC(conn, conf.PublicIPv4, conf.PublicIPv6)
38
-	socket = wrappers.NewStreamCipherRWC(socket, obfs2.Encryptor, obfs2.Decryptor)
37
+	conn = wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor)
39 38
 
40
-	return connOpts, socket, nil
39
+	conn.Logger().Infow("Client connection initialized")
40
+
41
+	return conn, connOpts, nil
41 42
 }

+ 12
- 8
client/middle.go Просмотреть файл

@@ -5,21 +5,25 @@ import (
5 5
 
6 6
 	"github.com/9seconds/mtg/config"
7 7
 	"github.com/9seconds/mtg/mtproto"
8
-	mtwrappers "github.com/9seconds/mtg/mtproto/wrappers"
9 8
 	"github.com/9seconds/mtg/wrappers"
10 9
 )
11 10
 
12
-func MiddleInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr, error) {
13
-	opts, newConn, err := DirectInit(conn, conf)
11
+func MiddleInit(socket net.Conn, connID string, conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
12
+	conn, opts, err := DirectInit(socket, connID, conf)
14 13
 	if err != nil {
15 14
 		return nil, nil, err
16 15
 	}
16
+	connStream := conn.(wrappers.StreamReadWriteCloser)
17 17
 
18
-	if opts.ConnectionType == mtproto.ConnectionTypeAbridged {
19
-		newConn = mtwrappers.NewAbridgedRWC(newConn, opts)
20
-	} else {
21
-		newConn = mtwrappers.NewIntermediateRWC(newConn, opts)
18
+	newConn := wrappers.NewMTProtoAbridged(connStream, opts)
19
+	if opts.ConnectionType != mtproto.ConnectionTypeAbridged {
20
+		newConn = wrappers.NewMTProtoIntermediate(connStream, opts)
22 21
 	}
23 22
 
24
-	return opts, newConn, nil
23
+	opts.ConnectionProto = mtproto.ConnectionProtocolIPv4
24
+	if socket.LocalAddr().(*net.TCPAddr).IP.To4() == nil {
25
+		opts.ConnectionProto = mtproto.ConnectionProtocolIPv6
26
+	}
27
+
28
+	return newConn, opts, err
25 29
 }

+ 4
- 5
config/config.go Просмотреть файл

@@ -5,7 +5,6 @@ import (
5 5
 	"fmt"
6 6
 	"net"
7 7
 	"strconv"
8
-	"time"
9 8
 
10 9
 	"github.com/juju/errors"
11 10
 )
@@ -14,10 +13,6 @@ import (
14 13
 const (
15 14
 	BufferWriteSize = 32 * 1024
16 15
 	BufferReadSize  = 32 * 1024
17
-	BufferSizeCopy  = 32 * 1024
18
-
19
-	TimeoutRead  = time.Minute
20
-	TimeoutWrite = time.Minute
21 16
 )
22 17
 
23 18
 // Config represents common configuration of mtg.
@@ -63,6 +58,10 @@ func (c *Config) StatAddr() string {
63 58
 	return getAddr(c.StatsIP, c.StatsPort)
64 59
 }
65 60
 
61
+func (c *Config) UseMiddleProxy() bool {
62
+	return len(c.AdTag) > 0
63
+}
64
+
66 65
 // GetURLs returns configured IPURLs instance with links to this server.
67 66
 func (c *Config) GetURLs() IPURLs {
68 67
 	urls := IPURLs{}

+ 11
- 6
main.go Просмотреть файл

@@ -111,16 +111,21 @@ func main() {
111 111
 		zapcore.NewJSONEncoder(encoderCfg),
112 112
 		zapcore.Lock(os.Stderr),
113 113
 		atom,
114
-	)).Sugar()
114
+	))
115
+	zap.ReplaceGlobals(logger)
116
+	defer logger.Sync()
115 117
 
116
-	stat := proxy.NewStats(conf)
117
-	go stat.Serve()
118
+	if conf.UseMiddleProxy() {
119
+		zap.S().Infow("Use middle proxy connection to Telegram")
120
+	} else {
121
+		zap.S().Infow("Use direct connection to Telegram")
122
+	}
118 123
 
119
-	srv := proxy.NewServer(conf, logger, stat)
120 124
 	printURLs(conf.GetURLs())
121 125
 
122
-	if err := srv.Serve(); err != nil {
123
-		logger.Fatal(err.Error())
126
+	server := proxy.NewProxy(conf)
127
+	if err := server.Serve(); err != nil {
128
+		zap.S().Fatalw("Server stopped", "error", err)
124 129
 	}
125 130
 }
126 131
 

+ 7
- 2
mtproto/connection_options.go Просмотреть файл

@@ -13,14 +13,19 @@ type ConnectionType uint8
13 13
 
14 14
 type ConnectionProtocol uint8
15 15
 
16
+type Hacks struct {
17
+	SimpleAck bool
18
+	QuickAck  bool
19
+}
20
+
16 21
 // ConnectionOpts presents an options, metadata on connection requested
17 22
 // by the user on handshake.
18 23
 type ConnectionOpts struct {
19 24
 	DC              int16
20 25
 	ConnectionType  ConnectionType
21 26
 	ConnectionProto ConnectionProtocol
22
-	QuickAck        bool
23
-	SimpleAck       bool
27
+	ReadHacks       Hacks
28
+	WriteHacks      Hacks
24 29
 	ClientAddr      *net.TCPAddr
25 30
 }
26 31
 

+ 22
- 0
mtproto/rpc/handshake_request.go Просмотреть файл

@@ -0,0 +1,22 @@
1
+package rpc
2
+
3
+import "bytes"
4
+
5
+type HandshakeRequest struct {
6
+}
7
+
8
+func (r *HandshakeRequest) Bytes() []byte {
9
+	buf := &bytes.Buffer{}
10
+	buf.Grow(len(TagHandshake) + len(HandshakeFlags) + len(HandshakeSenderPID) + len(HandshakePeerPID))
11
+
12
+	buf.Write(TagHandshake)
13
+	buf.Write(HandshakeFlags)
14
+	buf.Write(HandshakeSenderPID)
15
+	buf.Write(HandshakePeerPID)
16
+
17
+	return buf.Bytes()
18
+}
19
+
20
+func NewHandshakeRequest() *HandshakeRequest {
21
+	return &HandshakeRequest{}
22
+}

mtproto/rpc/rpc_handshake_response.go → mtproto/rpc/handshake_response.go Просмотреть файл

@@ -6,14 +6,14 @@ import (
6 6
 	"github.com/juju/errors"
7 7
 )
8 8
 
9
-type RPCHandshakeResponse struct {
9
+type HandshakeResponse struct {
10 10
 	Type      []byte
11 11
 	Flags     []byte
12 12
 	SenderPID []byte
13 13
 	PeerPID   []byte
14 14
 }
15 15
 
16
-func (r *RPCHandshakeResponse) Bytes() []byte {
16
+func (r *HandshakeResponse) Bytes() []byte {
17 17
 	buf := &bytes.Buffer{}
18 18
 
19 19
 	buf.Write(r.Type[:])
@@ -24,23 +24,23 @@ func (r *RPCHandshakeResponse) Bytes() []byte {
24 24
 	return buf.Bytes()
25 25
 }
26 26
 
27
-func (r *RPCHandshakeResponse) Valid(req *RPCHandshakeRequest) error {
28
-	if !bytes.Equal(r.Type, RPCTagHandshake) {
27
+func (r *HandshakeResponse) Valid(req *HandshakeRequest) error {
28
+	if !bytes.Equal(r.Type, TagHandshake) {
29 29
 		return errors.New("Unexpected handshake tag")
30 30
 	}
31
-	if !bytes.Equal(r.PeerPID, RPCHandshakeSenderPID) {
31
+	if !bytes.Equal(r.PeerPID, HandshakeSenderPID) {
32 32
 		return errors.New("Incorrect sender PID")
33 33
 	}
34 34
 
35 35
 	return nil
36 36
 }
37 37
 
38
-func NewRPCHandshakeResponse(data []byte) (*RPCHandshakeResponse, error) {
38
+func NewHandshakeResponse(data []byte) (*HandshakeResponse, error) {
39 39
 	if len(data) != 32 {
40 40
 		return nil, errors.New("Incorrect handshake response length")
41 41
 	}
42 42
 
43
-	return &RPCHandshakeResponse{
43
+	return &HandshakeResponse{
44 44
 		Type:      data[:4],
45 45
 		Flags:     data[4:8],
46 46
 		SenderPID: data[8:20],

mtproto/rpc/rpc_nonce_request.go → mtproto/rpc/nonce_request.go Просмотреть файл

@@ -9,25 +9,25 @@ import (
9 9
 	"github.com/juju/errors"
10 10
 )
11 11
 
12
-type RPCNonceRequest struct {
12
+type NonceRequest struct {
13 13
 	KeySelector []byte
14 14
 	CryptoTS    []byte
15 15
 	Nonce       []byte
16 16
 }
17 17
 
18
-func (r *RPCNonceRequest) Bytes() []byte {
18
+func (r *NonceRequest) Bytes() []byte {
19 19
 	buf := &bytes.Buffer{}
20 20
 
21
-	buf.Write(RPCTagNonce)
21
+	buf.Write(TagNonce)
22 22
 	buf.Write(r.KeySelector)
23
-	buf.Write(RPCNonceCryptoAES)
23
+	buf.Write(NonceCryptoAES)
24 24
 	buf.Write(r.CryptoTS)
25 25
 	buf.Write(r.Nonce)
26 26
 
27 27
 	return buf.Bytes()
28 28
 }
29 29
 
30
-func NewRPCNonceRequest(proxySecret []byte) (*RPCNonceRequest, error) {
30
+func NewNonceRequest(proxySecret []byte) (*NonceRequest, error) {
31 31
 	nonce := make([]byte, 16)
32 32
 	keySelector := make([]byte, 4)
33 33
 	cryptoTS := make([]byte, 4)
@@ -40,7 +40,7 @@ func NewRPCNonceRequest(proxySecret []byte) (*RPCNonceRequest, error) {
40 40
 	timestamp := time.Now().Truncate(time.Second).Unix() % 4294967296 // 256 ^ 4 - do not know how to name
41 41
 	binary.LittleEndian.PutUint32(cryptoTS, uint32(timestamp))
42 42
 
43
-	return &RPCNonceRequest{
43
+	return &NonceRequest{
44 44
 		KeySelector: keySelector,
45 45
 		CryptoTS:    cryptoTS,
46 46
 		Nonce:       nonce,

mtproto/rpc/rpc_nonce_response.go → mtproto/rpc/nonce_response.go Просмотреть файл

@@ -6,17 +6,17 @@ import (
6 6
 	"github.com/juju/errors"
7 7
 )
8 8
 
9
-type RPCNonceResponse struct {
10
-	RPCNonceRequest
9
+type NonceResponse struct {
10
+	NonceRequest
11 11
 
12
-	RPCType []byte
13
-	Crypto  []byte
12
+	Type   []byte
13
+	Crypto []byte
14 14
 }
15 15
 
16
-func (r *RPCNonceResponse) Bytes() []byte {
16
+func (r *NonceResponse) Bytes() []byte {
17 17
 	buf := &bytes.Buffer{}
18 18
 
19
-	buf.Write(r.RPCType)
19
+	buf.Write(r.Type)
20 20
 	buf.Write(r.KeySelector)
21 21
 	buf.Write(r.Crypto)
22 22
 	buf.Write(r.CryptoTS)
@@ -25,11 +25,11 @@ func (r *RPCNonceResponse) Bytes() []byte {
25 25
 	return buf.Bytes()
26 26
 }
27 27
 
28
-func (r *RPCNonceResponse) Valid(req *RPCNonceRequest) error {
29
-	if !bytes.Equal(r.RPCType, RPCTagNonce) {
28
+func (r *NonceResponse) Valid(req *NonceRequest) error {
29
+	if !bytes.Equal(r.Type, TagNonce) {
30 30
 		return errors.New("Unexpected RPC type")
31 31
 	}
32
-	if !bytes.Equal(r.Crypto, RPCNonceCryptoAES) {
32
+	if !bytes.Equal(r.Crypto, NonceCryptoAES) {
33 33
 		return errors.New("Unexpected crypto type")
34 34
 	}
35 35
 	if !bytes.Equal(r.KeySelector, req.KeySelector) {
@@ -39,18 +39,18 @@ func (r *RPCNonceResponse) Valid(req *RPCNonceRequest) error {
39 39
 	return nil
40 40
 }
41 41
 
42
-func NewRPCNonceResponse(data []byte) (*RPCNonceResponse, error) {
42
+func NewNonceResponse(data []byte) (*NonceResponse, error) {
43 43
 	if len(data) != 32 {
44 44
 		return nil, errors.New("Unexpected message length")
45 45
 	}
46 46
 
47
-	return &RPCNonceResponse{
48
-		RPCNonceRequest: RPCNonceRequest{
47
+	return &NonceResponse{
48
+		NonceRequest: NonceRequest{
49 49
 			KeySelector: data[4:8],
50 50
 			CryptoTS:    data[12:16],
51 51
 			Nonce:       data[16:],
52 52
 		},
53
-		RPCType: data[:4],
54
-		Crypto:  data[8:12],
53
+		Type:   data[:4],
54
+		Crypto: data[8:12],
55 55
 	}, nil
56 56
 }

+ 55
- 0
mtproto/rpc/proxy_flags.go Просмотреть файл

@@ -0,0 +1,55 @@
1
+package rpc
2
+
3
+import (
4
+	"encoding/binary"
5
+	"strings"
6
+)
7
+
8
+type proxyRequestFlags uint32
9
+
10
+const (
11
+	proxyRequestFlagsHasAdTag     proxyRequestFlags = 0x8
12
+	proxyRequestFlagsEncrypted                      = 0x2
13
+	proxyRequestFlagsMagic                          = 0x1000
14
+	proxyRequestFlagsExtMode2                       = 0x20000
15
+	proxyRequestFlagsIntermediate                   = 0x20000000
16
+	proxyRequestFlagsAbdridged                      = 0x40000000
17
+	proxyRequestFlagsQuickAck                       = 0x80000000
18
+)
19
+
20
+var proxyRequestFlagsEncryptedPrefix [8]byte
21
+
22
+func (r proxyRequestFlags) Bytes() []byte {
23
+	converted := make([]byte, 4)
24
+	binary.LittleEndian.PutUint32(converted, uint32(r))
25
+
26
+	return converted
27
+}
28
+
29
+func (r proxyRequestFlags) String() string {
30
+	flags := make([]string, 0, 7)
31
+
32
+	if r&proxyRequestFlagsHasAdTag != 0 {
33
+		flags = append(flags, "HAS_AD_TAG")
34
+	}
35
+	if r&proxyRequestFlagsEncrypted != 0 {
36
+		flags = append(flags, "ENCRYPTED")
37
+	}
38
+	if r&proxyRequestFlagsMagic != 0 {
39
+		flags = append(flags, "MAGIC")
40
+	}
41
+	if r&proxyRequestFlagsExtMode2 != 0 {
42
+		flags = append(flags, "EXT_MODE_2")
43
+	}
44
+	if r&proxyRequestFlagsIntermediate != 0 {
45
+		flags = append(flags, "INTERMEDIATE")
46
+	}
47
+	if r&proxyRequestFlagsAbdridged != 0 {
48
+		flags = append(flags, "ABRIDGED")
49
+	}
50
+	if r&proxyRequestFlagsQuickAck != 0 {
51
+		flags = append(flags, "QUICK_ACK")
52
+	}
53
+
54
+	return strings.Join(flags, " | ")
55
+}

+ 95
- 0
mtproto/rpc/proxy_request.go Просмотреть файл

@@ -0,0 +1,95 @@
1
+package rpc
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/rand"
6
+	"encoding/binary"
7
+	"fmt"
8
+	"net"
9
+
10
+	"github.com/juju/errors"
11
+
12
+	"github.com/9seconds/mtg/mtproto"
13
+)
14
+
15
+type ProxyRequest struct {
16
+	Flags        proxyRequestFlags
17
+	ConnectionID []byte
18
+	OurIPPort    []byte
19
+	ClientIPPort []byte
20
+	ADTag        []byte
21
+	Options      *mtproto.ConnectionOpts
22
+}
23
+
24
+func (r *ProxyRequest) MakeHeader(message []byte) (*bytes.Buffer, fmt.Stringer) {
25
+	bufferLength := len(TagProxyRequest) +
26
+		4 + // len(flags)
27
+		len(r.ConnectionID) +
28
+		len(r.ClientIPPort) +
29
+		len(r.OurIPPort) +
30
+		len(ProxyRequestExtraSize) +
31
+		len(ProxyRequestProxyTag) +
32
+		1 + // len(AdTag)
33
+		len(r.ADTag)
34
+	bufferLength += bufferLength % 4
35
+
36
+	buf := &bytes.Buffer{}
37
+	buf.Grow(bufferLength + len(message))
38
+
39
+	flags := r.Flags
40
+	if r.Options.ReadHacks.QuickAck {
41
+		flags |= proxyRequestFlagsQuickAck
42
+	}
43
+
44
+	if bytes.HasPrefix(message, proxyRequestFlagsEncryptedPrefix[:]) {
45
+		flags |= proxyRequestFlagsEncrypted
46
+	}
47
+
48
+	buf.Write(TagProxyRequest)
49
+	buf.Write(flags.Bytes())
50
+	buf.Write(r.ConnectionID)
51
+	buf.Write(r.ClientIPPort)
52
+	buf.Write(r.OurIPPort)
53
+	buf.Write(ProxyRequestExtraSize)
54
+	buf.Write(ProxyRequestProxyTag)
55
+	buf.WriteByte(byte(len(r.ADTag)))
56
+	buf.Write(r.ADTag)
57
+	buf.Write(make([]byte, (4-buf.Len()%4)%4))
58
+
59
+	return buf, flags
60
+}
61
+
62
+func NewProxyRequest(clientAddr, ownAddr *net.TCPAddr, opts *mtproto.ConnectionOpts, adTag []byte) (*ProxyRequest, error) {
63
+	flags := proxyRequestFlagsHasAdTag | proxyRequestFlagsMagic | proxyRequestFlagsExtMode2
64
+
65
+	switch opts.ConnectionType {
66
+	case mtproto.ConnectionTypeAbridged:
67
+		flags |= proxyRequestFlagsAbdridged
68
+	case mtproto.ConnectionTypeIntermediate:
69
+		flags |= proxyRequestFlagsIntermediate
70
+	}
71
+
72
+	request := &ProxyRequest{
73
+		Flags:        flags,
74
+		ADTag:        adTag,
75
+		Options:      opts,
76
+		ConnectionID: make([]byte, 8),
77
+		ClientIPPort: make([]byte, 16+4),
78
+		OurIPPort:    make([]byte, 16+4),
79
+	}
80
+
81
+	if _, err := rand.Read(request.ConnectionID); err != nil {
82
+		return nil, errors.Annotate(err, "Cannot generate connection ID")
83
+	}
84
+
85
+	port := [4]byte{}
86
+	copy(request.ClientIPPort[:16], clientAddr.IP.To16())
87
+	binary.LittleEndian.PutUint32(port[:], uint32(clientAddr.Port))
88
+	copy(request.ClientIPPort[16:], port[:])
89
+
90
+	copy(request.OurIPPort[:16], ownAddr.IP.To16())
91
+	binary.LittleEndian.PutUint32(port[:], uint32(ownAddr.Port))
92
+	copy(request.OurIPPort[16:], port[:])
93
+
94
+	return request, nil
95
+}

+ 16
- 16
mtproto/rpc/rpc.go Просмотреть файл

@@ -1,30 +1,30 @@
1 1
 package rpc
2 2
 
3 3
 const (
4
-	RPCNonceSeqNo     = -2
5
-	RPCHandshakeSeqNo = -1
4
+	SeqNoNonce     = -2
5
+	SeqNoHandshake = -1
6 6
 )
7 7
 
8 8
 var (
9
-	RPCTagCloseExt     = []byte{0xa2, 0x34, 0xb6, 0x5e}
10
-	RPCTagProxyAns     = []byte{0x0d, 0xda, 0x03, 0x44}
11
-	RPCTagSimpleAck    = []byte{0x9b, 0x40, 0xac, 0x3b}
12
-	RPCTagHandshake    = []byte{0xf5, 0xee, 0x82, 0x76}
13
-	RPCTagNonce        = []byte{0xaa, 0x87, 0xcb, 0x7a}
14
-	RPCTagProxyRequest = []byte{0xee, 0xf1, 0xce, 0x36}
9
+	TagCloseExt     = []byte{0xa2, 0x34, 0xb6, 0x5e}
10
+	TagProxyAns     = []byte{0x0d, 0xda, 0x03, 0x44}
11
+	TagSimpleAck    = []byte{0x9b, 0x40, 0xac, 0x3b}
12
+	TagHandshake    = []byte{0xf5, 0xee, 0x82, 0x76}
13
+	TagNonce        = []byte{0xaa, 0x87, 0xcb, 0x7a}
14
+	TagProxyRequest = []byte{0xee, 0xf1, 0xce, 0x36}
15 15
 
16
-	RPCNonceCryptoAES = []byte{0x01, 0x00, 0x00, 0x00}
16
+	NonceCryptoAES = []byte{0x01, 0x00, 0x00, 0x00}
17 17
 
18
-	RPCHandshakeFlags = []byte{0x00, 0x00, 0x00, 0x00}
18
+	HandshakeFlags = []byte{0x00, 0x00, 0x00, 0x00}
19 19
 
20
-	RPCProxyRequestExtraSize = []byte{0x18, 0x00, 0x00, 0x00}
21
-	RPCProxyRequestProxyTag  = []byte{0xae, 0x26, 0x1e, 0xdb}
20
+	ProxyRequestExtraSize = []byte{0x18, 0x00, 0x00, 0x00}
21
+	ProxyRequestProxyTag  = []byte{0xae, 0x26, 0x1e, 0xdb}
22 22
 
23
-	RPCHandshakeSenderPID = []byte{}
24
-	RPCHandshakePeerPID   = []byte{}
23
+	HandshakeSenderPID []byte
24
+	HandshakePeerPID   []byte
25 25
 )
26 26
 
27 27
 func init() {
28
-	RPCHandshakeSenderPID = []byte("IPIPPRPDTIME")
29
-	RPCHandshakePeerPID = []byte("IPIPPRPDTIME")
28
+	HandshakeSenderPID = []byte("IPIPPRPDTIME")
29
+	HandshakePeerPID = []byte("IPIPPRPDTIME")
30 30
 }

+ 0
- 21
mtproto/rpc/rpc_handshake_request.go Просмотреть файл

@@ -1,21 +0,0 @@
1
-package rpc
2
-
3
-import "bytes"
4
-
5
-type RPCHandshakeRequest struct {
6
-}
7
-
8
-func (r *RPCHandshakeRequest) Bytes() []byte {
9
-	buf := &bytes.Buffer{}
10
-
11
-	buf.Write(RPCTagHandshake)
12
-	buf.Write(RPCHandshakeFlags)
13
-	buf.Write(RPCHandshakeSenderPID)
14
-	buf.Write(RPCHandshakePeerPID)
15
-
16
-	return buf.Bytes()
17
-}
18
-
19
-func NewRPCHandshakeRequest() *RPCHandshakeRequest {
20
-	return &RPCHandshakeRequest{}
21
-}

+ 0
- 24
mtproto/rpc/rpc_proxy_flags.go Просмотреть файл

@@ -1,24 +0,0 @@
1
-package rpc
2
-
3
-import "encoding/binary"
4
-
5
-type RPCProxyRequestFlags uint32
6
-
7
-const (
8
-	RPCProxyRequestFlagsHasAdTag     RPCProxyRequestFlags = 0x8
9
-	RPCProxyRequestFlagsEncrypted                         = 0x2
10
-	RPCProxyRequestFlagsMagic                             = 0x1000
11
-	RPCProxyRequestFlagsExtMode2                          = 0x20000
12
-	RPCProxyRequestFlagsIntermediate                      = 0x20000000
13
-	RPCProxyRequestFlagsAbdridged                         = 0x40000000
14
-	RPCProxyRequestFlagsQuickAck                          = 0x80000000
15
-)
16
-
17
-var rpcProxyRequestFlagsEncryptedPrefix [8]byte
18
-
19
-func (r RPCProxyRequestFlags) Bytes() []byte {
20
-	converted := make([]byte, 4)
21
-	binary.LittleEndian.PutUint32(converted, uint32(r))
22
-
23
-	return converted
24
-}

+ 0
- 83
mtproto/rpc/rpc_proxy_request.go Просмотреть файл

@@ -1,83 +0,0 @@
1
-package rpc
2
-
3
-import (
4
-	"bytes"
5
-	"crypto/rand"
6
-	"encoding/binary"
7
-	"net"
8
-
9
-	"github.com/juju/errors"
10
-
11
-	"github.com/9seconds/mtg/mtproto"
12
-)
13
-
14
-type RPCProxyRequest struct {
15
-	Flags        RPCProxyRequestFlags
16
-	ConnectionID []byte
17
-	OurIPPort    []byte
18
-	ClientIPPort []byte
19
-	ADTag        []byte
20
-	Options      *mtproto.ConnectionOpts
21
-}
22
-
23
-func (r *RPCProxyRequest) Bytes(message []byte) []byte {
24
-	buf := &bytes.Buffer{}
25
-
26
-	flags := r.Flags
27
-	if r.Options.QuickAck {
28
-		flags |= RPCProxyRequestFlagsQuickAck
29
-	}
30
-
31
-	if bytes.HasPrefix(message, rpcProxyRequestFlagsEncryptedPrefix[:]) {
32
-		flags |= RPCProxyRequestFlagsEncrypted
33
-	}
34
-
35
-	buf.Write(RPCTagProxyRequest)
36
-	buf.Write(flags.Bytes())
37
-	buf.Write(r.ConnectionID[:])
38
-	buf.Write(r.ClientIPPort[:])
39
-	buf.Write(r.OurIPPort[:])
40
-	buf.Write(RPCProxyRequestExtraSize)
41
-	buf.Write(RPCProxyRequestProxyTag)
42
-	buf.WriteByte(byte(len(r.ADTag)))
43
-	buf.Write(r.ADTag)
44
-	buf.Write(bytes.Repeat([]byte{0x00}, buf.Len()%4))
45
-	buf.Write(message)
46
-
47
-	return buf.Bytes()
48
-}
49
-
50
-func NewRPCProxyRequest(clientAddr, ownAddr *net.TCPAddr, opts *mtproto.ConnectionOpts, adTag []byte) (*RPCProxyRequest, error) {
51
-	flags := RPCProxyRequestFlagsHasAdTag | RPCProxyRequestFlagsMagic | RPCProxyRequestFlagsExtMode2
52
-
53
-	switch opts.ConnectionType {
54
-	case mtproto.ConnectionTypeAbridged:
55
-		flags |= RPCProxyRequestFlagsAbdridged
56
-	case mtproto.ConnectionTypeIntermediate:
57
-		flags |= RPCProxyRequestFlagsIntermediate
58
-	}
59
-
60
-	request := RPCProxyRequest{
61
-		Flags:        flags,
62
-		ADTag:        adTag,
63
-		Options:      opts,
64
-		ConnectionID: make([]byte, 8),
65
-		ClientIPPort: make([]byte, 16+4),
66
-		OurIPPort:    make([]byte, 16+4),
67
-	}
68
-
69
-	if _, err := rand.Read(request.ConnectionID); err != nil {
70
-		return nil, errors.Annotate(err, "Cannot generate connection ID")
71
-	}
72
-
73
-	port := [4]byte{}
74
-	copy(request.ClientIPPort[:16], clientAddr.IP.To16())
75
-	binary.LittleEndian.PutUint32(port[:], uint32(clientAddr.Port))
76
-	copy(request.ClientIPPort[16:], port[:])
77
-
78
-	copy(request.OurIPPort[:16], ownAddr.IP.To16())
79
-	binary.LittleEndian.PutUint32(port[:], uint32(ownAddr.Port))
80
-	copy(request.OurIPPort[16:], port[:])
81
-
82
-	return &request, nil
83
-}

+ 0
- 121
mtproto/wrappers/abridged.go Просмотреть файл

@@ -1,121 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/binary"
6
-	"io"
7
-	"net"
8
-
9
-	"github.com/juju/errors"
10
-
11
-	"github.com/9seconds/mtg/mtproto"
12
-	"github.com/9seconds/mtg/wrappers"
13
-)
14
-
15
-type uint24 [3]byte
16
-
17
-const (
18
-	abridgedSmallPacketLength = 0x7f
19
-	abridgedQuickAckLength    = 0x80
20
-	abridgedLargePacketLength = 16777216 // 256 ^ 3
21
-)
22
-
23
-type AbridgedReadWriteCloserWithAddr struct {
24
-	wrappers.BufferedReader
25
-
26
-	conn wrappers.ReadWriteCloserWithAddr
27
-	opts *mtproto.ConnectionOpts
28
-}
29
-
30
-func (a *AbridgedReadWriteCloserWithAddr) Read(p []byte) (int, error) {
31
-	return a.BufferedRead(p, func() error {
32
-		var msgLength uint8
33
-		if err := binary.Read(a.conn, binary.LittleEndian, &msgLength); err != nil {
34
-			return errors.Annotate(err, "Cannot read message length")
35
-		}
36
-
37
-		a.opts.QuickAck = false
38
-		if msgLength >= abridgedQuickAckLength {
39
-			a.opts.QuickAck = true
40
-			msgLength -= 0x80
41
-		}
42
-
43
-		msgLength32 := uint32(msgLength)
44
-		if msgLength == abridgedSmallPacketLength {
45
-			buf := &bytes.Buffer{}
46
-			buf.Grow(3)
47
-
48
-			if _, err := io.CopyN(buf, a.conn, 3); err != nil {
49
-				return errors.Annotate(err, "Cannot read correct message length")
50
-			}
51
-			number := uint24{}
52
-			copy(number[:], buf.Bytes())
53
-			msgLength32 = fromUint24(number)
54
-		}
55
-		msgLength32 *= 4
56
-
57
-		if _, err := io.CopyN(a.Buffer, a.conn, int64(msgLength32)); err != nil {
58
-			return errors.Annotate(err, "Cannot read message")
59
-		}
60
-
61
-		return nil
62
-	})
63
-}
64
-
65
-func (a *AbridgedReadWriteCloserWithAddr) Write(p []byte) (int, error) {
66
-	if len(p)%4 != 0 {
67
-		return 0, errors.Errorf("Incorrect packet length %d", len(p))
68
-	}
69
-	if a.opts.SimpleAck {
70
-		return a.conn.Write(reverseBytes(p))
71
-	}
72
-
73
-	packetLength := len(p) / 4
74
-	switch {
75
-	case packetLength < abridgedSmallPacketLength:
76
-		newData := append([]byte{byte(packetLength)}, p...)
77
-		return a.conn.Write(newData)
78
-
79
-	case packetLength < abridgedLargePacketLength:
80
-		length24 := toUint24(uint32(packetLength))
81
-
82
-		buf := &bytes.Buffer{}
83
-		buf.Grow(1 + 3 + len(p))
84
-		buf.WriteByte(byte(abridgedSmallPacketLength))
85
-		buf.Write(length24[:])
86
-		buf.Write(p)
87
-
88
-		return a.conn.Write(buf.Bytes())
89
-
90
-	default:
91
-		return 0, errors.Errorf("Packet is too big %d", len(p))
92
-	}
93
-}
94
-
95
-func (a *AbridgedReadWriteCloserWithAddr) Close() error {
96
-	return a.conn.Close()
97
-}
98
-
99
-func (a *AbridgedReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
100
-	return a.conn.LocalAddr()
101
-}
102
-
103
-func (a *AbridgedReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
104
-	return a.conn.RemoteAddr()
105
-}
106
-
107
-func toUint24(number uint32) uint24 {
108
-	return uint24{byte(number), byte(number >> 8), byte(number >> 16)}
109
-}
110
-
111
-func fromUint24(number uint24) uint32 {
112
-	return uint32(number[0]) + (uint32(number[1]) << 8) + (uint32(number[2]) << 16)
113
-}
114
-
115
-func NewAbridgedRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts) wrappers.ReadWriteCloserWithAddr {
116
-	return &AbridgedReadWriteCloserWithAddr{
117
-		BufferedReader: wrappers.NewBufferedReader(),
118
-		conn:           conn,
119
-		opts:           connOpts,
120
-	}
121
-}

+ 0
- 45
mtproto/wrappers/crypt_test.go Просмотреть файл

@@ -1,45 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"encoding/binary"
5
-	"net"
6
-	"testing"
7
-
8
-	"github.com/stretchr/testify/assert"
9
-
10
-	"github.com/9seconds/mtg/mtproto/rpc"
11
-)
12
-
13
-var proxySecret = []byte{196, 249, 250, 202, 150, 120, 230, 187, 72, 173,
14
-	108, 126, 44, 229, 192, 210, 68, 48, 100, 93, 85, 74, 221, 235, 85, 65,
15
-	158, 3, 77, 166, 39, 33, 208, 70, 234, 171, 110, 82, 171, 20, 169, 90, 68,
16
-	62, 207, 179, 70, 62, 121, 160, 90, 102, 97, 42, 223, 156, 174, 218, 139,
17
-	233, 168, 13, 166, 152, 111, 176, 166, 255, 56, 122, 248, 77, 136, 239,
18
-	58, 100, 19, 113, 62, 92, 51, 119, 246, 225, 163, 212, 125, 153, 245, 224,
19
-	197, 110, 236, 232, 240, 92, 84, 196, 144, 176, 121, 227, 27, 239, 130,
20
-	255, 14, 232, 242, 176, 163, 39, 86, 210, 73, 197, 242, 18, 105, 129, 108,
21
-	183, 6, 27, 38, 93, 178, 18}
22
-
23
-func TestMakeKeys(t *testing.T) {
24
-	req, err := rpc.NewRPCNonceRequest(proxySecret)
25
-	assert.Nil(t, err)
26
-
27
-	copy(req.Nonce[:], []byte{24, 49, 53, 111, 198, 10, 235, 180, 230, 112, 92, 78, 1, 201, 106, 105})
28
-	binary.LittleEndian.PutUint32(req.CryptoTS[:], 1528396015)
29
-
30
-	resp := &rpc.RPCNonceResponse{}
31
-	copy(resp.Nonce[:], []byte{247, 40, 210, 56, 65, 12, 101, 170, 216, 155, 14, 253, 250, 238, 219, 226})
32
-
33
-	cltAddr := &net.TCPAddr{
34
-		IP:   net.ParseIP("80.211.29.34"),
35
-		Port: 54208,
36
-	}
37
-	srvAddr := &net.TCPAddr{
38
-		IP:   net.ParseIP("149.154.162.38"),
39
-		Port: 80,
40
-	}
41
-
42
-	key, iv := makeKeys(CipherPurposeClient, req, resp, cltAddr, srvAddr, proxySecret)
43
-	assert.Equal(t, key, []byte{165, 158, 127, 49, 41, 232, 187, 69, 38, 29, 163, 226, 183, 146, 28, 67, 225, 224, 134, 191, 207, 152, 255, 166, 152, 66, 169, 196, 54, 135, 50, 188})
44
-	assert.Equal(t, iv, []byte{33, 110, 125, 221, 183, 121, 160, 116, 130, 180, 156, 249, 52, 111, 37, 178})
45
-}

+ 0
- 130
mtproto/wrappers/frame.go Просмотреть файл

@@ -1,130 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"bytes"
5
-	"crypto/aes"
6
-	"encoding/binary"
7
-	"hash/crc32"
8
-	"io"
9
-	"io/ioutil"
10
-	"net"
11
-
12
-	"github.com/juju/errors"
13
-
14
-	"github.com/9seconds/mtg/wrappers"
15
-)
16
-
17
-// Frame: { MessageLength(4) | SequenceNumber(4) | Message(???) | CRC32(4) [| padding(4), ...] }
18
-const (
19
-	frameRWCMinMessageLength = 12
20
-	frameRWCMaxMessageLength = 16777216
21
-)
22
-
23
-var frameRWCPadding = [4]byte{0x04, 0x00, 0x00, 0x00}
24
-
25
-type FrameRWC struct {
26
-	wrappers.BufferedReader
27
-
28
-	conn       wrappers.ReadWriteCloserWithAddr
29
-	readSeqNo  int32
30
-	writeSeqNo int32
31
-}
32
-
33
-func (f *FrameRWC) Write(buf []byte) (int, error) {
34
-	writeBuf := &bytes.Buffer{}
35
-
36
-	// 4 - len bytes
37
-	// 4 - seq bytes
38
-	// . - message
39
-	// 4 - crc32
40
-	messageLength := 4 + 4 + len(buf) + 4
41
-	paddingLength := (aes.BlockSize - messageLength%aes.BlockSize) % aes.BlockSize
42
-	writeBuf.Grow(messageLength + paddingLength)
43
-
44
-	binary.Write(writeBuf, binary.LittleEndian, uint32(messageLength))
45
-	binary.Write(writeBuf, binary.LittleEndian, f.writeSeqNo)
46
-	writeBuf.Write(buf)
47
-	f.writeSeqNo++
48
-
49
-	checksum := crc32.ChecksumIEEE(writeBuf.Bytes())
50
-	binary.Write(writeBuf, binary.LittleEndian, checksum)
51
-	writeBuf.Write(bytes.Repeat(frameRWCPadding[:], paddingLength/4))
52
-
53
-	_, err := f.conn.Write(writeBuf.Bytes())
54
-	return len(buf), err
55
-}
56
-
57
-func (f *FrameRWC) Read(p []byte) (int, error) {
58
-	return f.BufferedRead(p, func() error {
59
-		buf := &bytes.Buffer{}
60
-		sum := crc32.NewIEEE()
61
-		writer := io.MultiWriter(buf, sum)
62
-
63
-		for {
64
-			buf.Reset()
65
-			sum.Reset()
66
-			if _, err := io.CopyN(writer, f.conn, 4); err != nil {
67
-				return errors.Annotate(err, "Cannot read frame padding")
68
-			}
69
-			if !bytes.Equal(buf.Bytes(), frameRWCPadding[:]) {
70
-				break
71
-			}
72
-		}
73
-
74
-		messageLength := binary.LittleEndian.Uint32(buf.Bytes())
75
-		if messageLength%4 != 0 || messageLength < frameRWCMinMessageLength || messageLength > frameRWCMaxMessageLength {
76
-			return errors.Errorf("Incorrect frame message length %d", messageLength)
77
-		}
78
-
79
-		buf.Reset()
80
-		buf.Grow(int(messageLength) - 4 - 4)
81
-		if _, err := io.CopyN(writer, f.conn, int64(messageLength)-4-4); err != nil {
82
-			return errors.Annotate(err, "Cannot read the message frame")
83
-		}
84
-
85
-		var seqNo int32
86
-		binary.Read(buf, binary.LittleEndian, &seqNo)
87
-		if seqNo != f.readSeqNo {
88
-			return errors.Errorf("Unexpected sequence number %d (wait for %d)", seqNo, f.readSeqNo)
89
-		}
90
-		f.readSeqNo++
91
-
92
-		data, _ := ioutil.ReadAll(buf)
93
-		buf.Reset()
94
-		// write to buf, not to writer. This is because we are going to fetch
95
-		// crc32 checksum.
96
-		if _, err := io.CopyN(buf, f.conn, 4); err != nil {
97
-			return errors.Annotate(err, "Cannot read checksum")
98
-		}
99
-		checksum := binary.LittleEndian.Uint32(buf.Bytes())
100
-
101
-		if checksum != sum.Sum32() {
102
-			return errors.Errorf("CRC32 checksum mismatch. Wait for %d, got %d", sum.Sum32(), checksum)
103
-
104
-		}
105
-		f.Buffer.Write(data)
106
-
107
-		return nil
108
-	})
109
-}
110
-
111
-func (f *FrameRWC) Close() error {
112
-	return f.conn.Close()
113
-}
114
-
115
-func (f *FrameRWC) LocalAddr() *net.TCPAddr {
116
-	return f.conn.LocalAddr()
117
-}
118
-
119
-func (f *FrameRWC) RemoteAddr() *net.TCPAddr {
120
-	return f.conn.RemoteAddr()
121
-}
122
-
123
-func NewFrameRWC(conn wrappers.ReadWriteCloserWithAddr, seqNo int32) wrappers.ReadWriteCloserWithAddr {
124
-	return &FrameRWC{
125
-		BufferedReader: wrappers.NewBufferedReader(),
126
-		conn:           conn,
127
-		readSeqNo:      seqNo,
128
-		writeSeqNo:     seqNo,
129
-	}
130
-}

+ 0
- 83
mtproto/wrappers/intermediate.go Просмотреть файл

@@ -1,83 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/binary"
6
-	"io"
7
-	"net"
8
-
9
-	"github.com/juju/errors"
10
-
11
-	"github.com/9seconds/mtg/mtproto"
12
-	"github.com/9seconds/mtg/wrappers"
13
-)
14
-
15
-const intermediateQuickAckLength = 0x80000000
16
-
17
-type IntermediateReadWriteCloserWithAddr struct {
18
-	wrappers.BufferedReader
19
-
20
-	conn wrappers.ReadWriteCloserWithAddr
21
-	opts *mtproto.ConnectionOpts
22
-}
23
-
24
-func (i *IntermediateReadWriteCloserWithAddr) Read(p []byte) (int, error) {
25
-	return i.BufferedRead(p, func() error {
26
-		var length uint32
27
-		if err := binary.Read(i.conn, binary.LittleEndian, &length); err != nil {
28
-			return errors.Annotate(err, "Cannot read message length")
29
-		}
30
-
31
-		if length > intermediateQuickAckLength {
32
-			i.opts.QuickAck = true
33
-			length -= intermediateQuickAckLength
34
-		}
35
-
36
-		buf := &bytes.Buffer{}
37
-		buf.Grow(int(length))
38
-		if _, err := io.CopyN(buf, i.conn, int64(length)); err != nil {
39
-			return errors.Annotate(err, "Cannot read the message")
40
-		}
41
-
42
-		if length%4 != 0 {
43
-			length -= length % 4
44
-			i.Buffer.Write(buf.Bytes()[:length])
45
-			return nil
46
-		}
47
-
48
-		i.Buffer.Write(buf.Bytes())
49
-
50
-		return nil
51
-	})
52
-}
53
-
54
-func (i *IntermediateReadWriteCloserWithAddr) Write(p []byte) (int, error) {
55
-	if i.opts.SimpleAck {
56
-		return i.conn.Write(p)
57
-	}
58
-
59
-	var length [4]byte
60
-	binary.LittleEndian.PutUint32(length[:], uint32(len(p)))
61
-
62
-	return i.conn.Write(append(length[:], p...))
63
-}
64
-
65
-func (i *IntermediateReadWriteCloserWithAddr) Close() error {
66
-	return i.conn.Close()
67
-}
68
-
69
-func (i *IntermediateReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
70
-	return i.conn.LocalAddr()
71
-}
72
-
73
-func (i *IntermediateReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
74
-	return i.conn.RemoteAddr()
75
-}
76
-
77
-func NewIntermediateRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts) wrappers.ReadWriteCloserWithAddr {
78
-	return &IntermediateReadWriteCloserWithAddr{
79
-		BufferedReader: wrappers.NewBufferedReader(),
80
-		conn:           conn,
81
-		opts:           connOpts,
82
-	}
83
-}

+ 0
- 112
mtproto/wrappers/proxy_request.go Просмотреть файл

@@ -1,112 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"bytes"
5
-	"io"
6
-	"io/ioutil"
7
-	"net"
8
-
9
-	"github.com/juju/errors"
10
-
11
-	"github.com/9seconds/mtg/mtproto"
12
-	"github.com/9seconds/mtg/mtproto/rpc"
13
-	"github.com/9seconds/mtg/wrappers"
14
-)
15
-
16
-type ProxyRequestReadWriteCloserWithAddr struct {
17
-	wrappers.BufferedReader
18
-
19
-	conn wrappers.ReadWriteCloserWithAddr
20
-	req  *rpc.RPCProxyRequest
21
-}
22
-
23
-func (p *ProxyRequestReadWriteCloserWithAddr) Read(buf []byte) (int, error) {
24
-	return p.BufferedRead(buf, func() error {
25
-		ansBuf := &bytes.Buffer{}
26
-		ansBuf.Grow(4)
27
-
28
-		if _, err := io.CopyN(ansBuf, p.conn, 4); err != nil {
29
-			return errors.Annotate(err, "Cannot read RPC tag")
30
-		}
31
-
32
-		if bytes.Equal(ansBuf.Bytes(), rpc.RPCTagCloseExt) {
33
-			return p.readCloseExt()
34
-		} else if bytes.Equal(ansBuf.Bytes(), rpc.RPCTagProxyAns) {
35
-			return p.readProxyAns(buf)
36
-		} else if bytes.Equal(ansBuf.Bytes(), rpc.RPCTagSimpleAck) {
37
-			return p.readSimpleAck()
38
-		}
39
-
40
-		return nil
41
-	})
42
-}
43
-
44
-func (p *ProxyRequestReadWriteCloserWithAddr) readCloseExt() error {
45
-	return errors.New("Connection has been closed remotely")
46
-}
47
-
48
-func (p *ProxyRequestReadWriteCloserWithAddr) readProxyAns(buf []byte) error {
49
-	if _, err := io.CopyN(ioutil.Discard, p.conn, 8+4); err != nil {
50
-		return errors.Annotate(err, "Cannot skip flags and connid")
51
-	}
52
-
53
-	for {
54
-		n, err := p.conn.Read(buf)
55
-		if err != nil {
56
-			return errors.Annotate(err, "Cannot read proxy answer")
57
-		}
58
-		if n == 0 {
59
-			break
60
-		}
61
-		p.Buffer.Write(buf[:n])
62
-	}
63
-
64
-	return nil
65
-}
66
-
67
-func (p *ProxyRequestReadWriteCloserWithAddr) readSimpleAck() error {
68
-	if _, err := io.CopyN(ioutil.Discard, p.conn, 8); err != nil {
69
-		return errors.Annotate(err, "Cannot skip connid")
70
-	}
71
-	if _, err := io.CopyN(p.Buffer, p.conn, 4); err != nil {
72
-		return errors.Annotate(err, "Cannot read simple ack")
73
-	}
74
-	p.req.Options.SimpleAck = true
75
-
76
-	return nil
77
-}
78
-
79
-func (p *ProxyRequestReadWriteCloserWithAddr) Write(raw []byte) (int, error) {
80
-	if _, err := p.conn.Write(p.req.Bytes(raw)); err != nil {
81
-		return 0, err
82
-	}
83
-	p.req.Options.SimpleAck = false
84
-	p.req.Options.QuickAck = false
85
-
86
-	return len(raw), nil
87
-}
88
-
89
-func (p *ProxyRequestReadWriteCloserWithAddr) Close() error {
90
-	return p.conn.Close()
91
-}
92
-
93
-func (p *ProxyRequestReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
94
-	return p.conn.LocalAddr()
95
-}
96
-
97
-func (p *ProxyRequestReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
98
-	return p.conn.RemoteAddr()
99
-}
100
-
101
-func NewProxyRequestRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts, adTag []byte) (wrappers.ReadWriteCloserWithAddr, error) {
102
-	req, err := rpc.NewRPCProxyRequest(connOpts.ClientAddr, conn.LocalAddr(), connOpts, adTag)
103
-	if err != nil {
104
-		return nil, errors.Annotate(err, "Cannot create new RPC proxy request")
105
-	}
106
-
107
-	return &ProxyRequestReadWriteCloserWithAddr{
108
-		BufferedReader: wrappers.NewBufferedReader(),
109
-		conn:           conn,
110
-		req:            req,
111
-	}, nil
112
-}

+ 0
- 18
proxy/copy_pool.go Просмотреть файл

@@ -1,18 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"sync"
5
-
6
-	"github.com/9seconds/mtg/config"
7
-)
8
-
9
-var copyPool sync.Pool
10
-
11
-func init() {
12
-	copyPool = sync.Pool{
13
-		New: func() interface{} {
14
-			data := make([]byte, config.BufferSizeCopy)
15
-			return &data
16
-		},
17
-	}
18
-}

+ 154
- 0
proxy/proxy.go Просмотреть файл

@@ -0,0 +1,154 @@
1
+package proxy
2
+
3
+import (
4
+	"io"
5
+	"net"
6
+	"sync"
7
+
8
+	"github.com/juju/errors"
9
+	uuid "github.com/satori/go.uuid"
10
+	"go.uber.org/zap"
11
+
12
+	"github.com/9seconds/mtg/client"
13
+	"github.com/9seconds/mtg/config"
14
+	"github.com/9seconds/mtg/mtproto"
15
+	"github.com/9seconds/mtg/telegram"
16
+	"github.com/9seconds/mtg/wrappers"
17
+)
18
+
19
+type Proxy struct {
20
+	clientInit client.Init
21
+	tg         telegram.Telegram
22
+	conf       *config.Config
23
+}
24
+
25
+func (p *Proxy) Serve() error {
26
+	lsock, err := net.Listen("tcp", p.conf.BindAddr())
27
+	if err != nil {
28
+		return errors.Annotate(err, "Cannot create listen socket")
29
+	}
30
+
31
+	for {
32
+		if conn, err := lsock.Accept(); err != nil {
33
+			zap.S().Errorw("Cannot allocate incoming connection", "error", err)
34
+		} else {
35
+			go p.accept(conn)
36
+		}
37
+	}
38
+}
39
+
40
+func (p *Proxy) accept(conn net.Conn) {
41
+	connID := uuid.NewV4().String()
42
+	log := zap.S().With("connection_id", connID)
43
+
44
+	defer func() {
45
+		conn.Close()
46
+
47
+		if err := recover(); err != nil {
48
+			log.Errorw("Crash of accept handler", "error", err)
49
+		}
50
+	}()
51
+
52
+	log.Infow("Client connected", "addr", conn.RemoteAddr())
53
+
54
+	client, opts, err := p.clientInit(conn, connID, p.conf)
55
+	if err != nil {
56
+		log.Errorw("Cannot initialize client connection", "error", err)
57
+		return
58
+	}
59
+	defer client.(io.Closer).Close()
60
+
61
+	server, err := p.getTelegramConn(opts, connID)
62
+	if err != nil {
63
+		log.Errorw("Cannot initialize server connection", "error", err)
64
+		return
65
+	}
66
+	defer server.(io.Closer).Close()
67
+
68
+	wait := &sync.WaitGroup{}
69
+	wait.Add(2)
70
+
71
+	if p.conf.UseMiddleProxy() {
72
+		clientPacket := client.(wrappers.PacketReadWriteCloser)
73
+		serverPacket := server.(wrappers.PacketReadWriteCloser)
74
+		go p.middlePipe(clientPacket, serverPacket, wait, &opts.ReadHacks)
75
+		go p.middlePipe(serverPacket, clientPacket, wait, &opts.WriteHacks)
76
+	} else {
77
+		clientStream := client.(wrappers.StreamReadWriteCloser)
78
+		serverStream := server.(wrappers.StreamReadWriteCloser)
79
+		go p.directPipe(clientStream, serverStream, wait)
80
+		go p.directPipe(serverStream, clientStream, wait)
81
+	}
82
+
83
+	wait.Wait()
84
+
85
+	log.Infow("Client disconnected", "addr", conn.RemoteAddr())
86
+}
87
+
88
+func (p *Proxy) getTelegramConn(opts *mtproto.ConnectionOpts, connID string) (wrappers.Wrap, error) {
89
+	streamConn, err := p.tg.Dial(connID, opts)
90
+	if err != nil {
91
+		return nil, errors.Annotate(err, "Cannot dial to Telegram")
92
+	}
93
+
94
+	packetConn, err := p.tg.Init(opts, streamConn)
95
+	if err != nil {
96
+		return nil, errors.Annotate(err, "Cannot handshake telegram")
97
+	}
98
+
99
+	return packetConn, nil
100
+}
101
+
102
+func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst wrappers.PacketWriteCloser, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
103
+	defer func() {
104
+		src.Close()
105
+		dst.Close()
106
+		wait.Done()
107
+	}()
108
+
109
+	for {
110
+		hacks.SimpleAck = false
111
+		hacks.QuickAck = false
112
+
113
+		packet, err := src.Read()
114
+		if err != nil {
115
+			src.Logger().Warnw("Cannot read packet", "error", err)
116
+			return
117
+		}
118
+		if _, err = dst.Write(packet); err != nil {
119
+			src.Logger().Warnw("Cannot write packet", "error", err)
120
+			return
121
+		}
122
+	}
123
+}
124
+
125
+func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst wrappers.StreamWriteCloser, wait *sync.WaitGroup) {
126
+	defer func() {
127
+		src.Close()
128
+		dst.Close()
129
+		wait.Done()
130
+	}()
131
+
132
+	if _, err := io.Copy(dst, src); err != nil {
133
+		src.Logger().Warnw("Cannot pump sockets", "error", err)
134
+	}
135
+}
136
+
137
+func NewProxy(conf *config.Config) *Proxy {
138
+	var clientInit client.Init
139
+	var tg telegram.Telegram
140
+
141
+	if conf.UseMiddleProxy() {
142
+		clientInit = client.MiddleInit
143
+		tg = telegram.NewMiddleTelegram(conf)
144
+	} else {
145
+		clientInit = client.DirectInit
146
+		tg = telegram.NewDirectTelegram(conf)
147
+	}
148
+
149
+	return &Proxy{
150
+		conf:       conf,
151
+		clientInit: clientInit,
152
+		tg:         tg,
153
+	}
154
+}

+ 0
- 157
proxy/server.go Просмотреть файл

@@ -1,157 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-
9
-	"github.com/juju/errors"
10
-	uuid "github.com/satori/go.uuid"
11
-	"go.uber.org/zap"
12
-
13
-	"github.com/9seconds/mtg/client"
14
-	"github.com/9seconds/mtg/config"
15
-	"github.com/9seconds/mtg/mtproto"
16
-	"github.com/9seconds/mtg/telegram"
17
-	"github.com/9seconds/mtg/wrappers"
18
-)
19
-
20
-// Server is an insgtance of MTPROTO proxy.
21
-type Server struct {
22
-	conf       *config.Config
23
-	logger     *zap.SugaredLogger
24
-	stats      *Stats
25
-	tg         telegram.Telegram
26
-	clientInit client.Init
27
-}
28
-
29
-// Serve does MTPROTO proxying.
30
-func (s *Server) Serve() error {
31
-	lsock, err := net.Listen("tcp", s.conf.BindAddr())
32
-	if err != nil {
33
-		return errors.Annotate(err, "Cannot create listen socket")
34
-	}
35
-
36
-	for {
37
-		if conn, err := lsock.Accept(); err != nil {
38
-			s.logger.Warn("Cannot allocate incoming connection", "error", err)
39
-		} else {
40
-			go s.accept(conn)
41
-		}
42
-	}
43
-}
44
-
45
-func (s *Server) accept(conn net.Conn) {
46
-	defer func() {
47
-		s.stats.closeConnection()
48
-		conn.Close() // nolint: errcheck
49
-
50
-		if r := recover(); r != nil {
51
-			s.logger.Errorw("Crash of accept handler", "error", r)
52
-		}
53
-	}()
54
-
55
-	s.stats.newConnection()
56
-	ctx, cancel := context.WithCancel(context.Background())
57
-	socketID := uuid.NewV4().String()
58
-
59
-	s.logger.Debugw("Client connected",
60
-		"addr", conn.RemoteAddr().String(),
61
-		"socketid", socketID,
62
-	)
63
-
64
-	connOpts, clientConn, err := s.getClientStream(ctx, cancel, conn, socketID)
65
-	if err != nil {
66
-		s.logger.Warnw("Cannot initialize client connection",
67
-			"addr", conn.RemoteAddr().String(),
68
-			"socketid", socketID,
69
-			"error", err,
70
-		)
71
-		return
72
-	}
73
-	defer clientConn.Close() // nolint: errcheck
74
-
75
-	tgConn, err := s.getTelegramStream(ctx, cancel, connOpts, socketID)
76
-	if err != nil {
77
-		s.logger.Warnw("Cannot initialize Telegram connection",
78
-			"socketid", socketID,
79
-			"error", err,
80
-		)
81
-		return
82
-	}
83
-	defer tgConn.Close() // nolint: errcheck
84
-
85
-	wait := &sync.WaitGroup{}
86
-	wait.Add(2)
87
-
88
-	go s.pipe(clientConn, tgConn, wait)
89
-	go s.pipe(tgConn, clientConn, wait)
90
-
91
-	<-ctx.Done()
92
-	wait.Wait()
93
-
94
-	s.logger.Debugw("Client disconnected",
95
-		"addr", conn.RemoteAddr().String(),
96
-		"socketid", socketID,
97
-	)
98
-}
99
-
100
-func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
101
-	connOpts, socket, err := s.clientInit(conn, s.conf)
102
-	if err != nil {
103
-		return nil, nil, errors.Annotate(err, "Cannot init client connection")
104
-	}
105
-
106
-	socket = wrappers.NewTrafficRWC(socket, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
107
-	socket = wrappers.NewLogRWC(socket, s.logger, socketID, "client")
108
-	socket = wrappers.NewCtxRWC(ctx, cancel, socket)
109
-
110
-	return connOpts, socket, nil
111
-}
112
-
113
-func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFunc, connOpts *mtproto.ConnectionOpts, socketID string) (io.ReadWriteCloser, error) {
114
-	conn, err := s.tg.Dial(connOpts)
115
-	if err != nil {
116
-		return nil, errors.Annotate(err, "Cannot connect to Telegram")
117
-	}
118
-
119
-	conn = wrappers.NewTrafficRWC(conn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
120
-	conn, err = s.tg.Init(connOpts, conn)
121
-	if err != nil {
122
-		return nil, errors.Annotate(err, "Cannot handshake Telegram")
123
-	}
124
-
125
-	conn = wrappers.NewLogRWC(conn, s.logger, socketID, "telegram")
126
-	conn = wrappers.NewCtxRWC(ctx, cancel, conn)
127
-
128
-	return conn, nil
129
-}
130
-
131
-func (s *Server) pipe(dst io.Writer, src io.Reader, wait *sync.WaitGroup) {
132
-	defer wait.Done()
133
-
134
-	buf := copyPool.Get().(*[]byte)
135
-	defer copyPool.Put(buf)
136
-
137
-	io.CopyBuffer(dst, src, *buf) // nolint: errcheck
138
-}
139
-
140
-// NewServer creates new instance of MTPROTO proxy.
141
-func NewServer(conf *config.Config, logger *zap.SugaredLogger, stat *Stats) *Server {
142
-	clientInit := client.DirectInit
143
-	tg := telegram.NewDirectTelegram
144
-
145
-	if len(conf.AdTag) > 0 {
146
-		clientInit = client.MiddleInit
147
-		tg = telegram.NewMiddleTelegram
148
-	}
149
-
150
-	return &Server{
151
-		conf:       conf,
152
-		logger:     logger,
153
-		stats:      stat,
154
-		tg:         tg(conf, logger),
155
-		clientInit: clientInit,
156
-	}
157
-}

+ 0
- 74
proxy/stats.go Просмотреть файл

@@ -1,74 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"encoding/json"
5
-	"net/http"
6
-	"strconv"
7
-	"sync/atomic"
8
-	"time"
9
-
10
-	"github.com/9seconds/mtg/config"
11
-)
12
-
13
-type statsUptime time.Time
14
-
15
-func (s statsUptime) MarshalJSON() ([]byte, error) {
16
-	uptime := int(time.Since(time.Time(s)).Seconds())
17
-	return []byte(strconv.Itoa(uptime)), nil
18
-}
19
-
20
-// Stats is a datastructure for statistics on work of this proxy.
21
-type Stats struct {
22
-	AllConnections    uint64 `json:"all_connections"`
23
-	ActiveConnections uint32 `json:"active_connections"`
24
-	Traffic           struct {
25
-		Incoming uint64 `json:"incoming"`
26
-		Outgoing uint64 `json:"outgoing"`
27
-	} `json:"traffic"`
28
-	URLs   config.IPURLs `json:"urls"`
29
-	Uptime statsUptime   `json:"uptime"`
30
-
31
-	conf *config.Config
32
-}
33
-
34
-func (s *Stats) newConnection() {
35
-	atomic.AddUint64(&s.AllConnections, 1)
36
-	atomic.AddUint32(&s.ActiveConnections, 1)
37
-}
38
-
39
-func (s *Stats) closeConnection() {
40
-	atomic.AddUint32(&s.ActiveConnections, ^uint32(0))
41
-}
42
-
43
-func (s *Stats) addIncomingTraffic(n int) {
44
-	atomic.AddUint64(&s.Traffic.Incoming, uint64(n))
45
-}
46
-
47
-func (s *Stats) addOutgoingTraffic(n int) {
48
-	atomic.AddUint64(&s.Traffic.Outgoing, uint64(n))
49
-}
50
-
51
-// Serve runs statistics HTTP server.
52
-func (s *Stats) Serve() {
53
-	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
54
-		w.Header().Set("Content-Type", "application/json")
55
-
56
-		encoder := json.NewEncoder(w)
57
-		encoder.SetEscapeHTML(false)
58
-		encoder.SetIndent("", "  ")
59
-		encoder.Encode(s) // nolint: errcheck, gas
60
-	})
61
-
62
-	http.ListenAndServe(s.conf.StatAddr(), nil) // nolint: errcheck, gas
63
-}
64
-
65
-// NewStats returns new instance of statistics datastructure.
66
-func NewStats(conf *config.Config) *Stats {
67
-	stat := &Stats{
68
-		Uptime: statsUptime(time.Now()),
69
-		conf:   conf,
70
-	}
71
-	stat.URLs = conf.GetURLs()
72
-
73
-	return stat
74
-}

+ 3
- 2
telegram/dialer.go Просмотреть файл

@@ -30,11 +30,12 @@ func (t *tgDialer) dial(addr string) (net.Conn, error) {
30 30
 	return conn, nil
31 31
 }
32 32
 
33
-func (t *tgDialer) dialRWC(addr string) (wrappers.ReadWriteCloserWithAddr, error) {
33
+func (t *tgDialer) dialRWC(addr, connID string) (wrappers.StreamReadWriteCloser, error) {
34 34
 	conn, err := t.dial(addr)
35 35
 	if err != nil {
36 36
 		return nil, err
37 37
 	}
38
+	tgConn := wrappers.NewConn(conn, connID, wrappers.ConnPurposeTelegram, t.conf.PublicIPv4, t.conf.PublicIPv6)
38 39
 
39
-	return wrappers.NewTimeoutRWC(conn, t.conf.PublicIPv4, t.conf.PublicIPv6), nil
40
+	return tgConn, nil
40 41
 }

+ 8
- 9
telegram/direct.go Просмотреть файл

@@ -4,7 +4,6 @@ import (
4 4
 	"net"
5 5
 
6 6
 	"github.com/juju/errors"
7
-	"go.uber.org/zap"
8 7
 
9 8
 	"github.com/9seconds/mtg/config"
10 9
 	"github.com/9seconds/mtg/mtproto"
@@ -29,11 +28,11 @@ var (
29 28
 	}
30 29
 )
31 30
 
32
-type directTelegram struct {
31
+type DirectTelegram struct {
33 32
 	baseTelegram
34 33
 }
35 34
 
36
-func (t *directTelegram) Dial(connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
35
+func (t *DirectTelegram) Dial(connID string, connOpts *mtproto.ConnectionOpts) (wrappers.StreamReadWriteCloser, error) {
37 36
 	dc := connOpts.DC
38 37
 	if dc < 0 {
39 38
 		dc = -dc
@@ -41,23 +40,23 @@ func (t *directTelegram) Dial(connOpts *mtproto.ConnectionOpts) (wrappers.ReadWr
41 40
 		dc = 1
42 41
 	}
43 42
 
44
-	return t.baseTelegram.dial(dc-1, connOpts.ConnectionProto)
43
+	return t.baseTelegram.dial(dc-1, connID, connOpts.ConnectionProto)
45 44
 }
46 45
 
47
-func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error) {
46
+func (t *DirectTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.StreamReadWriteCloser) (wrappers.Wrap, error) {
48 47
 	obfs2, frame := obfuscated2.MakeTelegramObfuscated2Frame(connOpts)
49 48
 
50
-	if n, err := conn.Write(frame); err != nil || n != obfuscated2.FrameLen {
49
+	if _, err := conn.Write(frame); err != nil {
51 50
 		return nil, errors.Annotate(err, "Cannot write hadnshake frame")
52 51
 	}
53 52
 
54
-	return wrappers.NewStreamCipherRWC(conn, obfs2.Encryptor, obfs2.Decryptor), nil
53
+	return wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor), nil
55 54
 }
56 55
 
57 56
 // NewDirectTelegram returns Telegram instance which connects directly
58 57
 // to Telegram bypassing middleproxies.
59
-func NewDirectTelegram(conf *config.Config, _ *zap.SugaredLogger) Telegram {
60
-	return &directTelegram{baseTelegram{
58
+func NewDirectTelegram(conf *config.Config) Telegram {
59
+	return &DirectTelegram{baseTelegram{
61 60
 		dialer: tgDialer{
62 61
 			Dialer: net.Dialer{Timeout: telegramDialTimeout},
63 62
 			conf:   conf,

+ 51
- 53
telegram/middle.go Просмотреть файл

@@ -1,56 +1,26 @@
1 1
 package telegram
2 2
 
3 3
 import (
4
-	"fmt"
5
-	"io"
6 4
 	"net"
7 5
 	"net/http"
8 6
 	"sync"
9 7
 
10 8
 	"github.com/juju/errors"
11
-	"go.uber.org/zap"
12 9
 
13 10
 	"github.com/9seconds/mtg/config"
14 11
 	"github.com/9seconds/mtg/mtproto"
15 12
 	"github.com/9seconds/mtg/mtproto/rpc"
16
-	mtwrappers "github.com/9seconds/mtg/mtproto/wrappers"
17 13
 	"github.com/9seconds/mtg/wrappers"
18 14
 )
19 15
 
20
-type middleTelegram struct {
16
+type MiddleTelegram struct {
21 17
 	middleTelegramCaller
22 18
 
23 19
 	conf *config.Config
24 20
 }
25 21
 
26
-func NewMiddleTelegram(conf *config.Config, logger *zap.SugaredLogger) Telegram {
27
-	tg := &middleTelegram{
28
-		middleTelegramCaller: middleTelegramCaller{
29
-			baseTelegram: baseTelegram{
30
-				dialer: tgDialer{
31
-					Dialer: net.Dialer{Timeout: telegramDialTimeout},
32
-					conf:   conf,
33
-				},
34
-			},
35
-			logger: logger,
36
-			httpClient: &http.Client{
37
-				Timeout: middleTelegramHTTPClientTimeout,
38
-			},
39
-			dialerMutex: &sync.RWMutex{},
40
-		},
41
-		conf: conf,
42
-	}
43
-
44
-	if err := tg.update(); err != nil {
45
-		panic(err)
46
-	}
47
-	go tg.autoUpdate()
48
-
49
-	return tg
50
-}
51
-
52
-func (t *middleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error) {
53
-	rpcNonceConn := mtwrappers.NewFrameRWC(conn, rpc.RPCNonceSeqNo)
22
+func (t *MiddleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.StreamReadWriteCloser) (wrappers.Wrap, error) {
23
+	rpcNonceConn := wrappers.NewMTProtoFrame(conn, rpc.SeqNoNonce)
54 24
 
55 25
 	rpcNonceReq, err := t.sendRPCNonceRequest(rpcNonceConn)
56 26
 	if err != nil {
@@ -61,23 +31,29 @@ func (t *middleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.Re
61 31
 		return nil, err
62 32
 	}
63 33
 
64
-	secureConn := mtwrappers.NewMiddleProxyCipherRWC(conn, rpcNonceReq, rpcNonceResp, t.proxySecret)
65
-	secureConn = mtwrappers.NewFrameRWC(secureConn, rpc.RPCHandshakeSeqNo)
34
+	secureConn := wrappers.NewMiddleProxyCipher(conn, rpcNonceReq, rpcNonceResp, t.proxySecret)
35
+	frameConn := wrappers.NewMTProtoFrame(secureConn, rpc.SeqNoHandshake)
66 36
 
67
-	rpcHandshakeReq, err := t.sendRPCHandshakeRequest(secureConn)
37
+	rpcHandshakeReq, err := t.sendRPCHandshakeRequest(frameConn)
38
+	if err != nil {
39
+		return nil, err
40
+	}
41
+	_, err = t.receiveRPCHandshakeResponse(frameConn, rpcHandshakeReq)
68 42
 	if err != nil {
69 43
 		return nil, err
70 44
 	}
71
-	_, err = t.receiveRPCHandshakeResponse(secureConn, rpcHandshakeReq)
45
+
46
+	proxyConn, err := wrappers.NewMTProtoProxy(frameConn, connOpts, t.conf.AdTag)
72 47
 	if err != nil {
73 48
 		return nil, err
74 49
 	}
50
+	proxyConn.Logger().Infow("Telegram connection initialized")
75 51
 
76
-	return mtwrappers.NewProxyRequestRWC(secureConn, connOpts, t.conf.AdTag)
52
+	return proxyConn, nil
77 53
 }
78 54
 
79
-func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.RPCNonceRequest, error) {
80
-	rpcNonceReq, err := rpc.NewRPCNonceRequest(t.proxySecret)
55
+func (t *MiddleTelegram) sendRPCNonceRequest(conn wrappers.PacketWriter) (*rpc.NonceRequest, error) {
56
+	rpcNonceReq, err := rpc.NewNonceRequest(t.proxySecret)
81 57
 	if err != nil {
82 58
 		return nil, errors.Annotate(err, "Cannot create RPC nonce request")
83 59
 	}
@@ -88,14 +64,13 @@ func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.RPCNonceReque
88 64
 	return rpcNonceReq, nil
89 65
 }
90 66
 
91
-func (t *middleTelegram) receiveRPCNonceResponse(conn io.Reader, req *rpc.RPCNonceRequest) (*rpc.RPCNonceResponse, error) {
92
-	var ans [128]byte
93
-
94
-	n, err := conn.Read(ans[:])
67
+func (t *MiddleTelegram) receiveRPCNonceResponse(conn wrappers.PacketReader, req *rpc.NonceRequest) (*rpc.NonceResponse, error) {
68
+	packet, err := conn.Read()
95 69
 	if err != nil {
96 70
 		return nil, errors.Annotate(err, "Cannot read RPC nonce response")
97 71
 	}
98
-	rpcNonceResp, err := rpc.NewRPCNonceResponse(ans[:n])
72
+
73
+	rpcNonceResp, err := rpc.NewNonceResponse(packet)
99 74
 	if err != nil {
100 75
 		return nil, errors.Annotate(err, "Cannot initialize RPC nonce response")
101 76
 	}
@@ -106,8 +81,8 @@ func (t *middleTelegram) receiveRPCNonceResponse(conn io.Reader, req *rpc.RPCNon
106 81
 	return rpcNonceResp, nil
107 82
 }
108 83
 
109
-func (t *middleTelegram) sendRPCHandshakeRequest(conn io.Writer) (*rpc.RPCHandshakeRequest, error) {
110
-	req := rpc.NewRPCHandshakeRequest()
84
+func (t *MiddleTelegram) sendRPCHandshakeRequest(conn wrappers.PacketWriter) (*rpc.HandshakeRequest, error) {
85
+	req := rpc.NewHandshakeRequest()
111 86
 	if _, err := conn.Write(req.Bytes()); err != nil {
112 87
 		return nil, errors.Annotate(err, "Cannot send RPC handshake request")
113 88
 	}
@@ -115,21 +90,44 @@ func (t *middleTelegram) sendRPCHandshakeRequest(conn io.Writer) (*rpc.RPCHandsh
115 90
 	return req, nil
116 91
 }
117 92
 
118
-func (t *middleTelegram) receiveRPCHandshakeResponse(conn io.Reader, req *rpc.RPCHandshakeRequest) (*rpc.RPCHandshakeResponse, error) {
119
-	var ans [128]byte
120
-
121
-	n, err := conn.Read(ans[:])
93
+func (t *MiddleTelegram) receiveRPCHandshakeResponse(conn wrappers.PacketReader, req *rpc.HandshakeRequest) (*rpc.HandshakeResponse, error) {
94
+	packet, err := conn.Read()
122 95
 	if err != nil {
123 96
 		return nil, errors.Annotate(err, "Cannot read RPC handshake response")
124 97
 	}
125
-	rpcHandshakeResp, err := rpc.NewRPCHandshakeResponse(ans[:n])
98
+
99
+	rpcHandshakeResp, err := rpc.NewHandshakeResponse(packet)
126 100
 	if err != nil {
127 101
 		return nil, errors.Annotate(err, "Cannot initialize RPC handshake response")
128 102
 	}
129 103
 	if err = rpcHandshakeResp.Valid(req); err != nil {
130 104
 		return nil, errors.Annotate(err, "Invalid RPC handshake response")
131 105
 	}
132
-	fmt.Println("VICTORY")
133 106
 
134 107
 	return rpcHandshakeResp, nil
135 108
 }
109
+
110
+func NewMiddleTelegram(conf *config.Config) Telegram {
111
+	tg := &MiddleTelegram{
112
+		middleTelegramCaller: middleTelegramCaller{
113
+			baseTelegram: baseTelegram{
114
+				dialer: tgDialer{
115
+					Dialer: net.Dialer{Timeout: telegramDialTimeout},
116
+					conf:   conf,
117
+				},
118
+			},
119
+			httpClient: &http.Client{
120
+				Timeout: middleTelegramHTTPClientTimeout,
121
+			},
122
+			dialerMutex: &sync.RWMutex{},
123
+		},
124
+		conf: conf,
125
+	}
126
+
127
+	if err := tg.update(); err != nil {
128
+		panic(err)
129
+	}
130
+	go tg.autoUpdate()
131
+
132
+	return tg
133
+}

+ 5
- 10
telegram/middle_caller.go Просмотреть файл

@@ -28,18 +28,17 @@ const (
28 28
 	tgUserAgent       = "mtg"
29 29
 )
30 30
 
31
-var middleTelegramProxyConfigSplitter *regexp.Regexp
31
+var middleTelegramProxyConfigSplitter = regexp.MustCompile(`\s+`)
32 32
 
33 33
 type middleTelegramCaller struct {
34 34
 	baseTelegram
35 35
 
36 36
 	proxySecret []byte
37 37
 	dialerMutex *sync.RWMutex
38
-	logger      *zap.SugaredLogger
39 38
 	httpClient  *http.Client
40 39
 }
41 40
 
42
-func (t *middleTelegramCaller) Dial(connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
41
+func (t *middleTelegramCaller) Dial(connID string, connOpts *mtproto.ConnectionOpts) (wrappers.StreamReadWriteCloser, error) {
43 42
 	dc := connOpts.DC
44 43
 	if dc == 0 {
45 44
 		dc = 1
@@ -47,13 +46,13 @@ func (t *middleTelegramCaller) Dial(connOpts *mtproto.ConnectionOpts) (wrappers.
47 46
 	t.dialerMutex.RLock()
48 47
 	defer t.dialerMutex.RUnlock()
49 48
 
50
-	return t.baseTelegram.dial(dc, connOpts.ConnectionProto)
49
+	return t.baseTelegram.dial(dc, connID, connOpts.ConnectionProto)
51 50
 }
52 51
 
53 52
 func (t *middleTelegramCaller) autoUpdate() {
54 53
 	for range time.Tick(middleTelegramAutoUpdateInterval) {
55 54
 		if err := t.update(); err != nil {
56
-			t.logger.Warnw("Cannot update from Telegram", "error", err)
55
+			zap.S().Warnw("Cannot update from Telegram", "error", err)
57 56
 		}
58 57
 	}
59 58
 }
@@ -80,7 +79,7 @@ func (t *middleTelegramCaller) update() error {
80 79
 	t.v6Addresses = v6Addresses
81 80
 	t.dialerMutex.Unlock()
82 81
 
83
-	t.logger.Infow("Telegram middle proxy data has been updated")
82
+	zap.S().Infow("Telegram middle proxy data has been updated")
84 83
 
85 84
 	return nil
86 85
 }
@@ -151,7 +150,3 @@ func (t *middleTelegramCaller) call(url string) (*http.Response, error) {
151 150
 
152 151
 	return t.httpClient.Do(req)
153 152
 }
154
-
155
-func init() {
156
-	middleTelegramProxyConfigSplitter = regexp.MustCompile(`\s+`)
157
-}

+ 4
- 7
telegram/telegram.go Просмотреть файл

@@ -9,12 +9,9 @@ import (
9 9
 	"github.com/9seconds/mtg/wrappers"
10 10
 )
11 11
 
12
-// Telegram defines an interface to connect to Telegram. This
13
-// encapsulates logic of working with middleproxies or direct
14
-// connections.
15 12
 type Telegram interface {
16
-	Dial(*mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error)
17
-	Init(*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error)
13
+	Dial(string, *mtproto.ConnectionOpts) (wrappers.StreamReadWriteCloser, error)
14
+	Init(*mtproto.ConnectionOpts, wrappers.StreamReadWriteCloser) (wrappers.Wrap, error)
18 15
 }
19 16
 
20 17
 type baseTelegram struct {
@@ -24,7 +21,7 @@ type baseTelegram struct {
24 21
 	v6Addresses map[int16][]string
25 22
 }
26 23
 
27
-func (b *baseTelegram) dial(dcIdx int16, proto mtproto.ConnectionProtocol) (wrappers.ReadWriteCloserWithAddr, error) {
24
+func (b *baseTelegram) dial(dcIdx int16, connID string, proto mtproto.ConnectionProtocol) (wrappers.StreamReadWriteCloser, error) {
28 25
 	addrs := make([]string, 2)
29 26
 
30 27
 	if proto&mtproto.ConnectionProtocolIPv6 != 0 {
@@ -39,7 +36,7 @@ func (b *baseTelegram) dial(dcIdx int16, proto mtproto.ConnectionProtocol) (wrap
39 36
 	}
40 37
 
41 38
 	for _, addr := range addrs {
42
-		if conn, err := b.dialer.dialRWC(addr); err == nil {
39
+		if conn, err := b.dialer.dialRWC(addr, connID); err == nil {
43 40
 			return conn, err
44 41
 		}
45 42
 	}

+ 20
- 0
utils/read_current_data.go Просмотреть файл

@@ -0,0 +1,20 @@
1
+package utils
2
+
3
+import "io"
4
+
5
+const readCurrentDataBufferSize = 1024 + 1 // + 1 because telegram operates with blocks mod 4
6
+
7
+func ReadCurrentData(src io.Reader) (rv []byte, err error) {
8
+	buf := make([]byte, readCurrentDataBufferSize)
9
+	n := readCurrentDataBufferSize
10
+
11
+	for n == len(buf) {
12
+		n, err = src.Read(buf)
13
+		if err != nil {
14
+			return nil, err
15
+		}
16
+		rv = append(rv, buf[:n]...)
17
+	}
18
+
19
+	return rv, nil
20
+}

+ 14
- 0
utils/reverse_bytes.go Просмотреть файл

@@ -0,0 +1,14 @@
1
+package utils
2
+
3
+func ReverseBytes(data []byte) []byte {
4
+	dataLen := len(data)
5
+	rv := make([]byte, dataLen)
6
+
7
+	rv[dataLen/2] = data[dataLen/2]
8
+	for i := dataLen/2 - 1; i >= 0; i-- {
9
+		opp := dataLen - i - 1
10
+		rv[i], rv[opp] = data[opp], data[i]
11
+	}
12
+
13
+	return rv
14
+}

+ 11
- 0
utils/uint24.go Просмотреть файл

@@ -0,0 +1,11 @@
1
+package utils
2
+
3
+type Uint24 [3]byte
4
+
5
+func ToUint24(number uint32) Uint24 {
6
+	return Uint24{byte(number), byte(number >> 8), byte(number >> 16)}
7
+}
8
+
9
+func FromUint24(number Uint24) uint32 {
10
+	return uint32(number[0]) + (uint32(number[1]) << 8) + (uint32(number[2]) << 16)
11
+}

+ 90
- 0
wrappers/blockcipher.go Просмотреть файл

@@ -0,0 +1,90 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/aes"
6
+	"crypto/cipher"
7
+	"net"
8
+
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/utils"
12
+	"github.com/juju/errors"
13
+)
14
+
15
+type BlockCipher struct {
16
+	buf *bytes.Buffer
17
+
18
+	logger    *zap.SugaredLogger
19
+	conn      StreamReadWriteCloser
20
+	encryptor cipher.BlockMode
21
+	decryptor cipher.BlockMode
22
+}
23
+
24
+func (b *BlockCipher) Read(p []byte) (int, error) {
25
+	if b.buf.Len() > 0 {
26
+		return b.flush(p)
27
+	}
28
+
29
+	buf := []byte{}
30
+	for len(buf) == 0 || len(buf)%aes.BlockSize != 0 {
31
+		rv, err := utils.ReadCurrentData(b.conn)
32
+		if err != nil {
33
+			return 0, errors.Annotate(err, "Cannot read from socket")
34
+		}
35
+		buf = append(buf, rv...)
36
+	}
37
+
38
+	b.decryptor.CryptBlocks(buf, buf)
39
+	b.buf.Write(buf)
40
+
41
+	return b.flush(p)
42
+}
43
+
44
+func (b *BlockCipher) flush(p []byte) (int, error) {
45
+	if b.buf.Len() <= len(p) {
46
+		sizeToReturn := b.buf.Len()
47
+		copy(p, b.buf.Bytes())
48
+		b.buf.Reset()
49
+		return sizeToReturn, nil
50
+	}
51
+
52
+	return b.buf.Read(p)
53
+}
54
+
55
+func (b *BlockCipher) Write(p []byte) (int, error) {
56
+	if len(p)%aes.BlockSize > 0 {
57
+		return 0, errors.Errorf("Incorrect block size %d", len(p))
58
+	}
59
+
60
+	encrypted := make([]byte, len(p))
61
+	b.encryptor.CryptBlocks(encrypted, p)
62
+
63
+	return b.conn.Write(encrypted)
64
+}
65
+
66
+func (b *BlockCipher) Logger() *zap.SugaredLogger {
67
+	return b.logger
68
+}
69
+
70
+func (b *BlockCipher) LocalAddr() *net.TCPAddr {
71
+	return b.conn.LocalAddr()
72
+}
73
+
74
+func (b *BlockCipher) RemoteAddr() *net.TCPAddr {
75
+	return b.conn.RemoteAddr()
76
+}
77
+
78
+func (b *BlockCipher) Close() error {
79
+	return b.conn.Close()
80
+}
81
+
82
+func NewBlockCipher(conn StreamReadWriteCloser, encryptor, decryptor cipher.BlockMode) StreamReadWriteCloser {
83
+	return &BlockCipher{
84
+		buf:       &bytes.Buffer{},
85
+		conn:      conn,
86
+		logger:    conn.Logger().Named("block-cipher"),
87
+		encryptor: encryptor,
88
+		decryptor: decryptor,
89
+	}
90
+}

+ 0
- 66
wrappers/blockcipherrwc.go Просмотреть файл

@@ -1,66 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"crypto/aes"
5
-	"crypto/cipher"
6
-	"net"
7
-
8
-	"github.com/juju/errors"
9
-)
10
-
11
-type BlockCipherReadWriteCloserWithAddr struct {
12
-	BufferedReader
13
-
14
-	conn      ReadWriteCloserWithAddr
15
-	encryptor cipher.BlockMode
16
-	decryptor cipher.BlockMode
17
-}
18
-
19
-func (c *BlockCipherReadWriteCloserWithAddr) Read(p []byte) (int, error) {
20
-	return c.BufferedRead(p, func() error {
21
-		bufferLength := c.Buffer.Len()
22
-		for bufferLength%aes.BlockSize != 0 || bufferLength == 0 {
23
-			n, err := c.conn.Read(p)
24
-			if err != nil {
25
-				return errors.Annotate(err, "Cannot read from socket")
26
-			}
27
-			c.Buffer.Write(p[:n])
28
-			bufferLength = c.Buffer.Len()
29
-		}
30
-		c.decryptor.CryptBlocks(c.Buffer.Bytes(), c.Buffer.Bytes())
31
-
32
-		return nil
33
-	})
34
-}
35
-
36
-func (c *BlockCipherReadWriteCloserWithAddr) Write(p []byte) (int, error) {
37
-	if len(p)%aes.BlockSize > 0 {
38
-		return 0, errors.Errorf("Incorrect block size %d", len(p))
39
-	}
40
-
41
-	encrypted := make([]byte, len(p))
42
-	c.encryptor.CryptBlocks(encrypted, p)
43
-
44
-	return c.conn.Write(encrypted)
45
-}
46
-
47
-func (c *BlockCipherReadWriteCloserWithAddr) Close() error {
48
-	return c.conn.Close()
49
-}
50
-
51
-func (c *BlockCipherReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
52
-	return c.conn.LocalAddr()
53
-}
54
-
55
-func (c *BlockCipherReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
56
-	return c.conn.RemoteAddr()
57
-}
58
-
59
-func NewBlockCipherRWC(conn ReadWriteCloserWithAddr, encryptor, decryptor cipher.BlockMode) ReadWriteCloserWithAddr {
60
-	return &BlockCipherReadWriteCloserWithAddr{
61
-		BufferedReader: NewBufferedReader(),
62
-		conn:           conn,
63
-		encryptor:      encryptor,
64
-		decryptor:      decryptor,
65
-	}
66
-}

+ 0
- 27
wrappers/buffer_pool.go Просмотреть файл

@@ -1,27 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"bytes"
5
-	"sync"
6
-)
7
-
8
-var bufPool sync.Pool
9
-
10
-func getBuffer() *bytes.Buffer {
11
-	buf := bufPool.Get().(*bytes.Buffer)
12
-	buf.Reset()
13
-
14
-	return buf
15
-}
16
-
17
-func putBuffer(buf *bytes.Buffer) {
18
-	bufPool.Put(buf)
19
-}
20
-
21
-func init() {
22
-	bufPool = sync.Pool{
23
-		New: func() interface{} {
24
-			return &bytes.Buffer{}
25
-		},
26
-	}
27
-}

+ 0
- 32
wrappers/buffered_reader.go Просмотреть файл

@@ -1,32 +0,0 @@
1
-package wrappers
2
-
3
-import "bytes"
4
-
5
-type BufferedReader struct {
6
-	Buffer *bytes.Buffer
7
-}
8
-
9
-func (b *BufferedReader) BufferedRead(p []byte, callback func() error) (int, error) {
10
-	if b.Buffer.Len() > 0 {
11
-		return b.flush(p)
12
-	}
13
-	if err := callback(); err != nil {
14
-		return 0, err
15
-	}
16
-	return b.flush(p)
17
-}
18
-
19
-func (b *BufferedReader) flush(p []byte) (int, error) {
20
-	if b.Buffer.Len() < len(p) {
21
-		sizeToReturn := b.Buffer.Len()
22
-		copy(p, b.Buffer.Bytes())
23
-		b.Buffer.Reset()
24
-		return sizeToReturn, nil
25
-	}
26
-
27
-	return b.Buffer.Read(p)
28
-}
29
-
30
-func NewBufferedReader() BufferedReader {
31
-	return BufferedReader{Buffer: &bytes.Buffer{}}
32
-}

+ 105
- 0
wrappers/conn.go Просмотреть файл

@@ -0,0 +1,105 @@
1
+package wrappers
2
+
3
+import (
4
+	"net"
5
+	"time"
6
+
7
+	"go.uber.org/zap"
8
+)
9
+
10
+type ConnPurpose uint8
11
+
12
+func (c ConnPurpose) String() string {
13
+	switch c {
14
+	case ConnPurposeClient:
15
+		return "client"
16
+	case ConnPurposeTelegram:
17
+		return "telegram"
18
+	}
19
+
20
+	return ""
21
+}
22
+
23
+const (
24
+	ConnPurposeClient = iota
25
+	ConnPurposeTelegram
26
+)
27
+
28
+const (
29
+	connTimeoutRead  = 5 * time.Minute
30
+	connTimeoutWrite = 5 * time.Minute
31
+)
32
+
33
+type Conn struct {
34
+	connID     string
35
+	conn       net.Conn
36
+	logger     *zap.SugaredLogger
37
+	publicIPv4 net.IP
38
+	publicIPv6 net.IP
39
+}
40
+
41
+func (c *Conn) Write(p []byte) (int, error) {
42
+	c.conn.SetWriteDeadline(time.Now().Add(connTimeoutWrite))
43
+	n, err := c.conn.Write(p)
44
+
45
+	c.logger.Debugw("Write to stream", "bytes", n, "error", err)
46
+
47
+	return n, err
48
+}
49
+
50
+func (c *Conn) Read(p []byte) (int, error) {
51
+	c.conn.SetReadDeadline(time.Now().Add(connTimeoutRead))
52
+	n, err := c.conn.Read(p)
53
+
54
+	c.logger.Debugw("Read from stream", "bytes", n, "error", err)
55
+
56
+	return n, err
57
+}
58
+
59
+func (c *Conn) Close() error {
60
+	defer c.logger.Debugw("Closed connection")
61
+	return c.conn.Close()
62
+}
63
+
64
+func (c *Conn) LocalAddr() *net.TCPAddr {
65
+	addr := c.conn.LocalAddr().(*net.TCPAddr)
66
+	newAddr := *addr
67
+
68
+	if c.RemoteAddr().IP.To4() != nil {
69
+		if c.publicIPv4 != nil {
70
+			newAddr.IP = c.publicIPv4
71
+		}
72
+	} else if c.publicIPv6 != nil {
73
+		newAddr.IP = c.publicIPv6
74
+	}
75
+
76
+	return &newAddr
77
+}
78
+
79
+func (c *Conn) RemoteAddr() *net.TCPAddr {
80
+	return c.conn.RemoteAddr().(*net.TCPAddr)
81
+}
82
+
83
+func (c *Conn) Logger() *zap.SugaredLogger {
84
+	return c.logger
85
+}
86
+
87
+func NewConn(conn net.Conn, connID string, purpose ConnPurpose, publicIPv4, publicIPv6 net.IP) StreamReadWriteCloser {
88
+	logger := zap.S().With(
89
+		"connection_id", connID,
90
+		"local_address", conn.LocalAddr(),
91
+		"remote_address", conn.RemoteAddr(),
92
+		"purpose", purpose,
93
+	).Named("conn")
94
+
95
+	wrapper := Conn{
96
+		logger:     logger,
97
+		connID:     connID,
98
+		conn:       conn,
99
+		publicIPv4: publicIPv4,
100
+		publicIPv6: publicIPv6,
101
+	}
102
+	wrapper.logger = logger.With("faked_local_addr", wrapper.LocalAddr())
103
+
104
+	return &wrapper
105
+}

+ 0
- 67
wrappers/ctxrwc.go Просмотреть файл

@@ -1,67 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"context"
5
-	"net"
6
-
7
-	"github.com/juju/errors"
8
-)
9
-
10
-// CtxReadWriteCloser wraps underlying connection and does management of the
11
-// context and its cancel function.
12
-type CtxReadWriteCloserWithAddr struct {
13
-	ctx    context.Context
14
-	conn   ReadWriteCloserWithAddr
15
-	cancel context.CancelFunc
16
-}
17
-
18
-// Read reads from connection
19
-func (c *CtxReadWriteCloserWithAddr) Read(p []byte) (int, error) {
20
-	select {
21
-	case <-c.ctx.Done():
22
-		return 0, errors.Annotate(c.ctx.Err(), "Read is failed because of closed context")
23
-	default:
24
-		n, err := c.conn.Read(p)
25
-		if err != nil {
26
-			c.cancel()
27
-		}
28
-		return n, err
29
-	}
30
-}
31
-
32
-// Write writes into connection.
33
-func (c *CtxReadWriteCloserWithAddr) Write(p []byte) (int, error) {
34
-	select {
35
-	case <-c.ctx.Done():
36
-		return 0, errors.Annotate(c.ctx.Err(), "Write is failed because of closed context")
37
-	default:
38
-		n, err := c.conn.Write(p)
39
-		if err != nil {
40
-			c.cancel()
41
-		}
42
-		return n, err
43
-	}
44
-}
45
-
46
-// Close closes underlying connection.
47
-func (c *CtxReadWriteCloserWithAddr) Close() error {
48
-	return c.conn.Close()
49
-}
50
-
51
-func (c *CtxReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
52
-	return c.conn.LocalAddr()
53
-}
54
-
55
-func (c *CtxReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
56
-	return c.conn.RemoteAddr()
57
-}
58
-
59
-// NewCtxRWC returns ReadWriteCloser which respects given context,
60
-// cancellation etc.
61
-func NewCtxRWC(ctx context.Context, cancel context.CancelFunc, conn ReadWriteCloserWithAddr) ReadWriteCloserWithAddr {
62
-	return &CtxReadWriteCloserWithAddr{
63
-		conn:   conn,
64
-		ctx:    ctx,
65
-		cancel: cancel,
66
-	}
67
-}

+ 0
- 55
wrappers/logrwc.go Просмотреть файл

@@ -1,55 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"net"
5
-
6
-	"go.uber.org/zap"
7
-)
8
-
9
-// LogReadWriteCloser adds additional logging for reading/writing. All
10
-// logging is performed for debug mode only.
11
-type LogReadWriteCloserWithAddr struct {
12
-	conn   ReadWriteCloserWithAddr
13
-	logger *zap.SugaredLogger
14
-	sockid string
15
-	name   string
16
-}
17
-
18
-// Read reads from connection
19
-func (l *LogReadWriteCloserWithAddr) Read(p []byte) (n int, err error) {
20
-	n, err = l.conn.Read(p)
21
-	l.logger.Debugw("Finish reading", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err)
22
-	return
23
-}
24
-
25
-// Write writes into connection.
26
-func (l *LogReadWriteCloserWithAddr) Write(p []byte) (n int, err error) {
27
-	n, err = l.conn.Write(p)
28
-	l.logger.Debugw("Finish writing", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err)
29
-	return
30
-}
31
-
32
-// Close closes underlying connection.
33
-func (l *LogReadWriteCloserWithAddr) Close() error {
34
-	err := l.conn.Close()
35
-	l.logger.Debugw("Finish closing socket", "name", l.name, "socketid", l.sockid, "error", err)
36
-	return err
37
-}
38
-
39
-func (l *LogReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
40
-	return l.conn.LocalAddr()
41
-}
42
-
43
-func (l *LogReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
44
-	return l.conn.RemoteAddr()
45
-}
46
-
47
-// NewLogRWC wraps ReadWriteCloser with logger calls.
48
-func NewLogRWC(conn ReadWriteCloserWithAddr, logger *zap.SugaredLogger, sockid string, name string) ReadWriteCloserWithAddr {
49
-	return &LogReadWriteCloserWithAddr{
50
-		conn:   conn,
51
-		logger: logger,
52
-		sockid: sockid,
53
-		name:   name,
54
-	}
55
-}

+ 152
- 0
wrappers/mtproto_abridged.go Просмотреть файл

@@ -0,0 +1,152 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"io"
6
+	"net"
7
+
8
+	"github.com/juju/errors"
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/mtproto"
12
+	"github.com/9seconds/mtg/utils"
13
+)
14
+
15
+const (
16
+	mtprotoAbridgedSmallPacketLength = 0x7f
17
+	mtprotoAbridgedQuickAckLength    = 0x80
18
+	mtprotoAbridgedLargePacketLength = 16777216 // 256 ^ 3
19
+)
20
+
21
+type MTProtoAbridged struct {
22
+	conn   StreamReadWriteCloser
23
+	opts   *mtproto.ConnectionOpts
24
+	logger *zap.SugaredLogger
25
+
26
+	readCounter  uint32
27
+	writeCounter uint32
28
+}
29
+
30
+func (m *MTProtoAbridged) Read() ([]byte, error) {
31
+	defer func() {
32
+		m.readCounter++
33
+	}()
34
+
35
+	m.logger.Debugw("Read packet",
36
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
37
+		"quick_ack", m.opts.ReadHacks.QuickAck,
38
+		"counter", m.readCounter,
39
+	)
40
+
41
+	buf := &bytes.Buffer{}
42
+	buf.Grow(3)
43
+
44
+	if _, err := io.CopyN(buf, m.conn, 1); err != nil {
45
+		return nil, errors.Annotate(err, "Cannot read message length")
46
+	}
47
+	msgLength := uint32(buf.Bytes()[0])
48
+	buf.Reset()
49
+
50
+	m.logger.Debugw("Packet first byte",
51
+		"byte", msgLength,
52
+		"counter", m.readCounter,
53
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
54
+		"quick_ack", m.opts.ReadHacks.QuickAck,
55
+	)
56
+
57
+	if msgLength >= mtprotoAbridgedQuickAckLength {
58
+		m.opts.ReadHacks.QuickAck = true
59
+		msgLength -= mtprotoAbridgedQuickAckLength
60
+	}
61
+
62
+	if msgLength == mtprotoAbridgedSmallPacketLength {
63
+		if _, err := io.CopyN(buf, m.conn, 3); err != nil {
64
+			return nil, errors.Annotate(err, "Cannot read the correct message length")
65
+		}
66
+		number := utils.Uint24{}
67
+		copy(number[:], buf.Bytes())
68
+		msgLength = utils.FromUint24(number)
69
+	}
70
+	msgLength *= 4
71
+
72
+	m.logger.Debugw("Packet length",
73
+		"length", msgLength,
74
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
75
+		"quick_ack", m.opts.ReadHacks.QuickAck,
76
+		"counter", m.readCounter,
77
+	)
78
+
79
+	buf.Reset()
80
+	buf.Grow(int(msgLength))
81
+	if _, err := io.CopyN(buf, m.conn, int64(msgLength)); err != nil {
82
+		return nil, errors.Annotate(err, "Cannot read message")
83
+	}
84
+
85
+	return buf.Bytes(), nil
86
+}
87
+
88
+func (m *MTProtoAbridged) Write(p []byte) (int, error) {
89
+	defer func() {
90
+		m.writeCounter++
91
+	}()
92
+
93
+	m.logger.Debugw("Write packet",
94
+		"length", len(p),
95
+		"simple_ack", m.opts.WriteHacks.SimpleAck,
96
+		"quick_ack", m.opts.WriteHacks.QuickAck,
97
+		"counter", m.writeCounter,
98
+	)
99
+
100
+	if len(p)%4 != 0 {
101
+		return 0, errors.Errorf("Incorrect packet length %d", len(p))
102
+	}
103
+
104
+	if m.opts.WriteHacks.SimpleAck {
105
+		return m.conn.Write(utils.ReverseBytes(p))
106
+	}
107
+
108
+	packetLength := len(p) / 4
109
+	switch {
110
+	case packetLength < mtprotoAbridgedSmallPacketLength:
111
+		newData := append([]byte{byte(packetLength)}, p...)
112
+		return m.conn.Write(newData)
113
+
114
+	case packetLength < mtprotoAbridgedLargePacketLength:
115
+		length24 := utils.ToUint24(uint32(packetLength))
116
+
117
+		buf := &bytes.Buffer{}
118
+		buf.Grow(1 + 3 + len(p))
119
+
120
+		buf.WriteByte(byte(mtprotoAbridgedSmallPacketLength))
121
+		buf.Write(length24[:])
122
+		buf.Write(p)
123
+
124
+		return m.conn.Write(buf.Bytes())
125
+	}
126
+
127
+	return 0, errors.Errorf("Packet is too big %d", len(p))
128
+}
129
+
130
+func (m *MTProtoAbridged) Logger() *zap.SugaredLogger {
131
+	return m.logger
132
+}
133
+
134
+func (m *MTProtoAbridged) LocalAddr() *net.TCPAddr {
135
+	return m.conn.LocalAddr()
136
+}
137
+
138
+func (m *MTProtoAbridged) RemoteAddr() *net.TCPAddr {
139
+	return m.conn.RemoteAddr()
140
+}
141
+
142
+func (m *MTProtoAbridged) Close() error {
143
+	return m.conn.Close()
144
+}
145
+
146
+func NewMTProtoAbridged(conn StreamReadWriteCloser, opts *mtproto.ConnectionOpts) PacketReadWriteCloser {
147
+	return &MTProtoAbridged{
148
+		conn:   conn,
149
+		opts:   opts,
150
+		logger: conn.Logger().Named("mtproto-abridged"),
151
+	}
152
+}

mtproto/wrappers/crypt.go → wrappers/mtproto_cipher.go Просмотреть файл

@@ -10,7 +10,7 @@ import (
10 10
 	"net"
11 11
 
12 12
 	"github.com/9seconds/mtg/mtproto/rpc"
13
-	"github.com/9seconds/mtg/wrappers"
13
+	"github.com/9seconds/mtg/utils"
14 14
 )
15 15
 
16 16
 type CipherPurpose uint8
@@ -22,21 +22,20 @@ const (
22 22
 
23 23
 var emptyIP = [4]byte{0x00, 0x00, 0x00, 0x00}
24 24
 
25
-func NewMiddleProxyCipherRWC(conn wrappers.ReadWriteCloserWithAddr, req *rpc.RPCNonceRequest, resp *rpc.RPCNonceResponse, secret []byte) wrappers.ReadWriteCloserWithAddr {
25
+func NewMiddleProxyCipher(conn StreamReadWriteCloser, req *rpc.NonceRequest, resp *rpc.NonceResponse, secret []byte) StreamReadWriteCloser {
26 26
 	localAddr := conn.LocalAddr()
27 27
 	remoteAddr := conn.RemoteAddr()
28 28
 
29
-	encKey, encIV := makeKeys(CipherPurposeClient, req, resp, localAddr, remoteAddr, secret)
30
-	decKey, decIV := makeKeys(CipherPurposeServer, req, resp, localAddr, remoteAddr, secret)
29
+	encKey, encIV := deriveKeys(CipherPurposeClient, req, resp, localAddr, remoteAddr, secret)
30
+	decKey, decIV := deriveKeys(CipherPurposeServer, req, resp, localAddr, remoteAddr, secret)
31 31
 
32 32
 	enc, _ := makeEncrypterDecrypter(encKey, encIV)
33 33
 	_, dec := makeEncrypterDecrypter(decKey, decIV)
34 34
 
35
-	return wrappers.NewBlockCipherRWC(conn, enc, dec)
35
+	return NewBlockCipher(conn, enc, dec)
36 36
 }
37 37
 
38
-func makeKeys(purpose CipherPurpose, req *rpc.RPCNonceRequest, resp *rpc.RPCNonceResponse,
39
-	client *net.TCPAddr, remote *net.TCPAddr, secret []byte) ([]byte, []byte) {
38
+func deriveKeys(purpose CipherPurpose, req *rpc.NonceRequest, resp *rpc.NonceResponse, client *net.TCPAddr, remote *net.TCPAddr, secret []byte) ([]byte, []byte) {
40 39
 	message := bytes.Buffer{}
41 40
 	message.Write(resp.Nonce[:])
42 41
 	message.Write(req.Nonce[:])
@@ -45,8 +44,8 @@ func makeKeys(purpose CipherPurpose, req *rpc.RPCNonceRequest, resp *rpc.RPCNonc
45 44
 	clientIPv4 := emptyIP[:]
46 45
 	serverIPv4 := emptyIP[:]
47 46
 	if client.IP.To4() != nil {
48
-		clientIPv4 = reverseBytes(client.IP.To4())
49
-		serverIPv4 = reverseBytes(remote.IP.To4())
47
+		clientIPv4 = utils.ReverseBytes(client.IP.To4())
48
+		serverIPv4 = utils.ReverseBytes(remote.IP.To4())
50 49
 	}
51 50
 	message.Write(serverIPv4)
52 51
 
@@ -93,16 +92,3 @@ func makeEncrypterDecrypter(key, iv []byte) (cipher.BlockMode, cipher.BlockMode)
93 92
 
94 93
 	return cipher.NewCBCEncrypter(block, iv), cipher.NewCBCDecrypter(block, iv)
95 94
 }
96
-
97
-func reverseBytes(data []byte) []byte {
98
-	dataLen := len(data)
99
-	rv := make([]byte, dataLen)
100
-
101
-	rv[dataLen/2] = data[dataLen/2]
102
-	for i := dataLen/2 - 1; i >= 0; i-- {
103
-		opp := dataLen - i - 1
104
-		rv[i], rv[opp] = data[opp], data[i]
105
-	}
106
-
107
-	return rv
108
-}

+ 143
- 0
wrappers/mtproto_frame.go Просмотреть файл

@@ -0,0 +1,143 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/aes"
6
+	"encoding/binary"
7
+	"hash/crc32"
8
+	"io"
9
+	"io/ioutil"
10
+	"net"
11
+
12
+	"github.com/juju/errors"
13
+	"go.uber.org/zap"
14
+)
15
+
16
+const (
17
+	mtprotoFrameMinMessageLength = 12
18
+	mtprotoFrameMaxMessageLength = 16777216
19
+)
20
+
21
+var mtprotoFramePadding = []byte{0x04, 0x00, 0x00, 0x00}
22
+
23
+type MTProtoFrame struct {
24
+	conn   StreamReadWriteCloser
25
+	logger *zap.SugaredLogger
26
+
27
+	readSeqNo  int32
28
+	writeSeqNo int32
29
+}
30
+
31
+func (m *MTProtoFrame) Read() ([]byte, error) {
32
+	buf := &bytes.Buffer{}
33
+	sum := crc32.NewIEEE()
34
+	writer := io.MultiWriter(buf, sum)
35
+
36
+	for {
37
+		buf.Reset()
38
+		sum.Reset()
39
+		if _, err := io.CopyN(writer, m.conn, 4); err != nil {
40
+			return nil, errors.Annotate(err, "Cannot read frame padding")
41
+		}
42
+		if !bytes.Equal(buf.Bytes(), mtprotoFramePadding) {
43
+			break
44
+		}
45
+	}
46
+
47
+	messageLength := binary.LittleEndian.Uint32(buf.Bytes())
48
+	m.logger.Debugw("Read MTProto frame",
49
+		"messageLength", messageLength,
50
+		"sequence_number", m.readSeqNo,
51
+	)
52
+	if messageLength%4 != 0 || messageLength < mtprotoFrameMinMessageLength || messageLength > mtprotoFrameMaxMessageLength {
53
+		return nil, errors.Errorf("Incorrect frame message length %d", messageLength)
54
+	}
55
+
56
+	buf.Reset()
57
+	buf.Grow(int(messageLength) - 4 - 4)
58
+	if _, err := io.CopyN(writer, m.conn, int64(messageLength)-4-4); err != nil {
59
+		return nil, errors.Annotate(err, "Cannot read the message frame")
60
+	}
61
+
62
+	var seqNo int32
63
+	binary.Read(buf, binary.LittleEndian, &seqNo)
64
+	if seqNo != m.readSeqNo {
65
+		return nil, errors.Errorf("Unexpected sequence number %d (wait for %d)", seqNo, m.readSeqNo)
66
+	}
67
+
68
+	data, _ := ioutil.ReadAll(buf)
69
+	buf.Reset()
70
+	// write to buf, not to writer. This is because we are going to fetch
71
+	// crc32 checksum.
72
+	if _, err := io.CopyN(buf, m.conn, 4); err != nil {
73
+		return nil, errors.Annotate(err, "Cannot read checksum")
74
+	}
75
+
76
+	checksum := binary.LittleEndian.Uint32(buf.Bytes())
77
+	if checksum != sum.Sum32() {
78
+		return nil, errors.Errorf("CRC32 checksum mismatch. Wait for %d, got %d", sum.Sum32(), checksum)
79
+	}
80
+
81
+	m.logger.Debugw("Read MTProto frame",
82
+		"messageLength", messageLength,
83
+		"sequence_number", m.readSeqNo,
84
+		"dataLength", len(data),
85
+		"checksum", checksum,
86
+	)
87
+	m.readSeqNo++
88
+
89
+	return data, nil
90
+}
91
+
92
+func (m *MTProtoFrame) Write(p []byte) (int, error) {
93
+	messageLength := 4 + 4 + len(p) + 4
94
+	paddingLength := (aes.BlockSize - messageLength%aes.BlockSize) % aes.BlockSize
95
+
96
+	buf := &bytes.Buffer{}
97
+	buf.Grow(messageLength + paddingLength)
98
+
99
+	binary.Write(buf, binary.LittleEndian, uint32(messageLength))
100
+	binary.Write(buf, binary.LittleEndian, m.writeSeqNo)
101
+	buf.Write(p)
102
+
103
+	checksum := crc32.ChecksumIEEE(buf.Bytes())
104
+	binary.Write(buf, binary.LittleEndian, checksum)
105
+	buf.Write(bytes.Repeat(mtprotoFramePadding, paddingLength/4))
106
+
107
+	m.logger.Debugw("Write MTProto frame",
108
+		"length", len(p),
109
+		"sequence_number", m.writeSeqNo,
110
+		"crc32", checksum,
111
+		"frame_length", buf.Len(),
112
+	)
113
+	m.writeSeqNo++
114
+
115
+	_, err := m.conn.Write(buf.Bytes())
116
+
117
+	return len(p), err
118
+}
119
+
120
+func (m *MTProtoFrame) Logger() *zap.SugaredLogger {
121
+	return m.logger
122
+}
123
+
124
+func (m *MTProtoFrame) LocalAddr() *net.TCPAddr {
125
+	return m.conn.LocalAddr()
126
+}
127
+
128
+func (m *MTProtoFrame) RemoteAddr() *net.TCPAddr {
129
+	return m.conn.RemoteAddr()
130
+}
131
+
132
+func (m *MTProtoFrame) Close() error {
133
+	return m.conn.Close()
134
+}
135
+
136
+func NewMTProtoFrame(conn StreamReadWriteCloser, seqNo int32) PacketReadWriteCloser {
137
+	return &MTProtoFrame{
138
+		conn:       conn,
139
+		logger:     conn.Logger().Named("mtproto-frame"),
140
+		readSeqNo:  seqNo,
141
+		writeSeqNo: seqNo,
142
+	}
143
+}

+ 113
- 0
wrappers/mtproto_intermediate.go Просмотреть файл

@@ -0,0 +1,113 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"encoding/binary"
6
+	"io"
7
+	"net"
8
+
9
+	"github.com/juju/errors"
10
+	"go.uber.org/zap"
11
+
12
+	"github.com/9seconds/mtg/mtproto"
13
+)
14
+
15
+const mtprotoIntermediateQuickAckLength = 0x80000000
16
+
17
+type MTProtoIntermediate struct {
18
+	conn   StreamReadWriteCloser
19
+	opts   *mtproto.ConnectionOpts
20
+	logger *zap.SugaredLogger
21
+
22
+	readCounter  uint32
23
+	writeCounter uint32
24
+}
25
+
26
+func (m *MTProtoIntermediate) Read() ([]byte, error) {
27
+	defer func() {
28
+		m.readCounter++
29
+	}()
30
+
31
+	m.logger.Debugw("Read packet",
32
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
33
+		"quick_ack", m.opts.ReadHacks.QuickAck,
34
+		"counter", m.readCounter,
35
+	)
36
+
37
+	buf := &bytes.Buffer{}
38
+	buf.Grow(4)
39
+
40
+	if _, err := io.CopyN(buf, m.conn, 4); err != nil {
41
+		return nil, errors.Annotate(err, "Cannot read message length")
42
+	}
43
+	length := binary.LittleEndian.Uint32(buf.Bytes())
44
+
45
+	m.logger.Debugw("Packet message length",
46
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
47
+		"quick_ack", m.opts.ReadHacks.QuickAck,
48
+		"counter", m.readCounter,
49
+		"length", length,
50
+	)
51
+
52
+	if length > mtprotoIntermediateQuickAckLength {
53
+		m.opts.ReadHacks.QuickAck = true
54
+		length -= mtprotoIntermediateQuickAckLength
55
+	}
56
+
57
+	buf.Reset()
58
+	buf.Grow(int(length))
59
+	if _, err := io.CopyN(buf, m.conn, int64(length)); err != nil {
60
+		return nil, errors.Annotate(err, "Cannot read the message")
61
+	}
62
+
63
+	if length%4 != 0 {
64
+		length -= length % 4
65
+	}
66
+
67
+	return buf.Bytes()[:length], nil
68
+}
69
+
70
+func (m *MTProtoIntermediate) Write(p []byte) (int, error) {
71
+	defer func() {
72
+		m.writeCounter++
73
+	}()
74
+
75
+	m.logger.Debugw("Write packet",
76
+		"simple_ack", m.opts.WriteHacks.SimpleAck,
77
+		"quick_ack", m.opts.WriteHacks.QuickAck,
78
+		"counter", m.writeCounter,
79
+	)
80
+
81
+	if m.opts.ReadHacks.SimpleAck {
82
+		return m.conn.Write(p)
83
+	}
84
+
85
+	var length [4]byte
86
+	binary.LittleEndian.PutUint32(length[:], uint32(len(p)))
87
+
88
+	return m.conn.Write(append(length[:], p...))
89
+}
90
+
91
+func (m *MTProtoIntermediate) Logger() *zap.SugaredLogger {
92
+	return m.logger
93
+}
94
+
95
+func (m *MTProtoIntermediate) LocalAddr() *net.TCPAddr {
96
+	return m.conn.LocalAddr()
97
+}
98
+
99
+func (m *MTProtoIntermediate) RemoteAddr() *net.TCPAddr {
100
+	return m.conn.RemoteAddr()
101
+}
102
+
103
+func (m *MTProtoIntermediate) Close() error {
104
+	return m.conn.Close()
105
+}
106
+
107
+func NewMTProtoIntermediate(conn StreamReadWriteCloser, opts *mtproto.ConnectionOpts) PacketReadWriteCloser {
108
+	return &MTProtoIntermediate{
109
+		conn:   conn,
110
+		logger: conn.Logger().Named("mtproto-intermediate"),
111
+		opts:   opts,
112
+	}
113
+}

+ 158
- 0
wrappers/mtproto_proxy.go Просмотреть файл

@@ -0,0 +1,158 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"fmt"
6
+	"net"
7
+
8
+	"github.com/juju/errors"
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/mtproto"
12
+	"github.com/9seconds/mtg/mtproto/rpc"
13
+)
14
+
15
+type MTProtoProxy struct {
16
+	conn   PacketReadWriteCloser
17
+	req    *rpc.ProxyRequest
18
+	logger *zap.SugaredLogger
19
+
20
+	readCounter  uint32
21
+	writeCounter uint32
22
+}
23
+
24
+func (m *MTProtoProxy) Read() ([]byte, error) {
25
+	defer func() {
26
+		m.readCounter++
27
+	}()
28
+
29
+	m.logger.Debugw("Read packet",
30
+		"counter", m.readCounter,
31
+		"simple_ack", m.req.Options.WriteHacks.SimpleAck,
32
+		"quick_ack", m.req.Options.WriteHacks.QuickAck,
33
+	)
34
+
35
+	packet, err := m.conn.Read()
36
+	if err != nil {
37
+		return nil, errors.Annotate(err, "Cannot read packet")
38
+	}
39
+
40
+	m.logger.Debugw("Read packet length",
41
+		"counter", m.readCounter,
42
+		"simple_ack", m.req.Options.WriteHacks.SimpleAck,
43
+		"quick_ack", m.req.Options.WriteHacks.QuickAck,
44
+		"length", len(packet),
45
+	)
46
+
47
+	if len(packet) < 4 {
48
+		return nil, errors.Annotate(err, "Incorrect packet length")
49
+	}
50
+
51
+	tag, packet := packet[:4], packet[4:]
52
+	switch {
53
+	case bytes.Equal(tag, rpc.TagProxyAns):
54
+		return m.readProxyAns(packet)
55
+	case bytes.Equal(tag, rpc.TagSimpleAck):
56
+		return m.readSimpleAck(packet)
57
+	case bytes.Equal(tag, rpc.TagCloseExt):
58
+		return m.readCloseExt(packet)
59
+	}
60
+
61
+	return nil, errors.Errorf("Unknown RPC answer %v", tag)
62
+}
63
+
64
+func (m *MTProtoProxy) readProxyAns(data []byte) ([]byte, error) {
65
+	if len(data) < 12 {
66
+		return nil, errors.Errorf("Incorrect data of proxy answer: %d", len(data))
67
+	}
68
+	data = data[12:]
69
+
70
+	m.logger.Debugw("Read RPC_PROXY_ANS",
71
+		"counter", m.readCounter,
72
+		"length", len(data),
73
+	)
74
+
75
+	return data, nil
76
+}
77
+
78
+func (m *MTProtoProxy) readSimpleAck(data []byte) ([]byte, error) {
79
+	if len(data) != 12 {
80
+		return nil, errors.Errorf("Incorrect data of simple ack: %d", len(data))
81
+	}
82
+	data = data[8:12]
83
+	m.req.Options.WriteHacks.SimpleAck = true
84
+
85
+	m.logger.Debugw("Read RPC_SIMPLE_ACK",
86
+		"counter", m.readCounter,
87
+		"length", len(data),
88
+	)
89
+
90
+	return data, nil
91
+}
92
+
93
+func (m *MTProtoProxy) readCloseExt(data []byte) ([]byte, error) {
94
+	m.logger.Debugw("Read RPC_CLOSE_EXT", "counter", m.readCounter)
95
+
96
+	return nil, errors.New("Connection has been closed remotely by RPC call")
97
+}
98
+
99
+func (m *MTProtoProxy) Write(p []byte) (int, error) {
100
+	defer func() {
101
+		m.writeCounter++
102
+	}()
103
+
104
+	m.logger.Debugw("Write packet",
105
+		"length", len(p),
106
+		"counter", m.writeCounter,
107
+		"simple_ack", m.req.Options.ReadHacks.SimpleAck,
108
+		"quick_ack", m.req.Options.ReadHacks.QuickAck,
109
+	)
110
+
111
+	header, flags := m.req.MakeHeader(p)
112
+	if ce := m.logger.Desugar().Check(zap.DebugLevel, "RPC_PROXY_REQ header"); ce != nil {
113
+		ce.Write(
114
+			zap.Int("length", len(p)),
115
+			zap.Uint32("counter", m.writeCounter),
116
+			zap.Bool("simple_ack", m.req.Options.ReadHacks.QuickAck),
117
+			zap.Bool("quick_ack", m.req.Options.ReadHacks.SimpleAck),
118
+			zap.String("header", fmt.Sprintf("%v", header.Bytes())),
119
+			zap.Stringer("flags", flags),
120
+		)
121
+	}
122
+	header.Write(p)
123
+
124
+	if _, err := m.conn.Write(header.Bytes()); err != nil {
125
+		return 0, err
126
+	}
127
+
128
+	return len(p), nil
129
+}
130
+
131
+func (m *MTProtoProxy) Logger() *zap.SugaredLogger {
132
+	return m.logger
133
+}
134
+
135
+func (m *MTProtoProxy) LocalAddr() *net.TCPAddr {
136
+	return m.conn.LocalAddr()
137
+}
138
+
139
+func (m *MTProtoProxy) RemoteAddr() *net.TCPAddr {
140
+	return m.conn.RemoteAddr()
141
+}
142
+
143
+func (m *MTProtoProxy) Close() error {
144
+	return m.conn.Close()
145
+}
146
+
147
+func NewMTProtoProxy(conn PacketReadWriteCloser, connOpts *mtproto.ConnectionOpts, adTag []byte) (PacketReadWriteCloser, error) {
148
+	req, err := rpc.NewProxyRequest(connOpts.ClientAddr, conn.LocalAddr(), connOpts, adTag)
149
+	if err != nil {
150
+		return nil, errors.Annotate(err, "Cannot create new RPC proxy request")
151
+	}
152
+
153
+	return &MTProtoProxy{
154
+		conn:   conn,
155
+		logger: conn.Logger().Named("mtproto-proxy"),
156
+		req:    req,
157
+	}, nil
158
+}

+ 0
- 13
wrappers/rwcaddr.go Просмотреть файл

@@ -1,13 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"io"
5
-	"net"
6
-)
7
-
8
-type ReadWriteCloserWithAddr interface {
9
-	io.ReadWriteCloser
10
-
11
-	LocalAddr() *net.TCPAddr
12
-	RemoteAddr() *net.TCPAddr
13
-}

+ 58
- 0
wrappers/streamcipher.go Просмотреть файл

@@ -0,0 +1,58 @@
1
+package wrappers
2
+
3
+import (
4
+	"crypto/cipher"
5
+	"net"
6
+
7
+	"github.com/juju/errors"
8
+	"go.uber.org/zap"
9
+)
10
+
11
+type StreamCipher struct {
12
+	encryptor cipher.Stream
13
+	decryptor cipher.Stream
14
+	conn      StreamReadWriteCloser
15
+	logger    *zap.SugaredLogger
16
+}
17
+
18
+func (s *StreamCipher) Read(p []byte) (int, error) {
19
+	n, err := s.conn.Read(p)
20
+	if err != nil {
21
+		return 0, errors.Annotate(err, "Cannot read stream ciphered data")
22
+	}
23
+	s.decryptor.XORKeyStream(p, p[:n])
24
+
25
+	return n, nil
26
+}
27
+
28
+func (s *StreamCipher) Write(p []byte) (int, error) {
29
+	encrypted := make([]byte, len(p))
30
+	s.encryptor.XORKeyStream(encrypted, p)
31
+
32
+	return s.conn.Write(encrypted)
33
+}
34
+
35
+func (s *StreamCipher) Logger() *zap.SugaredLogger {
36
+	return s.logger
37
+}
38
+
39
+func (s *StreamCipher) LocalAddr() *net.TCPAddr {
40
+	return s.conn.LocalAddr()
41
+}
42
+
43
+func (s *StreamCipher) RemoteAddr() *net.TCPAddr {
44
+	return s.conn.RemoteAddr()
45
+}
46
+
47
+func (s *StreamCipher) Close() error {
48
+	return s.conn.Close()
49
+}
50
+
51
+func NewStreamCipher(conn StreamReadWriteCloser, encryptor, decryptor cipher.Stream) StreamReadWriteCloser {
52
+	return &StreamCipher{
53
+		conn:      conn,
54
+		logger:    conn.Logger().Named("stream-cipher"),
55
+		encryptor: encryptor,
56
+		decryptor: decryptor,
57
+	}
58
+}

+ 0
- 62
wrappers/streamcipherrwc.go Просмотреть файл

@@ -1,62 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"crypto/cipher"
5
-	"net"
6
-)
7
-
8
-// StreamCipherReadWriteCloser is a ReadWriteCloser which ciphers
9
-// incoming and outgoing data with givem cipher.Stream instances.
10
-type StreamCipherReadWriteCloserWithAddr struct {
11
-	encryptor cipher.Stream
12
-	decryptor cipher.Stream
13
-	conn      ReadWriteCloserWithAddr
14
-}
15
-
16
-// Read reads from connection
17
-func (c *StreamCipherReadWriteCloserWithAddr) Read(p []byte) (n int, err error) {
18
-	n, err = c.conn.Read(p)
19
-	c.decryptor.XORKeyStream(p, p[:n])
20
-	return
21
-}
22
-
23
-// Write writes into connection.
24
-func (c *StreamCipherReadWriteCloserWithAddr) Write(p []byte) (int, error) {
25
-	// This is to decrease an amount of allocations. Unfortunately, escape
26
-	// analysis in (at least Golang 1.10) is absolutely not perfect. For
27
-	// example, it understands that we want to have a slice locally, right?
28
-	// But since slice is effectively 2 ints + uintptr to [number]byte, the
29
-	// most heavyweight part is placed in heap.
30
-	buf := getBuffer()
31
-	defer putBuffer(buf)
32
-	buf.Grow(len(p))
33
-	buf.Write(p)
34
-
35
-	encrypted := buf.Bytes()
36
-	c.encryptor.XORKeyStream(encrypted, p)
37
-
38
-	return c.conn.Write(encrypted)
39
-}
40
-
41
-// Close closes underlying connection.
42
-func (c *StreamCipherReadWriteCloserWithAddr) Close() error {
43
-	return c.conn.Close()
44
-}
45
-
46
-func (c *StreamCipherReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
47
-	return c.conn.LocalAddr()
48
-}
49
-
50
-func (c *StreamCipherReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
51
-	return c.conn.RemoteAddr()
52
-}
53
-
54
-// NewStreamCipherRWC returns wrapper which transparently
55
-// encrypts/decrypts traffic with obfuscated2 protocol.
56
-func NewStreamCipherRWC(conn ReadWriteCloserWithAddr, encryptor, decryptor cipher.Stream) ReadWriteCloserWithAddr {
57
-	return &StreamCipherReadWriteCloserWithAddr{
58
-		conn:      conn,
59
-		encryptor: encryptor,
60
-		decryptor: decryptor,
61
-	}
62
-}

+ 0
- 55
wrappers/timeoutrwc.go Просмотреть файл

@@ -1,55 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"net"
5
-	"time"
6
-
7
-	"github.com/9seconds/mtg/config"
8
-)
9
-
10
-type TimeoutReadWriteCloserWithAddr struct {
11
-	conn       net.Conn
12
-	publicIPv4 net.IP
13
-	publicIPv6 net.IP
14
-}
15
-
16
-func (t *TimeoutReadWriteCloserWithAddr) Read(p []byte) (int, error) {
17
-	t.conn.SetReadDeadline(time.Now().Add(config.TimeoutRead))
18
-	return t.conn.Read(p)
19
-}
20
-
21
-func (t *TimeoutReadWriteCloserWithAddr) Write(p []byte) (int, error) {
22
-	t.conn.SetWriteDeadline(time.Now().Add(config.TimeoutWrite))
23
-	return t.conn.Write(p)
24
-}
25
-
26
-func (t *TimeoutReadWriteCloserWithAddr) Close() error {
27
-	return t.conn.Close()
28
-}
29
-
30
-func (t *TimeoutReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
31
-	return t.conn.RemoteAddr().(*net.TCPAddr)
32
-}
33
-
34
-func (t *TimeoutReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
35
-	addr := t.conn.LocalAddr().(*net.TCPAddr)
36
-	newAddr := *addr
37
-
38
-	if t.RemoteAddr().IP.To4() != nil {
39
-		if t.publicIPv4 != nil {
40
-			newAddr.IP = t.publicIPv4
41
-		}
42
-	} else if t.publicIPv6 != nil {
43
-		newAddr.IP = t.publicIPv6
44
-	}
45
-
46
-	return &newAddr
47
-}
48
-
49
-func NewTimeoutRWC(conn net.Conn, ipv4, ipv6 net.IP) ReadWriteCloserWithAddr {
50
-	return &TimeoutReadWriteCloserWithAddr{
51
-		conn:       conn,
52
-		publicIPv4: ipv4,
53
-		publicIPv6: ipv6,
54
-	}
55
-}

+ 0
- 47
wrappers/trafficrwc.go Просмотреть файл

@@ -1,47 +0,0 @@
1
-package wrappers
2
-
3
-import "net"
4
-
5
-// TrafficReadWriteCloser counts an amount of ingress/egress traffic by
6
-// calling given callbacks.
7
-type TrafficReadWriteCloserWithAddr struct {
8
-	conn          ReadWriteCloserWithAddr
9
-	readCallback  func(int)
10
-	writeCallback func(int)
11
-}
12
-
13
-// Read reads from connection
14
-func (t *TrafficReadWriteCloserWithAddr) Read(p []byte) (n int, err error) {
15
-	n, err = t.conn.Read(p)
16
-	t.readCallback(n)
17
-	return
18
-}
19
-
20
-// Write writes into connection.
21
-func (t *TrafficReadWriteCloserWithAddr) Write(p []byte) (n int, err error) {
22
-	n, err = t.conn.Write(p)
23
-	t.writeCallback(n)
24
-	return
25
-}
26
-
27
-// Close closes underlying connection.
28
-func (t *TrafficReadWriteCloserWithAddr) Close() error {
29
-	return t.conn.Close()
30
-}
31
-
32
-func (t *TrafficReadWriteCloserWithAddr) LocalAddr() *net.TCPAddr {
33
-	return t.conn.LocalAddr()
34
-}
35
-
36
-func (t *TrafficReadWriteCloserWithAddr) RemoteAddr() *net.TCPAddr {
37
-	return t.conn.RemoteAddr()
38
-}
39
-
40
-// NewTrafficRWC wraps ReadWriteCloser to have read/write callbacks.
41
-func NewTrafficRWC(conn ReadWriteCloserWithAddr, readCallback, writeCallback func(int)) ReadWriteCloserWithAddr {
42
-	return &TrafficReadWriteCloserWithAddr{
43
-		conn:          conn,
44
-		readCallback:  readCallback,
45
-		writeCallback: writeCallback,
46
-	}
47
-}

+ 85
- 0
wrappers/wrap.go Просмотреть файл

@@ -0,0 +1,85 @@
1
+package wrappers
2
+
3
+import (
4
+	"io"
5
+	"net"
6
+
7
+	"go.uber.org/zap"
8
+)
9
+
10
+type Wrap interface {
11
+	Logger() *zap.SugaredLogger
12
+	LocalAddr() *net.TCPAddr
13
+	RemoteAddr() *net.TCPAddr
14
+}
15
+
16
+type Writer interface {
17
+	io.Writer
18
+	Wrap
19
+}
20
+
21
+type Closer interface {
22
+	io.Closer
23
+	Wrap
24
+}
25
+
26
+type WriteCloser interface {
27
+	io.Closer
28
+	Writer
29
+}
30
+
31
+type StreamReader interface {
32
+	io.Reader
33
+	Wrap
34
+}
35
+
36
+type StreamReadCloser interface {
37
+	io.Closer
38
+	StreamReader
39
+}
40
+
41
+type StreamReadWriter interface {
42
+	io.Writer
43
+	StreamReader
44
+}
45
+
46
+type StreamWriteCloser interface {
47
+	io.WriteCloser
48
+	Wrap
49
+}
50
+
51
+type StreamReadWriteCloser interface {
52
+	io.Closer
53
+	StreamReadWriter
54
+}
55
+
56
+type PacketReader interface {
57
+	Read() ([]byte, error)
58
+	Wrap
59
+}
60
+
61
+type PacketWriter interface {
62
+	io.Writer
63
+	Wrap
64
+}
65
+
66
+type PacketReadWriter interface {
67
+	io.Writer
68
+	PacketReader
69
+}
70
+
71
+type PacketReadCloser interface {
72
+	io.Closer
73
+	PacketReader
74
+}
75
+
76
+type PacketWriteCloser interface {
77
+	io.Writer
78
+	io.Closer
79
+	Wrap
80
+}
81
+
82
+type PacketReadWriteCloser interface {
83
+	io.Closer
84
+	PacketReadWriter
85
+}

Загрузка…
Отмена
Сохранить