Przeglądaj źródła

Merge pull request #10 from 9seconds/channels

Promoted channels
tags/0.9
Sergey Arkhipov 7 lat temu
rodzic
commit
e5612bfa91
No account linked to committer's email address
51 zmienionych plików z 2516 dodań i 627 usunięć
  1. 7
    1
      Gopkg.lock
  2. 4
    0
      Gopkg.toml
  3. 12
    0
      README.md
  4. 3
    3
      client/client.go
  5. 24
    11
      client/direct.go
  6. 31
    0
      client/middle.go
  7. 31
    49
      config/config.go
  8. 17
    8
      main.go
  9. 27
    2
      mtproto/connection_options.go
  10. 26
    0
      mtproto/rpc/handshake_request.go
  11. 55
    0
      mtproto/rpc/handshake_response.go
  12. 52
    0
      mtproto/rpc/nonce_request.go
  13. 60
    0
      mtproto/rpc/nonce_response.go
  14. 55
    0
      mtproto/rpc/proxy_flags.go
  15. 100
    0
      mtproto/rpc/proxy_request.go
  16. 33
    0
      mtproto/rpc/rpc.go
  17. 15
    17
      obfuscated2/frame.go
  18. 0
    24
      obfuscated2/frame_pool.go
  19. 4
    4
      obfuscated2/frame_test.go
  20. 8
    10
      obfuscated2/obfuscated2.go
  21. 5
    5
      obfuscated2/obfuscated2_test.go
  22. 0
    18
      proxy/copy_pool.go
  23. 162
    0
      proxy/proxy.go
  24. 0
    149
      proxy/server.go
  25. 0
    74
      proxy/stats.go
  26. 0
    1
      run-mtg.sh
  27. 151
    0
      stats/channels.go
  28. 57
    0
      stats/server.go
  29. 100
    0
      stats/stats.go
  30. 18
    5
      telegram/dialer.go
  31. 9
    8
      telegram/direct.go
  32. 136
    0
      telegram/middle.go
  33. 152
    0
      telegram/middle_caller.go
  34. 15
    12
      telegram/telegram.go
  35. 21
    0
      utils/read_current_data.go
  36. 15
    0
      utils/reverse_bytes.go
  37. 15
    0
      utils/uint24.go
  38. 99
    0
      wrappers/blockcipher.go
  39. 0
    27
      wrappers/buffer_pool.go
  40. 121
    0
      wrappers/conn.go
  41. 0
    59
      wrappers/ctxrwc.go
  42. 0
    47
      wrappers/logrwc.go
  43. 159
    0
      wrappers/mtproto_abridged.go
  44. 96
    0
      wrappers/mtproto_cipher.go
  45. 160
    0
      wrappers/mtproto_frame.go
  46. 121
    0
      wrappers/mtproto_intermediate.go
  47. 164
    0
      wrappers/mtproto_proxy.go
  48. 65
    0
      wrappers/streamcipher.go
  49. 0
    54
      wrappers/streamcipherrwc.go
  50. 0
    39
      wrappers/trafficrwc.go
  51. 111
    0
      wrappers/wrap.go

+ 7
- 1
Gopkg.lock Wyświetl plik

@@ -22,6 +22,12 @@
22 22
   revision = "346938d642f2ec3594ed81d874461961cd0faa76"
23 23
   version = "v1.1.0"
24 24
 
25
+[[projects]]
26
+  branch = "master"
27
+  name = "github.com/dustin/go-humanize"
28
+  packages = ["."]
29
+  revision = "02af3965c54e8cacf948b97fef38925c4120652c"
30
+
25 31
 [[projects]]
26 32
   branch = "master"
27 33
   name = "github.com/juju/errors"
@@ -80,6 +86,6 @@
80 86
 [solve-meta]
81 87
   analyzer-name = "dep"
82 88
   analyzer-version = 1
83
-  inputs-digest = "24afdd6b64331aeba47fed75918d04032e13e404612cac107bad1d68a5038b72"
89
+  inputs-digest = "c4fdd3664f683342ad0c2509f4a8bcfe5b267a6e8cdaf36f70d39536bbf89834"
84 90
   solver-name = "gps-cdcl"
85 91
   solver-version = 1

+ 4
- 0
Gopkg.toml Wyświetl plik

@@ -44,3 +44,7 @@
44 44
 [[constraint]]
45 45
   name = "github.com/satori/go.uuid"
46 46
   version = "1.2.0"
47
+
48
+[[constraint]]
49
+  branch = "master"
50
+  name = "github.com/dustin/go-humanize"

+ 12
- 0
README.md Wyświetl plik

@@ -38,6 +38,18 @@ mtg is an implementation in golang which is intended to be:
38 38
 * **No management WebUI**
39 39
   This is an implementation of simple lightweight proxy. I won't do that.
40 40
 
41
+This proxy supports 2 modes of work: direct connection to Telegram and
42
+promoted channel mode. If you do not need promoted channels, I would
43
+recommend you to go with direct mode: this is way more robust.
44
+
45
+To run proxy in direct mode, all you need to do is just provide a
46
+secret. If you do not provide ADTag as a second parameter, promoted
47
+channels mode won't be activated.
48
+
49
+To get promoted channel, please contact
50
+[@MTProxybot|https://t.me/MTProxybot] and provide generated adtag as a
51
+second parameter.
52
+
41 53
 
42 54
 # How to build
43 55
 

+ 3
- 3
client/client.go Wyświetl plik

@@ -1,12 +1,12 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"io"
5 4
 	"net"
6 5
 
7 6
 	"github.com/9seconds/mtg/config"
8 7
 	"github.com/9seconds/mtg/mtproto"
8
+	"github.com/9seconds/mtg/wrappers"
9 9
 )
10 10
 
11
-// Init has to initialize client connection based on given config.
12
-type Init func(net.Conn, *config.Config) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error)
11
+// Init defines common method for initializing client connections.
12
+type Init func(net.Conn, string, *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error)

+ 24
- 11
client/direct.go Wyświetl plik

@@ -1,7 +1,6 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"io"
5 4
 	"net"
6 5
 	"time"
7 6
 
@@ -15,28 +14,42 @@ import (
15 14
 
16 15
 const (
17 16
 	handshakeTimeout = 10 * time.Second
17
+	readBufferSize   = 64 * 1024
18
+	writeBufferSize  = 64 * 1024
18 19
 )
19 20
 
20
-// DirectInit initializes client to access Telegram bypassing middleproxies.
21
-func DirectInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
22
-	if err := config.SetSocketOptions(conn); err != nil {
23
-		return nil, nil, errors.Annotate(err, "Cannot set socket options")
21
+// DirectInit initializes client connection for proxy which connects to
22
+// Telegram directly.
23
+func DirectInit(socket net.Conn, connID string, conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
24
+	tcpSocket := socket.(*net.TCPConn)
25
+	if err := tcpSocket.SetNoDelay(false); err != nil {
26
+		return nil, nil, errors.Annotate(err, "Cannot disable NO_DELAY to client socket")
27
+	}
28
+	if err := tcpSocket.SetReadBuffer(readBufferSize); err != nil {
29
+		return nil, nil, errors.Annotate(err, "Cannot set read buffer size of client socket")
30
+	}
31
+	if err := tcpSocket.SetWriteBuffer(writeBufferSize); err != nil {
32
+		return nil, nil, errors.Annotate(err, "Cannot set write buffer size of client socket")
24 33
 	}
25 34
 
26
-	conn.SetReadDeadline(time.Now().Add(handshakeTimeout)) // nolint: errcheck
27
-	frame, err := obfuscated2.ExtractFrame(conn)
28
-	conn.SetReadDeadline(time.Time{}) // nolint: errcheck
35
+	socket.SetReadDeadline(time.Now().Add(handshakeTimeout)) // nolint: errcheck
36
+	frame, err := obfuscated2.ExtractFrame(socket)
29 37
 	if err != nil {
30 38
 		return nil, nil, errors.Annotate(err, "Cannot extract frame")
31 39
 	}
32
-	defer obfuscated2.ReturnFrame(frame)
40
+	socket.SetReadDeadline(time.Time{}) // nolint: errcheck
33 41
 
42
+	conn := wrappers.NewConn(socket, connID, wrappers.ConnPurposeClient, conf.PublicIPv4, conf.PublicIPv6)
34 43
 	obfs2, connOpts, err := obfuscated2.ParseObfuscated2ClientFrame(conf.Secret, frame)
35 44
 	if err != nil {
36 45
 		return nil, nil, errors.Annotate(err, "Cannot parse obfuscated frame")
37 46
 	}
47
+	connOpts.ConnectionProto = mtproto.ConnectionProtocolAny
48
+	connOpts.ClientAddr = conn.RemoteAddr()
49
+
50
+	conn = wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor)
38 51
 
39
-	socket := wrappers.NewStreamCipherRWC(conn, obfs2.Encryptor, obfs2.Decryptor)
52
+	conn.Logger().Infow("Client connection initialized")
40 53
 
41
-	return connOpts, socket, nil
54
+	return conn, connOpts, nil
42 55
 }

+ 31
- 0
client/middle.go Wyświetl plik

@@ -0,0 +1,31 @@
1
+package client
2
+
3
+import (
4
+	"net"
5
+
6
+	"github.com/9seconds/mtg/config"
7
+	"github.com/9seconds/mtg/mtproto"
8
+	"github.com/9seconds/mtg/wrappers"
9
+)
10
+
11
+// MiddleInit initializes client connection for proxy which has to
12
+// support promoted channels, connect to Telegram middle proxies etc.
13
+func MiddleInit(socket net.Conn, connID string, conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
14
+	conn, opts, err := DirectInit(socket, connID, conf)
15
+	if err != nil {
16
+		return nil, nil, err
17
+	}
18
+	connStream := conn.(wrappers.StreamReadWriteCloser)
19
+
20
+	newConn := wrappers.NewMTProtoAbridged(connStream, opts)
21
+	if opts.ConnectionType != mtproto.ConnectionTypeAbridged {
22
+		newConn = wrappers.NewMTProtoIntermediate(connStream, opts)
23
+	}
24
+
25
+	opts.ConnectionProto = mtproto.ConnectionProtocolIPv4
26
+	if socket.LocalAddr().(*net.TCPAddr).IP.To4() == nil {
27
+		opts.ConnectionProto = mtproto.ConnectionProtocolIPv6
28
+	}
29
+
30
+	return newConn, opts, err
31
+}

+ 31
- 49
config/config.go Wyświetl plik

@@ -5,7 +5,6 @@ import (
5 5
 	"fmt"
6 6
 	"net"
7 7
 	"strconv"
8
-	"time"
9 8
 
10 9
 	"github.com/juju/errors"
11 10
 )
@@ -14,9 +13,6 @@ import (
14 13
 const (
15 14
 	BufferWriteSize = 32 * 1024
16 15
 	BufferReadSize  = 32 * 1024
17
-	BufferSizeCopy  = 32 * 1024
18
-
19
-	keepAlivePeriod = 20 * time.Second
20 16
 )
21 17
 
22 18
 // Config represents common configuration of mtg.
@@ -35,6 +31,7 @@ type Config struct {
35 31
 	StatsIP    net.IP
36 32
 
37 33
 	Secret []byte
34
+	AdTag  []byte
38 35
 }
39 36
 
40 37
 // URLs contains links to the proxy (tg://, t.me) and their QR codes.
@@ -56,27 +53,28 @@ func (c *Config) BindAddr() string {
56 53
 	return getAddr(c.BindIP, c.BindPort)
57 54
 }
58 55
 
59
-// IPv4Addr returns connection string to ipv6 for mtproto proxy.
60
-func (c *Config) IPv4Addr() string {
61
-	return getAddr(c.PublicIPv4, c.PublicIPv4Port)
62
-}
63
-
64
-// IPv6Addr returns connection string to ipv6 for mtproto proxy.
65
-func (c *Config) IPv6Addr() string {
66
-	return getAddr(c.PublicIPv6, c.PublicIPv6Port)
67
-}
68
-
69 56
 // StatAddr returns connection string to the stats API.
70 57
 func (c *Config) StatAddr() string {
71 58
 	return getAddr(c.StatsIP, c.StatsPort)
72 59
 }
73 60
 
61
+// UseMiddleProxy defines if this proxy has to connect middle proxies
62
+// which supports promoted channels or directly access Telegram.
63
+func (c *Config) UseMiddleProxy() bool {
64
+	return len(c.AdTag) > 0
65
+}
66
+
74 67
 // GetURLs returns configured IPURLs instance with links to this server.
75 68
 func (c *Config) GetURLs() IPURLs {
76
-	return IPURLs{
77
-		IPv4: getURLs(c.PublicIPv4, c.PublicIPv4Port, c.Secret),
78
-		IPv6: getURLs(c.PublicIPv6, c.PublicIPv6Port, c.Secret),
69
+	urls := IPURLs{}
70
+	if c.PublicIPv4 != nil {
71
+		urls.IPv4 = getURLs(c.PublicIPv4, c.PublicIPv4Port, c.Secret)
72
+	}
73
+	if c.PublicIPv6 != nil {
74
+		urls.IPv6 = getURLs(c.PublicIPv6, c.PublicIPv6Port, c.Secret)
79 75
 	}
76
+
77
+	return urls
80 78
 }
81 79
 
82 80
 func getAddr(host fmt.Stringer, port uint16) string {
@@ -91,7 +89,7 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
91 89
 	publicIPv4 net.IP, PublicIPv4Port uint16,
92 90
 	publicIPv6 net.IP, publicIPv6Port uint16,
93 91
 	statsIP net.IP, statsPort uint16,
94
-	secret string) (*Config, error) {
92
+	secret, adtag string) (*Config, error) {
95 93
 	if len(secret) != 32 {
96 94
 		return nil, errors.New("Telegram demands secret of length 32")
97 95
 	}
@@ -100,15 +98,22 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
100 98
 		return nil, errors.Annotate(err, "Cannot create config")
101 99
 	}
102 100
 
101
+	var adTagBytes []byte
102
+	if len(adtag) != 0 {
103
+		adTagBytes, err = hex.DecodeString(adtag)
104
+		if err != nil {
105
+			return nil, errors.Annotate(err, "Cannot create config")
106
+		}
107
+	}
108
+
103 109
 	if publicIPv4 == nil {
104 110
 		publicIPv4, err = getGlobalIPv4()
105 111
 		if err != nil {
106
-			return nil, errors.Errorf("Cannot get public IP")
112
+			publicIPv4 = nil
113
+		} else if publicIPv4.To4() == nil {
114
+			return nil, errors.Errorf("IP %s is not IPv4", publicIPv4.String())
107 115
 		}
108 116
 	}
109
-	if publicIPv4.To4() == nil {
110
-		return nil, errors.Errorf("IP %s is not IPv4", publicIPv4.String())
111
-	}
112 117
 	if PublicIPv4Port == 0 {
113 118
 		PublicIPv4Port = bindPort
114 119
 	}
@@ -116,12 +121,11 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
116 121
 	if publicIPv6 == nil {
117 122
 		publicIPv6, err = getGlobalIPv6()
118 123
 		if err != nil {
119
-			publicIPv6 = publicIPv4
124
+			publicIPv6 = nil
125
+		} else if publicIPv6.To4() != nil {
126
+			return nil, errors.Errorf("IP %s is not IPv6", publicIPv6.String())
120 127
 		}
121 128
 	}
122
-	if publicIPv6.To16() == nil {
123
-		return nil, errors.Errorf("IP %s is not IPv6", publicIPv6.String())
124
-	}
125 129
 	if publicIPv6Port == 0 {
126 130
 		publicIPv6Port = bindPort
127 131
 	}
@@ -142,30 +146,8 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
142 146
 		StatsIP:        statsIP,
143 147
 		StatsPort:      statsPort,
144 148
 		Secret:         secretBytes,
149
+		AdTag:          adTagBytes,
145 150
 	}
146 151
 
147 152
 	return conf, nil
148 153
 }
149
-
150
-// SetSocketOptions makes socket keepalive, sets buffer sizes
151
-func SetSocketOptions(conn net.Conn) error {
152
-	socket := conn.(*net.TCPConn)
153
-
154
-	if err := socket.SetReadBuffer(BufferReadSize); err != nil {
155
-		return errors.Annotate(err, "Cannot set read buffer size")
156
-	}
157
-	if err := socket.SetWriteBuffer(BufferWriteSize); err != nil {
158
-		return errors.Annotate(err, "Cannot set write buffer size")
159
-	}
160
-	if err := socket.SetKeepAlive(true); err != nil {
161
-		return errors.Annotate(err, "Cannot make socket keepalive")
162
-	}
163
-	if err := socket.SetKeepAlivePeriod(keepAlivePeriod); err != nil {
164
-		return errors.Annotate(err, "Cannot set keepalive period")
165
-	}
166
-	if err := socket.SetNoDelay(true); err != nil {
167
-		return errors.Annotate(err, "Cannot activate nodelay for the socket")
168
-	}
169
-
170
-	return nil
171
-}

+ 17
- 8
main.go Wyświetl plik

@@ -16,6 +16,7 @@ import (
16 16
 
17 17
 	"github.com/9seconds/mtg/config"
18 18
 	"github.com/9seconds/mtg/proxy"
19
+	"github.com/9seconds/mtg/stats"
19 20
 	"github.com/juju/errors"
20 21
 )
21 22
 
@@ -70,6 +71,7 @@ var (
70 71
 			Uint16()
71 72
 
72 73
 	secret = app.Arg("secret", "Secret of this proxy.").Required().String()
74
+	adtag  = app.Arg("adtag", "ADTag of the proxy.").String()
73 75
 )
74 76
 
75 77
 func init() {
@@ -91,7 +93,7 @@ func main() {
91 93
 		*publicIPv4, *publicIPv4Port,
92 94
 		*publicIPv6, *publicIPv6Port,
93 95
 		*statsIP, *statsPort,
94
-		*secret,
96
+		*secret, *adtag,
95 97
 	)
96 98
 	if err != nil {
97 99
 		usage(err.Error())
@@ -110,16 +112,23 @@ func main() {
110 112
 		zapcore.NewJSONEncoder(encoderCfg),
111 113
 		zapcore.Lock(os.Stderr),
112 114
 		atom,
113
-	)).Sugar()
115
+	))
116
+	zap.ReplaceGlobals(logger)
117
+	defer logger.Sync() // nolint: errcheck
114 118
 
