Sfoglia il codice sorgente

Rework for simplier proxy

tags/0.9
9seconds 7 anni fa
parent
commit
a0ba89b105
12 ha cambiato i file con 153 aggiunte e 194 eliminazioni
  1. 12
    0
      client/client.go
  2. 1
    1
      client/direct.go
  3. 4
    3
      client/middle.go
  4. 4
    0
      config/config.go
  5. 4
    6
      main.go
  6. 0
    64
      proxy/direct.go
  7. 0
    76
      proxy/middle.go
  8. 90
    16
      proxy/proxy.go
  9. 2
    2
      telegram/direct.go
  10. 26
    26
      telegram/middle.go
  11. 5
    0
      telegram/telegram.go
  12. 5
    0
      wrappers/wrap.go

+ 12
- 0
client/client.go Vedi File

@@ -0,0 +1,12 @@
1
+package client
2
+
3
+import (
4
+	"context"
5
+	"net"
6
+
7
+	"github.com/9seconds/mtg/config"
8
+	"github.com/9seconds/mtg/mtproto"
9
+	"github.com/9seconds/mtg/wrappers"
10
+)
11
+
12
+type Init func(context.Context, context.CancelFunc, net.Conn, string, *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error)

+ 1
- 1
client/direct.go Vedi File

@@ -16,7 +16,7 @@ import (
16 16
 const handshakeTimeout = 10 * time.Second
17 17
 
18 18
 func DirectInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn, connID string,
19
-	conf *config.Config) (wrappers.WrapStreamReadWriteCloser, *mtproto.ConnectionOpts, error) {
19
+	conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
20 20
 	if err := config.SetSocketOptions(socket); err != nil {
21 21
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
22 22
 	}

+ 4
- 3
client/middle.go Vedi File

@@ -10,15 +10,16 @@ import (
10 10
 )
11 11
 
12 12
 func MiddleInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn, connID string,
13
-	conf *config.Config) (wrappers.WrapPacketReadWriteCloser, *mtproto.ConnectionOpts, error) {
13
+	conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
14 14
 	conn, opts, err := DirectInit(ctx, cancel, socket, connID, conf)
15 15
 	if err != nil {
16 16
 		return nil, nil, err
17 17
 	}
18
+	connStream := conn.(wrappers.WrapStreamReadWriteCloser)
18 19
 
19
-	newConn := wrappers.NewMTProtoAbridged(conn, opts)
20
+	newConn := wrappers.NewMTProtoAbridged(connStream, opts)
20 21
 	if opts.ConnectionType != mtproto.ConnectionTypeAbridged {
21
-		newConn = wrappers.NewMTProtoIntermediate(conn, opts)
22
+		newConn = wrappers.NewMTProtoIntermediate(connStream, opts)
22 23
 	}
23 24
 
24 25
 	opts.ConnectionProto = mtproto.ConnectionProtocolIPv4

+ 4
- 0
config/config.go Vedi File

@@ -58,6 +58,10 @@ func (c *Config) StatAddr() string {
58 58
 	return getAddr(c.StatsIP, c.StatsPort)
59 59
 }
60 60
 
61
+func (c *Config) UseMiddleProxy() bool {
62
+	return len(c.AdTag) > 0
63
+}
64
+
61 65
 // GetURLs returns configured IPURLs instance with links to this server.
62 66
 func (c *Config) GetURLs() IPURLs {
63 67
 	urls := IPURLs{}

+ 4
- 6
main.go Vedi File

@@ -115,17 +115,15 @@ func main() {
115 115
 	zap.ReplaceGlobals(logger)
116 116
 	defer logger.Sync()
117 117
 
118
-	var server *proxy.Proxy
119
-	if len(conf.AdTag) == 0 {
120
-		zap.S().Infow("Use direct connection to Telegram")
121
-		server = proxy.NewProxyDirect(conf)
122
-	} else {
118
+	if conf.UseMiddleProxy() {
123 119
 		zap.S().Infow("Use middle proxy connection to Telegram")
124
-		server = proxy.NewProxyMiddle(conf)
120
+	} else {
121
+		zap.S().Infow("Use direct connection to Telegram")
125 122
 	}
126 123
 
127 124
 	printURLs(conf.GetURLs())
128 125
 
126
+	server := proxy.NewProxy(conf)
129 127
 	if err := server.Serve(); err != nil {
130 128
 		zap.S().Fatalw("Server stopped", "error", err)
131 129
 	}

+ 0
- 64
proxy/direct.go Vedi File

@@ -1,64 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-
9
-	"github.com/juju/errors"
10
-
11
-	"github.com/9seconds/mtg/client"
12
-	"github.com/9seconds/mtg/config"
13
-	"github.com/9seconds/mtg/mtproto"
14
-	"github.com/9seconds/mtg/telegram"
15
-	"github.com/9seconds/mtg/wrappers"
16
-)
17
-
18
-func NewProxyDirect(conf *config.Config) *Proxy {
19
-	tg := telegram.NewDirectTelegram(conf)
20
-
21
-	return &Proxy{
22
-		conf: conf,
23
-		acceptCallback: func(ctx context.Context, cancel context.CancelFunc, clientSocket net.Conn,
24
-			connID string, wait *sync.WaitGroup, conf *config.Config) (io.Closer, io.Closer, error) {
25
-			client, opts, err := client.DirectInit(ctx, cancel, clientSocket, connID, conf)
26
-			if err != nil {
27
-				return nil, nil, errors.Annotate(err, "Cannot initialize client connection")
28
-			}
29
-
30
-			server, err := directTelegramStream(ctx, cancel, opts, connID, tg)
31
-			if err != nil {
32
-				return client, nil, errors.Annotate(err, "Cannot initialize telegram connection")
33
-			}
34
-
35
-			wait.Add(2)
36
-
37
-			go directPipe(client, server, wait)
38
-			go directPipe(server, client, wait)
39
-
40
-			return client, server, nil
41
-		},
42
-	}
43
-}
44
-
45
-func directTelegramStream(ctx context.Context, cancel context.CancelFunc, opts *mtproto.ConnectionOpts,
46
-	connID string, tg *telegram.DirectTelegram) (wrappers.WrapStreamReadWriteCloser, error) {
47
-	streamConn, err := tg.Dial(connID, opts)
48
-	if err != nil {
49
-		return nil, errors.Annotate(err, "Cannot dial to Telegram")
50
-	}
51
-	streamConn = wrappers.NewCtx(ctx, cancel, streamConn)
52
-
53
-	packetConn, err := tg.Init(opts, streamConn)
54
-	if err != nil {
55
-		return nil, errors.Annotate(err, "Cannot handshake telegram")
56
-	}
57
-
58
-	return packetConn, nil
59
-}
60
-
61
-func directPipe(src io.Reader, dst io.Writer, wait *sync.WaitGroup) {
62
-	defer wait.Done()
63
-	io.Copy(dst, src)
64
-}

+ 0
- 76
proxy/middle.go Vedi File

@@ -1,76 +0,0 @@
1
-package proxy
2
-
3
-import (
4
-	"context"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-
9
-	"github.com/juju/errors"
10
-
11
-	"github.com/9seconds/mtg/client"
12
-	"github.com/9seconds/mtg/config"
13
-	"github.com/9seconds/mtg/mtproto"
14
-	"github.com/9seconds/mtg/telegram"
15
-	"github.com/9seconds/mtg/wrappers"
16
-)
17
-
18
-func NewProxyMiddle(conf *config.Config) *Proxy {
19
-	tg := telegram.NewMiddleTelegram(conf)
20
-
21
-	return &Proxy{
22
-		conf: conf,
23
-		acceptCallback: func(ctx context.Context, cancel context.CancelFunc, clientSocket net.Conn,
24
-			connID string, wait *sync.WaitGroup, conf *config.Config) (io.Closer, io.Closer, error) {
25
-			client, opts, err := client.MiddleInit(ctx, cancel, clientSocket, connID, conf)
26
-			if err != nil {
27
-				return nil, nil, errors.Annotate(err, "Cannot initialize client connection")
28
-			}
29
-
30
-			server, err := middleTelegramStream(ctx, cancel, opts, connID, tg)
31
-			if err != nil {
32
-				return client, nil, errors.Annotate(err, "Cannot initialize telegram connection")
33
-			}
34
-
35
-			wait.Add(2)
36
-
37
-			go middlePipe(client, server, wait, &opts.ReadHacks)
38
-			go middlePipe(server, client, wait, &opts.WriteHacks)
39
-
40
-			return client, server, nil
41
-		},
42
-	}
43
-}
44
-
45
-func middleTelegramStream(ctx context.Context, cancel context.CancelFunc, opts *mtproto.ConnectionOpts,
46
-	connID string, tg *telegram.MiddleTelegram) (wrappers.WrapPacketReadWriteCloser, error) {
47
-	streamConn, err := tg.Dial(connID, opts)
48
-	if err != nil {
49
-		return nil, errors.Annotate(err, "Cannot dial to Telegram")
50
-	}
51
-	streamConn = wrappers.NewCtx(ctx, cancel, streamConn)
52
-
53
-	packetConn, err := tg.Init(opts, streamConn)
54
-	if err != nil {
55
-		return nil, errors.Annotate(err, "Cannot handshake telegram")
56
-	}
57
-
58
-	return packetConn, nil
59
-}
60
-
61
-func middlePipe(src wrappers.WrapPacketReader, dst wrappers.WrapPacketWriter, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
62
-	defer wait.Done()
63
-
64
-	for {
65
-		hacks.SimpleAck = false
66
-		hacks.QuickAck = false
67
-
68
-		packet, err := src.Read()
69
-		if err != nil {
70
-			return
71
-		}
72
-		if _, err = dst.Write(packet); err != nil {
73
-			return
74
-		}
75
-	}
76
-}

+ 90
- 16
proxy/proxy.go Vedi File

@@ -10,14 +10,17 @@ import (
10 10
 	uuid "github.com/satori/go.uuid"
11 11
 	"go.uber.org/zap"
12 12
 
13
+	"github.com/9seconds/mtg/client"
13 14
 	"github.com/9seconds/mtg/config"
15
+	"github.com/9seconds/mtg/mtproto"
16
+	"github.com/9seconds/mtg/telegram"
17
+	"github.com/9seconds/mtg/wrappers"
14 18
 )
15 19
 
16
-type proxyAcceptCallback func(context.Context, context.CancelFunc, net.Conn, string, *sync.WaitGroup, *config.Config) (io.Closer, io.Closer, error)
17
-
18 20
 type Proxy struct {
19
-	conf           *config.Config
20
-	acceptCallback proxyAcceptCallback
21
+	clientInit client.Init
22
+	tg         telegram.Telegram
23
+	conf       *config.Config
21 24
 }
22 25
 
23 26
 func (p *Proxy) Serve() error {
@@ -50,20 +53,33 @@ func (p *Proxy) accept(conn net.Conn) {
50 53
 	log.Infow("Client connected", "addr", conn.RemoteAddr())
51 54
 
52 55
 	ctx, cancel := context.WithCancel(context.Background())
53
-	wait := &sync.WaitGroup{}
56
+	client, opts, err := p.clientInit(ctx, cancel, conn, connID, p.conf)
57
+	if err != nil {
58
+		log.Errorw("Cannot initialize client connection", "error", err)
59
+		return
60
+	}
61
+	defer client.(wrappers.WrapCloser).Close()
54 62
 
55
-	client, server, err := p.acceptCallback(ctx, cancel, conn, connID, wait, p.conf)
56
-	defer func() {
57
-		if client != nil {
58
-			client.Close()
59
-		}
60
-		if server != nil {
61
-			server.Close()
62
-		}
63
-	}()
63
+	server, err := p.getTelegramConn(ctx, cancel, opts, connID)
64 64
 	if err != nil {
65
-		log.Errorw("Cannot initialize connection", "error", err)
66
-		cancel()
65
+		log.Errorw("Cannot initialize server connection", "error", err)
66
+		return
67
+	}
68
+	defer server.(wrappers.WrapCloser).Close()
69
+
70
+	wait := &sync.WaitGroup{}
71
+	wait.Add(2)
72
+
73
+	if p.conf.UseMiddleProxy() {
74
+		clientPacket := client.(wrappers.WrapPacketReadWriteCloser)
75
+		serverPacket := server.(wrappers.WrapPacketReadWriteCloser)
76
+		go p.middlePipe(clientPacket, serverPacket, wait, &opts.ReadHacks)
77
+		go p.middlePipe(serverPacket, clientPacket, wait, &opts.WriteHacks)
78
+	} else {
79
+		clientStream := client.(wrappers.WrapStreamReadWriteCloser)
80
+		serverStream := server.(wrappers.WrapStreamReadWriteCloser)
81
+		go p.directPipe(clientStream, serverStream, wait)
82
+		go p.directPipe(serverStream, clientStream, wait)
67 83
 	}
68 84
 
69 85
 	<-ctx.Done()
@@ -71,3 +87,61 @@ func (p *Proxy) accept(conn net.Conn) {
71 87
 
72 88
 	log.Infow("Client disconnected", "addr", conn.RemoteAddr())
73 89
 }
90
+
91
+func (p *Proxy) getTelegramConn(ctx context.Context, cancel context.CancelFunc, opts *mtproto.ConnectionOpts,
92
+	connID string) (wrappers.Wrap, error) {
93
+	streamConn, err := p.tg.Dial(connID, opts)
94
+	if err != nil {
95
+		return nil, errors.Annotate(err, "Cannot dial to Telegram")
96
+	}
97
+	streamConn = wrappers.NewCtx(ctx, cancel, streamConn)
98
+
99
+	packetConn, err := p.tg.Init(opts, streamConn)
100
+	if err != nil {
101
+		return nil, errors.Annotate(err, "Cannot handshake telegram")
102
+	}
103
+
104
+	return packetConn, nil
105
+}
106
+
107
+func (p *Proxy) middlePipe(src wrappers.WrapPacketReader, dst wrappers.WrapPacketWriter, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
108
+	defer wait.Done()
109
+
110
+	for {
111
+		hacks.SimpleAck = false
112
+		hacks.QuickAck = false
113
+
114
+		packet, err := src.Read()
115
+		if err != nil {
116
+			return
117
+		}
118
+		if _, err = dst.Write(packet); err != nil {
119
+			return
120
+		}
121
+	}
122
+}
123
+
124
+func (p *Proxy) directPipe(src io.Reader, dst io.Writer, wait *sync.WaitGroup) {
125
+	defer wait.Done()
126
+	io.Copy(dst, src)
127
+
128
+}
129
+
130
+func NewProxy(conf *config.Config) *Proxy {
131
+	var clientInit client.Init
132
+	var tg telegram.Telegram
133
+
134
+	if conf.UseMiddleProxy() {
135
+		clientInit = client.MiddleInit
136
+		tg = telegram.NewMiddleTelegram(conf)
137
+	} else {
138
+		clientInit = client.DirectInit
139
+		tg = telegram.NewDirectTelegram(conf)
140
+	}
141
+
142
+	return &Proxy{
143
+		conf:       conf,
144
+		clientInit: clientInit,
145
+		tg:         tg,
146
+	}
147
+}

+ 2
- 2
telegram/direct.go Vedi File

@@ -43,7 +43,7 @@ func (t *DirectTelegram) Dial(connID string, connOpts *mtproto.ConnectionOpts) (
43 43
 	return t.baseTelegram.dial(dc-1, connID, connOpts.ConnectionProto)
44 44
 }
45 45
 
46
-func (t *DirectTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.WrapStreamReadWriteCloser) (wrappers.WrapStreamReadWriteCloser, error) {
46
+func (t *DirectTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.WrapStreamReadWriteCloser) (wrappers.Wrap, error) {
47 47
 	obfs2, frame := obfuscated2.MakeTelegramObfuscated2Frame(connOpts)
48 48
 
49 49
 	if _, err := conn.Write(frame); err != nil {
@@ -55,7 +55,7 @@ func (t *DirectTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.Wr
55 55
 
56 56
 // NewDirectTelegram returns Telegram instance which connects directly
57 57
 // to Telegram bypassing middleproxies.
58
-func NewDirectTelegram(conf *config.Config) *DirectTelegram {
58
+func NewDirectTelegram(conf *config.Config) Telegram {
59 59
 	return &DirectTelegram{baseTelegram{
60 60
 		dialer: tgDialer{
61 61
 			Dialer: net.Dialer{Timeout: telegramDialTimeout},

+ 26
- 26
telegram/middle.go Vedi File

@@ -19,32 +19,7 @@ type MiddleTelegram struct {
19 19
 	conf *config.Config
20 20
 }
21 21
 
22
-func NewMiddleTelegram(conf *config.Config) *MiddleTelegram {
23
-	tg := &MiddleTelegram{
24
-		middleTelegramCaller: middleTelegramCaller{
25
-			baseTelegram: baseTelegram{
26
-				dialer: tgDialer{
27
-					Dialer: net.Dialer{Timeout: telegramDialTimeout},
28
-					conf:   conf,
29
-				},
30
-			},
31
-			httpClient: &http.Client{
32
-				Timeout: middleTelegramHTTPClientTimeout,
33
-			},
34
-			dialerMutex: &sync.RWMutex{},
35
-		},
36
-		conf: conf,
37
-	}
38
-
39
-	if err := tg.update(); err != nil {
40
-		panic(err)
41
-	}
42
-	go tg.autoUpdate()
43
-
44
-	return tg
45
-}
46
-
47
-func (t *MiddleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.WrapStreamReadWriteCloser) (wrappers.WrapPacketReadWriteCloser, error) {
22
+func (t *MiddleTelegram) Init(connOpts *mtproto.ConnectionOpts, conn wrappers.WrapStreamReadWriteCloser) (wrappers.Wrap, error) {
48 23
 	rpcNonceConn := wrappers.NewMTProtoFrame(conn, rpc.SeqNoNonce)
49 24
 
50 25
 	rpcNonceReq, err := t.sendRPCNonceRequest(rpcNonceConn)
@@ -125,3 +100,28 @@ func (t *MiddleTelegram) receiveRPCHandshakeResponse(conn wrappers.WrapPacketRea
125 100
 
126 101
 	return rpcHandshakeResp, nil
127 102
 }
103
+
104
+func NewMiddleTelegram(conf *config.Config) Telegram {
105
+	tg := &MiddleTelegram{
106
+		middleTelegramCaller: middleTelegramCaller{
107
+			baseTelegram: baseTelegram{
108
+				dialer: tgDialer{
109
+					Dialer: net.Dialer{Timeout: telegramDialTimeout},
110
+					conf:   conf,
111
+				},
112
+			},
113
+			httpClient: &http.Client{
114
+				Timeout: middleTelegramHTTPClientTimeout,
115
+			},
116
+			dialerMutex: &sync.RWMutex{},
117
+		},
118
+		conf: conf,
119
+	}
120
+
121
+	if err := tg.update(); err != nil {
122
+		panic(err)
123
+	}
124
+	go tg.autoUpdate()
125
+
126
+	return tg
127
+}

+ 5
- 0
telegram/telegram.go Vedi File

@@ -9,6 +9,11 @@ import (
9 9
 	"github.com/9seconds/mtg/wrappers"
10 10
 )
11 11
 
12
+type Telegram interface {
13
+	Dial(string, *mtproto.ConnectionOpts) (wrappers.WrapStreamReadWriteCloser, error)
14
+	Init(*mtproto.ConnectionOpts, wrappers.WrapStreamReadWriteCloser) (wrappers.Wrap, error)
15
+}
16
+
12 17
 type baseTelegram struct {
13 18
 	dialer tgDialer
14 19
 

+ 5
- 0
wrappers/wrap.go Vedi File

@@ -20,6 +20,11 @@ type WrapWriter interface {
20 20
 	Wrap
21 21
 }
22 22
 
23
+type WrapCloser interface {
24
+	io.Closer
25
+	Wrap
26
+}
27
+
23 28
 type WrapWriteCloser interface {
24 29
 	io.Closer
25 30
 	WrapWriter

Loading…
Annulla
Salva