Преглед изворни кода

Refactored all the things!

tags/0.9
9seconds пре 7 година
родитељ
комит
f82ff1f6fe

+ 0
- 12
client/client.go Прегледај датотеку

@@ -1,12 +0,0 @@
1
-package client
2
-
3
-import (
4
-	"net"
5
-
6
-	"github.com/9seconds/mtg/config"
7
-	"github.com/9seconds/mtg/mtproto"
8
-	"github.com/9seconds/mtg/wrappers"
9
-)
10
-
11
-// Init has to initialize client connection based on given config.
12
-type Init func(net.Conn, string, *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error)

+ 12
- 10
client/direct.go Прегледај датотеку

@@ -1,6 +1,7 @@
1 1
 package client
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"net"
5 6
 	"time"
6 7
 
@@ -14,28 +15,29 @@ import (
14 15
 
15 16
 const handshakeTimeout = 10 * time.Second
16 17
 
17
-// DirectInit initializes client to access Telegram bypassing middleproxies.
18
-func DirectInit(conn net.Conn, socketID string, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
19
-	if err := config.SetSocketOptions(conn); err != nil {
18
+func DirectInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn, connID string,
19
+	conf *config.Config) (wrappers.WrapStreamReadWriteCloser, *mtproto.ConnectionOpts, error) {
20
+	if err := config.SetSocketOptions(socket); err != nil {
20 21
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
21 22
 	}
22 23
 
23
-	conn.SetReadDeadline(time.Now().Add(handshakeTimeout)) // nolint: errcheck
24
-	frame, err := obfuscated2.ExtractFrame(conn)
25
-	conn.SetReadDeadline(time.Time{}) // nolint: errcheck
24
+	socket.SetReadDeadline(time.Now().Add(handshakeTimeout))
25
+	frame, err := obfuscated2.ExtractFrame(socket)
26 26
 	if err != nil {
27 27
 		return nil, nil, errors.Annotate(err, "Cannot extract frame")
28 28
 	}
29
+	socket.SetReadDeadline(time.Time{})
30
+	conn := wrappers.NewConn(socket, connID, wrappers.ConnPurposeClient, conf.PublicIPv4, conf.PublicIPv6)
29 31
 
30 32
 	obfs2, connOpts, err := obfuscated2.ParseObfuscated2ClientFrame(conf.Secret, frame)
31 33
 	if err != nil {
32 34
 		return nil, nil, errors.Annotate(err, "Cannot parse obfuscated frame")
33 35
 	}
34 36
 	connOpts.ConnectionProto = mtproto.ConnectionProtocolAny
35
-	connOpts.ClientAddr = conn.RemoteAddr().(*net.TCPAddr)
37
+	connOpts.ClientAddr = conn.RemoteAddr()
36 38
 
37
-	socket := wrappers.NewTimeoutRWC(conn, socketID, conf.PublicIPv4, conf.PublicIPv6)
38
-	socket = wrappers.NewStreamCipherRWC(socket, obfs2.Encryptor, obfs2.Decryptor)
39
+	conn = wrappers.NewCtx(ctx, cancel, conn)
40
+	conn = wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor)
39 41
 
40
-	return socket, connOpts, nil
42
+	return conn, connOpts, nil
41 43
 }

+ 9
- 9
client/middle.go Прегледај датотеку

@@ -1,30 +1,30 @@
1 1
 package client
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"net"
5 6
 
6 7
 	"github.com/9seconds/mtg/config"
7 8
 	"github.com/9seconds/mtg/mtproto"
8
-	mtwrappers "github.com/9seconds/mtg/mtproto/wrappers"
9 9
 	"github.com/9seconds/mtg/wrappers"
10 10
 )
11 11
 
12
-func MiddleInit(conn net.Conn, socketID string, conf *config.Config) (wrappers.ReadWriteCloserWithAddr, *mtproto.ConnectionOpts, error) {
13
-	newConn, opts, err := DirectInit(conn, socketID, conf)
12
+func MiddleInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn, connID string,
13
+	conf *config.Config) (wrappers.WrapPacketReadWriteCloser, *mtproto.ConnectionOpts, error) {
14
+	conn, opts, err := DirectInit(ctx, cancel, socket, connID, conf)
14 15
 	if err != nil {
15 16
 		return nil, nil, err
16 17
 	}
17 18
 
18
-	if opts.ConnectionType == mtproto.ConnectionTypeAbridged {
19
-		newConn = mtwrappers.NewAbridgedRWC(newConn, opts)
20
-	} else {
21
-		newConn = mtwrappers.NewIntermediateRWC(newConn, opts)
19
+	newConn := wrappers.NewMTProtoAbridged(conn, opts)
20
+	if opts.ConnectionType != mtproto.ConnectionTypeAbridged {
21
+		newConn = wrappers.NewMTProtoIntermediate(conn, opts)
22 22
 	}
23 23
 
24 24
 	opts.ConnectionProto = mtproto.ConnectionProtocolIPv4
25
-	if conn.LocalAddr().(*net.TCPAddr).IP.To4() == nil {
25
+	if socket.LocalAddr().(*net.TCPAddr).IP.To4() == nil {
26 26
 		opts.ConnectionProto = mtproto.ConnectionProtocolIPv6
27 27
 	}
28 28
 
29
-	return newConn, opts, nil
29
+	return newConn, opts, err
30 30
 }

+ 0
- 5
config/config.go Прегледај датотеку

@@ -5,7 +5,6 @@ import (
5 5
 	"fmt"
6 6
 	"net"
7 7
 	"strconv"
8
-	"time"
9 8
 
10 9
 	"github.com/juju/errors"
11 10
 )
@@ -14,10 +13,6 @@ import (
14 13
 const (
15 14
 	BufferWriteSize = 32 * 1024
16 15
 	BufferReadSize  = 32 * 1024
17
-	BufferSizeCopy  = 32 * 1024
18
-
19
-	TimeoutRead  = time.Minute
20
-	TimeoutWrite = time.Minute
21 16
 )
22 17
 
23 18
 // Config represents common configuration of mtg.

+ 11
- 6
main.go Прегледај датотеку

@@ -111,16 +111,21 @@ func main() {
111 111
 		zapcore.NewJSONEncoder(encoderCfg),
112 112
 		zapcore.Lock(os.Stderr),
113 113
 		atom,
114
-	)).Sugar()
114
+	))
115
+	zap.ReplaceGlobals(logger)
116
+	defer logger.Sync()
115 117
 
116
-	stat := proxy.NewStats(conf)
117
-	go stat.Serve()
118
+	var server *proxy.Proxy
119
+	if len(conf.AdTag) == 0 {
120
+		server = proxy.NewProxyDirect(conf)
121
+	} else {
122
+		server = proxy.NewProxyMiddle(conf)
123
+	}
118 124
 
119
-	srv := proxy.NewServer(conf, logger, stat)
120 125
 	printURLs(conf.GetURLs())
121 126
 
122
-	if err := srv.Serve(); err != nil {
123
-		logger.Fatal(err.Error())
127
+	if err := server.Serve(); err != nil {
128
+		zap.S().Fatalw("Server stopped", "error", err)
124 129
 	}
125 130
 }
126 131
 

+ 2
- 8
obfuscated2/frame.go Прегледај датотеку

@@ -9,6 +9,7 @@ import (
9 9
 	"github.com/juju/errors"
10 10
 
11 11
 	"github.com/9seconds/mtg/mtproto"
12
+	"github.com/9seconds/mtg/utils"
12 13
 )
13 14
 
14 15
 // [frameOffsetFirst:frameOffsetKey:frameOffsetIV:frameOffsetMagic:frameOffsetDC:frameOffsetEnd]