115
-	stat := proxy.NewStats(conf)
116
-	go stat.Serve()
117
-
118
-	srv := proxy.NewServer(conf, logger, stat)
119 119
 	printURLs(conf.GetURLs())
120 120
 
121
-	if err := srv.Serve(); err != nil {
122
-		logger.Fatal(err.Error())
121
+	if conf.UseMiddleProxy() {
122
+		zap.S().Infow("Use middle proxy connection to Telegram")
123
+	} else {
124
+		zap.S().Infow("Use direct connection to Telegram")
125
+	}
126
+
127
+	go stats.Start(conf)
128
+
129
+	server := proxy.NewProxy(conf)
130
+	if err := server.Serve(); err != nil {
131
+		zap.S().Fatalw("Server stopped", "error", err)
123 132
 	}
124 133
 }
125 134
 

+ 27
- 2
mtproto/connection_options.go Wyświetl plik

@@ -2,6 +2,7 @@ package mtproto
2 2
 
3 3
 import (
4 4
 	"bytes"
5
+	"net"
5 6
 
6 7
 	"github.com/juju/errors"
7 8
 )
@@ -10,11 +11,27 @@ import (
10 11
 // by the user.
11 12
 type ConnectionType uint8
12 13
 
14
+// ConnectionProtocol is a type of IP protocol to use.
15
+type ConnectionProtocol uint8
16
+
17
+// Hacks is a simple structure to store flags for packet transmission.
18
+type Hacks struct {
19
+	SimpleAck bool
20
+	QuickAck  bool
21
+}
22
+
13 23
 // ConnectionOpts presents an options, metadata on connection requested
14 24
 // by the user on handshake.
15 25
 type ConnectionOpts struct {
16
-	DC             int16
17
-	ConnectionType ConnectionType
26
+	DC              int16
27
+	ConnectionType  ConnectionType
28
+	ConnectionProto ConnectionProtocol
29
+	// Read and Write means direction related to the client.
30
+	// ReadHacks are meant to be flushed on client read
31
+	// WriteHacks are meant to be flushed on client write.
32
+	ReadHacks  Hacks
33
+	WriteHacks Hacks
34
+	ClientAddr *net.TCPAddr
18 35
 }
19 36
 
20 37
 // Different connection types which user requests from Telegram.
@@ -24,6 +41,14 @@ const (
24 41
 	ConnectionTypeIntermediate
25 42
 )
26 43
 
44
+// ConnectionProtocol* define which connection protocols to use.
45
+// ConnectionProtocolAny means that any is suitable.
46
+const (
47
+	ConnectionProtocolIPv4 ConnectionProtocol = 1
48
+	ConnectionProtocolIPv6                    = ConnectionProtocolIPv4 << 1
49
+	ConnectionProtocolAny                     = ConnectionProtocolIPv4 | ConnectionProtocolIPv6
50
+)
51
+
27 52
 // Connection tags for mtproto handshakes.
28 53
 var (
29 54
 	ConnectionTagAbridged     = []byte{0xef, 0xef, 0xef, 0xef}

+ 26
- 0
mtproto/rpc/handshake_request.go Wyświetl plik

@@ -0,0 +1,26 @@
1
+package rpc
2
+
3
+import "bytes"
4
+
5
+// HandshakeRequest is the data type which is responsible for
6
+// constructing of correct handshake request.
7
+type HandshakeRequest struct {
8
+}
9
+
10
+// Bytes returns serialized handshake request.
11
+func (r *HandshakeRequest) Bytes() []byte {
12
+	buf := &bytes.Buffer{}
13
+	buf.Grow(len(TagHandshake) + len(HandshakeFlags) + len(HandshakeSenderPID) + len(HandshakePeerPID))
14
+
15
+	buf.Write(TagHandshake)
16
+	buf.Write(HandshakeFlags)
17
+	buf.Write(HandshakeSenderPID)
18
+	buf.Write(HandshakePeerPID)
19
+
20
+	return buf.Bytes()
21
+}
22
+
23
+// NewHandshakeRequest creates new HandshakeRequest instance.
24
+func NewHandshakeRequest() *HandshakeRequest {
25
+	return &HandshakeRequest{}
26
+}

+ 55
- 0
mtproto/rpc/handshake_response.go Wyświetl plik

@@ -0,0 +1,55 @@
1
+package rpc
2
+
3
+import (
4
+	"bytes"
5
+
6
+	"github.com/juju/errors"
7
+)
8
+
9
+// HandshakeResponse defines data structure which is used for storage of
10
+// handshake response.
11
+type HandshakeResponse struct {
12
+	Type      []byte
13
+	Flags     []byte
14
+	SenderPID []byte
15
+	PeerPID   []byte
16
+}
17
+
18
+// Bytes returns a serialized handshake response.
19
+func (r *HandshakeResponse) Bytes() []byte {
20
+	buf := &bytes.Buffer{}
21
+
22
+	buf.Write(r.Type[:])
23
+	buf.Write(r.Flags[:])
24
+	buf.Write(r.SenderPID[:])
25
+	buf.Write(r.PeerPID[:])
26
+
27
+	return buf.Bytes()
28
+}
29
+
30
+// Valid checks that handshake response compliments request.
31
+func (r *HandshakeResponse) Valid(req *HandshakeRequest) error {
32
+	if !bytes.Equal(r.Type, TagHandshake) {
33
+		return errors.New("Unexpected handshake tag")
34
+	}
35
+	if !bytes.Equal(r.PeerPID, HandshakeSenderPID) {
36
+		return errors.New("Incorrect sender PID")
37
+	}
38
+
39
+	return nil
40
+}
41
+
42
+// NewHandshakeResponse constructs new handshake response from the given
43
+// data.
44
+func NewHandshakeResponse(data []byte) (*HandshakeResponse, error) {
45
+	if len(data) != 32 {
46
+		return nil, errors.New("Incorrect handshake response length")
47
+	}
48
+
49
+	return &HandshakeResponse{
50
+		Type:      data[:4],
51
+		Flags:     data[4:8],
52
+		SenderPID: data[8:20],
53
+		PeerPID:   data[20:],
54
+	}, nil
55
+}

+ 52
- 0
mtproto/rpc/nonce_request.go Wyświetl plik

@@ -0,0 +1,52 @@
1
+package rpc
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/rand"
6
+	"encoding/binary"
7
+	"time"
8
+
9
+	"github.com/juju/errors"
10
+)
11
+
12
+// NonceRequest is the data type which contains all the data for correct
13
+// nonce request.
14
+type NonceRequest struct {
15
+	KeySelector []byte
16
+	CryptoTS    []byte
17
+	Nonce       []byte
18
+}
19
+
20
+// Bytes returns serialized nonce request.
21
+func (r *NonceRequest) Bytes() []byte {
22
+	buf := &bytes.Buffer{}
23
+
24
+	buf.Write(TagNonce)
25
+	buf.Write(r.KeySelector)
26
+	buf.Write(NonceCryptoAES)
27
+	buf.Write(r.CryptoTS)
28
+	buf.Write(r.Nonce)
29
+
30
+	return buf.Bytes()
31
+}
32
+
33
+// NewNonceRequest builds new none request based on proxy secret.
34
+func NewNonceRequest(proxySecret []byte) (*NonceRequest, error) {
35
+	nonce := make([]byte, 16)
36
+	keySelector := make([]byte, 4)
37
+	cryptoTS := make([]byte, 4)
38
+
39
+	if _, err := rand.Read(nonce); err != nil {
40
+		return nil, errors.Annotate(err, "Cannot generate nonce")
41
+	}
42
+	copy(keySelector, proxySecret)
43
+
44
+	timestamp := time.Now().Truncate(time.Second).Unix() % 4294967296 // 256 ^ 4 - do not know how to name
45
+	binary.LittleEndian.PutUint32(cryptoTS, uint32(timestamp))
46
+
47
+	return &NonceRequest{
48
+		KeySelector: keySelector,
49
+		CryptoTS:    cryptoTS,
50
+		Nonce:       nonce,
51
+	}, nil
52
+}

+ 60
- 0
mtproto/rpc/nonce_response.go Wyświetl plik

@@ -0,0 +1,60 @@
1
+package rpc
2
+
3
+import (
4
+	"bytes"
5
+
6
+	"github.com/juju/errors"
7
+)
8
+
9
+// NonceResponse is the data type which contains data of nonce response.
10
+type NonceResponse struct {
11
+	NonceRequest
12
+
13
+	Type   []byte
14
+	Crypto []byte
15
+}
16
+
17
+// Bytes returns serialized form of the nonce response.
18
+func (r *NonceResponse) Bytes() []byte {
19
+	buf := &bytes.Buffer{}
20
+
21
+	buf.Write(r.Type)
22
+	buf.Write(r.KeySelector)
23
+	buf.Write(r.Crypto)
24
+	buf.Write(r.CryptoTS)
25
+	buf.Write(r.Nonce)
26
+
27
+	return buf.Bytes()
28
+}
29
+
30
+// Valid checks that nonce response compliments nonce request.
31
+func (r *NonceResponse) Valid(req *NonceRequest) error {
32
+	if !bytes.Equal(r.Type, TagNonce) {
33
+		return errors.New("Unexpected RPC type")
34
+	}
35
+	if !bytes.Equal(r.Crypto, NonceCryptoAES) {
36
+		return errors.New("Unexpected crypto type")
37
+	}
38
+	if !bytes.Equal(r.KeySelector, req.KeySelector) {
39
+		return errors.New("Unexpected key selector")
40
+	}
41
+
42
+	return nil
43
+}
44
+
45
+// NewNonceResponse build new nonce response based on the given data.
46
+func NewNonceResponse(data []byte) (*NonceResponse, error) {
47
+	if len(data) != 32 {
48
+		return nil, errors.New("Unexpected message length")
49
+	}
50
+
51
+	return &NonceResponse{
52
+		NonceRequest: NonceRequest{
53
+			KeySelector: data[4:8],
54
+			CryptoTS:    data[12:16],
55
+			Nonce:       data[16:],
56
+		},
57
+		Type:   data[:4],
58
+		Crypto: data[8:12],
59
+	}, nil
60
+}

+ 55
- 0
mtproto/rpc/proxy_flags.go Wyświetl plik

@@ -0,0 +1,55 @@
1
+package rpc
2
+
3
+import (
4
+	"encoding/binary"
5
+	"strings"
6
+)
7
+
8
+type proxyRequestFlags uint32
9
+
10
+const (
11
+	proxyRequestFlagsHasAdTag     proxyRequestFlags = 0x8
12
+	proxyRequestFlagsEncrypted                      = 0x2
13
+	proxyRequestFlagsMagic                          = 0x1000
14
+	proxyRequestFlagsExtMode2                       = 0x20000
15
+	proxyRequestFlagsIntermediate                   = 0x20000000
16
+	proxyRequestFlagsAbdridged                      = 0x40000000
17
+	proxyRequestFlagsQuickAck                       = 0x80000000
18
+)
19
+
20
+var proxyRequestFlagsEncryptedPrefix [8]byte
21
+
22
+func (r proxyRequestFlags) Bytes() []byte {
23
+	converted := make([]byte, 4)
24
+	binary.LittleEndian.PutUint32(converted, uint32(r))
25
+
26
+	return converted
27
+}
28
+
29
+func (r proxyRequestFlags) String() string {
30
+	flags := make([]string, 0, 7)
31
+
32
+	if r&proxyRequestFlagsHasAdTag != 0 {
33
+		flags = append(flags, "HAS_AD_TAG")
34
+	}
35
+	if r&proxyRequestFlagsEncrypted != 0 {
36
+		flags = append(flags, "ENCRYPTED")
37
+	}
38
+	if r&proxyRequestFlagsMagic != 0 {
39
+		flags = append(flags, "MAGIC")
40
+	}
41
+	if r&proxyRequestFlagsExtMode2 != 0 {
42
+		flags = append(flags, "EXT_MODE_2")
43
+	}
44
+	if r&proxyRequestFlagsIntermediate != 0 {
45
+		flags = append(flags, "INTERMEDIATE")
46
+	}
47
+	if r&proxyRequestFlagsAbdridged != 0 {
48
+		flags = append(flags, "ABRIDGED")
49
+	}
50
+	if r&proxyRequestFlagsQuickAck != 0 {
51
+		flags = append(flags, "QUICK_ACK")
52
+	}
53
+
54
+	return strings.Join(flags, " | ")
55
+}

+ 100
- 0
mtproto/rpc/proxy_request.go Wyświetl plik

@@ -0,0 +1,100 @@
1
+package rpc
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/rand"
6
+	"encoding/binary"
7
+	"fmt"
8
+	"net"
9
+
10
+	"github.com/juju/errors"
11
+
12
+	"github.com/9seconds/mtg/mtproto"
13
+)
14
+
15
+// ProxyRequest is the data type for storing data required to compose
16
+// RPC_PROXY_REQ request.
17
+type ProxyRequest struct {
18
+	Flags        proxyRequestFlags
19
+	ConnectionID []byte
20
+	OurIPPort    []byte
21
+	ClientIPPort []byte
22
+	ADTag        []byte
23
+	Options      *mtproto.ConnectionOpts
24
+}
25
+
26
+// MakeHeader makes RPC_PROXY_REQ header. We need only to append the
27
+// data for it.
28
+func (r *ProxyRequest) MakeHeader(message []byte) (*bytes.Buffer, fmt.Stringer) {
29
+	bufferLength := len(TagProxyRequest) +
30
+		4 + // len(flags)
31
+		len(r.ConnectionID) +
32
+		len(r.ClientIPPort) +
33
+		len(r.OurIPPort) +
34
+		len(ProxyRequestExtraSize) +
35
+		len(ProxyRequestProxyTag) +
36
+		1 + // len(AdTag)
37
+		len(r.ADTag)
38
+	bufferLength += bufferLength % 4
39
+
40
+	buf := &bytes.Buffer{}
41
+	buf.Grow(bufferLength + len(message))
42
+
43
+	flags := r.Flags
44
+	if r.Options.ReadHacks.QuickAck {
45
+		flags |= proxyRequestFlagsQuickAck
46
+	}
47
+
48
+	if bytes.HasPrefix(message, proxyRequestFlagsEncryptedPrefix[:]) {
49
+		flags |= proxyRequestFlagsEncrypted
50
+	}
51
+
52
+	buf.Write(TagProxyRequest)
53
+	buf.Write(flags.Bytes())
54
+	buf.Write(r.ConnectionID)
55
+	buf.Write(r.ClientIPPort)
56
+	buf.Write(r.OurIPPort)
57
+	buf.Write(ProxyRequestExtraSize)
58
+	buf.Write(ProxyRequestProxyTag)
59
+	buf.WriteByte(byte(len(r.ADTag)))
60
+	buf.Write(r.ADTag)
61
+	buf.Write(make([]byte, (4-buf.Len()%4)%4))
62
+
63
+	return buf, flags
64
+}
65
+
66
+// NewProxyRequest build new ProxyRequest data structure.
67
+func NewProxyRequest(clientAddr, ownAddr *net.TCPAddr, opts *mtproto.ConnectionOpts, adTag []byte) (*ProxyRequest, error) {
68
+	flags := proxyRequestFlagsHasAdTag | proxyRequestFlagsMagic | proxyRequestFlagsExtMode2
69
+
70
+	switch opts.ConnectionType {
71
+	case mtproto.ConnectionTypeAbridged:
72
+		flags |= proxyRequestFlagsAbdridged
73
+	case mtproto.ConnectionTypeIntermediate:
74
+		flags |= proxyRequestFlagsIntermediate
75
+	}
76
+
77
+	request := &ProxyRequest{
78
+		Flags:        flags,
79
+		ADTag:        adTag,
80
+		Options:      opts,
81
+		ConnectionID: make([]byte, 8),
82
+		ClientIPPort: make([]byte, 16+4),
83
+		OurIPPort:    make([]byte, 16+4),
84
+	}
85
+
86
+	if _, err := rand.Read(request.ConnectionID); err != nil {
87
+		return nil, errors.Annotate(err, "Cannot generate connection ID")
88
+	}
89
+
90
+	port := [4]byte{}
91
+	copy(request.ClientIPPort[:16], clientAddr.IP.To16())
92
+	binary.LittleEndian.PutUint32(port[:], uint32(clientAddr.Port))
93
+	copy(request.ClientIPPort[16:], port[:])
94
+
95
+	copy(request.OurIPPort[:16], ownAddr.IP.To16())
96
+	binary.LittleEndian.PutUint32(port[:], uint32(ownAddr.Port))
97
+	copy(request.OurIPPort[16:], port[:])
98
+
99
+	return request, nil
100
+}

+ 33
- 0
mtproto/rpc/rpc.go Wyświetl plik

@@ -0,0 +1,33 @@
1
+package rpc
2
+
3
+// SeqNo* is the number of the sequence which have special meaning for
4
+// the Telegram.
5
+const (
6
+	SeqNoNonce     = -2
7
+	SeqNoHandshake = -1
8
+)
9
+
10
+// Different constants for RPC protocol
11
+var (
12
+	TagCloseExt     = []byte{0xa2, 0x34, 0xb6, 0x5e}
13
+	TagProxyAns     = []byte{0x0d, 0xda, 0x03, 0x44}
14
+	TagSimpleAck    = []byte{0x9b, 0x40, 0xac, 0x3b}
15
+	TagHandshake    = []byte{0xf5, 0xee, 0x82, 0x76}
16
+	TagNonce        = []byte{0xaa, 0x87, 0xcb, 0x7a}
17
+	TagProxyRequest = []byte{0xee, 0xf1, 0xce, 0x36}
18
+
19
+	NonceCryptoAES = []byte{0x01, 0x00, 0x00, 0x00}
20
+
21
+	HandshakeFlags = []byte{0x00, 0x00, 0x00, 0x00}
22
+
23
+	ProxyRequestExtraSize = []byte{0x18, 0x00, 0x00, 0x00}
24
+	ProxyRequestProxyTag  = []byte{0xae, 0x26, 0x1e, 0xdb}
25
+
26
+	HandshakeSenderPID []byte
27
+	HandshakePeerPID   []byte
28
+)
29
+
30
+func init() {
31
+	HandshakeSenderPID = []byte("IPIPPRPDTIME")
32
+	HandshakePeerPID = []byte("IPIPPRPDTIME")
33
+}

