Просмотр исходного кода

Wire things altogether

tags/1.0^2
9seconds 6 лет назад
Родитель
Сommit
828627b0d8
3 измененных файлов: 105 добавлений и 43 удалений
  1. 43
    0
      proxy/direct.go
  2. 60
    0
      proxy/middle.go
  3. 2
    43
      proxy/proxy.go

+ 43
- 0
proxy/direct.go Просмотреть файл

@@ -0,0 +1,43 @@
1
+package proxy
2
+
3
+import (
4
+	"io"
5
+	"sync"
6
+
7
+	"go.uber.org/zap"
8
+
9
+	"github.com/9seconds/mtg/conntypes"
10
+	"github.com/9seconds/mtg/obfuscated2"
11
+	"github.com/9seconds/mtg/protocol"
12
+)
13
+
14
+const directPipeBufferSize = 1024 * 1024
15
+
16
+func directConnection(request *protocol.TelegramRequest) error {
17
+	telegramConnRaw, err := obfuscated2.TelegramProtocol(request)
18
+	if err != nil {
19
+		return err
20
+	}
21
+	telegramConn := telegramConnRaw.(conntypes.StreamReadWriteCloser)
22
+	defer telegramConn.Close()
23
+
24
+	wg := &sync.WaitGroup{}
25
+	wg.Add(2)
26
+
27
+	go directPipe(telegramConn, request.ClientConn, wg, request.Logger)
28
+	go directPipe(request.ClientConn, telegramConn, wg, request.Logger)
29
+
30
+	<-request.Ctx.Done()
31
+	wg.Wait()
32
+
33
+	return request.Ctx.Err()
34
+}
35
+
36
+func directPipe(dst io.Writer, src io.Reader, wg *sync.WaitGroup, logger *zap.SugaredLogger) {
37
+	defer wg.Done()
38
+
39
+	buf := make([]byte, directPipeBufferSize)
40
+	if _, err := io.CopyBuffer(dst, src, buf); err != nil {
41
+		logger.Debugw("Cannot pump sockets", "error", err)
42
+	}
43
+}

+ 60
- 0
proxy/middle.go Просмотреть файл

@@ -0,0 +1,60 @@
1
+package proxy
2
+
3
+import (
4
+	"sync"
5
+
6
+	"go.uber.org/zap"
7
+
8
+	"github.com/9seconds/mtg/conntypes"
9
+	"github.com/9seconds/mtg/protocol"
10
+	"github.com/9seconds/mtg/wrappers/packetack"
11
+)
12
+
13
+func middleConnection(request *protocol.TelegramRequest) error {
14
+	telegramConn := packetack.NewProxy(request)
15
+	defer telegramConn.Close()
16
+
17
+	var clientConn conntypes.PacketAckFullReadWriteCloser
18
+	switch request.ClientProtocol.ConnectionType() {
19
+	case conntypes.ConnectionTypeAbridged:
20
+		clientConn = packetack.NewClientAbridged(request.ClientConn)
21
+	case conntypes.ConnectionTypeIntermediate:
22
+		clientConn = packetack.NewClientIntermediate(request.ClientConn)
23
+	case conntypes.ConnectionTypeSecure:
24
+		clientConn = packetack.NewClientIntermediateSecure(request.ClientConn)
25
+	default:
26
+		panic("unknown connection type")
27
+	}
28
+
29
+	wg := &sync.WaitGroup{}
30
+	wg.Add(2)
31
+
32
+	go middlePipe(telegramConn, clientConn, wg, request.Logger)
33
+	go middlePipe(clientConn, telegramConn, wg, request.Logger)
34
+
35
+	<-request.Ctx.Done()
36
+	wg.Wait()
37
+
38
+	return request.Ctx.Err()
39
+}
40
+
41
+func middlePipe(dst conntypes.PacketAckWriter,
42
+	src conntypes.PacketAckReader,
43
+	wg *sync.WaitGroup,
44
+	logger *zap.SugaredLogger) {
45
+	defer wg.Done()
46
+
47
+	for {
48
+		acks := conntypes.ConnectionAcks{}
49
+		packet, err := src.Read(&acks)
50
+		if err != nil {
51
+			logger.Debugw("Cannot read packet", "error", err)
52
+			return
53
+		}
54
+
55
+		if err = dst.Write(packet, &acks); err != nil {
56
+			logger.Debugw("Cannot send packet", "error", err)
57
+			return
58
+		}
59
+	}
60
+}

+ 2
- 43
proxy/proxy.go Просмотреть файл

@@ -2,23 +2,18 @@ package proxy
2 2
 
3 3
 import (
4 4
 	"context"
5
-	"io"
6 5
 	"net"
7
-	"sync"
8 6
 
9 7
 	"go.uber.org/zap"
10 8
 
11 9
 	"github.com/9seconds/mtg/config"
12 10
 	"github.com/9seconds/mtg/conntypes"
13
-	"github.com/9seconds/mtg/obfuscated2"
14 11
 	"github.com/9seconds/mtg/protocol"
15 12
 	"github.com/9seconds/mtg/stats"
16 13
 	"github.com/9seconds/mtg/utils"
17 14
 	"github.com/9seconds/mtg/wrappers/stream"
18 15
 )
19 16
 
20
-const directPipeBufferSize = 1024 * 1024
21
-
22 17
 type Proxy struct {
23 18
 	Logger              *zap.SugaredLogger
24 19
 	Context             context.Context
@@ -89,46 +84,10 @@ func (p *Proxy) accept(conn net.Conn) {
89 84
 	}
90 85
 
91 86
 	if len(config.C.AdTag) > 0 {
92
-		err = p.acceptMiddleProxyConnection(req)
87
+		err = middleConnection(req)
93 88
 	} else {
94
-		err = p.acceptDirectConnection(req)
89
+		err = directConnection(req)
95 90
 	}
96 91
 
97 92
 	logger.Infow("Client disconnected", "error", err, "addr", conn.RemoteAddr())
98 93
 }
99
-
100
-func (p *Proxy) acceptDirectConnection(request *protocol.TelegramRequest) error {
101
-	telegramConnRaw, err := obfuscated2.TelegramProtocol(request)
102
-	if err != nil {
103
-		return err
104
-	}
105
-	telegramConn := telegramConnRaw.(conntypes.StreamReadWriteCloser)
106
-	defer telegramConn.Close()
107
-
108
-	wg := &sync.WaitGroup{}
109
-	wg.Add(2)
110
-
111
-	go p.directPipe(telegramConn, request.ClientConn, wg, request.Logger)
112
-	go p.directPipe(request.ClientConn, telegramConn, wg, request.Logger)
113
-
114
-	<-request.Ctx.Done()
115
-	wg.Wait()
116
-
117
-	return request.Ctx.Err()
118
-}
119
-
120
-func (p *Proxy) directPipe(dst io.Writer,
121
-	src io.Reader,
122
-	wg *sync.WaitGroup,
123
-	logger *zap.SugaredLogger) {
124
-	defer wg.Done()
125
-
126
-	buf := make([]byte, directPipeBufferSize)
127
-	if _, err := io.CopyBuffer(dst, src, buf); err != nil {
128
-		logger.Debugw("Cannot pump sockets", "error", err)
129
-	}
130
-}
131
-
132
-func (p *Proxy) acceptMiddleProxyConnection(request *protocol.TelegramRequest) error {
133
-	return nil
134
-}

Загрузка…
Отмена
Сохранить