@@ -67,14 +68,7 @@ func (f Frame) ConnectionType() (mtproto.ConnectionType, error) {
67 68
 // Invert inverts frame for extracting encryption keys. Pkease check that link:
68 69
 // https://blog.susanka.eu/how-telegram-obfuscates-its-mtproto-traffic/
69 70
 func (f Frame) Invert() Frame {
70
-	reversed := make(Frame, FrameLen)
71
-	copy(reversed, f)
72
-
73
-	for i := 0; i < frameLenKey+frameLenIV; i++ {
74
-		reversed[frameOffsetFirst+i] = f[frameOffsetIV-1-i]
75
-	}
76
-
77
-	return reversed
71
+	return Frame(utils.ReverseBytes([]byte(f)))
78 72
 }
79 73
 
80 74
 // ExtractFrame extracts exact obfuscated2 handshake frame from given reader.

+ 66
- 0
proxy/direct.go Прегледај датотеку

@@ -0,0 +1,66 @@
1
+package proxy
2
+
3
+import (
4
+	"context"
5
+	"io"
6
+	"net"
7
+	"sync"
8
+
9
+	"github.com/juju/errors"
10
+
11
+	"github.com/9seconds/mtg/client"
12
+	"github.com/9seconds/mtg/config"
13
+	"github.com/9seconds/mtg/mtproto"
14
+	"github.com/9seconds/mtg/telegram"
15
+	"github.com/9seconds/mtg/wrappers"
16
+)
17
+
18
+func NewProxyDirect(conf *config.Config) *Proxy {
19
+	tg := telegram.NewDirectTelegram(conf)
20
+
21
+	return &Proxy{
22
+		conf: conf,
23
+		acceptCallback: func(ctx context.Context, cancel context.CancelFunc, clientSocket net.Conn,
24
+			connID string, wait *sync.WaitGroup, conf *config.Config) error {
25
+			client, opts, err := client.DirectInit(ctx, cancel, clientSocket, connID, conf)
26
+			if err != nil {
27
+				return errors.Annotate(err, "Cannot initialize client connection")
28
+			}
29
+			defer client.Close()
30
+
31
+			server, err := directTelegramStream(ctx, cancel, opts, connID, tg)
32
+			if err != nil {
33
+				return errors.Annotate(err, "Cannot initialize telegram connection")
34
+			}
35
+			defer server.Close()
36
+
37
+			wait.Add(2)
38
+
39
+			go directPipe(client, server, wait)
40
+			go directPipe(server, client, wait)
41
+
42
+			return nil
43
+		},
44
+	}
45
+}
46
+
47
+func directTelegramStream(ctx context.Context, cancel context.CancelFunc, opts *mtproto.ConnectionOpts,
48
+	connID string, tg *telegram.DirectTelegram) (wrappers.WrapStreamReadWriteCloser, error) {
49
+	streamConn, err := tg.Dial(connID, opts)
50
+	if err != nil {
51
+		return nil, errors.Annotate(err, "Cannot dial to Telegram")
52
+	}
53
+	streamConn = wrappers.NewCtx(ctx, cancel, streamConn)
54
+
55
+	packetConn, err := tg.Init(opts, streamConn)
56
+	if err != nil {
57
+		return nil, errors.Annotate(err, "Cannot handshake telegram")
58
+	}
59
+
60
+	return packetConn, nil
61
+}
62
+
63
+func directPipe(src io.Reader, dst io.Writer, wait *sync.WaitGroup) {
64
+	defer wait.Done()
65
+	io.Copy(dst, src)
66
+}

+ 77
- 0
proxy/middle.go Прегледај датотеку

@@ -0,0 +1,77 @@
1
+package proxy
2
+
3
+import (
4
+	"context"
5
+	"net"
6
+	"sync"
7
+
8
+	"github.com/juju/errors"
9
+
10
+	"github.com/9seconds/mtg/client"
11
+	"github.com/9seconds/mtg/config"
12
+	"github.com/9seconds/mtg/mtproto"
13
+	"github.com/9seconds/mtg/telegram"
14
+	"github.com/9seconds/mtg/wrappers"
15
+)
16
+
17
+func NewProxyMiddle(conf *config.Config) *Proxy {
18
+	tg := telegram.NewMiddleTelegram(conf)
19
+
20
+	return &Proxy{
21
+		conf: conf,
22
+		acceptCallback: func(ctx context.Context, cancel context.CancelFunc, clientSocket net.Conn,
23
+			connID string, wait *sync.WaitGroup, conf *config.Config) error {
24
+			client, opts, err := client.MiddleInit(ctx, cancel, clientSocket, connID, conf)
25
+			if err != nil {
26
+				return errors.Annotate(err, "Cannot initialize client connection")
27
+			}
28
+			defer client.Close()
29
+
30
+			server, err := middleTelegramStream(ctx, cancel, opts, connID, tg)
31
+			if err != nil {
32
+				return errors.Annotate(err, "Cannot initialize telegram connection")
33
+			}
34
+			defer server.Close()
35
+
36
+			wait.Add(2)
37
+
38
+			go middlePipe(client, server, wait, &opts.ReadHacks)
39
+			go middlePipe(server, client, wait, &opts.WriteHacks)
40
+
41
+			return nil
42
+		},
43
+	}
44
+}
45
+
46
+func middleTelegramStream(ctx context.Context, cancel context.CancelFunc, opts *mtproto.ConnectionOpts,
47
+	connID string, tg *telegram.MiddleTelegram) (wrappers.WrapPacketReadWriteCloser, error) {
48
+	streamConn, err := tg.Dial(connID, opts)
49
+	if err != nil {
50
+		return nil, errors.Annotate(err, "Cannot dial to Telegram")
51
+	}
52
+	streamConn = wrappers.NewCtx(ctx, cancel, streamConn)
53
+
54
+	packetConn, err := tg.Init(opts, streamConn)
55
+	if err != nil {
56
+		return nil, errors.Annotate(err, "Cannot handshake telegram")
57
+	}
58
+
59
+	return packetConn, nil
60
+}
61
+
62
+func middlePipe(src wrappers.WrapPacketReader, dst wrappers.WrapPacketWriter, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
63
+	defer wait.Done()
64
+
65
+	for {
66
+		hacks.SimpleAck = false
67
+		hacks.QuickAck = false
68
+
69
+		packet, err := src.Read()
70
+		if err != nil {
71
+			return
72
+		}
73
+		if _, err = dst.Write(packet); err != nil {
74
+			return
75
+		}
76
+	}
77
+}

+ 63
- 0
proxy/proxy.go Прегледај датотеку

@@ -0,0 +1,63 @@
1
+package proxy
2
+
3
+import (
4
+	"context"
5
+	"net"
6
+	"sync"
7
+
8
+	"github.com/juju/errors"
9
+	uuid "github.com/satori/go.uuid"
10
+	"go.uber.org/zap"
11
+
12
+	"github.com/9seconds/mtg/config"
13
+)
14
+
15
+type proxyAcceptCallback func(context.Context, context.CancelFunc, net.Conn, string, *sync.WaitGroup, *config.Config) error
16
+
17
+type Proxy struct {
18
+	conf           *config.Config
19
+	acceptCallback proxyAcceptCallback
20
+}
21
+
22
+func (p *Proxy) Serve() error {
23
+	lsock, err := net.Listen("tcp", p.conf.BindAddr())
24
+	if err != nil {
25
+		return errors.Annotate(err, "Cannot create listen socket")
26
+	}
27
+
28
+	for {
29
+		if conn, err := lsock.Accept(); err != nil {
30
+			zap.S().Errorw("Cannot allocate incoming connection", "error", err)
31
+		} else {
32
+			go p.accept(conn)
33
+		}
34
+	}
35
+}
36
+
37
+func (p *Proxy) accept(conn net.Conn) {
38
+	connID := uuid.NewV4().String()
39
+	log := zap.S().With("connection_id", connID)
40
+
41
+	defer func() {
42
+		conn.Close()
43
+
44
+		if err := recover(); err != nil {
45
+			log.Errorw("Crash of accept handler", "error", err)
46
+		}
47
+	}()
48
+
49
+	log.Infow("Client connected", "addr", conn.RemoteAddr())
50
+
51
+	ctx, cancel := context.WithCancel(context.Background())
52
+	wait := &sync.WaitGroup{}
53
+
54
+	if err := p.acceptCallback(ctx, cancel, conn, connID, wait, p.conf); err != nil {
55
+		log.Errorw("Cannot initialize connection", "error", err)
56
+		cancel()
57
+	}
58
+
59
+	<-ctx.Done()
60
+	wait.Wait()
61
+
62
+	log.Infow("Client disconnected", "addr", conn.RemoteAddr())
63
+}

+ 0
- 184
proxy/server.go Прегледај датотеку

@@ -1,184 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-
9
-	"github.com/juju/errors"
10
-	uuid "github.com/satori/go.uuid"
11
-	"go.uber.org/zap"
12
-
13
-	"github.com/9seconds/mtg/client"
14
-	"github.com/9seconds/mtg/config"
15
-	"github.com/9seconds/mtg/mtproto"
16
-	"github.com/9seconds/mtg/telegram"
17
-	"github.com/9seconds/mtg/utils"
18
-	"github.com/9seconds/mtg/wrappers"
19
-)
20
-
21
-// Server is an insgtance of MTPROTO proxy.
22
-type Server struct {
23
-	conf       *config.Config
24
-	logger     *zap.SugaredLogger
25
-	stats      *Stats
26
-	tg         telegram.Telegram
27
-	clientInit client.Init
28
-}
29
-
30
-// Serve does MTPROTO proxying.
31
-func (s *Server) Serve() error {
32
-	lsock, err := net.Listen("tcp", s.conf.BindAddr())
33
-	if err != nil {
34
-		return errors.Annotate(err, "Cannot create listen socket")
35
-	}
36
-
37
-	for {
38
-		if conn, err := lsock.Accept(); err != nil {
39
-			s.logger.Warn("Cannot allocate incoming connection", "error", err)
40
-		} else {
41
-			go s.accept(conn)
42
-		}
43
-	}
44
-}
45
-
46
-func (s *Server) accept(conn net.Conn) {
47
-	defer func() {
48
-		s.stats.closeConnection()
49
-		conn.Close() // nolint: errcheck
50
-
51
-		if r := recover(); r != nil {
52
-			s.logger.Errorw("Crash of accept handler", "error", r)
53
-		}
54
-	}()
55
-
56
-	s.stats.newConnection()
57
-	ctx, cancel := context.WithCancel(context.Background())
58
-	socketID := uuid.NewV4().String()
59
-
60
-	s.logger.Debugw("Client connected",
61
-		"addr", conn.RemoteAddr().String(),
62
-		"socketid", socketID,
63
-	)
64
-
65
-	connOpts, clientConn, err := s.getClientStream(ctx, cancel, conn, socketID)
66
-	if err != nil {
67
-		s.logger.Warnw("Cannot initialize client connection",
68
-			"addr", conn.RemoteAddr().String(),
69
-			"socketid", socketID,
70
-			"error", err,
71
-		)
72
-		return
73
-	}
74
-	defer clientConn.Close() // nolint: errcheck
75
-
76
-	tgConn, err := s.getTelegramStream(ctx, cancel, connOpts, socketID)
77
-	if err != nil {
78
-		s.logger.Warnw("Cannot initialize Telegram connection",
79
-			"socketid", socketID,
80
-			"error", err,
81
-		)
82
-		return
83
-	}
84
-	defer tgConn.Close() // nolint: errcheck
85
-
86
-	wait := &sync.WaitGroup{}
87
-	wait.Add(2)
88
-
89
-	go func() {
90
-		defer wait.Done()
91
-
92
-		for {
93
-			connOpts.ReadHacks.QuickAck = false
94
-			connOpts.ReadHacks.SimpleAck = false
95
-			if err := s.pump(clientConn, tgConn, socketID, "client"); err != nil {
96
-				s.logger.Infow("Client stream is aborted",
97
-					"socketid", socketID, "error", err)
98
-				return
99
-			}
100
-		}
101
-	}()
102
-	go func() {
103
-		defer wait.Done()
104
-
105
-		for {
106
-			connOpts.WriteHacks.QuickAck = false
107
-			connOpts.WriteHacks.SimpleAck = false
108
-			if err := s.pump(tgConn, clientConn, socketID, "telegram"); err != nil {
109
-				s.logger.Infow("Telegram stream is aborted",
110
-					"socketid", socketID, "error", err)
111
-				return
112
-			}
113
-		}
114
-	}()
115
-
116
-	<-ctx.Done()
117
-	wait.Wait()
118
-
119
-	s.logger.Debugw("Client disconnected",
120
-		"addr", conn.RemoteAddr().String(),
121
-		"socketid", socketID,
122
-	)
123
-}
124
-
125
-func (s *Server) getClientStream(ctx context.Context, cancel context.CancelFunc, conn net.Conn, socketID string) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
126
-	socket, connOpts, err := s.clientInit(conn, socketID, s.conf)
127
-	if err != nil {
128
-		return nil, nil, errors.Annotate(err, "Cannot init client connection")
129
-	}
130
-
131
-	socket = wrappers.NewTrafficRWC(socket, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
132
-	socket = wrappers.NewLogRWC(socket, s.logger, socketID, "client")
133
-	socket = wrappers.NewCtxRWC(ctx, cancel, socket)
134
-
135
-	return connOpts, socket, nil
136
-}
137
-
138
-func (s *Server) getTelegramStream(ctx context.Context, cancel context.CancelFunc, connOpts *mtproto.ConnectionOpts, socketID string) (io.ReadWriteCloser, error) {
139
-	conn, err := s.tg.Dial(socketID, connOpts)
140
-	if err != nil {
141
-		return nil, errors.Annotate(err, "Cannot connect to Telegram")
142
-	}
143
-
144
-	conn = wrappers.NewTrafficRWC(conn, s.stats.addIncomingTraffic, s.stats.addOutgoingTraffic)
145
-	conn, err = s.tg.Init(connOpts, conn)
146
-	if err != nil {
147
-		return nil, errors.Annotate(err, "Cannot handshake Telegram")
148
-	}
149
-
150
-	conn = wrappers.NewLogRWC(conn, s.logger, socketID, "telegram")
151
-	conn = wrappers.NewCtxRWC(ctx, cancel, conn)
152
-
153
-	return conn, nil
154
-}
155
-
156
-func (s *Server) pump(src io.Reader, dst io.Writer, socketID, name string) error {
157
-	buf, err := utils.ReadCurrentData(src)
158
-	if err != nil {
159
-		return errors.Annotate(err, "Cannot pump the socket")
160
-	}
161
-
162
-	_, err = dst.Write(buf)
163
-
164
-	return err
165
-}
166
-
167
-// NewServer creates new instance of MTPROTO proxy.
168
-func NewServer(conf *config.Config, logger *zap.SugaredLogger, stat *Stats) *Server {
169
-	clientInit := client.DirectInit
170
-	tg := telegram.NewDirectTelegram
171
-
172
-	if len(conf.AdTag) > 0 {
173
-		clientInit = client.MiddleInit
174
-		tg = telegram.NewMiddleTelegram
175
-	}
176
-
177
-	return &Server{
178
-		conf:       conf,
179
-		logger:     logger,
180
-		stats:      stat,
181
-		tg:         tg(conf, logger),
182
-		clientInit: clientInit,
183
-	}
184
-}