+ 15
- 17
obfuscated2/frame.go Wyświetl plik

@@ -66,57 +66,55 @@ func (f Frame) ConnectionType() (mtproto.ConnectionType, error) {
66 66
 
67 67
 // Invert inverts frame for extracting encryption keys. Pkease check that link:
68 68
 // https://blog.susanka.eu/how-telegram-obfuscates-its-mtproto-traffic/
69
-func (f Frame) Invert() *Frame {
70
-	reversed := MakeFrame()
71
-	copy(*reversed, f)
69
+func (f Frame) Invert() Frame {
70
+	reversed := make(Frame, FrameLen)
71
+	copy(reversed, f)
72 72
 
73 73
 	for i := 0; i < frameLenKey+frameLenIV; i++ {
74
-		(*reversed)[frameOffsetFirst+i] = f[frameOffsetIV-1-i]
74
+		reversed[frameOffsetFirst+i] = f[frameOffsetIV-1-i]
75 75
 	}
76 76
 
77 77
 	return reversed
78 78
 }
79 79
 
80 80
 // ExtractFrame extracts exact obfuscated2 handshake frame from given reader.
81
-func ExtractFrame(conn io.Reader) (*Frame, error) {
82
-	frame := MakeFrame()
83
-	buf := bytes.NewBuffer(*frame)
81
+func ExtractFrame(conn io.Reader) (Frame, error) {
82
+	frame := make(Frame, FrameLen)
83
+	buf := bytes.NewBuffer(frame)
84 84
 	buf.Reset()
85 85
 
86 86
 	if _, err := io.CopyN(buf, conn, FrameLen); err != nil {
87
-		ReturnFrame(frame)
88 87
 		return nil, errors.Annotate(err, "Cannot extract obfuscated header")
89 88
 	}
90
-	copy(*frame, buf.Bytes())
89
+	copy(frame, buf.Bytes())
91 90
 
92 91
 	return frame, nil
93 92
 }
94 93
 
95
-func generateFrame(connectionType mtproto.ConnectionType) *Frame {
96
-	frame := MakeFrame()
97
-	data := *frame
94
+func generateFrame(connectionType mtproto.ConnectionType) Frame {
95
+	frame := make(Frame, FrameLen)
98 96
 
99 97
 	for {
100
-		if _, err := rand.Read(data); err != nil {
98
+		if _, err := rand.Read(frame); err != nil {
101 99
 			continue
102 100
 		}
103
-		if data[0] == 0xef {
101
+		if frame[0] == 0xef {
104 102
 			continue
105 103
 		}
106 104
 
107
-		val := (uint32(data[3]) << 24) | (uint32(data[2]) << 16) | (uint32(data[1]) << 8) | uint32(data[0])
105
+		val := (uint32(frame[3]) << 24) | (uint32(frame[2]) << 16) | (uint32(frame[1]) << 8) | uint32(frame[0])
108 106
 		if val == 0x44414548 || val == 0x54534f50 || val == 0x20544547 || val == 0x4954504f || val == 0xeeeeeeee {
109 107
 			continue
110 108
 		}
111 109
 
112
-		val = (uint32(data[7]) << 24) | (uint32(data[6]) << 16) | (uint32(data[5]) << 8) | uint32(data[4])
110
+		val = (uint32(frame[7]) << 24) | (uint32(frame[6]) << 16) | (uint32(frame[5]) << 8) | uint32(frame[4])
113 111
 		if val == 0x00000000 {
114 112
 			continue
115 113
 		}
116 114
 
117 115
 		// error has to be checked before calling this function
118 116
 		tag, _ := connectionType.Tag() // nolint: errcheck
119
-		copy(data.Magic(), tag)
117
+		copy(frame.Magic(), tag)
120 118
 
121 119
 		return frame
122 120
 	}

+ 0
- 24
obfuscated2/frame_pool.go Wyświetl plik

@@ -1,24 +0,0 @@
1
-package obfuscated2
2
-
3
-import "sync"
4
-
5
-var framePool sync.Pool
6
-
7
-// MakeFrame returns new pointer to the handshake frame.
8
-func MakeFrame() *Frame {
9
-	return framePool.Get().(*Frame)
10
-}
11
-
12
-// ReturnFrame returns pointer to the handshake frame back to the pool.
13
-func ReturnFrame(f *Frame) {
14
-	framePool.Put(f)
15
-}
16
-
17
-func init() {
18
-	framePool = sync.Pool{
19
-		New: func() interface{} {
20
-			data := make(Frame, FrameLen)
21
-			return &data
22
-		},
23
-	}
24
-}

+ 4
- 4
obfuscated2/frame_test.go Wyświetl plik

@@ -54,21 +54,21 @@ func TestFrameValid(t *testing.T) {
54 54
 
55 55
 func TestFrameDoubleInvert(t *testing.T) {
56 56
 	frame := makeFrame()
57
-	assert.True(t, bytes.Equal(frame, *frame.Invert().Invert()))
57
+	assert.True(t, bytes.Equal(frame, frame.Invert().Invert()))
58 58
 }
59 59
 
60 60
 func TestFrameInvert(t *testing.T) {
61 61
 	frame := makeFrame()
62 62
 	reversed := frame.Invert()
63 63
 
64
-	assert.Exactly(t, frame[:8], (*reversed)[:8])
65
-	assert.Exactly(t, frame[56:], (*reversed)[56:])
64
+	assert.Exactly(t, frame[:8], reversed[:8])
65
+	assert.Exactly(t, frame[56:], reversed[56:])
66 66
 
67 67
 	toCompare := make([]byte, 48)
68 68
 	for i := 0; i < 48; i++ {
69 69
 		toCompare[i] = frame[55-i]
70 70
 	}
71
-	assert.Equal(t, []byte((*reversed)[8:56]), toCompare)
71
+	assert.Equal(t, []byte(reversed[8:56]), toCompare)
72 72
 }
73 73
 
74 74
 func TestFrameGenerateValid(t *testing.T) {

+ 8
- 10
obfuscated2/obfuscated2.go Wyświetl plik

@@ -21,7 +21,7 @@ type Obfuscated2 struct {
21 21
 // details: http://telegra.ph/telegram-blocks-wtf-05-26
22 22
 //
23 23
 // Beware, link above is in russian.
24
-func ParseObfuscated2ClientFrame(secret []byte, frame *Frame) (*Obfuscated2, *mtproto.ConnectionOpts, error) {
24
+func ParseObfuscated2ClientFrame(secret []byte, frame Frame) (*Obfuscated2, *mtproto.ConnectionOpts, error) {
25 25
 	decHasher := sha256.New()
26 26
 	decHasher.Write(frame.Key()) // nolint: errcheck
27 27
 	decHasher.Write(secret)      // nolint: errcheck
@@ -33,9 +33,8 @@ func ParseObfuscated2ClientFrame(secret []byte, frame *Frame) (*Obfuscated2, *mt
33 33
 	encHasher.Write(secret)              // nolint: errcheck
34 34
 	encryptor := makeStreamCipher(encHasher.Sum(nil), invertedFrame.IV())
35 35
 
36
-	decryptedFrame := MakeFrame()
37
-	defer ReturnFrame(decryptedFrame)
38
-	decryptor.XORKeyStream(*decryptedFrame, *frame)
36
+	decryptedFrame := make(Frame, FrameLen)
37
+	decryptor.XORKeyStream(decryptedFrame, frame)
39 38
 	connType, err := decryptedFrame.ConnectionType()
40 39
 	if err != nil {
41 40
 		return nil, nil, errors.Annotate(err, "Unknown protocol")
@@ -56,18 +55,17 @@ func ParseObfuscated2ClientFrame(secret []byte, frame *Frame) (*Obfuscated2, *mt
56 55
 // MakeTelegramObfuscated2Frame creates new handshake frame to send to
57 56
 // Telegram.
58 57
 // https://blog.susanka.eu/how-telegram-obfuscates-its-mtproto-traffic/
59
-func MakeTelegramObfuscated2Frame(opts *mtproto.ConnectionOpts) (*Obfuscated2, *Frame) {
58
+func MakeTelegramObfuscated2Frame(opts *mtproto.ConnectionOpts) (*Obfuscated2, Frame) {
60 59
 	frame := generateFrame(opts.ConnectionType)
61 60
 
62 61
 	encryptor := makeStreamCipher(frame.Key(), frame.IV())
63 62
 	decryptorFrame := frame.Invert()
64 63
 	decryptor := makeStreamCipher(decryptorFrame.Key(), decryptorFrame.IV())
65 64
 
66
-	copyFrame := MakeFrame()
67
-	defer ReturnFrame(copyFrame)
68
-	copy((*copyFrame)[:frameOffsetIV], (*frame)[:frameOffsetIV])
69
-	encryptor.XORKeyStream(*frame, *frame)
70
-	copy((*frame)[:frameOffsetIV], (*copyFrame)[:frameOffsetIV])
65
+	copyFrame := make(Frame, FrameLen)
66
+	copy(copyFrame[:frameOffsetIV], frame[:frameOffsetIV])
67
+	encryptor.XORKeyStream(frame, frame)
68
+	copy(frame[:frameOffsetIV], copyFrame[:frameOffsetIV])
71 69
 
72 70
 	obfs := &Obfuscated2{
73 71
 		Decryptor: decryptor,

+ 5
- 5
obfuscated2/obfuscated2_test.go Wyświetl plik

@@ -18,7 +18,7 @@ func TestObfs2TelegramFrameDecrypt(t *testing.T) {
18 18
 	decryptor := makeStreamCipher(frame.Key(), frame.IV())
19 19
 
20 20
 	decrypted := make(Frame, FrameLen)
21
-	decryptor.XORKeyStream(decrypted, *frame)
21
+	decryptor.XORKeyStream(decrypted, frame)
22 22
 
23 23
 	_, err := decrypted.ConnectionType()
24 24
 	assert.Nil(t, err)
@@ -53,8 +53,8 @@ func TestObfs2Full(t *testing.T) {
53 53
 
54 54
 	encryptor := makeStreamCipher(clientKey, clientFrame.IV())
55 55
 	encrypted := make(Frame, FrameLen)
56
-	encryptor.XORKeyStream(encrypted, *clientFrame)
57
-	copy(encrypted[:56], (*clientFrame)[:56])
56
+	encryptor.XORKeyStream(encrypted, clientFrame)
57
+	copy(encrypted[:56], clientFrame[:56])
58 58
 
59 59
 	invertedClientFrame := clientFrame.Invert()
60 60
 	clientHasher = sha256.New()
@@ -63,7 +63,7 @@ func TestObfs2Full(t *testing.T) {
63 63
 	invertedClientKey := clientHasher.Sum(nil)
64 64
 	clientDecryptor := makeStreamCipher(invertedClientKey, invertedClientFrame.IV())
65 65
 
66
-	clientObfs, _, err := ParseObfuscated2ClientFrame(secret, &encrypted)
66
+	clientObfs, _, err := ParseObfuscated2ClientFrame(secret, encrypted)
67 67
 	assert.Nil(t, err)
68 68
 
69 69
 	connOpts := &mtproto.ConnectionOpts{
@@ -73,7 +73,7 @@ func TestObfs2Full(t *testing.T) {
73 73
 	tgObfs, tgFrame := MakeTelegramObfuscated2Frame(connOpts)
74 74
 	tgDecryptor := makeStreamCipher(tgFrame.Key(), tgFrame.IV())
75 75
 	decrypted := make(Frame, FrameLen)
76
-	tgDecryptor.XORKeyStream(decrypted, *tgFrame)
76
+	tgDecryptor.XORKeyStream(decrypted, tgFrame)
77 77
 	_, err = decrypted.ConnectionType()
78 78
 	assert.Nil(t, err)
79 79
 

+ 0
- 18
proxy/copy_pool.go Wyświetl plik

@@ -1,18 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"sync"
5
-
6
-	"github.com/9seconds/mtg/config"
7
-)
8
-
9
-var copyPool sync.Pool
10
-
11
-func init() {
12
-	copyPool = sync.Pool{
13
-		New: func() interface{} {
14
-			data := make([]byte, config.BufferSizeCopy)
15
-			return &data
16
-		},
17
-	}
18
-}

+ 162
- 0
proxy/proxy.go Wyświetl plik

@@ -0,0 +1,162 @@
1
+package proxy
2
+
3
+import (
4
+	"io"
5
+	"net"
6
+	"sync"
7
+
8
+	"github.com/juju/errors"
9
+	uuid "github.com/satori/go.uuid"
10
+	"go.uber.org/zap"
11
+
12
+	"github.com/9seconds/mtg/client"
13
+	"github.com/9seconds/mtg/config"
14
+	"github.com/9seconds/mtg/mtproto"
15
+	"github.com/9seconds/mtg/stats"
16
+	"github.com/9seconds/mtg/telegram"
17
+	"github.com/9seconds/mtg/wrappers"
18
+)
19
+
20
+// Proxy is a core of this program.
21
+type Proxy struct {
22
+	clientInit client.Init
23
+	tg         telegram.Telegram
24
+	conf       *config.Config
25
+}
26
+
27
+// Serve runs TCP proxy server.
28
+func (p *Proxy) Serve() error {
29
+	lsock, err := net.Listen("tcp", p.conf.BindAddr())
30
+	if err != nil {
31
+		return errors.Annotate(err, "Cannot create listen socket")
32
+	}
33
+
34
+	for {
35
+		if conn, err := lsock.Accept(); err != nil {
36
+			zap.S().Errorw("Cannot allocate incoming connection", "error", err)
37
+		} else {
38
+			go p.accept(conn)
39
+		}
40
+	}
41
+}
42
+
43
+func (p *Proxy) accept(conn net.Conn) {
44
+	connID := uuid.NewV4().String()
45
+	log := zap.S().With("connection_id", connID).Named("main")
46
+
47
+	defer func() {
48
+		conn.Close() // nolint: errcheck
49
+
50
+		if err := recover(); err != nil {
51
+			stats.NewCrash()
52
+			log.Errorw("Crash of accept handler", "error", err)
53
+		}
54
+	}()
55
+
56
+	log.Infow("Client connected", "addr", conn.RemoteAddr())
57
+
58
+	clientConn, opts, err := p.clientInit(conn, connID, p.conf)
59
+	if err != nil {
60
+		log.Errorw("Cannot initialize client connection", "error", err)
61
+		return
62
+	}
63
+	defer clientConn.(io.Closer).Close() // nolint: errcheck
64
+
65
+	stats.ClientConnected(opts.ConnectionType, clientConn.RemoteAddr())
66
+	defer stats.ClientDisconnected(opts.ConnectionType, clientConn.RemoteAddr())
67
+
68
+	serverConn, err := p.getTelegramConn(opts, connID)
69
+	if err != nil {
70
+		log.Errorw("Cannot initialize server connection", "error", err)
71
+		return
72
+	}
73
+	defer serverConn.(io.Closer).Close() // nolint: errcheck
74
+
75
+	wait := &sync.WaitGroup{}
76
+	wait.Add(2)
77
+
78
+	if p.conf.UseMiddleProxy() {
79
+		clientPacket := clientConn.(wrappers.PacketReadWriteCloser)
80
+		serverPacket := serverConn.(wrappers.PacketReadWriteCloser)
81
+		go p.middlePipe(clientPacket, serverPacket, wait, &opts.ReadHacks)
82
+		go p.middlePipe(serverPacket, clientPacket, wait, &opts.WriteHacks)
83
+	} else {
84
+		clientStream := clientConn.(wrappers.StreamReadWriteCloser)
85
+		serverStream := serverConn.(wrappers.StreamReadWriteCloser)
86
+		go p.directPipe(clientStream, serverStream, wait)
87
+		go p.directPipe(serverStream, clientStream, wait)
88
+	}
89
+
90
+	wait.Wait()
91
+
92
+	log.Infow("Client disconnected", "addr", conn.RemoteAddr())
93
+}
94
+
95
+func (p *Proxy) getTelegramConn(opts *mtproto.ConnectionOpts, connID string) (wrappers.Wrap, error) {
96
+	streamConn, err := p.tg.Dial(connID, opts)
97
+	if err != nil {
98
+		return nil, errors.Annotate(err, "Cannot dial to Telegram")
99
+	}
100
+
101
+	packetConn, err := p.tg.Init(opts, streamConn)
102
+	if err != nil {
103
+		return nil, errors.Annotate(err, "Cannot handshake telegram")
104
+	}
105
+
106
+	return packetConn, nil
107
+}
108
+
109
+func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst io.WriteCloser, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
110
+	defer func() {
111
+		src.Close() // nolint: errcheck
112
+		dst.Close() // nolint: errcheck
113
+		wait.Done()
114
+	}()
115
+
116
+	for {
117
+		hacks.SimpleAck = false
118
+		hacks.QuickAck = false
119
+
120
+		packet, err := src.Read()
121
+		if err != nil {
122
+			src.Logger().Warnw("Cannot read packet", "error", err)
123
+			return
124
+		}
125
+		if _, err = dst.Write(packet); err != nil {
126
+			src.Logger().Warnw("Cannot write packet", "error", err)
127
+			return
128
+		}
129
+	}
130
+}
131
+
132
+func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser, wait *sync.WaitGroup) {
133
+	defer func() {
134
+		src.Close() // nolint: errcheck
135
+		dst.Close() // nolint: errcheck
136
+		wait.Done()
137
+	}()
138
+
139
+	if _, err := io.Copy(dst, src); err != nil {
140
+		src.Logger().Warnw("Cannot pump sockets", "error", err)
141
+	}
142
+}
143
+
144
+// NewProxy returns new proxy instance.
145
+func NewProxy(conf *config.Config) *Proxy {
146
+	var clientInit client.Init
147
+	var tg telegram.Telegram
148
+
149
+	if conf.UseMiddleProxy() {
150
+		clientInit = client.MiddleInit
151
+		tg = telegram.NewMiddleTelegram(conf)
152
+	} else {
153
+		clientInit = client.DirectInit
154
+		tg = telegram.NewDirectTelegram(conf)
155
+	}
156
+
157
+	return &Proxy{
158
+		conf:       conf,
159
+		clientInit: clientInit,
160
+		tg:         tg,
161
+	}
162
+}

+ 0
- 149
proxy/server.go Wyświetl plik

@@ -1,149 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-
9
-	"github.com/juju/errors"
10
-	uuid "github.com/satori/go.uuid"
11
-	"go.uber.org/zap"
12
-
13
-	"github.com/9seconds/mtg/client"
14
-	"github.com/9seconds/mtg/config"
15
-	"github.com/9seconds/mtg/mtproto"
16
-	"github.com/9seconds/mtg/telegram"
17
-	"github.com/9seconds/mtg/wrappers"
18
-)
19
-
20
-// Server is an insgtance of MTPROTO proxy.
21
-type Server struct {
22
-	conf       *config.Config
23
-	logger     *zap.SugaredLogger
24
-	stats      *Stats
25
-	tg         telegram.Telegram
26
-	clientInit client.Init
27
-}
28
-
29
-// Serve does MTPROTO proxying.
30
-func (s *Server) Serve() error {
31
-	lsock, err := net.Listen("tcp", s.conf.BindAddr())
32
-	if err != nil {
33
-		return errors.Annotate(err, "Cannot create listen socket")
34
-	}
35
-
36
-	for {
37
-		if conn, err := lsock.Accept(); err != nil {
38
-			s.logger.Warn("Cannot allocate incoming connection", "error", err)
39
-		} else {
40
-			go s.accept(conn)
41
-		}
42
-	}
43
-}
44
-
45
-func (s *Server) accept(conn net.Conn) {
46
-	defer func() {
47
-		s.stats.closeConnection()
48
-		conn.Close() // nolint: errcheck
49
-
50
-		if r := recover(); r != nil {
51
-			s.logger.Errorw("Crash of accept handler", "error", r)
52
-		}
53
-	}()
54
-
55
-	s.stats.newConnection()
56
-	ctx, cancel := context.WithCancel(context.Background())
57
-	socketID := uuid.NewV4().String()
58
-
59
-	s.logger.Debugw("Client connected",
60
-		"addr", conn.RemoteAddr().String(),
61
-		"socketid", socketID,
62
-	)
63
-
64
-	connOpts, clientConn, err := s.getClientStream(ctx, cancel, conn, socketID)
65
-	if err != nil {
66
-		s.logger.Warnw("Cannot initialize client connection",
67
-			"addr", conn.RemoteAddr().String(),
68
-			"socketid", socketID,
69
-			"error", err,
70
-		)
71
-		return
72
-	}
73
-	defer clientConn.Close() // nolint: errcheck
74
-
75
-	tgConn, err := s.getTelegramStream(ctx, cancel, connOpts, socketID)
76
-	if err != nil {
77
-		s.logger.Warnw("Cannot initialize Telegram connection",
78
-			"socketid", socketID,
79
-			"error", err,
80
-		)
81
-		return
82
-	}
83
-	defer tgConn.Close() // nolint: errcheck
84
-
85
-	wait := &sync.WaitGroup{}
86
-	wait.Add(2)
87
-
88
-	go s.pipe(clientConn, tgConn, wait)
89
-	go s.pipe(tgConn, clientConn, wait)
90
-
91
-	<-ctx.Done()
92
-	wait.Wait()
93
-
94
-	s.logger.Debugw("Client disconnected",
95
-		"addr", conn.RemoteAddr().String(),
96
-		"socketid", socketID,
97
-	)
98
-}
99
-
100
-func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
101
-	connOpts, socket, err := s.clientInit(conn, s.conf)
102
-	if err != nil {
103
-		return nil, nil, errors.Annotate(err, "Cannot init client connection")
104
-	}
105
-
106
-	socket = wrappers.NewTrafficRWC(socket, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
107
-	socket = wrappers.NewLogRWC(socket, s.logger, socketID, "client")
108
-	socket = wrappers.NewCtxRWC(ctx, cancel, socket)
109
-
110
-	return connOpts, socket, nil
111
-}
112
-
113
-func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFunc, connOpts *mtproto.ConnectionOpts, socketID string) (io.ReadWriteCloser, error) {
114
-	conn, err := s.tg.Dial(connOpts)
115
-	if err != nil {
116
-		return nil, errors.Annotate(err, "Cannot connect to Telegram")
117
-	}
118
-
119
-	conn = wrappers.NewTrafficRWC(conn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
120
-	conn, err = s.tg.Init(connOpts, conn)
121
-	if err != nil {
122
-		return nil, errors.Annotate(err, "Cannot handshake Telegram")
123
-	}
124
-
125
-	conn = wrappers.NewLogRWC(conn, s.logger, socketID, "telegram")
126
-	conn = wrappers.NewCtxRWC(ctx, cancel, conn)
127
-
128
-	return conn, nil
129
-}
130
-
131
-func (s *Server) pipe(dst io.Writer, src io.Reader, wait *sync.WaitGroup) {
132
-	defer wait.Done()
133
-
134
-	buf := copyPool.Get().(*[]byte)
135
-	defer copyPool.Put(buf)
136
-
137
-	io.CopyBuffer(dst, src, *buf) // nolint: errcheck
138
-}
139
-
140
-// NewServer creates new instance of MTPROTO proxy.
141
-func NewServer(conf *config.Config, logger *zap.SugaredLogger, stat *Stats) *Server {
142
-	return &Server{
143
-		conf:       conf,
144
-		logger:     logger,
145
-		stats:      stat,
146
-		tg:         telegram.NewDirectTelegram(conf),
147
-		clientInit: client.DirectInit,
148
-	}
149
-}

+ 0
- 74
proxy/stats.go Wyświetl plik

@@ -1,74 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"encoding/json"
5
-	"net/http"
6
-	"strconv"
7
-	"sync/atomic"
8
-	"time"
9
-
10
-	"github.com/9seconds/mtg/config"
11
-)
12
-
13
-type statsUptime time.Time
14
-
15
-func (s statsUptime) MarshalJSON() ([]byte, error) {
16
-	uptime := int(time.Since(time.Time(s)).Seconds())
17
-	return []byte(strconv.Itoa(uptime)), nil
18
-}
19
-
20
-// Stats is a datastructure for statistics on work of this proxy.
21
-type Stats struct {
22
-	AllConnections    uint64 `json:"all_connections"`
23
-	ActiveConnections uint32 `json:"active_connections"`
24
-	Traffic           struct {
25
-		Incoming uint64 `json:"incoming"`
26
-		Outgoing uint64 `json:"outgoing"`
27
-	} `json:"traffic"`
28
-	URLs   config.IPURLs `json:"urls"`
29
-	Uptime statsUptime   `json:"uptime"`
30
-
31
-	conf *config.Config
32
-}
33
-
34
-func (s *Stats) newConnection() {
35
-	atomic.AddUint64(&s.AllConnections, 1)
36
-	atomic.AddUint32(&s.ActiveConnections, 1)
37
-}
38
-
39
-func (s *Stats) closeConnection() {
40
-	atomic.AddUint32(&s.ActiveConnections, ^uint32(0))
41
-}
42
-
43
-func (s *Stats) addIncomingTraffic(n int) {
44
-	atomic.AddUint64(&s.Traffic.Incoming, uint64(n))
45
-}
46
-
47
-func (s *Stats) addOutgoingTraffic(n int) {
48
-	atomic.AddUint64(&s.Traffic.Outgoing, uint64(n))
49
-}
50
-
51
-// Serve runs statistics HTTP server.
52
-func (s *Stats) Serve() {
53
-	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
54
-		w.Header().Set("Content-Type", "application/json")
55
-
56
-		encoder := json.NewEncoder(w)
57
-		encoder.SetEscapeHTML(false)
58
-		encoder.SetIndent("", "  ")
59
-		encoder.Encode(s) // nolint: errcheck, gas
60
-	})
61
-
62
-	http.ListenAndServe(s.conf.StatAddr(), nil) // nolint: errcheck, gas
63
-}
64
-
65
-// NewStats returns new instance of statistics datastructure.
66
-func NewStats(conf *config.Config) *Stats {
67
-	stat := &Stats{
68
-		Uptime: statsUptime(time.Now()),
69
-		conf:   conf,
70
-	}
71
-	stat.URLs = conf.GetURLs()
72
-
73
-	return stat
74
-}

+ 0
- 1
run-mtg.sh Wyświetl plik

@@ -21,7 +21,6 @@ docker run \
21 21
     --sysctl net.ipv4.tcp_congestion_control=bbr \
22 22
     --sysctl net.ipv4.tcp_fastopen=3 \
23 23
     --sysctl net.ipv4.tcp_fin_timeout=30 \
24
-    --sysctl net.ipv4.tcp_keepalive_time=1200 \
25 24
     --sysctl net.ipv4.tcp_max_syn_backlog=4096 \
26 25
     --sysctl net.ipv4.tcp_max_tw_buckets=5000 \
27 26
     --sysctl net.ipv4.tcp_mtu_probing=1 \

+ 151
- 0
stats/channels.go Wyświetl plik

@@ -0,0 +1,151 @@
1
+package stats
2
+
3
+import (
4
+	"net"
5
+	"time"
6
+
7
+	"github.com/9seconds/mtg/mtproto"
8
+)
9
+
10
+const (
11
+	crashesChanLength     = 1
12
+	connectionsChanLength = 20
13
+	trafficChanLength     = 5000
14
+)
15
+
16
+var (
17
+	crashesChan     = make(chan struct{}, crashesChanLength)
18
+	connectionsChan = make(chan *connectionData, connectionsChanLength)
19
+	trafficChan     = make(chan *trafficData, trafficChanLength)
20
+)
21
+
22
+type connectionData struct {
23
+	connectionType mtproto.ConnectionType
24
+	connected      bool
25
+	addr           *net.TCPAddr
26
+}
27
+
28
+type trafficData struct {
29
+	traffic int
30
+	ingress bool
31
+}
32
+
33
+func crashManager() {
34
+	for range crashesChan {
35
+		instance.mutex.RLock()
36
+
37
+		instance.Crashes++
38
+
39
+		instance.mutex.RUnlock()
40
+	}
41
+}
42
+
43
+func connectionManager() { // nolint: gocyclo
44
+	for event := range connectionsChan {
45
+		instance.mutex.RLock()
46
+
47
+		isIPv4 := event.addr.IP.To4() != nil
48
+		var inc uint32 = 1
49
+		if !event.connected {
50
+			inc = ^uint32(0)
51
+		}
52
+
53
+		switch event.connectionType {
54
+		case mtproto.ConnectionTypeAbridged:
55
+			if isIPv4 {
56
+				instance.ActiveConnections.Abridged.IPv4 += inc
57
+				if event.connected {
58
+					instance.AllConnections.Abridged.IPv4 += inc
59
+				}
60
+			} else {
61
+				instance.ActiveConnections.Abridged.IPv6 += inc
62
+				if event.connected {
63
+					instance.AllConnections.Abridged.IPv6 += inc
64
+				}
65
+			}
66
+		default:
67
+			if isIPv4 {
68
+				instance.ActiveConnections.Intermediate.IPv4 += inc
69
+				if event.connected {
70
+					instance.AllConnections.Intermediate.IPv4 += inc
71
+				}
72
+			} else {
73
+				instance.ActiveConnections.Intermediate.IPv6 += inc
74
+				if event.connected {
75
+					instance.AllConnections.Intermediate.IPv6 += inc
76
+				}
77
+			}
78
+		}
79
+
80
+		instance.mutex.RUnlock()
81
+	}
82
+}
83
+
84
+func trafficManager() {
85
+	speedChan := time.Tick(time.Second)
86
+
87
+	for {
88
+		select {
89
+		case event := <-trafficChan:
90
+			instance.mutex.RLock()
91
+
92
+			if event.ingress {
93
+				instance.Traffic.Ingress += trafficValue(event.traffic)
94
+				instance.speedCurrent.Ingress += trafficSpeedValue(event.traffic)
95
+			} else {
96
+				instance.Traffic.Egress += trafficValue(event.traffic)
97
+				instance.speedCurrent.Egress += trafficSpeedValue(event.traffic)
98
+			}
99
+
100
+			instance.mutex.RUnlock()
101
+		case <-speedChan:
102
+			instance.mutex.RLock()
103
+
104
+			instance.Speed.Ingress = instance.speedCurrent.Ingress
105
+			instance.Speed.Egress = instance.speedCurrent.Egress
106
+			instance.speedCurrent.Ingress = trafficSpeedValue(0)
107
+			instance.speedCurrent.Egress = trafficSpeedValue(0)
108
+
109
+			instance.mutex.RUnlock()
110
+		}
111
+	}
112
+}
113
+
114
+// NewCrash indicates new crash.
115
+func NewCrash() {
116
+	crashesChan <- struct{}{}
117
+}
118
+
119
+// ClientConnected indicates that new client was connected.
120
+func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
121
+	connectionsChan <- &connectionData{
122
+		connectionType: connectionType,
123
+		addr:           addr,
124
+		connected:      true,
125
+	}
126
+}
127
+
128
+// ClientDisconnected indicates that client was disconnected.
129
+func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
130
+	connectionsChan <- &connectionData{
131
+		connectionType: connectionType,
132
+		addr:           addr,
133
+		connected:      false,
134
+	}
135
+}
136
+
137
+// IngressTraffic accounts new ingress traffic.
138
+func IngressTraffic(traffic int) {
139
+	trafficChan <- &trafficData{
140
+		traffic: traffic,
141
+		ingress: true,
142
+	}
143
+}
144
+
145
+// EgressTraffic accounts new ingress traffic.
146
+func EgressTraffic(traffic int) {
147
+	trafficChan <- &trafficData{
148
+		traffic: traffic,
149
+		ingress: false,
150
+	}
151
+}

+ 57
- 0
stats/server.go Wyświetl plik

@@ -0,0 +1,57 @@
1
+package stats
2
+
3
+import (
4
+	"encoding/json"
5
+	"net/http"
6
+	"sync"
7
+	"time"
8
+
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/config"
12
+)
13
+
14
+var instance *stats
15
+
16
+// Start starts new statisitcs server.
17
+func Start(conf *config.Config) {
18
+	log := zap.S().Named("stats")
19
+
20
+	instance = &stats{
21
+		URLs:   conf.GetURLs(),
22
+		Uptime: uptime(time.Now()),
23
+		mutex:  &sync.RWMutex{},
24
+	}
25
+
26
+	go crashManager()
27
+	go connectionManager()
28
+	go trafficManager()
29
+
30
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
31
+		w.Header().Set("Content-Type", "application/json")
32
+
33
+		instance.mutex.Lock()
34
+		first, err := json.Marshal(instance)
35
+		instance.mutex.Unlock()
36
+
37
+		if err != nil {
38
+			log.Errorw("Cannot encode json", "error", err)
39
+			http.Error(w, "Internal server error", 500)
40
+			return
41
+		}
42
+
43
+		interm := map[string]interface{}{}
44
+		json.Unmarshal(first, &interm) // nolint: errcheck
45
+
46
+		encoder := json.NewEncoder(w)
47
+		encoder.SetEscapeHTML(false)
48
+		encoder.SetIndent("", "  ")
49
+		if err = encoder.Encode(interm); err != nil {
50
+			log.Errorw("Cannot encode json", "error", err)
51
+		}
52
+	})
53
+
54
+	if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
55
+		log.Fatalw("Stats server has been stopped", "error", err)
56
+	}
57
+}

+ 100
- 0
stats/stats.go Wyświetl plik

@@ -0,0 +1,100 @@
1
+package stats
2
+
3
+import (
4
+	"encoding/json"
5
+	"fmt"
6
+	"strconv"
7
+	"sync"
8
+	"time"
9
+
10
+	humanize "github.com/dustin/go-humanize"
11
+
12
+	"github.com/9seconds/mtg/config"
13
+)
14
+
15
+type uptime time.Time
16
+
17
+func (u uptime) MarshalJSON() ([]byte, error) {
18
+	duration := time.Since(time.Time(u))
19
+	value := map[string]string{
20
+		"seconds": strconv.Itoa(int(duration.Seconds())),
21
+		"human":   humanize.Time(time.Time(u)),
22
+	}
23
+
24
+	return json.Marshal(value)
25
+}
26
+
27
+type trafficValue uint64
28
+
29
+func (t trafficValue) MarshalJSON() ([]byte, error) {
30
+	tv := uint64(t)
31
+	value := map[string]interface{}{
32
+		"bytes": tv,
33
+		"human": humanize.Bytes(tv),
34
+	}
35
+
36
+	return json.Marshal(value)
37
+}
38
+
39
+type trafficSpeedValue uint64
40
+
41
+func (t trafficSpeedValue) MarshalJSON() ([]byte, error) {
42
+	speed := uint64(t)
43
+	value := map[string]interface{}{
44
+		"bytes/s": speed,
45
+		"human":   fmt.Sprintf("%s/S", humanize.Bytes(speed)),
46
+	}
47
+
48
+	return json.Marshal(value)
49
+}
50
+
51
+type connections struct {
52
+	All          connectionType `json:"all"`
53
+	Abridged     connectionType `json:"abridged"`
54
+	Intermediate connectionType `json:"intermediate"`
55
+}
56
+
57
+func (c connections) MarshalJSON() ([]byte, error) {
58
+	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4
59
+	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6
60
+
61
+	value := struct {
62
+		All          connectionType `json:"all"`
63
+		Abridged     connectionType `json:"abridged"`
64
+		Intermediate connectionType `json:"intermediate"`
65
+	}{
66
+		All:          c.All,
67
+		Abridged:     c.Abridged,
68
+		Intermediate: c.Intermediate,
69
+	}
70
+
71
+	return json.Marshal(value)
72
+}
73
+
74
+type connectionType struct {
75
+	IPv6 uint32 `json:"ipv6"`
76
+	IPv4 uint32 `json:"ipv4"`
77
+}
78
+
79
+type traffic struct {
80
+	Ingress trafficValue `json:"ingress"`
81
+	Egress  trafficValue `json:"egress"`
82
+}
83
+
84
+type speed struct {
85
+	Ingress trafficSpeedValue `json:"ingress"`
86
+	Egress  trafficSpeedValue `json:"egress"`
87
+}
88
+
89
+type stats struct {
90
+	URLs              config.IPURLs `json:"urls"`
91
+	ActiveConnections connections   `json:"active_connections"`
92
+	AllConnections    connections   `json:"all_connections"`
93
+	Traffic           traffic       `json:"traffic"`
94
+	Speed             speed         `json:"speed"`
95
+	Uptime            uptime        `json:"uptime"`
96
+	Crashes           uint32        `json:"crashes"`
97
+
98
+	speedCurrent speed
99
+	mutex        *sync.RWMutex
100
+}

+ 18
- 5
telegram/dialer.go Wyświetl plik

@@ -1,21 +1,25 @@
1 1
 package telegram
2 2
 
3 3
 import (
4
-	"io"
5 4
 	"net"
6 5
 	"time"
7 6
 
8 7
 	"github.com/juju/errors"
9 8
 
10 9
 	"github.com/9seconds/mtg/config"
10
+	"github.com/9seconds/mtg/wrappers"
11 11
 )
12 12
 
13 13
 const (
14 14
 	telegramDialTimeout = 10 * time.Second
15
+	readBufferSize      = 64 * 1024
16
+	writeBufferSize     = 64 * 1024
15 17
 )
16 18
 
17 19
 type tgDialer struct {
18 20
 	net.Dialer
21
+
22
+	conf *config.Config
19 23
 }
20 24
 
21 25
 func (t *tgDialer) dial(addr string) (net.Conn, error) {
@@ -23,18 +27,27 @@ func (t *tgDialer) dial(addr string) (net.Conn, error) {
23 27
 	if err != nil {
24 28
 		return nil, errors.Annotate(err, "Cannot connect to Telegram")
25 29
 	}
26
-	if err = config.SetSocketOptions(conn); err != nil {
27
-		return nil, errors.Annotate(err, "Cannot set socket options")
30
+
31
+	tcpSocket := conn.(*net.TCPConn)
32
+	if err = tcpSocket.SetNoDelay(true); err != nil {
33
+		return nil, errors.Annotate(err, "Cannot set NO_DELAY to Telegram")
34
+	}
35
+	if err = tcpSocket.SetReadBuffer(readBufferSize); err != nil {
36
+		return nil, errors.Annotate(err, "Cannot set read buffer size on telegram socket")
37
+	}
38
+	if err = tcpSocket.SetWriteBuffer(writeBufferSize); err != nil {
39
+		return nil, errors.Annotate(err, "Cannot set write buffer size on telegram socket")
28 40
 	}
29 41
 
30 42
 	return conn, nil
31 43
 }
32 44
 
33
-func (t *tgDialer) dialRWC(addr string) (io.ReadWriteCloser, error) {
45
+func (t *tgDialer) dialRWC(addr, connID string) (wrappers.StreamReadWriteCloser, error) {
34 46
 	conn, err := t.dial(addr)
35 47
 	if err != nil {
36 48
 		return nil, err
37 49
 	}
50
+	tgConn := wrappers.NewConn(conn, connID, wrappers.ConnPurposeTelegram, t.conf.PublicIPv4, t.conf.PublicIPv6)
38 51
 
39
-	return conn, nil
52
+	return tgConn, nil
40 53
 }

+ 9
- 8
telegram/direct.go Wyświetl plik

@@ -1,7 +1,6 @@
1 1
 package telegram
2 2
 
3 3
 import (
4
-	"io"
5 4
 	"net"
6 5
 
7 6
 	"github.com/juju/errors"
@@ -33,7 +32,7 @@ type directTelegram struct {
33 32
 	baseTelegram
34 33
 }
35 34
 
36
-func (t *directTelegram) Dial(connOpts *mtproto.ConnectionOpts) (io.ReadWriteCloser, error) {
35
+func (t *directTelegram) Dial(connID string, connOpts *mtproto.ConnectionOpts) (wrappers.StreamReadWriteCloser, error) {
37 36
 	dc := connOpts.DC
38 37
 	if dc < 0 {
39 38
 		dc = -dc
@@ -41,25 +40,27 @@ func (t *directTelegram) Dial(connOpts *mtproto.ConnectionOpts) (io.ReadWriteClo
41 40
 		dc = 1
42 41
 	}
43 42
 
44
-	return t.baseTelegram.dial(dc - 1)
43
+	return t.baseTelegram.dial(dc-1, connID, connOpts.ConnectionProto)
45 44
 }
46 45
 
47
-func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn io.ReadWriteCloser) (io.ReadWriteCloser, error) {
46
+func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.StreamReadWriteCloser) (wrappers.Wrap, error) {
48 47
 	obfs2, frame := obfuscated2.MakeTelegramObfuscated2Frame(connOpts)
49
-	defer obfuscated2.ReturnFrame(frame)
50 48
 
51
-	if n, err := conn.Write(*frame); err != nil || n != len(*frame) {
49
+	if _, err := conn.Write(frame); err != nil {
52 50
 		return nil, errors.Annotate(err, "Cannot write hadnshake frame")
53 51
 	}
54 52
 
55
-	return wrappers.NewStreamCipherRWC(conn, obfs2.Encryptor, obfs2.Decryptor), nil
53
+	return wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor), nil
56 54
 }
57 55
 
58 56
 // NewDirectTelegram returns Telegram instance which connects directly
59 57
 // to Telegram bypassing middleproxies.
60 58
 func NewDirectTelegram(conf *config.Config) Telegram {
61 59
 	return &directTelegram{baseTelegram{
62
-		dialer:      tgDialer{net.Dialer{Timeout: telegramDialTimeout}},
60
+		dialer: tgDialer{
61
+			Dialer: net.Dialer{Timeout: telegramDialTimeout},
62
+			conf:   conf,
63
+		},
63 64
 		v4Addresses: directV4Addresses,
64 65
 		v6Addresses: directV6Addresses,
65 66
 	}}

+ 136
- 0
telegram/middle.go Wyświetl plik

@@ -0,0 +1,136 @@
1
+package telegram
2
+
3
+import (
4
+	"io"
5
+	"net"
6
+	"net/http"
7
+	"sync"
8
+
9
+	"github.com/juju/errors"
10
+
11
+	"github.com/9seconds/mtg/config"
12
+	"github.com/9seconds/mtg/mtproto"
13
+	"github.com/9seconds/mtg/mtproto/rpc"
14
+	"github.com/9seconds/mtg/wrappers"
15
+)
16
+
17
+type middleTelegram struct {
18
+	middleTelegramCaller
19
+
20
+	conf *config.Config
21
+}
22
+
23
+func (t *middleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.StreamReadWriteCloser) (wrappers.Wrap, error) {
24
+	rpcNonceConn := wrappers.NewMTProtoFrame(conn, rpc.SeqNoNonce)
25
+
26
+	rpcNonceReq, err := t.sendRPCNonceRequest(rpcNonceConn)
27
+	if err != nil {
28
+		return nil, err
29
+	}
30
+	rpcNonceResp, err := t.receiveRPCNonceResponse(rpcNonceConn, rpcNonceReq)
31
+	if err != nil {
32
+		return nil, err
33
+	}
34
+
35
+	secureConn := wrappers.NewMiddleProxyCipher(conn, rpcNonceReq, rpcNonceResp, t.proxySecret)
36
+	frameConn := wrappers.NewMTProtoFrame(secureConn, rpc.SeqNoHandshake)
37
+
38
+	rpcHandshakeReq, err := t.sendRPCHandshakeRequest(frameConn)
39
+	if err != nil {
40
+		return nil, err
41
+	}
42
+	_, err = t.receiveRPCHandshakeResponse(frameConn, rpcHandshakeReq)
43
+	if err != nil {
44
+		return nil, err
45
+	}
46
+
47
+	proxyConn, err := wrappers.NewMTProtoProxy(frameConn, connOpts, t.conf.AdTag)
48
+	if err != nil {
49
+		return nil, err
50
+	}
51
+	proxyConn.Logger().Infow("Telegram connection initialized")
52
+
53
+	return proxyConn, nil
54
+}
55
+
56
+func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.NonceRequest, error) {
57
+	rpcNonceReq, err := rpc.NewNonceRequest(t.proxySecret)
58
+	if err != nil {
59
+		return nil, errors.Annotate(err, "Cannot create RPC nonce request")
60
+	}
61
+	if _, err = conn.Write(rpcNonceReq.Bytes()); err != nil {
62
+		return nil, errors.Annotate(err, "Cannot send RPC nonce request")
63
+	}
64
+
65
+	return rpcNonceReq, nil
66
+}
67
+
68
+func (t *middleTelegram) receiveRPCNonceResponse(conn wrappers.PacketReader, req *rpc.NonceRequest) (*rpc.NonceResponse, error) {
69
+	packet, err := conn.Read()
70
+	if err != nil {
71
+		return nil, errors.Annotate(err, "Cannot read RPC nonce response")
72
+	}
73
+
74
+	rpcNonceResp, err := rpc.NewNonceResponse(packet)
75
+	if err != nil {
76
+		return nil, errors.Annotate(err, "Cannot initialize RPC nonce response")
77
+	}
78
+	if err = rpcNonceResp.Valid(req); err != nil {
79
+		return nil, errors.Annotate(err, "Invalid RPC nonce response")
80
+	}
81
+
82
+	return rpcNonceResp, nil
83
+}
84
+
85
+func (t *middleTelegram) sendRPCHandshakeRequest(conn io.Writer) (*rpc.HandshakeRequest, error) {
86
+	req := rpc.NewHandshakeRequest()
87
+	if _, err := conn.Write(req.Bytes()); err != nil {
88
+		return nil, errors.Annotate(err, "Cannot send RPC handshake request")
89
+	}
90
+
91
+	return req, nil
92
+}
93
+
94
+func (t *middleTelegram) receiveRPCHandshakeResponse(conn wrappers.PacketReader, req *rpc.HandshakeRequest) (*rpc.HandshakeResponse, error) {
95
+	packet, err := conn.Read()
96
+	if err != nil {
97
+		return nil, errors.Annotate(err, "Cannot read RPC handshake response")
98
+	}
99
+
100
+	rpcHandshakeResp, err := rpc.NewHandshakeResponse(packet)
101
+	if err != nil {
102
+		return nil, errors.Annotate(err, "Cannot initialize RPC handshake response")
103
+	}
104
+	if err = rpcHandshakeResp.Valid(req); err != nil {
105
+		return nil, errors.Annotate(err, "Invalid RPC handshake response")
106
+	}
107
+
108
+	return rpcHandshakeResp, nil
109
+}
110
+
111
+// NewMiddleTelegram creates new instance of Telegram which works with
112
+// middle proxies.
113
+func NewMiddleTelegram(conf *config.Config) Telegram {
114
+	tg := &middleTelegram{
115
+		middleTelegramCaller: middleTelegramCaller{
116
+			baseTelegram: baseTelegram{
117
+				dialer: tgDialer{
118
+					Dialer: net.Dialer{Timeout: telegramDialTimeout},
119
+					conf:   conf,
120
+				},
121
+			},
122
+			httpClient: &http.Client{
123
+				Timeout: middleTelegramHTTPClientTimeout,
124
+			},
125
+			dialerMutex: &sync.RWMutex{},
126
+		},
127
+		conf: conf,
128
+	}
129
+
130
+	if err := tg.update(); err != nil {
131
+		panic(err)
132
+	}
133
+	go tg.autoUpdate()
134
+
135
+	return tg
136
+}

+ 152
- 0
telegram/middle_caller.go Wyświetl plik

@@ -0,0 +1,152 @@
1
+package telegram
2
+
3
+import (
4
+	"bufio"
5
+	"io/ioutil"
6
+	"net"
7
+	"net/http"
8
+	"regexp"
9
+	"strconv"
10
+	"strings"
11
+	"sync"
12
+	"time"
13
+
14
+	"github.com/juju/errors"
15
+	"go.uber.org/zap"
16
+
17
+	"github.com/9seconds/mtg/mtproto"
18
+	"github.com/9seconds/mtg/wrappers"
19
+)
20
+
21
+const (
22
+	middleTelegramAutoUpdateInterval = 6 * time.Hour
23
+	middleTelegramHTTPClientTimeout  = 30 * time.Second
24
+
25
+	tgAddrProxySecret = "https://core.telegram.org/getProxySecret"   // nolint: gas
26
+	tgAddrProxyV4     = "https://core.telegram.org/getProxyConfig"   // nolint: gas
27
+	tgAddrProxyV6     = "https://core.telegram.org/getProxyConfigV6" // nolint: gas
28
+	tgUserAgent       = "mtg"
29
+)
30
+
31
+var middleTelegramProxyConfigSplitter = regexp.MustCompile(`\s+`)
32
+
33
+type middleTelegramCaller struct {
34
+	baseTelegram
35
+
36
+	proxySecret []byte
37
+	dialerMutex *sync.RWMutex
38
+	httpClient  *http.Client
39
+}
40
+
41
+func (t *middleTelegramCaller) Dial(connID string, connOpts *mtproto.ConnectionOpts) (wrappers.StreamReadWriteCloser, error) {
42
+	dc := connOpts.DC
43
+	if dc == 0 {
44
+		dc = 1
45
+	}
46
+	t.dialerMutex.RLock()
47
+	defer t.dialerMutex.RUnlock()
48
+
49
+	return t.baseTelegram.dial(dc, connID, connOpts.ConnectionProto)
50
+}
51
+
52
+func (t *middleTelegramCaller) autoUpdate() {
53
+	for range time.Tick(middleTelegramAutoUpdateInterval) {
54
+		if err := t.update(); err != nil {
55
+			zap.S().Warnw("Cannot update from Telegram", "error", err)
56
+		}
57
+	}
58
+}
59
+
60
+func (t *middleTelegramCaller) update() error {
61
+	secret, err := t.getTelegramProxySecret()
62
+	if err != nil {
63
+		return errors.Annotate(err, "Cannot get proxy secret")
64
+	}
65
+
66
+	v4Addresses, err := t.getTelegramAddresses(tgAddrProxyV4)
67
+	if err != nil {
68
+		return errors.Annotate(err, "Cannot get ipv4 addresses")
69
+	}
70
+
71
+	v6Addresses, err := t.getTelegramAddresses(tgAddrProxyV6)
72
+	if err != nil {
73
+		return errors.Annotate(err, "Cannot get ipv6 addresses")
74
+	}
75
+
76
+	t.dialerMutex.Lock()
77
+	t.proxySecret = secret
78
+	t.v4Addresses = v4Addresses
79
+	t.v6Addresses = v6Addresses
80
+	t.dialerMutex.Unlock()
81
+
82
+	zap.S().Infow("Telegram middle proxy data has been updated")
83
+
84
+	return nil
85
+}
86
+
87
+func (t *middleTelegramCaller) getTelegramProxySecret() ([]byte, error) {
88
+	resp, err := t.call(tgAddrProxySecret)
89
+	if err != nil {
90
+		return nil, errors.Annotate(err, "Cannot access telegram server")
91
+	}
92
+	defer resp.Body.Close() // nolint: errcheck
93
+
94
+	secret, err := ioutil.ReadAll(resp.Body)
95
+	if err != nil {
96
+		return nil, errors.Annotate(err, "Cannot read response")
97
+	}
98
+
99
+	return secret, nil
100
+}
101
+
102
+func (t *middleTelegramCaller) getTelegramAddresses(url string) (map[int16][]string, error) {
103
+	resp, err := t.call(url)
104
+	if err != nil {
105
+		return nil, errors.Annotate(err, "Cannot access telegram server")
106
+	}
107
+	defer resp.Body.Close() // nolint: errcheck
108
+
109
+	scanner := bufio.NewScanner(resp.Body)
110
+	data := map[int16][]string{}
111
+	for scanner.Scan() {
112
+		text := strings.TrimSpace(scanner.Text())
113
+		if strings.HasPrefix(text, "#") {
114
+			continue
115
+		}
116
+
117
+		chunks := middleTelegramProxyConfigSplitter.Split(text, 3)
118
+		if len(chunks) != 3 || chunks[0] != "proxy_for" {
119
+			return nil, errors.Errorf("Incorrect config '%s'", text)
120
+		}
121
+		dcIdx64, err2 := strconv.ParseInt(chunks[1], 10, 16)
122
+		if err2 != nil {
123
+			return nil, errors.Errorf("Incorrect config '%s'", text)
124
+		}
125
+		dcIdx := int16(dcIdx64)
126
+
127
+		addr := strings.TrimRight(chunks[2], ";")
128
+		if _, _, err2 = net.SplitHostPort(addr); err != nil {
129
+			return nil, errors.Annotatef(err2, "Incorrect config '%s'", text)
130
+		}
131
+
132
+		if addresses, ok := data[dcIdx]; ok {
133
+			data[dcIdx] = append(addresses, addr)
134
+		} else {
135
+			data[dcIdx] = []string{addr}
136
+		}
137
+	}
138
+	err = scanner.Err()
139
+	if err != nil {
140
+		return nil, errors.Annotate(err, "Cannot read response from the telegram")
141
+	}
142
+
143
+	return data, nil
144
+}
145
+
146
+func (t *middleTelegramCaller) call(url string) (*http.Response, error) {
147
+	req, _ := http.NewRequest("GET", url, nil)
148
+	req.Header.Set("Accept", "text/plain")
149
+	req.Header.Set("User-Agent", tgUserAgent)
150
+
151
+	return t.httpClient.Do(req)
152
+}

+ 15
- 12
telegram/telegram.go Wyświetl plik

@@ -1,20 +1,18 @@
1 1
 package telegram
2 2
 
3 3
 import (
4
-	"io"
5 4
 	"math/rand"
6 5
 
7 6
 	"github.com/juju/errors"
8 7
 
9 8
 	"github.com/9seconds/mtg/mtproto"
9
+	"github.com/9seconds/mtg/wrappers"
10 10
 )
11 11
 
12
-// Telegram defines an interface to connect to Telegram. This
13
-// encapsulates logic of working with middleproxies or direct
14
-// connections.
12
+// Telegram is an interface for different Telegram work modes.
15 13
 type Telegram interface {
16
-	Dial(*mtproto.ConnectionOpts) (io.ReadWriteCloser, error)
17
-	Init(*mtproto.ConnectionOpts, io.ReadWriteCloser) (io.ReadWriteCloser, error)
14
+	Dial(string, *mtproto.ConnectionOpts) (wrappers.StreamReadWriteCloser, error)
15
+	Init(*mtproto.ConnectionOpts, wrappers.StreamReadWriteCloser) (wrappers.Wrap, error)
18 16
 }
19 17
 
20 18
 type baseTelegram struct {
@@ -24,17 +22,22 @@ type baseTelegram struct {
24 22
 	v6Addresses map[int16][]string
25 23
 }
26 24
 
27
-func (b *baseTelegram) dial(dcIdx int16) (io.ReadWriteCloser, error) {
25
+func (b *baseTelegram) dial(dcIdx int16, connID string, proto mtproto.ConnectionProtocol) (wrappers.StreamReadWriteCloser, error) {
28 26
 	addrs := make([]string, 2)
29
-	if addr, ok := b.v6Addresses[dcIdx]; ok && len(addr) > 0 {
30
-		addrs = append(addrs, addr[rand.Intn(len(addr))])
27
+
28
+	if proto&mtproto.ConnectionProtocolIPv6 != 0 {
29
+		if addr, ok := b.v6Addresses[dcIdx]; ok && len(addr) > 0 {
30
+			addrs = append(addrs, addr[rand.Intn(len(addr))])
31
+		}
31 32
 	}
32
-	if addr, ok := b.v4Addresses[dcIdx]; ok && len(addr) > 0 {
33
-		addrs = append(addrs, addr[rand.Intn(len(addr))])
33
+	if proto&mtproto.ConnectionProtocolIPv4 != 0 {
34
+		if addr, ok := b.v4Addresses[dcIdx]; ok && len(addr) > 0 {
35
+			addrs = append(addrs, addr[rand.Intn(len(addr))])
36
+		}
34 37
 	}
35 38
 
36 39
 	for _, addr := range addrs {
37
-		if conn, err := b.dialer.dialRWC(addr); err == nil {
40
+		if conn, err := b.dialer.dialRWC(addr, connID); err == nil {
38 41
 			return conn, err
39 42
 		}
40 43
 	}

+ 21
- 0
utils/read_current_data.go Wyświetl plik

@@ -0,0 +1,21 @@
1
+package utils
2
+
3
+import "io"
4
+
5
+const readCurrentDataBufferSize = 1024 + 1 // + 1 because telegram operates with blocks mod 4
6
+
7
+// ReadCurrentData reads all data from io.Reader which is ready to be read.
8
+func ReadCurrentData(src io.Reader) (rv []byte, err error) {
9
+	buf := make([]byte, readCurrentDataBufferSize)
10
+	n := readCurrentDataBufferSize
11
+
12
+	for n == len(buf) {
13
+		n, err = src.Read(buf)
14
+		if err != nil {
15
+			return nil, err
16
+		}
17
+		rv = append(rv, buf[:n]...)
18
+	}
19
+
20
+	return rv, nil
21
+}

+ 15
- 0
utils/reverse_bytes.go Wyświetl plik

@@ -0,0 +1,15 @@
1
+package utils
2
+
3
+// ReverseBytes is a common slice reverser.
4
+func ReverseBytes(data []byte) []byte {
5
+	dataLen := len(data)
6
+	rv := make([]byte, dataLen)
7
+
8
+	rv[dataLen/2] = data[dataLen/2]
9
+	for i := dataLen/2 - 1; i >= 0; i-- {
10
+		opp := dataLen - i - 1
11
+		rv[i], rv[opp] = data[opp], data[i]
12
+	}
13
+
14
+	return rv
15
+}

+ 15
- 0
utils/uint24.go Wyświetl plik

@@ -0,0 +1,15 @@
1
+package utils
2
+
3
+// Uint24 is a replacement for the absent Go uint24 data type.
4
+// This data type is little endian.
5
+type Uint24 [3]byte
6
+
7
+// ToUint24 converts number to Uint24.
8
+func ToUint24(number uint32) Uint24 {
9
+	return Uint24{byte(number), byte(number >> 8), byte(number >> 16)}
10
+}
11
+
12
+// FromUint24 converts Uint24 to number.
13
+func FromUint24(number Uint24) uint32 {
14
+	return uint32(number[0]) + (uint32(number[1]) << 8) + (uint32(number[2]) << 16)
15
+}

+ 99
- 0
wrappers/blockcipher.go Wyświetl plik

@@ -0,0 +1,99 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/aes"
6
+	"crypto/cipher"
7
+	"net"
8
+
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/utils"
12
+	"github.com/juju/errors"
13
+)
14
+
15
+// BlockCipher is a stream writer which encrypts/decrypts blocks of data
16
+// with AES CBC. This also is buffered reader. It means, that block
17
+// reading is transparent for it, you can assume you are working with
18
+// good old io.Reader.
19
+type BlockCipher struct {
20
+	buf *bytes.Buffer
21
+
22
+	logger    *zap.SugaredLogger
23
+	conn      StreamReadWriteCloser
24
+	encryptor cipher.BlockMode
25
+	decryptor cipher.BlockMode
26
+}
27
+
28
+func (b *BlockCipher) Read(p []byte) (int, error) {
29
+	if b.buf.Len() > 0 {
30
+		return b.flush(p)
31
+	}
32
+
33
+	buf := []byte{}
34
+	for len(buf) == 0 || len(buf)%aes.BlockSize != 0 {
35
+		rv, err := utils.ReadCurrentData(b.conn)
36
+		if err != nil {
37
+			return 0, errors.Annotate(err, "Cannot read from socket")
38
+		}
39
+		buf = append(buf, rv...)
40
+	}
41
+
42
+	b.decryptor.CryptBlocks(buf, buf)
43
+	b.buf.Write(buf)
44
+
45
+	return b.flush(p)
46
+}
47
+
48
+func (b *BlockCipher) flush(p []byte) (int, error) {
49
+	if b.buf.Len() <= len(p) {
50
+		sizeToReturn := b.buf.Len()
51
+		copy(p, b.buf.Bytes())
52
+		b.buf.Reset()
53
+		return sizeToReturn, nil
54
+	}
55
+
56
+	return b.buf.Read(p)
57
+}
58
+
59
+func (b *BlockCipher) Write(p []byte) (int, error) {
60
+	if len(p)%aes.BlockSize > 0 {
61
+		return 0, errors.Errorf("Incorrect block size %d", len(p))
62
+	}
63
+
64
+	encrypted := make([]byte, len(p))
65
+	b.encryptor.CryptBlocks(encrypted, p)
66
+
67
+	return b.conn.Write(encrypted)
68
+}
69
+
70
+// Logger returns an instance of the logger for this wrapper.
71
+func (b *BlockCipher) Logger() *zap.SugaredLogger {
72
+	return b.logger
73
+}
74
+
75
+// LocalAddr returns local address of the underlying net.Conn.
76
+func (b *BlockCipher) LocalAddr() *net.TCPAddr {
77
+	return b.conn.LocalAddr()
78
+}
79
+
80
+// RemoteAddr returns remote address of the underlying net.Conn.
81
+func (b *BlockCipher) RemoteAddr() *net.TCPAddr {
82
+	return b.conn.RemoteAddr()
83
+}
84
+
85
+// Close closes underlying net.Conn.
86
+func (b *BlockCipher) Close() error {
87
+	return b.conn.Close()
88
+}
89
+
90
+// NewBlockCipher creates new instance of BlockCipher based on given data.
91
+func NewBlockCipher(conn StreamReadWriteCloser, encryptor, decryptor cipher.BlockMode) StreamReadWriteCloser {
92
+	return &BlockCipher{
93
+		buf:       &bytes.Buffer{},
94
+		conn:      conn,
95
+		logger:    conn.Logger().Named("block-cipher"),
96
+		encryptor: encryptor,
97
+		decryptor: decryptor,
98
+	}
99
+}

+ 0
- 27
wrappers/buffer_pool.go Wyświetl plik

@@ -1,27 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"bytes"
5
-	"sync"
6
-)
7
-
8
-var bufPool sync.Pool
9
-
10
-func getBuffer() *bytes.Buffer {
11
-	buf := bufPool.Get().(*bytes.Buffer)
12
-	buf.Reset()
13
-
14
-	return buf
15
-}
16
-
17
-func putBuffer(buf *bytes.Buffer) {
18
-	bufPool.Put(buf)
19
-}
20
-
21
-func init() {
22
-	bufPool = sync.Pool{
23
-		New: func() interface{} {
24
-			return &bytes.Buffer{}
25
-		},
26
-	}
27
-}

+ 121
- 0
wrappers/conn.go Wyświetl plik

@@ -0,0 +1,121 @@
1
+package wrappers
2
+
3
+import (
4
+	"net"
5
+	"time"
6
+
7
+	"go.uber.org/zap"
8
+
9
+	"github.com/9seconds/mtg/stats"
10
+)
11
+
12
+// ConnPurpose is intented to be identifier of connection purpose. We
13
+// sometimes want to treat client/telegram connection differently (for
14
+// logging for example).
15
+type ConnPurpose uint8
16
+
17
+func (c ConnPurpose) String() string {
18
+	switch c {
19
+	case ConnPurposeClient:
20
+		return "client"
21
+	case ConnPurposeTelegram:
22
+		return "telegram"
23
+	}
24
+
25
+	return ""
26
+}
27
+
28
+// ConnPurpose* define different connection types.
29
+const (
30
+	ConnPurposeClient = iota
31
+	ConnPurposeTelegram
32
+)
33
+
34
+const (
35
+	connTimeoutRead  = 2 * time.Minute
36
+	connTimeoutWrite = 2 * time.Minute
37
+)
38
+
39
+// Conn is a basic wrapper for net.Conn providing the most low-level
40
+// logic and management as possible.
41
+type Conn struct {
42
+	connID string
43
+	conn   net.Conn
44
+	logger *zap.SugaredLogger
45
+
46
+	publicIPv4 net.IP
47
+	publicIPv6 net.IP
48
+}
49
+
50
+func (c *Conn) Write(p []byte) (int, error) {
51
+	c.conn.SetWriteDeadline(time.Now().Add(connTimeoutWrite)) // nolint: errcheck
52
+	n, err := c.conn.Write(p)
53
+
54
+	c.logger.Debugw("Write to stream", "bytes", n, "error", err)
55
+	stats.EgressTraffic(n)
56
+
57
+	return n, err
58
+}
59
+
60
+func (c *Conn) Read(p []byte) (int, error) {
61
+	c.conn.SetReadDeadline(time.Now().Add(connTimeoutRead)) // nolint: errcheck
62
+	n, err := c.conn.Read(p)
63
+
64
+	c.logger.Debugw("Read from stream", "bytes", n, "error", err)
65
+	stats.IngressTraffic(n)
66
+
67
+	return n, err
68
+}
69
+
70
+// Close closes underlying net.Conn instance.
71
+func (c *Conn) Close() error {
72
+	defer c.logger.Debugw("Close connection")
73
+	return c.conn.Close()
74
+}
75
+
76
+// Logger returns an instance of the logger for this wrapper.
77
+func (c *Conn) Logger() *zap.SugaredLogger {
78
+	return c.logger
79
+}
80
+
81
+// LocalAddr returns local address of the underlying net.Conn.
82
+func (c *Conn) LocalAddr() *net.TCPAddr {
83
+	addr := c.conn.LocalAddr().(*net.TCPAddr)
84
+	newAddr := *addr
85
+
86
+	if c.RemoteAddr().IP.To4() != nil {
87
+		if c.publicIPv4 != nil {
88
+			newAddr.IP = c.publicIPv4
89
+		}
90
+	} else if c.publicIPv6 != nil {
91
+		newAddr.IP = c.publicIPv6
92
+	}
93
+
94
+	return &newAddr
95
+}
96
+
97
+// RemoteAddr returns remote address of the underlying net.Conn.
98
+func (c *Conn) RemoteAddr() *net.TCPAddr {
99
+	return c.conn.RemoteAddr().(*net.TCPAddr)
100
+}
101
+
102
+// NewConn initializes Conn wrapper for net.Conn.
103
+func NewConn(conn net.Conn, connID string, purpose ConnPurpose, publicIPv4, publicIPv6 net.IP) StreamReadWriteCloser {
104
+	logger := zap.S().With(
105
+		"connection_id", connID,
106
+		"local_address", conn.LocalAddr(),
107
+		"remote_address", conn.RemoteAddr(),
108
+		"purpose", purpose,
109
+	).Named("conn")
110
+
111
+	wrapper := Conn{
112
+		logger:     logger,
113
+		connID:     connID,
114
+		conn:       conn,
115
+		publicIPv4: publicIPv4,
116
+		publicIPv6: publicIPv6,
117
+	}
118
+	wrapper.logger = logger.With("faked_local_addr", wrapper.LocalAddr())
119
+
120
+	return &wrapper
121
+}

+ 0
- 59
wrappers/ctxrwc.go Wyświetl plik

@@ -1,59 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-
7
-	"github.com/juju/errors"
8
-)
9
-
10
-// CtxReadWriteCloser wraps underlying connection and does management of the
11
-// context and its cancel function.
12
-type CtxReadWriteCloser struct {
13
-	ctx    context.Context
14
-	conn   io.ReadWriteCloser
15
-	cancel context.CancelFunc
16
-}
17
-
18
-// Read reads from connection
19
-func (c *CtxReadWriteCloser) Read(p []byte) (int, error) {
20
-	select {
21
-	case <-c.ctx.Done():
22
-		return 0, errors.Annotate(c.ctx.Err(), "Read is failed because of closed context")
23
-	default:
24
-		n, err := c.conn.Read(p)
25
-		if err != nil {
26
-			c.cancel()
27
-		}
28
-		return n, err
29
-	}
30
-}
31
-
32
-// Write writes into connection.
33
-func (c *CtxReadWriteCloser) Write(p []byte) (int, error) {
34
-	select {
35
-	case <-c.ctx.Done():
36
-		return 0, errors.Annotate(c.ctx.Err(), "Write is failed because of closed context")
37
-	default:
38
-		n, err := c.conn.Write(p)
39
-		if err != nil {
40
-			c.cancel()
41
-		}
42
-		return n, err
43
-	}
44
-}
45
-
46
-// Close closes underlying connection.
47
-func (c *CtxReadWriteCloser) Close() error {
48
-	return c.conn.Close()
49
-}
50
-
51
-// NewCtxRWC returns ReadWriteCloser which respects given context,
52
-// cancellation etc.
53
-func NewCtxRWC(ctx context.Context, cancel context.CancelFunc, conn io.ReadWriteCloser) io.ReadWriteCloser {
54
-	return &CtxReadWriteCloser{
55
-		conn:   conn,
56
-		ctx:    ctx,
57
-		cancel: cancel,
58
-	}
59
-}

+ 0
- 47
wrappers/logrwc.go Wyświetl plik

@@ -1,47 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"io"
5
-
6
-	"go.uber.org/zap"
7
-)
8
-
9
-// LogReadWriteCloser adds additional logging for reading/writing. All
10
-// logging is performed for debug mode only.
11
-type LogReadWriteCloser struct {
12
-	conn   io.ReadWriteCloser
13
-	logger *zap.SugaredLogger
14
-	sockid string
15
-	name   string
16
-}
17
-
18
-// Read reads from connection
19
-func (l *LogReadWriteCloser) Read(p []byte) (n int, err error) {
20
-	n, err = l.conn.Read(p)
21
-	l.logger.Debugw("Finish reading", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err)
22
-	return
23
-}
24
-
25
-// Write writes into connection.
26
-func (l *LogReadWriteCloser) Write(p []byte) (n int, err error) {
27
-	n, err = l.conn.Write(p)
28
-	l.logger.Debugw("Finish writing", "name", l.name, "socketid", l.sockid, "nbytes", n, "error", err)
29
-	return
30
-}
31
-
32
-// Close closes underlying connection.
33
-func (l *LogReadWriteCloser) Close() error {
34
-	err := l.conn.Close()
35
-	l.logger.Debugw("Finish closing socket", "name", l.name, "socketid", l.sockid, "error", err)
36
-	return err
37
-}
38
-
39
-// NewLogRWC wraps ReadWriteCloser with logger calls.
40
-func NewLogRWC(conn io.ReadWriteCloser, logger *zap.SugaredLogger, sockid string, name string) io.ReadWriteCloser {
41
-	return &LogReadWriteCloser{
42
-		conn:   conn,
43
-		logger: logger,
44
-		sockid: sockid,
45
-		name:   name,
46
-	}
47
-}

+ 159
- 0
wrappers/mtproto_abridged.go Wyświetl plik

@@ -0,0 +1,159 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"io"
6
+	"net"
7
+
8
+	"github.com/juju/errors"
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/mtproto"
12
+	"github.com/9seconds/mtg/utils"
13
+)
14
+
15
+const (
16
+	mtprotoAbridgedSmallPacketLength = 0x7f
17
+	mtprotoAbridgedQuickAckLength    = 0x80
18
+	mtprotoAbridgedLargePacketLength = 16777216 // 256 ^ 3
19
+)
20
+
21
+// MTProtoAbridged presents abridged connection between client and
22
+// middle proxy.
23
+type MTProtoAbridged struct {
24
+	conn   StreamReadWriteCloser
25
+	opts   *mtproto.ConnectionOpts
26
+	logger *zap.SugaredLogger
27
+
28
+	readCounter  uint32
29
+	writeCounter uint32
30
+}
31
+
32
+func (m *MTProtoAbridged) Read() ([]byte, error) {
33
+	defer func() {
34
+		m.readCounter++
35
+	}()
36
+
37
+	m.logger.Debugw("Read packet",
38
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
39
+		"quick_ack", m.opts.ReadHacks.QuickAck,
40
+		"counter", m.readCounter,
41
+	)
42
+
43
+	buf := &bytes.Buffer{}
44
+	buf.Grow(3)
45
+
46
+	if _, err := io.CopyN(buf, m.conn, 1); err != nil {
47
+		return nil, errors.Annotate(err, "Cannot read message length")
48
+	}
49
+	msgLength := uint32(buf.Bytes()[0])
50
+	buf.Reset()
51
+
52
+	m.logger.Debugw("Packet first byte",
53
+		"byte", msgLength,
54
+		"counter", m.readCounter,
55
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
56
+		"quick_ack", m.opts.ReadHacks.QuickAck,
57
+	)
58
+
59
+	if msgLength >= mtprotoAbridgedQuickAckLength {
60
+		m.opts.ReadHacks.QuickAck = true
61
+		msgLength -= mtprotoAbridgedQuickAckLength
62
+	}
63
+
64
+	if msgLength == mtprotoAbridgedSmallPacketLength {
65
+		if _, err := io.CopyN(buf, m.conn, 3); err != nil {
66
+			return nil, errors.Annotate(err, "Cannot read the correct message length")
67
+		}
68
+		number := utils.Uint24{}
69
+		copy(number[:], buf.Bytes())
70
+		msgLength = utils.FromUint24(number)
71
+	}
72
+	msgLength *= 4
73
+
74
+	m.logger.Debugw("Packet length",
75
+		"length", msgLength,
76
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
77
+		"quick_ack", m.opts.ReadHacks.QuickAck,
78
+		"counter", m.readCounter,
79
+	)
80
+
81
+	buf.Reset()
82
+	buf.Grow(int(msgLength))
83
+	if _, err := io.CopyN(buf, m.conn, int64(msgLength)); err != nil {
84
+		return nil, errors.Annotate(err, "Cannot read message")
85
+	}
86
+
87
+	return buf.Bytes(), nil
88
+}
89
+
90
+func (m *MTProtoAbridged) Write(p []byte) (int, error) {
91
+	defer func() {
92
+		m.writeCounter++
93
+	}()
94
+
95
+	m.logger.Debugw("Write packet",
96
+		"length", len(p),
97
+		"simple_ack", m.opts.WriteHacks.SimpleAck,
98
+		"quick_ack", m.opts.WriteHacks.QuickAck,
99
+		"counter", m.writeCounter,
100
+	)
101
+
102
+	if len(p)%4 != 0 {
103
+		return 0, errors.Errorf("Incorrect packet length %d", len(p))
104
+	}
105
+
106
+	if m.opts.WriteHacks.SimpleAck {
107
+		return m.conn.Write(utils.ReverseBytes(p))
108
+	}
109
+
110
+	packetLength := len(p) / 4
111
+	switch {
112
+	case packetLength < mtprotoAbridgedSmallPacketLength:
113
+		newData := append([]byte{byte(packetLength)}, p...)
114
+		return m.conn.Write(newData)
115
+
116
+	case packetLength < mtprotoAbridgedLargePacketLength:
117
+		length24 := utils.ToUint24(uint32(packetLength))
118
+
119
+		buf := &bytes.Buffer{}
120
+		buf.Grow(1 + 3 + len(p))
121
+
122
+		buf.WriteByte(byte(mtprotoAbridgedSmallPacketLength))
123
+		buf.Write(length24[:])
124
+		buf.Write(p)
125
+
126
+		return m.conn.Write(buf.Bytes())
127
+	}
128
+
129
+	return 0, errors.Errorf("Packet is too big %d", len(p))
130
+}
131
+
132
+// Logger returns an instance of the logger for this wrapper.
133
+func (m *MTProtoAbridged) Logger() *zap.SugaredLogger {
134
+	return m.logger
135
+}
136
+
137
+// LocalAddr returns local address of the underlying net.Conn.
138
+func (m *MTProtoAbridged) LocalAddr() *net.TCPAddr {
139
+	return m.conn.LocalAddr()
140
+}
141
+
142
+// RemoteAddr returns remote address of the underlying net.Conn.
143
+func (m *MTProtoAbridged) RemoteAddr() *net.TCPAddr {
144
+	return m.conn.RemoteAddr()
145
+}
146
+
147
+// Close closes underlying net.Conn instance.
148
+func (m *MTProtoAbridged) Close() error {
149
+	return m.conn.Close()
150
+}
151
+
152
+// NewMTProtoAbridged creates new wrapper for abridged client connection.
153
+func NewMTProtoAbridged(conn StreamReadWriteCloser, opts *mtproto.ConnectionOpts) PacketReadWriteCloser {
154
+	return &MTProtoAbridged{
155
+		conn:   conn,
156
+		opts:   opts,
157
+		logger: conn.Logger().Named("mtproto-abridged"),
158
+	}
159
+}

+ 96
- 0
wrappers/mtproto_cipher.go Wyświetl plik

@@ -0,0 +1,96 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/aes"
6
+	"crypto/cipher"
7
+	"crypto/md5" // nolint: gas
8
+	"crypto/sha1"
9
+	"encoding/binary"
10
+	"net"
11
+
12
+	"github.com/9seconds/mtg/mtproto/rpc"
13
+	"github.com/9seconds/mtg/utils"
14
+)
15
+
16
+type cipherPurpose uint8
17
+
18
+const (
19
+	cipherPurposeClient cipherPurpose = iota
20
+	cipherPurposeServer
21
+)
22
+
23
+var emptyIP = [4]byte{0x00, 0x00, 0x00, 0x00}
24
+
25
+// NewMiddleProxyCipher creates new block cipher to proxy<->telegram
26
+// connection.
27
+func NewMiddleProxyCipher(conn StreamReadWriteCloser, req *rpc.NonceRequest, resp *rpc.NonceResponse, secret []byte) StreamReadWriteCloser {
28
+	localAddr := conn.LocalAddr()
29
+	remoteAddr := conn.RemoteAddr()
30
+
31
+	encKey, encIV := deriveKeys(cipherPurposeClient, req, resp, localAddr, remoteAddr, secret)
32
+	decKey, decIV := deriveKeys(cipherPurposeServer, req, resp, localAddr, remoteAddr, secret)
33
+
34
+	enc, _ := makeEncrypterDecrypter(encKey, encIV)
35
+	_, dec := makeEncrypterDecrypter(decKey, decIV)
36
+
37
+	return NewBlockCipher(conn, enc, dec)
38
+}
39
+
40
+func deriveKeys(purpose cipherPurpose, req *rpc.NonceRequest, resp *rpc.NonceResponse, client *net.TCPAddr, remote *net.TCPAddr, secret []byte) ([]byte, []byte) {
41
+	message := bytes.Buffer{}
42
+	message.Write(resp.Nonce[:])
43
+	message.Write(req.Nonce[:])
44
+	message.Write(req.CryptoTS[:])
45
+
46
+	clientIPv4 := emptyIP[:]
47
+	serverIPv4 := emptyIP[:]
48
+	if client.IP.To4() != nil {
49
+		clientIPv4 = utils.ReverseBytes(client.IP.To4())
50
+		serverIPv4 = utils.ReverseBytes(remote.IP.To4())
51
+	}
52
+	message.Write(serverIPv4)
53
+
54
+	var port [2]byte
55
+	binary.LittleEndian.PutUint16(port[:], uint16(client.Port))
56
+	message.Write(port[:])
57
+
58
+	switch purpose {
59
+	case cipherPurposeClient:
60
+		message.WriteString("CLIENT")
61
+	case cipherPurposeServer:
62
+		message.WriteString("SERVER")
63
+	default:
64
+		panic("Unexpected cipher purpose")
65
+	}
66
+
67
+	message.Write(clientIPv4)
68
+	binary.LittleEndian.PutUint16(port[:], uint16(remote.Port))
69
+	message.Write(port[:])
70
+	message.Write(secret)
71
+	message.Write(resp.Nonce[:])
72
+
73
+	if client.IP.To4() == nil {
74
+		message.Write(client.IP.To16())
75
+		message.Write(remote.IP.To16())
76
+	}
77
+	message.Write(req.Nonce[:])
78
+
79
+	data := message.Bytes()
80
+	md5sum := md5.Sum(data[1:]) // nolint: gas
81
+	sha1sum := sha1.Sum(data)
82
+
83
+	key := append(md5sum[:12], sha1sum[:]...)
84
+	iv := md5.Sum(data[2:]) // nolint: gas
85
+
86
+	return key, iv[:]
87
+}
88
+
89
+func makeEncrypterDecrypter(key, iv []byte) (cipher.BlockMode, cipher.BlockMode) {
90
+	block, err := aes.NewCipher(key)
91
+	if err != nil {
92
+		panic(err)
93
+	}
94
+
95
+	return cipher.NewCBCEncrypter(block, iv), cipher.NewCBCDecrypter(block, iv)
96
+}

+ 160
- 0
wrappers/mtproto_frame.go Wyświetl plik

@@ -0,0 +1,160 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"crypto/aes"
6
+	"encoding/binary"
7
+	"hash/crc32"
8
+	"io"
9
+	"io/ioutil"
10
+	"net"
11
+
12
+	"github.com/juju/errors"
13
+	"go.uber.org/zap"
14
+)
15
+
16
+const (
17
+	mtprotoFrameMinMessageLength = 12
18
+	mtprotoFrameMaxMessageLength = 16777216
19
+)
20
+
21
+var mtprotoFramePadding = []byte{0x04, 0x00, 0x00, 0x00}
22
+
23
+// MTProtoFrame is a wrapper which converts written data to the MTProtoFrame.
24
+// The format of the frame:
25
+//
26
+// [ MSGLEN(4) | SEQNO(4) | MSG(...) | CRC32(4) | PADDING(4*x) ]
27
+//
28
+// MSGLEN is the length of the message + len of seqno and msglen.
29
+// SEQNO is the number of frame in the receive/send sequence. If client
30
+//   sends a message with SeqNo 18, it has to receive message with SeqNo 18.
31
+// MSG is the data which has to be written
32
+// CRC32 is the CRC32 checksum of MSGLEN + SEQNO + MSG
33
+// PADDING is custom padding schema to complete frame length to such that
34
+//    len(frame) % 16 == 0
35
+type MTProtoFrame struct {
36
+	conn   StreamReadWriteCloser
37
+	logger *zap.SugaredLogger
38
+
39
+	readSeqNo  int32
40
+	writeSeqNo int32
41
+}
42
+
43
+func (m *MTProtoFrame) Read() ([]byte, error) { // nolint: gocyclo
44
+	buf := &bytes.Buffer{}
45
+	sum := crc32.NewIEEE()
46
+	writer := io.MultiWriter(buf, sum)
47
+
48
+	for {
49
+		buf.Reset()
50
+		sum.Reset()
51
+		if _, err := io.CopyN(writer, m.conn, 4); err != nil {
52
+			return nil, errors.Annotate(err, "Cannot read frame padding")
53
+		}
54
+		if !bytes.Equal(buf.Bytes(), mtprotoFramePadding) {
55
+			break
56
+		}
57
+	}
58
+
59
+	messageLength := binary.LittleEndian.Uint32(buf.Bytes())
60
+	m.logger.Debugw("Read MTProto frame",
61
+		"messageLength", messageLength,
62
+		"sequence_number", m.readSeqNo,
63
+	)
64
+	if messageLength%4 != 0 || messageLength < mtprotoFrameMinMessageLength || messageLength > mtprotoFrameMaxMessageLength {
65
+		return nil, errors.Errorf("Incorrect frame message length %d", messageLength)
66
+	}
67
+
68
+	buf.Reset()
69
+	buf.Grow(int(messageLength) - 4 - 4)
70
+	if _, err := io.CopyN(writer, m.conn, int64(messageLength)-4-4); err != nil {
71
+		return nil, errors.Annotate(err, "Cannot read the message frame")
72
+	}
73
+
74
+	var seqNo int32
75
+	binary.Read(buf, binary.LittleEndian, &seqNo) // nolint: errcheck
76
+	if seqNo != m.readSeqNo {
77
+		return nil, errors.Errorf("Unexpected sequence number %d (wait for %d)", seqNo, m.readSeqNo)
78
+	}
79
+
80
+	data, _ := ioutil.ReadAll(buf)
81
+	buf.Reset()
82
+	// write to buf, not to writer. This is because we are going to fetch
83
+	// crc32 checksum.
84
+	if _, err := io.CopyN(buf, m.conn, 4); err != nil {
85
+		return nil, errors.Annotate(err, "Cannot read checksum")
86
+	}
87
+
88
+	checksum := binary.LittleEndian.Uint32(buf.Bytes())
89
+	if checksum != sum.Sum32() {
90
+		return nil, errors.Errorf("CRC32 checksum mismatch. Wait for %d, got %d", sum.Sum32(), checksum)
91
+	}
92
+
93
+	m.logger.Debugw("Read MTProto frame",
94
+		"messageLength", messageLength,
95
+		"sequence_number", m.readSeqNo,
96
+		"dataLength", len(data),
97
+		"checksum", checksum,
98
+	)
99
+	m.readSeqNo++
100
+
101
+	return data, nil
102
+}
103
+
104
+func (m *MTProtoFrame) Write(p []byte) (int, error) {
105
+	messageLength := 4 + 4 + len(p) + 4
106
+	paddingLength := (aes.BlockSize - messageLength%aes.BlockSize) % aes.BlockSize
107
+
108
+	buf := &bytes.Buffer{}
109
+	buf.Grow(messageLength + paddingLength)
110
+
111
+	binary.Write(buf, binary.LittleEndian, uint32(messageLength)) // nolint: errcheck
112
+	binary.Write(buf, binary.LittleEndian, m.writeSeqNo)          // nolint: errcheck
113
+	buf.Write(p)
114
+
115
+	checksum := crc32.ChecksumIEEE(buf.Bytes())
116
+	binary.Write(buf, binary.LittleEndian, checksum) // nolint: errcheck
117
+	buf.Write(bytes.Repeat(mtprotoFramePadding, paddingLength/4))
118
+
119
+	m.logger.Debugw("Write MTProto frame",
120
+		"length", len(p),
121
+		"sequence_number", m.writeSeqNo,
122
+		"crc32", checksum,
123
+		"frame_length", buf.Len(),
124
+	)
125
+	m.writeSeqNo++
126
+
127
+	_, err := m.conn.Write(buf.Bytes())
128
+
129
+	return len(p), err
130
+}
131
+
132
+// Logger returns an instance of the logger for this wrapper.
133
+func (m *MTProtoFrame) Logger() *zap.SugaredLogger {
134
+	return m.logger
135
+}
136
+
137
+// LocalAddr returns local address of the underlying net.Conn.
138
+func (m *MTProtoFrame) LocalAddr() *net.TCPAddr {
139
+	return m.conn.LocalAddr()
140
+}
141
+
142
+// RemoteAddr returns remote address of the underlying net.Conn.
143
+func (m *MTProtoFrame) RemoteAddr() *net.TCPAddr {
144
+	return m.conn.RemoteAddr()
145
+}
146
+
147
+// Close closes underlying net.Conn instance.
148
+func (m *MTProtoFrame) Close() error {
149
+	return m.conn.Close()
150
+}
151
+
152
+// NewMTProtoFrame creates new PacketWrapper for underlying connection.
153
+func NewMTProtoFrame(conn StreamReadWriteCloser, seqNo int32) PacketReadWriteCloser {
154
+	return &MTProtoFrame{
155
+		conn:       conn,
156
+		logger:     conn.Logger().Named("mtproto-frame"),
157
+		readSeqNo:  seqNo,
158
+		writeSeqNo: seqNo,
159
+	}
160
+}

+ 121
- 0
wrappers/mtproto_intermediate.go Wyświetl plik

@@ -0,0 +1,121 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"encoding/binary"
6
+	"io"
7
+	"net"
8
+
9
+	"github.com/juju/errors"
10
+	"go.uber.org/zap"
11
+
12
+	"github.com/9seconds/mtg/mtproto"
13
+)
14
+
15
+const mtprotoIntermediateQuickAckLength = 0x80000000
16
+
17
+// MTProtoIntermediate presents intermediate connection between client
18
+// and Telegram.
19
+type MTProtoIntermediate struct {
20
+	conn   StreamReadWriteCloser
21
+	opts   *mtproto.ConnectionOpts
22
+	logger *zap.SugaredLogger
23
+
24
+	readCounter  uint32
25
+	writeCounter uint32
26
+}
27
+
28
+func (m *MTProtoIntermediate) Read() ([]byte, error) {
29
+	defer func() {
30
+		m.readCounter++
31
+	}()
32
+
33
+	m.logger.Debugw("Read packet",
34
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
35
+		"quick_ack", m.opts.ReadHacks.QuickAck,
36
+		"counter", m.readCounter,
37
+	)
38
+
39
+	buf := &bytes.Buffer{}
40
+	buf.Grow(4)
41
+
42
+	if _, err := io.CopyN(buf, m.conn, 4); err != nil {
43
+		return nil, errors.Annotate(err, "Cannot read message length")
44
+	}
45
+	length := binary.LittleEndian.Uint32(buf.Bytes())
46
+
47
+	m.logger.Debugw("Packet message length",
48
+		"simple_ack", m.opts.ReadHacks.SimpleAck,
49
+		"quick_ack", m.opts.ReadHacks.QuickAck,
50
+		"counter", m.readCounter,
51
+		"length", length,
52
+	)
53
+
54
+	if length > mtprotoIntermediateQuickAckLength {
55
+		m.opts.ReadHacks.QuickAck = true
56
+		length -= mtprotoIntermediateQuickAckLength
57
+	}
58
+
59
+	buf.Reset()
60
+	buf.Grow(int(length))
61
+	if _, err := io.CopyN(buf, m.conn, int64(length)); err != nil {
62
+		return nil, errors.Annotate(err, "Cannot read the message")
63
+	}
64
+
65
+	if length%4 != 0 {
66
+		length -= length % 4
67
+	}
68
+
69
+	return buf.Bytes()[:length], nil
70
+}
71
+
72
+func (m *MTProtoIntermediate) Write(p []byte) (int, error) {
73
+	defer func() {
74
+		m.writeCounter++
75
+	}()
76
+
77
+	m.logger.Debugw("Write packet",
78
+		"simple_ack", m.opts.WriteHacks.SimpleAck,
79
+		"quick_ack", m.opts.WriteHacks.QuickAck,
80
+		"counter", m.writeCounter,
81
+	)
82
+
83
+	if m.opts.ReadHacks.SimpleAck {
84
+		return m.conn.Write(p)
85
+	}
86
+
87
+	var length [4]byte
88
+	binary.LittleEndian.PutUint32(length[:], uint32(len(p)))
89
+
90
+	return m.conn.Write(append(length[:], p...))
91
+}
92
+
93
+// Logger returns an instance of the logger for this wrapper.
94
+func (m *MTProtoIntermediate) Logger() *zap.SugaredLogger {
95
+	return m.logger
96
+}
97
+
98
+// LocalAddr returns local address of the underlying net.Conn.
99
+func (m *MTProtoIntermediate) LocalAddr() *net.TCPAddr {
100
+	return m.conn.LocalAddr()
101
+}
102
+
103
+// RemoteAddr returns remote address of the underlying net.Conn.
104
+func (m *MTProtoIntermediate) RemoteAddr() *net.TCPAddr {
105
+	return m.conn.RemoteAddr()
106
+}
107
+
108
+// Close closes underlying net.Conn instance.
109
+func (m *MTProtoIntermediate) Close() error {
110
+	return m.conn.Close()
111
+}
112
+
113
+// NewMTProtoIntermediate creates new PacketWrapper for intermediate
114
+// client connection.
115
+func NewMTProtoIntermediate(conn StreamReadWriteCloser, opts *mtproto.ConnectionOpts) PacketReadWriteCloser {
116
+	return &MTProtoIntermediate{
117
+		conn:   conn,
118
+		logger: conn.Logger().Named("mtproto-intermediate"),
119
+		opts:   opts,
120
+	}
121
+}

+ 164
- 0
wrappers/mtproto_proxy.go Wyświetl plik

@@ -0,0 +1,164 @@
1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"fmt"
6
+	"net"
7
+
8
+	"github.com/juju/errors"
9
+	"go.uber.org/zap"
10
+
11
+	"github.com/9seconds/mtg/mtproto"
12
+	"github.com/9seconds/mtg/mtproto/rpc"
13
+)
14
+
15
+// MTProtoProxy is a wrapper which creates/reads RPC responses from Telegram.
16
+type MTProtoProxy struct {
17
+	conn   PacketReadWriteCloser
18
+	req    *rpc.ProxyRequest
19
+	logger *zap.SugaredLogger
20
+
21
+	readCounter  uint32
22
+	writeCounter uint32
23
+}
24
+
25
+func (m *MTProtoProxy) Read() ([]byte, error) {
26
+	defer func() {
27
+		m.readCounter++
28
+	}()
29
+
30
+	m.logger.Debugw("Read packet",
31
+		"counter", m.readCounter,
32
+		"simple_ack", m.req.Options.WriteHacks.SimpleAck,
33
+		"quick_ack", m.req.Options.WriteHacks.QuickAck,
34
+	)
35
+
36
+	packet, err := m.conn.Read()
37
+	if err != nil {
38
+		return nil, errors.Annotate(err, "Cannot read packet")
39
+	}
40
+
41
+	m.logger.Debugw("Read packet length",
42
+		"counter", m.readCounter,
43
+		"simple_ack", m.req.Options.WriteHacks.SimpleAck,
44
+		"quick_ack", m.req.Options.WriteHacks.QuickAck,
45
+		"length", len(packet),
46
+	)
47
+
48
+	if len(packet) < 4 {
49
+		return nil, errors.Annotate(err, "Incorrect packet length")
50
+	}
51
+
52
+	tag, packet := packet[:4], packet[4:]
53
+	switch {
54
+	case bytes.Equal(tag, rpc.TagProxyAns):
55
+		return m.readProxyAns(packet)
56
+	case bytes.Equal(tag, rpc.TagSimpleAck):
57
+		return m.readSimpleAck(packet)
58
+	case bytes.Equal(tag, rpc.TagCloseExt):
59
+		return m.readCloseExt(packet)
60
+	}
61
+
62
+	return nil, errors.Errorf("Unknown RPC answer %v", tag)
63
+}
64
+
65
+func (m *MTProtoProxy) readProxyAns(data []byte) ([]byte, error) {
66
+	if len(data) < 12 {
67
+		return nil, errors.Errorf("Incorrect data of proxy answer: %d", len(data))
68
+	}
69
+	data = data[12:]
70
+
71
+	m.logger.Debugw("Read RPC_PROXY_ANS",
72
+		"counter", m.readCounter,
73
+		"length", len(data),
74
+	)
75
+
76
+	return data, nil
77
+}
78
+
79
+func (m *MTProtoProxy) readSimpleAck(data []byte) ([]byte, error) {
80
+	if len(data) != 12 {
81
+		return nil, errors.Errorf("Incorrect data of simple ack: %d", len(data))
82
+	}
83
+	data = data[8:12]
84
+	m.req.Options.WriteHacks.SimpleAck = true
85
+
86
+	m.logger.Debugw("Read RPC_SIMPLE_ACK",
87
+		"counter", m.readCounter,
88
+		"length", len(data),
89
+	)
90
+
91
+	return data, nil
92
+}
93
+
94
+func (m *MTProtoProxy) readCloseExt(data []byte) ([]byte, error) {
95
+	m.logger.Debugw("Read RPC_CLOSE_EXT", "counter", m.readCounter)
96
+
97
+	return nil, errors.New("Connection has been closed remotely by RPC call")
98
+}
99
+
100
+func (m *MTProtoProxy) Write(p []byte) (int, error) {
101
+	defer func() {
102
+		m.writeCounter++
103
+	}()
104
+
105
+	m.logger.Debugw("Write packet",
106
+		"length", len(p),
107
+		"counter", m.writeCounter,
108
+		"simple_ack", m.req.Options.ReadHacks.SimpleAck,
109
+		"quick_ack", m.req.Options.ReadHacks.QuickAck,
110
+	)
111
+
112
+	header, flags := m.req.MakeHeader(p)
113
+	if ce := m.logger.Desugar().Check(zap.DebugLevel, "RPC_PROXY_REQ header"); ce != nil {
114
+		ce.Write(
115
+			zap.Int("length", len(p)),
116
+			zap.Uint32("counter", m.writeCounter),
117
+			zap.Bool("simple_ack", m.req.Options.ReadHacks.QuickAck),
118
+			zap.Bool("quick_ack", m.req.Options.ReadHacks.SimpleAck),
119
+			zap.String("header", fmt.Sprintf("%v", header.Bytes())),
120
+			zap.Stringer("flags", flags),
121
+		)
122
+	}
123
+	header.Write(p)
124
+
125
+	if _, err := m.conn.Write(header.Bytes()); err != nil {
126
+		return 0, err
127
+	}
128
+
129
+	return len(p), nil
130
+}
131
+
132
+// Logger returns an instance of the logger for this wrapper.
133
+func (m *MTProtoProxy) Logger() *zap.SugaredLogger {
134
+	return m.logger
135
+}
136
+
137
+// LocalAddr returns local address of the underlying net.Conn.
138
+func (m *MTProtoProxy) LocalAddr() *net.TCPAddr {
139
+	return m.conn.LocalAddr()
140
+}
141
+
142
+// RemoteAddr returns remote address of the underlying net.Conn.
143
+func (m *MTProtoProxy) RemoteAddr() *net.TCPAddr {
144
+	return m.conn.RemoteAddr()
145
+}
146
+
147
+// Close closes underlying net.Conn instance.
148
+func (m *MTProtoProxy) Close() error {
149
+	return m.conn.Close()
150
+}
151
+
152
+// NewMTProtoProxy creates new RPC wrapper.
153
+func NewMTProtoProxy(conn PacketReadWriteCloser, connOpts *mtproto.ConnectionOpts, adTag []byte) (PacketReadWriteCloser, error) {
154
+	req, err := rpc.NewProxyRequest(connOpts.ClientAddr, conn.LocalAddr(), connOpts, adTag)
155
+	if err != nil {
156
+		return nil, errors.Annotate(err, "Cannot create new RPC proxy request")
157
+	}
158
+
159
+	return &MTProtoProxy{
160
+		conn:   conn,
161
+		logger: conn.Logger().Named("mtproto-proxy"),
162
+		req:    req,
163
+	}, nil
164
+}

+ 65
- 0
wrappers/streamcipher.go Wyświetl plik

@@ -0,0 +1,65 @@
1
+package wrappers
2
+
3
+import (
4
+	"crypto/cipher"
5
+	"net"
6
+
7
+	"github.com/juju/errors"
8
+	"go.uber.org/zap"
9
+)
10
+
11
+// StreamCipher is a wrapper which encrypts/decrypts stream with AES-CTR
12
+// (as a part of obfuscated2 protocol).
13
+type StreamCipher struct {
14
+	encryptor cipher.Stream
15
+	decryptor cipher.Stream
16
+	conn      StreamReadWriteCloser
17
+	logger    *zap.SugaredLogger
18
+}
19
+
20
+func (s *StreamCipher) Read(p []byte) (int, error) {
21
+	n, err := s.conn.Read(p)
22
+	if err != nil {
23
+		return 0, errors.Annotate(err, "Cannot read stream ciphered data")
24
+	}
25
+	s.decryptor.XORKeyStream(p, p[:n])
26
+
27
+	return n, nil
28
+}
29
+
30
+func (s *StreamCipher) Write(p []byte) (int, error) {
31
+	encrypted := make([]byte, len(p))
32
+	s.encryptor.XORKeyStream(encrypted, p)
33
+
34
+	return s.conn.Write(encrypted)
35
+}
36
+
37
+// Logger returns an instance of the logger for this wrapper.
38
+func (s *StreamCipher) Logger() *zap.SugaredLogger {
39
+	return s.logger
40
+}
41
+
42
+// LocalAddr returns local address of the underlying net.Conn.
43
+func (s *StreamCipher) LocalAddr() *net.TCPAddr {
44
+	return s.conn.LocalAddr()
45
+}
46
+
47
+// RemoteAddr returns remote address of the underlying net.Conn.
48
+func (s *StreamCipher) RemoteAddr() *net.TCPAddr {
49
+	return s.conn.RemoteAddr()
50
+}
51
+
52
+// Close closes underlying net.Conn instance.
53
+func (s *StreamCipher) Close() error {
54
+	return s.conn.Close()
55
+}
56
+
57
+// NewStreamCipher creates new stream cipher wrapper.
58
+func NewStreamCipher(conn StreamReadWriteCloser, encryptor, decryptor cipher.Stream) StreamReadWriteCloser {
59
+	return &StreamCipher{
60
+		conn:      conn,
61
+		logger:    conn.Logger().Named("stream-cipher"),
62
+		encryptor: encryptor,
63
+		decryptor: decryptor,
64
+	}
65
+}

+ 0
- 54
wrappers/streamcipherrwc.go Wyświetl plik

@@ -1,54 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"crypto/cipher"
5
-	"io"
6
-)
7
-
8
-// StreamCipherReadWriteCloser is a ReadWriteCloser which ciphers
9
-// incoming and outgoing data with givem cipher.Stream instances.
10
-type StreamCipherReadWriteCloser struct {
11
-	encryptor cipher.Stream
12
-	decryptor cipher.Stream
13
-	conn      io.ReadWriteCloser
14
-}
15
-
16
-// Read reads from connection
17
-func (c *StreamCipherReadWriteCloser) Read(p []byte) (n int, err error) {
18
-	n, err = c.conn.Read(p)
19
-	c.decryptor.XORKeyStream(p, p[:n])
20
-	return
21
-}
22
-
23
-// Write writes into connection.
24
-func (c *StreamCipherReadWriteCloser) Write(p []byte) (int, error) {
25
-	// This is to decrease an amount of allocations. Unfortunately, escape
26
-	// analysis in (at least Golang 1.10) is absolutely not perfect. For
27
-	// example, it understands that we want to have a slice locally, right?
28
-	// But since slice is effectively 2 ints + uintptr to [number]byte, the
29
-	// most heavyweight part is placed in heap.
30
-	buf := getBuffer()
31
-	defer putBuffer(buf)
32
-	buf.Grow(len(p))
33
-	buf.Write(p)
34
-
35
-	encrypted := buf.Bytes()
36
-	c.encryptor.XORKeyStream(encrypted, p)
37
-
38
-	return c.conn.Write(encrypted)
39
-}
40
-
41
-// Close closes underlying connection.
42
-func (c *StreamCipherReadWriteCloser) Close() error {
43
-	return c.conn.Close()
44
-}
45
-
46
-// NewStreamCipherRWC returns wrapper which transparently
47
-// encrypts/decrypts traffic with obfuscated2 protocol.
48
-func NewStreamCipherRWC(conn io.ReadWriteCloser, encryptor, decryptor cipher.Stream) io.ReadWriteCloser {
49
-	return &StreamCipherReadWriteCloser{
50
-		conn:      conn,
51
-		encryptor: encryptor,
52
-		decryptor: decryptor,
53
-	}
54
-}

+ 0
- 39
wrappers/trafficrwc.go Wyświetl plik

@@ -1,39 +0,0 @@
1
-package wrappers
2
-
3
-import "io"
4
-
5
-// TrafficReadWriteCloser counts an amount of ingress/egress traffic by
6
-// calling given callbacks.
7
-type TrafficReadWriteCloser struct {
8
-	conn          io.ReadWriteCloser
9
-	readCallback  func(int)
10
-	writeCallback func(int)
11
-}
12
-
13
-// Read reads from connection
14
-func (t *TrafficReadWriteCloser) Read(p []byte) (n int, err error) {
15
-	n, err = t.conn.Read(p)
16
-	t.readCallback(n)
17
-	return
18
-}
19
-
20
-// Write writes into connection.
21
-func (t *TrafficReadWriteCloser) Write(p []byte) (n int, err error) {
22
-	n, err = t.conn.Write(p)
23
-	t.writeCallback(n)
24
-	return
25
-}
26
-
27
-// Close closes underlying connection.
28
-func (t *TrafficReadWriteCloser) Close() error {
29
-	return t.conn.Close()
30
-}
31
-
32
-// NewTrafficRWC wraps ReadWriteCloser to have read/write callbacks.
33
-func NewTrafficRWC(conn io.ReadWriteCloser, readCallback, writeCallback func(int)) io.ReadWriteCloser {
34
-	return &TrafficReadWriteCloser{
35
-		conn:          conn,
36
-		readCallback:  readCallback,
37
-		writeCallback: writeCallback,
38
-	}
39
-}

+ 111
- 0
wrappers/wrap.go Wyświetl plik

@@ -0,0 +1,111 @@
1
+package wrappers
2
+
3
+import (
4
+	"io"
5
+	"net"
6
+
7
+	"go.uber.org/zap"
8
+)
9
+
10
+// Wrap is a base interface for all wrappers in this package.
11
+type Wrap interface {
12
+	Logger() *zap.SugaredLogger
13
+	LocalAddr() *net.TCPAddr
14
+	RemoteAddr() *net.TCPAddr
15
+}
16
+
17
+// Writer is a base interface for writers of this package.
18
+type Writer interface {
19
+	io.Writer
20
+	Wrap
21
+}
22
+
23
+// Closer is a base interface for wrappers of this package which can
24
+// close connections.
25
+type Closer interface {
26
+	io.Closer
27
+	Wrap
28
+}
29
+
30
+// WriteCloser is a base interface for wrappers of this package which
31
+// can write to and close connections.
32
+type WriteCloser interface {
33
+	io.Closer
34
+	Writer
35
+}
36
+
37
+// StreamReader is a base interface for wrappers which can read from the
38
+// stream.
39
+type StreamReader interface {
40
+	io.Reader
41
+	Wrap
42
+}
43
+
44
+// StreamReadCloser is a base interface for wrappers which can read from
45
+// and close the connections.
46
+type StreamReadCloser interface {
47
+	io.Closer
48
+	StreamReader
49
+}
50
+
51
+// StreamReadWriter is a base interface for wrappers which can read from
52
+// and write to the connections.
53
+type StreamReadWriter interface {
54
+	io.Writer
55
+	StreamReader
56
+}
57
+
58
+// StreamWriteCloser is a base interface for wrappers which can write to
59
+// and close the connections.
60
+type StreamWriteCloser interface {
61
+	io.WriteCloser
62
+	Wrap
63
+}
64
+
65
+// StreamReadWriteCloser is a base interface for stream processors.
66
+type StreamReadWriteCloser interface {
67
+	io.Closer
68
+	StreamReadWriter
69
+}
70
+
71
+// PacketReader is a base interface for wrappers which reads 'packets'.
72
+// packets are atoms so you either get a packet or you get an error You
73
+// cannot resume reading from packet.
74
+type PacketReader interface {
75
+	Read() ([]byte, error)
76
+	Wrap
77
+}
78
+
79
+// PacketWriter is a base interface for wrappers which can write packets.
80
+type PacketWriter interface {
81
+	io.Writer
82
+	Wrap
83
+}
84
+
85
+// PacketReadWriter is a base interface for wrappers which can read from
86
+// and write packets.
87
+type PacketReadWriter interface {
88
+	io.Writer
89
+	PacketReader
90
+}
91
+
92
+// PacketReadCloser is a base interface for wrappers which can read
93
+// packets and close the connection.
94
+type PacketReadCloser interface {
95
+	io.Closer
96
+	PacketReader
97
+}
98
+
99
+// PacketWriteCloser is a base interface for wrappers which can write
100
+// packets and close the connection.
101
+type PacketWriteCloser interface {
102
+	io.Writer
103
+	io.Closer
104
+	Wrap
105
+}
106
+
107
+// PacketReadWriteCloser is a base interface for packet processors.
108
+type PacketReadWriteCloser interface {
109
+	io.Closer
110
+	PacketReadWriter
111
+}

Ładowanie…
Anuluj
Zapisz