+ 0
- 74
proxy/stats.go Прегледај датотеку

@@ -1,74 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"encoding/json"
5
-	"net/http"
6
-	"strconv"
7
-	"sync/atomic"
8
-	"time"
9
-
10
-	"github.com/9seconds/mtg/config"
11
-)
12
-
13
-type statsUptime time.Time
14
-
15
-func (s statsUptime) MarshalJSON() ([]byte, error) {
16
-	uptime := int(time.Since(time.Time(s)).Seconds())
17
-	return []byte(strconv.Itoa(uptime)), nil
18
-}
19
-
20
-// Stats is a datastructure for statistics on work of this proxy.
21
-type Stats struct {
22
-	AllConnections    uint64 `json:"all_connections"`
23
-	ActiveConnections uint32 `json:"active_connections"`
24
-	Traffic           struct {
25
-		Incoming uint64 `json:"incoming"`
26
-		Outgoing uint64 `json:"outgoing"`
27
-	} `json:"traffic"`
28
-	URLs   config.IPURLs `json:"urls"`
29
-	Uptime statsUptime   `json:"uptime"`
30
-
31
-	conf *config.Config
32
-}
33
-
34
-func (s *Stats) newConnection() {
35
-	atomic.AddUint64(&s.AllConnections, 1)
36
-	atomic.AddUint32(&s.ActiveConnections, 1)
37
-}
38
-
39
-func (s *Stats) closeConnection() {
40
-	atomic.AddUint32(&s.ActiveConnections, ^uint32(0))
41
-}
42
-
43
-func (s *Stats) addIncomingTraffic(n int) {
44
-	atomic.AddUint64(&s.Traffic.Incoming, uint64(n))
45
-}
46
-
47
-func (s *Stats) addOutgoingTraffic(n int) {
48
-	atomic.AddUint64(&s.Traffic.Outgoing, uint64(n))
49
-}
50
-
51
-// Serve runs statistics HTTP server.
52
-func (s *Stats) Serve() {
53
-	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
54
-		w.Header().Set("Content-Type", "application/json")
55
-
56
-		encoder := json.NewEncoder(w)
57
-		encoder.SetEscapeHTML(false)
58
-		encoder.SetIndent("", "  ")
59
-		encoder.Encode(s) // nolint: errcheck, gas
60
-	})
61
-
62
-	http.ListenAndServe(s.conf.StatAddr(), nil) // nolint: errcheck, gas
63
-}
64
-
65
-// NewStats returns new instance of statistics datastructure.
66
-func NewStats(conf *config.Config) *Stats {
67
-	stat := &Stats{
68
-		Uptime: statsUptime(time.Now()),
69
-		conf:   conf,
70
-	}
71
-	stat.URLs = conf.GetURLs()
72
-
73
-	return stat
74
-}

+ 5
- 2
telegram/dialer.go Прегледај датотеку

@@ -30,11 +30,14 @@ func (t *tgDialer) dial(addr string) (net.Conn, error) {
30 30
 	return conn, nil
31 31
 }
32 32
 
33
-func (t *tgDialer) dialRWC(addr, sock string) (wrappers.ReadWriteCloserWithAddr, error) {
33
+func (t *tgDialer) dialRWC(addr, connID string) (wrappers.WrapStreamReadWriteCloser, error) {
34 34
 	conn, err := t.dial(addr)
35 35
 	if err != nil {
36 36
 		return nil, err
37 37
 	}
38 38
 
39
-	return wrappers.NewTimeoutRWC(conn, sock, t.conf.PublicIPv4, t.conf.PublicIPv6), nil
39
+	tgConn := wrappers.NewConn(conn, connID, wrappers.ConnPurposeTelegram,
40
+		t.conf.PublicIPv4, t.conf.PublicIPv6)
41
+
42
+	return tgConn, nil
40 43
 }

+ 8
- 9
telegram/direct.go Прегледај датотеку

@@ -4,7 +4,6 @@ import (
4 4
 	"net"
5 5
 
6 6
 	"github.com/juju/errors"
7
-	"go.uber.org/zap"
8 7
 
9 8
 	"github.com/9seconds/mtg/config"
10 9
 	"github.com/9seconds/mtg/mtproto"
@@ -29,11 +28,11 @@ var (
29 28
 	}
30 29
 )
31 30
 
32
-type directTelegram struct {
31
+type DirectTelegram struct {
33 32
 	baseTelegram
34 33
 }
35 34
 
36
-func (t *directTelegram) Dial(sock string, connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
35
+func (t *DirectTelegram) Dial(connID string, connOpts *mtproto.ConnectionOpts) (wrappers.WrapStreamReadWriteCloser, error) {
37 36
 	dc := connOpts.DC
38 37
 	if dc < 0 {
39 38
 		dc = -dc
@@ -41,23 +40,23 @@ func (t *directTelegram) Dial(sock string, connOpts *mtproto.ConnectionOpts) (wr
41 40
 		dc = 1
42 41
 	}
43 42
 
44
-	return t.baseTelegram.dial(dc-1, sock, connOpts.ConnectionProto)
43
+	return t.baseTelegram.dial(dc-1, connID, connOpts.ConnectionProto)
45 44
 }
46 45
 
47
-func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error) {
46
+func (t *DirectTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.WrapStreamReadWriteCloser) (wrappers.WrapStreamReadWriteCloser, error) {
48 47
 	obfs2, frame := obfuscated2.MakeTelegramObfuscated2Frame(connOpts)
49 48
 
50
-	if n, err := conn.Write(frame); err != nil || n != obfuscated2.FrameLen {
49
+	if _, err := conn.Write(frame); err != nil {
51 50
 		return nil, errors.Annotate(err, "Cannot write hadnshake frame")
52 51
 	}
53 52
 
54
-	return wrappers.NewStreamCipherRWC(conn, obfs2.Encryptor, obfs2.Decryptor), nil
53
+	return wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor), nil
55 54
 }
56 55
 
57 56
 // NewDirectTelegram returns Telegram instance which connects directly
58 57
 // to Telegram bypassing middleproxies.
59
-func NewDirectTelegram(conf *config.Config, _ *zap.SugaredLogger) Telegram {
60
-	return &directTelegram{baseTelegram{
58
+func NewDirectTelegram(conf *config.Config) *DirectTelegram {
59
+	return &DirectTelegram{baseTelegram{
61 60
 		dialer: tgDialer{
62 61
 			Dialer: net.Dialer{Timeout: telegramDialTimeout},
63 62
 			conf:   conf,

+ 18
- 26
telegram/middle.go Прегледај датотеку

@@ -1,29 +1,26 @@
1 1
 package telegram
2 2
 
3 3
 import (
4
-	"io"
5 4
 	"net"
6 5
 	"net/http"
7 6
 	"sync"
8 7
 
9 8
 	"github.com/juju/errors"
10
-	"go.uber.org/zap"
11 9
 
12 10
 	"github.com/9seconds/mtg/config"
13 11
 	"github.com/9seconds/mtg/mtproto"
14 12
 	"github.com/9seconds/mtg/mtproto/rpc"
15
-	mtwrappers "github.com/9seconds/mtg/mtproto/wrappers"
16 13
 	"github.com/9seconds/mtg/wrappers"
17 14
 )
18 15
 
19
-type middleTelegram struct {
16
+type MiddleTelegram struct {
20 17
 	middleTelegramCaller
21 18
 
22 19
 	conf *config.Config
23 20
 }
24 21
 
25
-func NewMiddleTelegram(conf *config.Config, logger *zap.SugaredLogger) Telegram {
26
-	tg := &middleTelegram{
22
+func NewMiddleTelegram(conf *config.Config) *MiddleTelegram {
23
+	tg := &MiddleTelegram{
27 24
 		middleTelegramCaller: middleTelegramCaller{
28 25
 			baseTelegram: baseTelegram{
29 26
 				dialer: tgDialer{
@@ -31,7 +28,6 @@ func NewMiddleTelegram(conf *config.Config, logger *zap.SugaredLogger) Telegram
31 28
 					conf:   conf,
32 29
 				},
33 30
 			},
34
-			logger: logger,
35 31
 			httpClient: &http.Client{
36 32
 				Timeout: middleTelegramHTTPClientTimeout,
37 33
 			},
@@ -48,8 +44,8 @@ func NewMiddleTelegram(conf *config.Config, logger *zap.SugaredLogger) Telegram
48 44
 	return tg
49 45
 }
50 46
 
51
-func (t *middleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error) {
52
-	rpcNonceConn := mtwrappers.NewFrameRWC(conn, rpc.SeqNoNonce)
47
+func (t *MiddleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.WrapStreamReadWriteCloser) (wrappers.WrapPacketReadWriteCloser, error) {
48
+	rpcNonceConn := wrappers.NewMTProtoFrame(conn, rpc.SeqNoNonce)
53 49
 
54 50
 	rpcNonceReq, err := t.sendRPCNonceRequest(rpcNonceConn)
55 51
 	if err != nil {
@@ -60,22 +56,22 @@ func (t *middleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.Re
60 56
 		return nil, err
61 57
 	}
62 58
 
63
-	secureConn := mtwrappers.NewMiddleProxyCipherRWC(conn, rpcNonceReq, rpcNonceResp, t.proxySecret)
64
-	secureConn = mtwrappers.NewFrameRWC(secureConn, rpc.SeqNoHandshake)
59
+	secureConn := wrappers.NewMiddleProxyCipher(conn, rpcNonceReq, rpcNonceResp, t.proxySecret)
60
+	frameConn := wrappers.NewMTProtoFrame(secureConn, rpc.SeqNoHandshake)
65 61
 
66
-	rpcHandshakeReq, err := t.sendRPCHandshakeRequest(secureConn)
62
+	rpcHandshakeReq, err := t.sendRPCHandshakeRequest(frameConn)
67 63
 	if err != nil {
68 64
 		return nil, err
69 65
 	}
70
-	_, err = t.receiveRPCHandshakeResponse(secureConn, rpcHandshakeReq)
66
+	_, err = t.receiveRPCHandshakeResponse(frameConn, rpcHandshakeReq)
71 67
 	if err != nil {
72 68
 		return nil, err
73 69
 	}
74 70
 
75
-	return mtwrappers.NewProxyRequestRWC(secureConn, connOpts, t.conf.AdTag)
71
+	return wrappers.NewMTProtoProxy(frameConn, connOpts, t.conf.AdTag)
76 72
 }
77 73
 
78
-func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.NonceRequest, error) {
74
+func (t *MiddleTelegram) sendRPCNonceRequest(conn wrappers.WrapPacketWriter) (*rpc.NonceRequest, error) {
79 75
 	rpcNonceReq, err := rpc.NewNonceRequest(t.proxySecret)
80 76
 	if err != nil {
81 77
 		return nil, errors.Annotate(err, "Cannot create RPC nonce request")
@@ -87,15 +83,13 @@ func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.NonceRequest,
87 83
 	return rpcNonceReq, nil
88 84
 }
89 85
 
90
-func (t *middleTelegram) receiveRPCNonceResponse(conn io.Reader, req *rpc.NonceRequest) (*rpc.NonceResponse, error) {
91
-	var ans [128]byte
92
-
93
-	n, err := conn.Read(ans[:])
86
+func (t *MiddleTelegram) receiveRPCNonceResponse(conn wrappers.WrapPacketReader, req *rpc.NonceRequest) (*rpc.NonceResponse, error) {
87
+	packet, err := conn.Read()
94 88
 	if err != nil {
95 89
 		return nil, errors.Annotate(err, "Cannot read RPC nonce response")
96 90
 	}
97 91
 
98
-	rpcNonceResp, err := rpc.NewNonceResponse(ans[:n])
92
+	rpcNonceResp, err := rpc.NewNonceResponse(packet)
99 93
 	if err != nil {
100 94
 		return nil, errors.Annotate(err, "Cannot initialize RPC nonce response")
101 95
 	}
@@ -106,7 +100,7 @@ func (t *middleTelegram) receiveRPCNonceResponse(conn io.Reader, req *rpc.NonceR
106 100
 	return rpcNonceResp, nil
107 101
 }
108 102
 
109
-func (t *middleTelegram) sendRPCHandshakeRequest(conn io.Writer) (*rpc.HandshakeRequest, error) {
103
+func (t *MiddleTelegram) sendRPCHandshakeRequest(conn wrappers.WrapPacketWriter) (*rpc.HandshakeRequest, error) {
110 104
 	req := rpc.NewHandshakeRequest()
111 105
 	if _, err := conn.Write(req.Bytes()); err != nil {
112 106
 		return nil, errors.Annotate(err, "Cannot send RPC handshake request")
@@ -115,15 +109,13 @@ func (t *middleTelegram) sendRPCHandshakeRequest(conn io.Writer) (*rpc.Handshake
115 109
 	return req, nil
116 110
 }
117 111
 
118
-func (t *middleTelegram) receiveRPCHandshakeResponse(conn io.Reader, req *rpc.HandshakeRequest) (*rpc.HandshakeResponse, error) {
119
-	var ans [128]byte
120
-
121
-	n, err := conn.Read(ans[:])
112
+func (t *MiddleTelegram) receiveRPCHandshakeResponse(conn wrappers.WrapPacketReader, req *rpc.HandshakeRequest) (*rpc.HandshakeResponse, error) {
113
+	packet, err := conn.Read()
122 114
 	if err != nil {
123 115
 		return nil, errors.Annotate(err, "Cannot read RPC handshake response")
124 116
 	}
125 117
 
126
-	rpcHandshakeResp, err := rpc.NewHandshakeResponse(ans[:n])
118
+	rpcHandshakeResp, err := rpc.NewHandshakeResponse(packet)
127 119
 	if err != nil {
128 120
 		return nil, errors.Annotate(err, "Cannot initialize RPC handshake response")
129 121
 	}

+ 4
- 5
telegram/middle_caller.go Прегледај датотеку

@@ -35,11 +35,10 @@ type middleTelegramCaller struct {
35 35
 
36 36
 	proxySecret []byte
37 37
 	dialerMutex *sync.RWMutex
38
-	logger      *zap.SugaredLogger
39 38
 	httpClient  *http.Client
40 39
 }
41 40
 
42
-func (t *middleTelegramCaller) Dial(sock string, connOpts *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error) {
41
+func (t *middleTelegramCaller) Dial(connID string, connOpts *mtproto.ConnectionOpts) (wrappers.WrapStreamReadWriteCloser, error) {
43 42
 	dc := connOpts.DC
44 43
 	if dc == 0 {
45 44
 		dc = 1
@@ -47,13 +46,13 @@ func (t *middleTelegramCaller) Dial(sock string, connOpts *mtproto.ConnectionOpt
47 46
 	t.dialerMutex.RLock()
48 47
 	defer t.dialerMutex.RUnlock()
49 48
 
50
-	return t.baseTelegram.dial(dc, sock, connOpts.ConnectionProto)
49
+	return t.baseTelegram.dial(dc, connID, connOpts.ConnectionProto)
51 50
 }
52 51
 
53 52
 func (t *middleTelegramCaller) autoUpdate() {
54 53
 	for range time.Tick(middleTelegramAutoUpdateInterval) {
55 54
 		if err := t.update(); err != nil {
56
-			t.logger.Warnw("Cannot update from Telegram", "error", err)
55
+			zap.S().Warnw("Cannot update from Telegram", "error", err)
57 56
 		}
58 57
 	}
59 58
 }
@@ -80,7 +79,7 @@ func (t *middleTelegramCaller) update() error {
80 79
 	t.v6Addresses = v6Addresses
81 80
 	t.dialerMutex.Unlock()
82 81
 
83
-	t.logger.Infow("Telegram middle proxy data has been updated")
82
+	zap.S().Infow("Telegram middle proxy data has been updated")
84 83
 
85 84
 	return nil
86 85
 }

+ 2
- 10
telegram/telegram.go Прегледај датотеку

@@ -9,14 +9,6 @@ import (
9 9
 	"github.com/9seconds/mtg/wrappers"
10 10
 )
11 11
 
12
-// Telegram defines an interface to connect to Telegram. This
13
-// encapsulates logic of working with middleproxies or direct
14
-// connections.
15
-type Telegram interface {
16
-	Dial(string, *mtproto.ConnectionOpts) (wrappers.ReadWriteCloserWithAddr, error)
17
-	Init(*mtproto.ConnectionOpts, wrappers.ReadWriteCloserWithAddr) (wrappers.ReadWriteCloserWithAddr, error)
18
-}
19
-
20 12
 type baseTelegram struct {
21 13
 	dialer tgDialer
22 14
 
@@ -24,7 +16,7 @@ type baseTelegram struct {
24 16
 	v6Addresses map[int16][]string
25 17
 }
26 18
 
27
-func (b *baseTelegram) dial(dcIdx int16, sock string, proto mtproto.ConnectionProtocol) (wrappers.ReadWriteCloserWithAddr, error) {
19
+func (b *baseTelegram) dial(dcIdx int16, connID string, proto mtproto.ConnectionProtocol) (wrappers.WrapStreamReadWriteCloser, error) {
28 20
 	addrs := make([]string, 2)
29 21
 
30 22
 	if proto&mtproto.ConnectionProtocolIPv6 != 0 {
@@ -39,7 +31,7 @@ func (b *baseTelegram) dial(dcIdx int16, sock string, proto mtproto.ConnectionPr
39 31
 	}
40 32
 
41 33
 	for _, addr := range addrs {
42
-		if conn, err := b.dialer.dialRWC(addr, sock); err == nil {
34
+		if conn, err := b.dialer.dialRWC(addr, connID); err == nil {
43 35
 			return conn, err
44 36
 		}
45 37
 	}

+ 51
- 38
wrappers/blockcipher.go Прегледај датотеку

@@ -1,6 +1,7 @@
1 1
 package wrappers
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"crypto/aes"
5 6
 	"crypto/cipher"
6 7
 	"net"
@@ -9,77 +10,89 @@ import (
9 10
 	"github.com/juju/errors"
10 11
 )
11 12
 
12
-type WrapBlockCipher struct {
13
-	BufferedReader
13
+type BlockCipher struct {
14
+	buf *bytes.Buffer
14 15
 
15 16
 	conn      WrapStreamReadWriteCloser
16 17
 	encryptor cipher.BlockMode
17 18
 	decryptor cipher.BlockMode
18 19
 }
19 20
 
20
-func (w *WrapBlockCipher) Read(p []byte) (int, error) {
21
-	return w.BufferedRead(p, func() error {
22
-		var buf []byte
21
+func (b *BlockCipher) Read(p []byte) (int, error) {
22
+	if b.buf.Len() > 0 {
23
+		return b.flush(p)
24
+	}
23 25
 
24
-		for len(buf) == 0 || len(buf)%aes.BlockSize != 0 {
25
-			rv, err := utils.ReadCurrentData(w.conn)
26
-			if err != nil {
27
-				return errors.Annotate(err, "Cannot read from socket")
28
-			}
29
-			buf = append(buf, rv...)
26
+	buf := []byte{}
27
+	for len(buf) == 0 || len(buf)%aes.BlockSize != 0 {
28
+		rv, err := utils.ReadCurrentData(b.conn)
29
+		if err != nil {
30
+			return 0, errors.Annotate(err, "Cannot read from socket")
30 31
 		}
32
+		buf = append(buf, rv...)
33
+	}
34
+
35
+	b.decryptor.CryptBlocks(buf, buf)
36
+	b.buf.Write(buf)
31 37
 
32
-		w.decryptor.CryptBlocks(buf, buf)
33
-		w.Buffer.Write(buf)
38
+	return b.flush(p)
39
+}
40
+
41
+func (b *BlockCipher) flush(p []byte) (int, error) {
42
+	if b.buf.Len() <= len(p) {
43
+		sizeToReturn := b.buf.Len()
44
+		copy(p, b.buf.Bytes())
45
+		b.buf.Reset()
46
+		return sizeToReturn, nil
47
+	}
34 48
 
35
-		return nil
36
-	})
49
+	return b.buf.Read(p)
37 50
 }
38 51
 
39
-func (w *WrapBlockCipher) Write(p []byte) (int, error) {
52
+func (b *BlockCipher) Write(p []byte) (int, error) {
40 53
 	if len(p)%aes.BlockSize > 0 {
41 54
 		return 0, errors.Errorf("Incorrect block size %d", len(p))
42 55
 	}
43 56
 
44 57
 	encrypted := make([]byte, len(p))
45
-	w.encryptor.CryptBlocks(encrypted, p)
58
+	b.encryptor.CryptBlocks(encrypted, p)
46 59
 
47
-	return w.conn.Write(encrypted)
60
+	return b.conn.Write(encrypted)
48 61
 }
49 62
 
50
-func (w *WrapBlockCipher) LogDebug(msg string, data ...interface{}) {
51
-	w.conn.LogDebug(msg, data...)
63
+func (b *BlockCipher) LogDebug(msg string, data ...interface{}) {
64
+	b.conn.LogDebug(msg, data...)
52 65
 }
53 66
 
54
-func (w *WrapBlockCipher) LogInfo(msg string, data ...interface{}) {
55
-	w.conn.LogInfo(msg, data...)
67
+func (b *BlockCipher) LogInfo(msg string, data ...interface{}) {
68
+	b.conn.LogInfo(msg, data...)
56 69
 }
57 70
 
58
-func (w *WrapBlockCipher) LogWarn(msg string, data ...interface{}) {
59
-	w.conn.LogWarn(msg, data...)
71
+func (b *BlockCipher) LogWarn(msg string, data ...interface{}) {
72
+	b.conn.LogWarn(msg, data...)
60 73
 }
61 74
 
62
-func (w *WrapBlockCipher) LogError(msg string, data ...interface{}) {
63
-	w.conn.LogError(msg, data...)
75
+func (b *BlockCipher) LogError(msg string, data ...interface{}) {
76
+	b.conn.LogError(msg, data...)
64 77
 }
65 78
 
66
-func (w *WrapBlockCipher) LocalAddr() *net.TCPAddr {
67
-	return w.conn.LocalAddr()
79
+func (b *BlockCipher) LocalAddr() *net.TCPAddr {
80
+	return b.conn.LocalAddr()
68 81
 }
69 82
 
70
-func (w *WrapBlockCipher) RemoteAddr() *net.TCPAddr {
71
-	return w.conn.RemoteAddr()
83
+func (b *BlockCipher) RemoteAddr() *net.TCPAddr {
84
+	return b.conn.RemoteAddr()
72 85
 }
73 86
 
74
-func (w *WrapBlockCipher) Close() error {
75
-	return w.conn.Close()
87
+func (b *BlockCipher) Close() error {
88
+	return b.conn.Close()
76 89
 }
77 90
 
78
-func NewWrapBlockCipher(conn WrapStreamReadWriteCloser, encryptor, decryptor cipher.BlockMode) WrapStreamReadWriteCloser {
79
-	return &WrapBlockCipher{
80
-		BufferedReader: NewBufferedReader(),
81
-		conn:           conn,
82
-		encryptor:      encryptor,
83
-		decryptor:      decryptor,
91
+func NewBlockCipher(conn WrapStreamReadWriteCloser, encryptor, decryptor cipher.BlockMode) WrapStreamReadWriteCloser {
92
+	return &BlockCipher{
93
+		buf:       &bytes.Buffer{},
94
+		conn:      conn,
95
+		encryptor: encryptor,
96
+		decryptor: decryptor,
84 97
 	}
85 98
 }

+ 0
- 32
wrappers/buffered_reader.go Прегледај датотеку

@@ -1,32 +0,0 @@
1
-package wrappers
2
-
3
-import "bytes"
4
-
5
-type BufferedReader struct {
6
-	Buffer *bytes.Buffer
7
-}
8
-
9
-func (b *BufferedReader) BufferedRead(p []byte, callback func() error) (int, error) {
10
-	if b.Buffer.Len() > 0 {
11
-		return b.flush(p)
12
-	}
13
-	if err := callback(); err != nil {
14
-		return 0, err
15
-	}
16
-	return b.flush(p)
17
-}
18
-
19
-func (b *BufferedReader) flush(p []byte) (int, error) {
20
-	if b.Buffer.Len() <= len(p) {
21
-		sizeToReturn := b.Buffer.Len()
22
-		copy(p, b.Buffer.Bytes())
23
-		b.Buffer.Reset()
24
-		return sizeToReturn, nil
25
-	}
26
-
27
-	return b.Buffer.Read(p)
28
-}
29
-
30
-func NewBufferedReader() BufferedReader {
31
-	return BufferedReader{Buffer: &bytes.Buffer{}}
32
-}

+ 35
- 33
wrappers/conn.go Прегледај датотеку

@@ -30,8 +30,7 @@ const (
30 30
 	connTimeoutWrite = 5 * time.Minute
31 31
 )
32 32
 
33
-type WrapConn struct {
34
-	purpose    ConnPurpose
33
+type Conn struct {
35 34
 	connID     string
36 35
 	conn       net.Conn
37 36
 	logger     *zap.SugaredLogger
@@ -39,77 +38,80 @@ type WrapConn struct {
39 38
 	publicIPv6 net.IP
40 39
 }
41 40
 
42
-func (w *WrapConn) Write(p []byte) (int, error) {
43
-	w.conn.SetWriteDeadline(time.Now().Add(connTimeoutWrite))
44
-	n, err := w.conn.Write(p)
41
+func (c *Conn) Write(p []byte) (int, error) {
42
+	c.conn.SetWriteDeadline(time.Now().Add(connTimeoutWrite))
43
+	n, err := c.conn.Write(p)
45 44
 
46
-	w.logger.Debugw("Write to stream", "bytes", n, "error", err)
45
+	c.logger.Debugw("Write to stream", "bytes", n, "error", err)
47 46
 
48 47
 	return n, err
49 48
 }
50 49
 
51
-func (w *WrapConn) Read(p []byte) (int, error) {
52
-	w.conn.SetReadDeadline(time.Now().Add(connTimeoutRead))
53
-	n, err := w.conn.Read(p)
50
+func (c *Conn) Read(p []byte) (int, error) {
51
+	c.conn.SetReadDeadline(time.Now().Add(connTimeoutRead))
52
+	n, err := c.conn.Read(p)
54 53
 
55
-	w.logger.Debugw("Read from stream", "bytes", n, "error", err)
54
+	c.logger.Debugw("Read from stream", "bytes", n, "error", err)
56 55
 
57 56
 	return n, err
58 57
 }
59 58
 
60
-func (w *WrapConn) Close() error {
61
-	defer w.LogDebug("Closed connection")
62
-	return w.conn.Close()
59
+func (c *Conn) Close() error {
60
+	defer c.LogDebug("Closed connection")
61
+	return c.conn.Close()
63 62
 }
64 63
 
65
-func (w *WrapConn) LocalAddr() *net.TCPAddr {
66
-	addr := w.conn.LocalAddr().(*net.TCPAddr)
64
+func (c *Conn) LocalAddr() *net.TCPAddr {
65
+	addr := c.conn.LocalAddr().(*net.TCPAddr)
67 66
 	newAddr := *addr
68 67
 
69
-	if w.RemoteAddr().IP.To4() != nil {
70
-		if w.publicIPv4 != nil {
71
-			newAddr.IP = w.publicIPv4
68
+	if c.RemoteAddr().IP.To4() != nil {
69
+		if c.publicIPv4 != nil {
70
+			newAddr.IP = c.publicIPv4
72 71
 		}
73
-	} else if w.publicIPv6 != nil {
74
-		newAddr.IP = w.publicIPv6
72
+	} else if c.publicIPv6 != nil {
73
+		newAddr.IP = c.publicIPv6
75 74
 	}
76 75
 
77 76
 	return &newAddr
78 77
 }
79 78
 
80
-func (w *WrapConn) RemoteAddr() *net.TCPAddr {
81
-	return w.conn.RemoteAddr().(*net.TCPAddr)
79
+func (c *Conn) RemoteAddr() *net.TCPAddr {
80
+	return c.conn.RemoteAddr().(*net.TCPAddr)
82 81
 }
83 82
 
84
-func (w *WrapConn) LogDebug(msg string, data ...interface{}) {
85
-	w.logger.Debugw(msg, data...)
83
+func (c *Conn) LogDebug(msg string, data ...interface{}) {
84
+	c.logger.Debugw(msg, data...)
86 85
 }
87 86
 
88
-func (w *WrapConn) LogInfo(msg string, data ...interface{}) {
89
-	w.logger.Infow(msg, data...)
87
+func (c *Conn) LogInfo(msg string, data ...interface{}) {
88
+	c.logger.Infow(msg, data...)
90 89
 }
91 90
 
92
-func (w *WrapConn) LogWarn(msg string, data ...interface{}) {
93
-	w.logger.Warnw(msg, data...)
91
+func (c *Conn) LogWarn(msg string, data ...interface{}) {
92
+	c.logger.Warnw(msg, data...)
94 93
 }
95 94
 
96
-func (w *WrapConn) LogError(msg string, data ...interface{}) {
97
-	w.logger.Errorw(msg, data...)
95
+func (c *Conn) LogError(msg string, data ...interface{}) {
96
+	c.logger.Errorw(msg, data...)
98 97
 }
99 98
 
100
-func NewConn(connID string, purpose ConnPurpose, conn net.Conn, publicIPv4, publicIPv6 net.IP) WrapStreamReadWriteCloser {
99
+func NewConn(conn net.Conn, connID string, purpose ConnPurpose, publicIPv4, publicIPv6 net.IP) WrapStreamReadWriteCloser {
101 100
 	logger := zap.S().With(
102 101
 		"connection_id", connID,
103 102
 		"local_address", conn.LocalAddr(),
104 103
 		"remote_address", conn.RemoteAddr(),
104
+		"purpose", purpose,
105 105
 	)
106 106
 
107
-	return &WrapConn{
107
+	wrapper := Conn{
108 108
 		logger:     logger,
109
-		purpose:    purpose,
110 109
 		connID:     connID,
111 110
 		conn:       conn,
112 111
 		publicIPv4: publicIPv4,
113 112
 		publicIPv6: publicIPv6,
114 113
 	}
114
+	wrapper.logger = logger.With("faked_local_addr", wrapper.LocalAddr())
115
+
116
+	return &wrapper
115 117
 }

+ 26
- 26
wrappers/ctx.go Прегледај датотеку

@@ -7,68 +7,68 @@ import (
7 7
 	"github.com/juju/errors"
8 8
 )
9 9
 
10
-type WrapCtx struct {
10
+type Ctx struct {
11 11
 	cancel context.CancelFunc
12 12
 	conn   WrapStreamReadWriteCloser
13 13
 	ctx    context.Context
14 14
 }
15 15
 
16
-func (w *WrapCtx) Read(p []byte) (int, error) {
16
+func (c *Ctx) Read(p []byte) (int, error) {
17 17
 	select {
18
-	case <-w.ctx.Done():
19
-		return 0, errors.Annotate(w.ctx.Err(), "Read is failed because of closed context")
18
+	case <-c.ctx.Done():
19
+		return 0, errors.Annotate(c.ctx.Err(), "Read is failed because of closed context")
20 20
 	default:
21
-		n, err := w.conn.Read(p)
21
+		n, err := c.conn.Read(p)
22 22
 		if err != nil {
23
-			w.cancel()
23
+			c.cancel()
24 24
 		}
25 25
 		return n, err
26 26
 	}
27 27
 }
28 28
 
29
-func (w *WrapCtx) Write(p []byte) (int, error) {
29
+func (c *Ctx) Write(p []byte) (int, error) {
30 30
 	select {
31
-	case <-w.ctx.Done():
32
-		return 0, errors.Annotate(w.ctx.Err(), "Write is failed because of closed context")
31
+	case <-c.ctx.Done():
32
+		return 0, errors.Annotate(c.ctx.Err(), "Write is failed because of closed context")
33 33
 	default:
34
-		n, err := w.conn.Write(p)
34
+		n, err := c.conn.Write(p)
35 35
 		if err != nil {
36
-			w.cancel()
36
+			c.cancel()
37 37
 		}
38 38
 		return n, err
39 39
 	}
40 40
 }
41 41
 
42
-func (w *WrapCtx) LogDebug(msg string, data ...interface{}) {
43
-	w.conn.LogDebug(msg, data...)
42
+func (c *Ctx) LogDebug(msg string, data ...interface{}) {
43
+	c.conn.LogDebug(msg, data...)
44 44
 }
45 45
 
46
-func (w *WrapCtx) LogInfo(msg string, data ...interface{}) {
47
-	w.conn.LogInfo(msg, data...)
46
+func (c *Ctx) LogInfo(msg string, data ...interface{}) {
47
+	c.conn.LogInfo(msg, data...)
48 48
 }
49 49
 
50
-func (w *WrapCtx) LogWarn(msg string, data ...interface{}) {
51
-	w.conn.LogWarn(msg, data...)
50
+func (c *Ctx) LogWarn(msg string, data ...interface{}) {
51
+	c.conn.LogWarn(msg, data...)
52 52
 }
53 53
 
54
-func (w *WrapCtx) LogError(msg string, data ...interface{}) {
55
-	w.conn.LogError(msg, data...)
54
+func (c *Ctx) LogError(msg string, data ...interface{}) {
55
+	c.conn.LogError(msg, data...)
56 56
 }
57 57
 
58
-func (w *WrapCtx) LocalAddr() *net.TCPAddr {
59
-	return w.conn.LocalAddr()
58
+func (c *Ctx) LocalAddr() *net.TCPAddr {
59
+	return c.conn.LocalAddr()
60 60
 }
61 61
 
62
-func (w *WrapCtx) RemoteAddr() *net.TCPAddr {
63
-	return w.conn.RemoteAddr()
62
+func (c *Ctx) RemoteAddr() *net.TCPAddr {
63
+	return c.conn.RemoteAddr()
64 64
 }
65 65
 
66
-func (w *WrapCtx) Close() error {
67
-	return w.conn.Close()
66
+func (c *Ctx) Close() error {
67
+	return c.conn.Close()
68 68
 }
69 69
 
70 70
 func NewCtx(ctx context.Context, cancel context.CancelFunc, conn WrapStreamReadWriteCloser) WrapStreamReadWriteCloser {
71
-	return &WrapCtx{
71
+	return &Ctx{
72 72
 		ctx:    ctx,
73 73
 		cancel: cancel,
74 74
 		conn:   conn,

+ 1
- 1
wrappers/mtproto_cipher.go Прегледај датотеку

@@ -32,7 +32,7 @@ func NewMiddleProxyCipher(conn WrapStreamReadWriteCloser, req *rpc.NonceRequest,
32 32
 	enc, _ := makeEncrypterDecrypter(encKey, encIV)
33 33
 	_, dec := makeEncrypterDecrypter(decKey, decIV)
34 34
 
35
-	return NewWrapBlockCipher(conn, enc, dec)
35
+	return NewBlockCipher(conn, enc, dec)
36 36
 }
37 37
 
38 38
 func deriveKeys(purpose CipherPurpose, req *rpc.NonceRequest, resp *rpc.NonceResponse, client *net.TCPAddr, remote *net.TCPAddr, secret []byte) ([]byte, []byte) {

+ 10
- 3
wrappers/mtproto_proxy.go Прегледај датотеку

@@ -4,8 +4,10 @@ import (
4 4
 	"bytes"
5 5
 	"net"
6 6
 
7
-	"github.com/9seconds/mtg/mtproto/rpc"
8 7
 	"github.com/juju/errors"
8
+
9
+	"github.com/9seconds/mtg/mtproto"
10
+	"github.com/9seconds/mtg/mtproto/rpc"
9 11
 )
10 12
 
11 13
 type MTProtoProxy struct {
@@ -127,9 +129,14 @@ func (m *MTProtoProxy) Close() error {
127 129
 	return m.conn.Close()
128 130
 }
129 131
 
130
-func NewMTProtoProxy(conn WrapPacketReadWriteCloser, req *rpc.ProxyRequest) WrapPacketReadWriteCloser {
132
+func NewMTProtoProxy(conn WrapPacketReadWriteCloser, connOpts *mtproto.ConnectionOpts, adTag []byte) (WrapPacketReadWriteCloser, error) {
133
+	req, err := rpc.NewProxyRequest(connOpts.ClientAddr, conn.LocalAddr(), connOpts, adTag)
134
+	if err != nil {
135
+		return nil, errors.Annotate(err, "Cannot create new RPC proxy request")
136
+	}
137
+
131 138
 	return &MTProtoProxy{
132 139
 		conn: conn,
133 140
 		req:  req,
134
-	}
141
+	}, nil
135 142
 }

+ 22
- 22
wrappers/streamcipher.go Прегледај датотеку

@@ -7,59 +7,59 @@ import (
7 7
 	"github.com/juju/errors"
8 8
 )
9 9
 
10
-type WrapStreamCipher struct {
10
+type StreamCipher struct {
11 11
 	encryptor cipher.Stream
12 12
 	decryptor cipher.Stream
13 13
 	conn      WrapStreamReadWriteCloser
14 14
 }
15 15
 
16
-func (w *WrapStreamCipher) Read(p []byte) (int, error) {
17
-	n, err := w.conn.Read(p)
16
+func (s *StreamCipher) Read(p []byte) (int, error) {
17
+	n, err := s.conn.Read(p)
18 18
 	if err != nil {
19 19
 		return 0, errors.Annotate(err, "Cannot read stream ciphered data")
20 20
 	}
21
-	w.decryptor.XORKeyStream(p, p[:n])
21
+	s.decryptor.XORKeyStream(p, p[:n])
22 22
 
23 23
 	return n, nil
24 24
 }
25 25
 
26
-func (w *WrapStreamCipher) Write(p []byte) (int, error) {
26
+func (s *StreamCipher) Write(p []byte) (int, error) {
27 27
 	encrypted := make([]byte, len(p))
28
-	w.encryptor.XORKeyStream(encrypted, p)
28
+	s.encryptor.XORKeyStream(encrypted, p)
29 29
 
30
-	return w.conn.Write(encrypted)
30
+	return s.conn.Write(encrypted)
31 31
 }
32 32
 
33
-func (w *WrapStreamCipher) LogDebug(msg string, data ...interface{}) {
34
-	w.conn.LogDebug(msg, data...)
33
+func (s *StreamCipher) LogDebug(msg string, data ...interface{}) {
34
+	s.conn.LogDebug(msg, data...)
35 35
 }
36 36
 
37
-func (w *WrapStreamCipher) LogInfo(msg string, data ...interface{}) {
38
-	w.conn.LogInfo(msg, data...)
37
+func (s *StreamCipher) LogInfo(msg string, data ...interface{}) {
38
+	s.conn.LogInfo(msg, data...)
39 39
 }
40 40
 
41
-func (w *WrapStreamCipher) LogWarn(msg string, data ...interface{}) {
42
-	w.conn.LogWarn(msg, data...)
41
+func (s *StreamCipher) LogWarn(msg string, data ...interface{}) {
42
+	s.conn.LogWarn(msg, data...)
43 43
 }
44 44
 
45
-func (w *WrapStreamCipher) LogError(msg string, data ...interface{}) {
46
-	w.conn.LogError(msg, data...)
45
+func (s *StreamCipher) LogError(msg string, data ...interface{}) {
46
+	s.conn.LogError(msg, data...)
47 47
 }
48 48
 
49
-func (w *WrapStreamCipher) LocalAddr() *net.TCPAddr {
50
-	return w.conn.LocalAddr()
49
+func (s *StreamCipher) LocalAddr() *net.TCPAddr {
50
+	return s.conn.LocalAddr()
51 51
 }
52 52
 
53
-func (w *WrapStreamCipher) RemoteAddr() *net.TCPAddr {
54
-	return w.conn.RemoteAddr()
53
+func (s *StreamCipher) RemoteAddr() *net.TCPAddr {
54
+	return s.conn.RemoteAddr()
55 55
 }
56 56
 
57
-func (w *WrapStreamCipher) Close() error {
58
-	return w.conn.Close()
57
+func (s *StreamCipher) Close() error {
58
+	return s.conn.Close()
59 59
 }
60 60
 
61 61
 func NewStreamCipher(conn WrapStreamReadWriteCloser, encryptor, decryptor cipher.Stream) WrapStreamReadWriteCloser {
62
-	return &WrapStreamCipher{
62
+	return &StreamCipher{
63 63
 		conn:      conn,
64 64
 		encryptor: encryptor,
65 65
 		decryptor: decryptor,

+ 5
- 0
wrappers/wrap.go Прегледај датотеку

@@ -56,6 +56,11 @@ type WrapPacketReader interface {
56 56
 	Wrap
57 57
 }
58 58
 
59
+type WrapPacketWriter interface {
60
+	io.Writer
61
+	Wrap
62
+}
63
+
59 64
 type WrapPacketReadWriter interface {
60 65
 	io.Writer
61 66
 	WrapPacketReader

Loading…
Откажи
Сачувај