Kaynağa Gözat

Merge pull request #230 from 9seconds/simplify-sockopts

Simplify sockopts
tags/v2.1.3^2
Sergey Arkhipov 4 yıl önce
ebeveyn
işleme
3f8f96b91f
No account linked to committer's email address
49 değiştirilmiş dosya ile 478 ekleme ve 290 silme
  1. 28
    0
      essentials/conns.go
  2. 6
    0
      essentials/doc.go
  3. 3
    1
      example.config.toml
  4. 6
    1
      go.mod
  5. 8
    0
      go.sum
  6. 2
    1
      internal/cli/access.go
  7. 2
    4
      internal/cli/run_proxy.go
  8. 1
    5
      internal/cli/simple_run.go
  9. 0
    1
      internal/config/config.go
  10. 0
    1
      internal/config/parse.go
  11. 6
    6
      internal/testlib/mtglib_network_mock.go
  12. 17
    9
      internal/testlib/net_conn_mock.go
  13. 2
    5
      internal/utils/net_listener.go
  14. 5
    4
      mtglib/conns.go
  15. 3
    3
      mtglib/conns_internal_test.go
  16. 7
    3
      mtglib/init.go
  17. 2
    2
      mtglib/internal/faketls/conn.go
  18. 1
    1
      mtglib/internal/faketls/conn_test.go
  19. 1
    1
      mtglib/internal/obfuscated2/client_handshake_test.go
  20. 3
    2
      mtglib/internal/obfuscated2/conn.go
  21. 2
    2
      mtglib/internal/obfuscated2/server_handshake_test.go
  22. 0
    19
      mtglib/internal/relay/conn.go
  23. 3
    4
      mtglib/internal/relay/init.go
  24. 21
    22
      mtglib/internal/relay/pools.go
  25. 26
    20
      mtglib/internal/relay/relay.go
  26. 11
    7
      mtglib/internal/relay/relay_test.go
  27. 77
    0
      mtglib/internal/relay/sync_pair.go
  28. 0
    22
      mtglib/internal/relay/timeouts.go
  29. 0
    37
      mtglib/internal/relay/timeouts_internal_test.go
  30. 3
    2
      mtglib/internal/telegram/init.go
  31. 4
    3
      mtglib/internal/telegram/telegram.go
  32. 3
    6
      mtglib/proxy.go
  33. 2
    8
      mtglib/proxy_opts.go
  34. 5
    3
      mtglib/stream_context.go
  35. 3
    3
      mtglib/stream_context_internal_test.go
  36. 7
    5
      network/circuit_breaker.go
  37. 2
    2
      network/circuit_breaker_internal_test.go
  38. 10
    16
      network/default.go
  39. 0
    5
      network/default_test.go
  40. 10
    3
      network/init.go
  41. 5
    5
      network/init_internal_test.go
  42. 8
    5
      network/init_test.go
  43. 4
    3
      network/load_balanced_socks5.go
  44. 10
    6
      network/network.go
  45. 10
    24
      network/sockopts.go
  46. 1
    1
      network/sockopts_unix.go
  47. 1
    1
      network/sockopts_windows.go
  48. 146
    5
      network/socks5.go
  49. 1
    1
      network/socks5_test.go

+ 28
- 0
essentials/conns.go Dosyayı Görüntüle

@@ -0,0 +1,28 @@
1
+package essentials
2
+
3
+import (
4
+	"io"
5
+	"net"
6
+)
7
+
8
+// CloseableReader is a reader interface that can close its reading end.
9
+type CloseableReader interface {
10
+	io.Reader
11
+
12
+	CloseRead() error
13
+}
14
+
15
+// CloseableWriter is a writer that can close its writing end.
16
+type CloseableWriter interface {
17
+	io.Writer
18
+
19
+	CloseWrite() error
20
+}
21
+
22
+// Conn is an extension of net.Conn that can close its ends. This mostly
23
+// implies TCP connections.
24
+type Conn interface {
25
+	net.Conn
26
+	CloseableReader
27
+	CloseableWriter
28
+}

+ 6
- 0
essentials/doc.go Dosyayı Görüntüle

@@ -0,0 +1,6 @@
1
+// This is a minimal package that contains _essentials_ of mtglib and its
2
+// complimentary packages. This is mostly required to comply some interfaces
3
+// between mtglib and its internals to avoid circular dependencies.
4
+//
5
+// This package should contain only bare minimum and mostly technical.
6
+package essentials

+ 3
- 1
example.config.toml Dosyayı Görüntüle

@@ -30,7 +30,9 @@ concurrency = 8192
30 30
 # A size of user-space buffer for TCP to use. Since we do 2 connections,
31 31
 # then we have tcp-buffer * (4 + 2) per each connection: read/write for
32 32
 # each connection + 2 copy buffers to pump the data between sockets.
33
-tcp-buffer = "4kb"
33
+#
34
+# Deprecated: this setting is no longer makes any effect.
35
+# tcp-buffer = "4kb"
34 36
 
35 37
 # Sometimes you want to enforce mtg to use some types of
36 38
 # IP connectivity to Telegram. We have 4 modes:

+ 6
- 1
go.mod Dosyayı Görüntüle

@@ -25,11 +25,13 @@ require (
25 25
 	github.com/stretchr/testify v1.7.0
26 26
 	github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43
27 27
 	golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
28
-	golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b
28
+	golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b // indirect
29 29
 	golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef
30 30
 	google.golang.org/protobuf v1.27.1 // indirect
31 31
 )
32 32
 
33
+require github.com/txthinking/socks5 v0.0.0-20211121111206-e03c1217a50b
34
+
33 35
 require (
34 36
 	github.com/beorn7/perks v1.0.1 // indirect
35 37
 	github.com/cenkalti/backoff/v4 v4.1.0 // indirect
@@ -38,9 +40,12 @@ require (
38 40
 	github.com/gotd/ige v0.1.5 // indirect
39 41
 	github.com/gotd/xor v0.1.1 // indirect
40 42
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
43
+	github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
41 44
 	github.com/pkg/errors v0.9.1 // indirect
42 45
 	github.com/pmezard/go-difflib v1.0.0 // indirect
43 46
 	github.com/prometheus/client_model v0.2.0 // indirect
47
+	github.com/txthinking/runnergroup v0.0.0-20210608031112-152c7c4432bf // indirect
48
+	github.com/txthinking/x v0.0.0-20210326105829-476fab902fbe // indirect
44 49
 	go.uber.org/atomic v1.7.0 // indirect
45 50
 	go.uber.org/multierr v1.6.0 // indirect
46 51
 	go.uber.org/zap v1.16.0 // indirect

+ 8
- 0
go.sum Dosyayı Görüntüle

@@ -195,6 +195,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
195 195
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
196 196
 github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ=
197 197
 github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
198
+github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
199
+github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
198 200
 github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
199 201
 github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
200 202
 github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@@ -250,6 +252,12 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
250 252
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
251 253
 github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
252 254
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
255
+github.com/txthinking/runnergroup v0.0.0-20210608031112-152c7c4432bf h1:7PflaKRtU4np/epFxRXlFhlzLXZzKFrH5/I4so5Ove0=
256
+github.com/txthinking/runnergroup v0.0.0-20210608031112-152c7c4432bf/go.mod h1:CLUSJbazqETbaR+i0YAhXBICV9TrKH93pziccMhmhpM=
257
+github.com/txthinking/socks5 v0.0.0-20211121111206-e03c1217a50b h1:6J/38A0Xmdnjacfie0Udams7OP/GdoExyTipKwuQWjY=
258
+github.com/txthinking/socks5 v0.0.0-20211121111206-e03c1217a50b/go.mod h1:7NloQcrxaZYKURWph5HLxVDlIwMHJXCPkeWPtpftsIg=
259
+github.com/txthinking/x v0.0.0-20210326105829-476fab902fbe h1:gMWxZxBFRAXqoGkwkYlPX2zvyyKNWJpxOxCrjqJkm5A=
260
+github.com/txthinking/x v0.0.0-20210326105829-476fab902fbe/go.mod h1:WgqbSEmUYSjEV3B1qmee/PpP2NYEz4bL9/+mF1ma+s4=
253 261
 github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43 h1:QEePdg0ty2r0t1+qwfZmQ4OOl/MB2UXIeJSpIZv56lg=
254 262
 github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43/go.mod h1:OYRfF6eb5wY9VRFkXJH8FFBi3plw2v+giaIu7P054pM=
255 263
 github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

+ 2
- 1
internal/cli/access.go Dosyayı Görüntüle

@@ -13,6 +13,7 @@ import (
13 13
 	"strings"
14 14
 	"sync"
15 15
 
16
+	"github.com/9seconds/mtg/v2/essentials"
16 17
 	"github.com/9seconds/mtg/v2/internal/config"
17 18
 	"github.com/9seconds/mtg/v2/internal/utils"
18 19
 	"github.com/9seconds/mtg/v2/mtglib"
@@ -106,7 +107,7 @@ func (a *Access) Run(cli *CLI, version string) error {
106 107
 }
107 108
 
108 109
 func (a *Access) getIP(ntw mtglib.Network, protocol string) net.IP {
109
-	client := ntw.MakeHTTPClient(func(ctx context.Context, network, address string) (net.Conn, error) {
110
+	client := ntw.MakeHTTPClient(func(ctx context.Context, network, address string) (essentials.Conn, error) {
110 111
 		return ntw.DialContext(ctx, protocol, address) // nolint: wrapcheck
111 112
 	})
112 113
 

+ 2
- 4
internal/cli/run_proxy.go Dosyayı Görüntüle

@@ -38,10 +38,9 @@ func makeNetwork(conf *config.Config, version string) (mtglib.Network, error) {
38 38
 	tcpTimeout := conf.Network.Timeout.TCP.Get(network.DefaultTimeout)
39 39
 	httpTimeout := conf.Network.Timeout.HTTP.Get(network.DefaultHTTPTimeout)
40 40
 	dohIP := conf.Network.DOHIP.Get(net.ParseIP(network.DefaultDOHHostname)).String()
41
-	bufferSize := conf.TCPBuffer.Get(network.DefaultBufferSize)
42 41
 	userAgent := "mtg/" + version
43 42
 
44
-	baseDialer, err := network.NewDefaultDialer(tcpTimeout, int(bufferSize))
43
+	baseDialer, err := network.NewDefaultDialer(tcpTimeout, 0)
45 44
 	if err != nil {
46 45
 		return nil, fmt.Errorf("cannot build a default dialer: %w", err)
47 46
 	}
@@ -193,7 +192,6 @@ func runProxy(conf *config.Config, version string) error { // nolint: funlen
193 192
 		EventStream:     eventStream,
194 193
 
195 194
 		Secret:             conf.Secret,
196
-		BufferSize:         conf.TCPBuffer.Get(mtglib.DefaultBufferSize),
197 195
 		DomainFrontingPort: conf.DomainFrontingPort.Get(mtglib.DefaultDomainFrontingPort),
198 196
 		PreferIP:           conf.PreferIP.Get(mtglib.DefaultPreferIP),
199 197
 
@@ -206,7 +204,7 @@ func runProxy(conf *config.Config, version string) error { // nolint: funlen
206 204
 		return fmt.Errorf("cannot create a proxy: %w", err)
207 205
 	}
208 206
 
209
-	listener, err := utils.NewListener(conf.BindTo.Get(""), int(opts.BufferSize))
207
+	listener, err := utils.NewListener(conf.BindTo.Get(""), 0)
210 208
 	if err != nil {
211 209
 		return fmt.Errorf("cannot start proxy: %w", err)
212 210
 	}

+ 1
- 5
internal/cli/simple_run.go Dosyayı Görüntüle

@@ -15,7 +15,7 @@ type SimpleRun struct {
15 15
 
16 16
 	Debug               bool          `kong:"name='debug',short='d',help='Run in debug mode.'"`                                                                        // nolint: lll
17 17
 	Concurrency         uint64        `kong:"name='concurrency',short='c',default='8192',help='Max number of concurrent connection to proxy.'"`                        // nolint: lll
18
-	TCPBuffer           string        `kong:"name='tcp-buffer',short='b',default='4KB',help='Size of TCP buffer to use.'"`                                             // nolint: lll
18
+	TCPBuffer           string        `kong:"name='tcp-buffer',short='b',default='4KB',help='Deprecated and ignored'"`                                                 // nolint: lll
19 19
 	PreferIP            string        `kong:"name='prefer-ip',short='i',default='prefer-ipv6',help='IP preference. By default we prefer IPv6 with fallback to IPv4.'"` // nolint: lll
20 20
 	DomainFrontingPort  uint64        `kong:"name='domain-fronting-port',short='p',default='443',help='A port to access for domain fronting.'"`                        // nolint: lll
21 21
 	DOHIP               net.IP        `kong:"name='doh-ip',short='n',default='9.9.9.9',help='IP address of DNS-over-HTTP to use.'"`                                    // nolint: lll
@@ -38,10 +38,6 @@ func (s *SimpleRun) Run(cli *CLI, version string) error { // nolint: cyclop
38 38
 		return fmt.Errorf("incorrect concurrency: %w", err)
39 39
 	}
40 40
 
41
-	if err := conf.TCPBuffer.Set(s.TCPBuffer); err != nil {
42
-		return fmt.Errorf("incorrect tcp-buffer: %w", err)
43
-	}
44
-
45 41
 	if err := conf.PreferIP.Set(s.PreferIP); err != nil {
46 42
 		return fmt.Errorf("incorrect prefer-ip: %w", err)
47 43
 	}

+ 0
- 1
internal/config/config.go Dosyayı Görüntüle

@@ -25,7 +25,6 @@ type Config struct {
25 25
 	AllowFallbackOnUnknownDC TypeBool        `json:"allowFallbackOnUnknownDc"`
26 26
 	Secret                   mtglib.Secret   `json:"secret"`
27 27
 	BindTo                   TypeHostPort    `json:"bindTo"`
28
-	TCPBuffer                TypeBytes       `json:"tcpBuffer"`
29 28
 	PreferIP                 TypePreferIP    `json:"preferIp"`
30 29
 	DomainFrontingPort       TypePort        `json:"domainFrontingPort"`
31 30
 	TolerateTimeSkewness     TypeDuration    `json:"tolerateTimeSkewness"`

+ 0
- 1
internal/config/parse.go Dosyayı Görüntüle

@@ -13,7 +13,6 @@ type tomlConfig struct {
13 13
 	AllowFallbackOnUnknownDC bool   `toml:"allow-fallback-on-unknown-dc" json:"allowFallbackOnUnknownDc,omitempty"`
14 14
 	Secret                   string `toml:"secret" json:"secret"`
15 15
 	BindTo                   string `toml:"bind-to" json:"bindTo"`
16
-	TCPBuffer                string `toml:"tcp-buffer" json:"tcpBuffer,omitempty"`
17 16
 	PreferIP                 string `toml:"prefer-ip" json:"preferIp,omitempty"`
18 17
 	DomainFrontingPort       uint   `toml:"domain-fronting-port" json:"domainFrontingPort,omitempty"`
19 18
 	TolerateTimeSkewness     string `toml:"tolerate-time-skewness" json:"tolerateTimeSkewness,omitempty"`

+ 6
- 6
internal/testlib/mtglib_network_mock.go Dosyayı Görüntüle

@@ -2,9 +2,9 @@ package testlib
2 2
 
3 3
 import (
4 4
 	"context"
5
-	"net"
6 5
 	"net/http"
7 6
 
7
+	"github.com/9seconds/mtg/v2/essentials"
8 8
 	"github.com/stretchr/testify/mock"
9 9
 )
10 10
 
@@ -12,19 +12,19 @@ type MtglibNetworkMock struct {
12 12
 	mock.Mock
13 13
 }
14 14
 
15
-func (m *MtglibNetworkMock) Dial(network, address string) (net.Conn, error) {
15
+func (m *MtglibNetworkMock) Dial(network, address string) (essentials.Conn, error) {
16 16
 	args := m.Called(network, address)
17 17
 
18
-	return args.Get(0).(net.Conn), args.Error(1) // nolint: wrapcheck
18
+	return args.Get(0).(essentials.Conn), args.Error(1) // nolint: wrapcheck
19 19
 }
20 20
 
21
-func (m *MtglibNetworkMock) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
21
+func (m *MtglibNetworkMock) DialContext(ctx context.Context, network, address string) (essentials.Conn, error) {
22 22
 	args := m.Called(ctx, network, address)
23 23
 
24
-	return args.Get(0).(net.Conn), args.Error(1) // nolint: wrapcheck
24
+	return args.Get(0).(essentials.Conn), args.Error(1) // nolint: wrapcheck
25 25
 }
26 26
 
27 27
 func (m *MtglibNetworkMock) MakeHTTPClient(dialFunc func(ctx context.Context,
28
-	network, address string) (net.Conn, error)) *http.Client {
28
+	network, address string) (essentials.Conn, error)) *http.Client {
29 29
 	return m.Called(dialFunc).Get(0).(*http.Client)
30 30
 }

+ 17
- 9
internal/testlib/net_conn_mock.go Dosyayı Görüntüle

@@ -7,42 +7,50 @@ import (
7 7
 	"github.com/stretchr/testify/mock"
8 8
 )
9 9
 
10
-type NetConnMock struct {
10
+type EssentialsConnMock struct {
11 11
 	mock.Mock
12 12
 }
13 13
 
14
-func (n *NetConnMock) Read(b []byte) (int, error) {
14
+func (n *EssentialsConnMock) Read(b []byte) (int, error) {
15 15
 	args := n.Called(b)
16 16
 
17 17
 	return args.Int(0), args.Error(1)
18 18
 }
19 19
 
20
-func (n *NetConnMock) Write(b []byte) (int, error) {
20
+func (n *EssentialsConnMock) Write(b []byte) (int, error) {
21 21
 	args := n.Called(b)
22 22
 
23 23
 	return args.Int(0), args.Error(1)
24 24
 }
25 25
 
26
-func (n *NetConnMock) Close() error {
26
+func (n *EssentialsConnMock) Close() error {
27 27
 	return n.Called().Error(0) // nolint: wrapcheck
28 28
 }
29 29
 
30
-func (n *NetConnMock) LocalAddr() net.Addr {
30
+func (n *EssentialsConnMock) CloseRead() error {
31
+	return n.Called().Error(0) // nolint: wrapcheck
32
+}
33
+
34
+func (n *EssentialsConnMock) CloseWrite() error {
35
+	return n.Called().Error(0) // nolint: wrapcheck
36
+}
37
+
38
+func (n *EssentialsConnMock) LocalAddr() net.Addr {
31 39
 	return n.Called().Get(0).(net.Addr)
32 40
 }
33 41
 
34
-func (n *NetConnMock) RemoteAddr() net.Addr {
42
+func (n *EssentialsConnMock) RemoteAddr() net.Addr {
35 43
 	return n.Called().Get(0).(net.Addr)
36 44
 }
37 45
 
38
-func (n *NetConnMock) SetDeadline(t time.Time) error {
46
+func (n *EssentialsConnMock) SetDeadline(t time.Time) error {
39 47
 	return n.Called(t).Error(0) // nolint: wrapcheck
40 48
 }
41 49
 
42
-func (n *NetConnMock) SetReadDeadline(t time.Time) error {
50
+func (n *EssentialsConnMock) SetReadDeadline(t time.Time) error {
43 51
 	return n.Called(t).Error(0) // nolint: wrapcheck
44 52
 }
45 53
 
46
-func (n *NetConnMock) SetWriteDeadline(t time.Time) error {
54
+func (n *EssentialsConnMock) SetWriteDeadline(t time.Time) error {
47 55
 	return n.Called(t).Error(0) // nolint: wrapcheck
48 56
 }

+ 2
- 5
internal/utils/net_listener.go Dosyayı Görüntüle

@@ -9,8 +9,6 @@ import (
9 9
 
10 10
 type Listener struct {
11 11
 	net.Listener
12
-
13
-	bufferSize int
14 12
 }
15 13
 
16 14
 func (l Listener) Accept() (net.Conn, error) {
@@ -19,7 +17,7 @@ func (l Listener) Accept() (net.Conn, error) {
19 17
 		return nil, err // nolint: wrapcheck
20 18
 	}
21 19
 
22
-	if err := network.SetClientSocketOptions(conn, l.bufferSize); err != nil {
20
+	if err := network.SetClientSocketOptions(conn, 0); err != nil {
23 21
 		conn.Close()
24 22
 
25 23
 		return nil, fmt.Errorf("cannot set TCP options: %w", err)
@@ -35,7 +33,6 @@ func NewListener(bindTo string, bufferSize int) (net.Listener, error) {
35 33
 	}
36 34
 
37 35
 	return Listener{
38
-		Listener:   base,
39
-		bufferSize: bufferSize,
36
+		Listener: base,
40 37
 	}, nil
41 38
 }

+ 5
- 4
mtglib/conns.go Dosyayı Görüntüle

@@ -4,12 +4,13 @@ import (
4 4
 	"bytes"
5 5
 	"context"
6 6
 	"io"
7
-	"net"
8 7
 	"sync"
8
+
9
+	"github.com/9seconds/mtg/v2/essentials"
9 10
 )
10 11
 
11 12
 type connTraffic struct {
12
-	net.Conn
13
+	essentials.Conn
13 14
 
14 15
 	streamID string
15 16
 	stream   EventStream
@@ -37,7 +38,7 @@ func (c connTraffic) Write(b []byte) (int, error) {
37 38
 }
38 39
 
39 40
 type connRewind struct {
40
-	net.Conn
41
+	essentials.Conn
41 42
 
42 43
 	active io.Reader
43 44
 	buf    bytes.Buffer
@@ -58,7 +59,7 @@ func (c *connRewind) Rewind() {
58 59
 	c.active = io.MultiReader(&c.buf, c.Conn)
59 60
 }
60 61
 
61
-func newConnRewind(conn net.Conn) *connRewind {
62
+func newConnRewind(conn essentials.Conn) *connRewind {
62 63
 	rv := &connRewind{
63 64
 		Conn: conn,
64 65
 	}

+ 3
- 3
mtglib/conns_internal_test.go Dosyayı Görüntüle

@@ -14,7 +14,7 @@ import (
14 14
 )
15 15
 
16 16
 type ConnRewindBaseConn struct {
17
-	testlib.NetConnMock
17
+	testlib.EssentialsConnMock
18 18
 
19 19
 	readBuffer bytes.Buffer
20 20
 }
@@ -29,13 +29,13 @@ type ConnTrafficTestSuite struct {
29 29
 	suite.Suite
30 30
 
31 31
 	eventStreamMock *EventStreamMock
32
-	connMock        *testlib.NetConnMock
32
+	connMock        *testlib.EssentialsConnMock
33 33
 	conn            io.ReadWriter
34 34
 }
35 35
 
36 36
 func (suite *ConnTrafficTestSuite) SetupTest() {
37 37
 	suite.eventStreamMock = &EventStreamMock{}
38
-	suite.connMock = &testlib.NetConnMock{}
38
+	suite.connMock = &testlib.EssentialsConnMock{}
39 39
 	suite.conn = connTraffic{
40 40
 		Conn:     suite.connMock,
41 41
 		streamID: "CONNID",

+ 7
- 3
mtglib/init.go Dosyayı Görüntüle

@@ -23,6 +23,8 @@ import (
23 23
 	"net"
24 24
 	"net/http"
25 25
 	"time"
26
+
27
+	"github.com/9seconds/mtg/v2/essentials"
26 28
 )
27 29
 
28 30
 var (
@@ -61,6 +63,8 @@ const (
61 63
 	DefaultConcurrency = 4096
62 64
 
63 65
 	// DefaultBufferSize is a default size of a copy buffer.
66
+	//
67
+	// Deprecated: this setting no longer makes any effect.
64 68
 	DefaultBufferSize = 16 * 1024 // 16 kib
65 69
 
66 70
 	// DefaultDomainFrontingPort is a default port (HTTPS) to connect to in
@@ -114,16 +118,16 @@ const (
114 118
 // 3. Doing HTTP requests (for example, for FireHOL ipblocklist).
115 119
 type Network interface {
116 120
 	// Dial establishes context-free TCP connections.
117
-	Dial(network, address string) (net.Conn, error)
121
+	Dial(network, address string) (essentials.Conn, error)
118 122
 
119 123
 	// DialContext dials using a context. This is a preferrable
120 124
 	// way of establishing TCP connections.
121
-	DialContext(ctx context.Context, network, address string) (net.Conn, error)
125
+	DialContext(ctx context.Context, network, address string) (essentials.Conn, error)
122 126
 
123 127
 	// MakeHTTPClient build an HTTP client with given dial function. If
124 128
 	// nothing is provided, then DialContext of this interface is going
125 129
 	// to be used.
126
-	MakeHTTPClient(func(ctx context.Context, network, address string) (net.Conn, error)) *http.Client
130
+	MakeHTTPClient(func(ctx context.Context, network, address string) (essentials.Conn, error)) *http.Client
127 131
 }
128 132
 
129 133
 // AntiReplayCache is an interface that is used to detect replay attacks

+ 2
- 2
mtglib/internal/faketls/conn.go Dosyayı Görüntüle

@@ -4,13 +4,13 @@ import (
4 4
 	"bytes"
5 5
 	"fmt"
6 6
 	"math/rand"
7
-	"net"
8 7
 
8
+	"github.com/9seconds/mtg/v2/essentials"
9 9
 	"github.com/9seconds/mtg/v2/mtglib/internal/faketls/record"
10 10
 )
11 11
 
12 12
 type Conn struct {
13
-	net.Conn
13
+	essentials.Conn
14 14
 
15 15
 	readBuffer bytes.Buffer
16 16
 }

+ 1
- 1
mtglib/internal/faketls/conn_test.go Dosyayı Görüntüle

@@ -15,7 +15,7 @@ import (
15 15
 )
16 16
 
17 17
 type ConnMock struct {
18
-	testlib.NetConnMock
18
+	testlib.EssentialsConnMock
19 19
 
20 20
 	readBuffer  bytes.Buffer
21 21
 	writeBuffer bytes.Buffer

+ 1
- 1
mtglib/internal/obfuscated2/client_handshake_test.go Dosyayı Görüntüle

@@ -42,7 +42,7 @@ func (suite *ClientHandshakeTestSuite) TestOk() {
42 42
 			writeData := make([]byte, len(snapshot.Encrypted.Text.data))
43 43
 			readData := make([]byte, len(snapshot.Decrypted.Text.data))
44 44
 
45
-			connMock := &testlib.NetConnMock{}
45
+			connMock := &testlib.EssentialsConnMock{}
46 46
 			connMock.On("Read", mock.Anything).
47 47
 				Once().
48 48
 				Return(len(snapshot.Decrypted.Text.data), nil).

+ 3
- 2
mtglib/internal/obfuscated2/conn.go Dosyayı Görüntüle

@@ -2,11 +2,12 @@ package obfuscated2
2 2
 
3 3
 import (
4 4
 	"crypto/cipher"
5
-	"net"
5
+
6
+	"github.com/9seconds/mtg/v2/essentials"
6 7
 )
7 8
 
8 9
 type Conn struct {
9
-	net.Conn
10
+	essentials.Conn
10 11
 
11 12
 	Encryptor cipher.Stream
12 13
 	Decryptor cipher.Stream

+ 2
- 2
mtglib/internal/obfuscated2/server_handshake_test.go Dosyayı Görüntüle

@@ -16,7 +16,7 @@ import (
16 16
 type ServerHandshakeTestSuite struct {
17 17
 	suite.Suite
18 18
 
19
-	connMock  *testlib.NetConnMock
19
+	connMock  *testlib.EssentialsConnMock
20 20
 	proxyConn obfuscated2.Conn
21 21
 	encryptor cipher.Stream
22 22
 	decryptor cipher.Stream
@@ -24,7 +24,7 @@ type ServerHandshakeTestSuite struct {
24 24
 
25 25
 func (suite *ServerHandshakeTestSuite) SetupTest() {
26 26
 	buf := &bytes.Buffer{}
27
-	suite.connMock = &testlib.NetConnMock{}
27
+	suite.connMock = &testlib.EssentialsConnMock{}
28 28
 
29 29
 	encryptor, decryptor, err := obfuscated2.ServerHandshake(buf)
30 30
 	suite.NoError(err)

+ 0
- 19
mtglib/internal/relay/conn.go Dosyayı Görüntüle

@@ -1,19 +0,0 @@
1
-package relay
2
-
3
-import (
4
-	"fmt"
5
-	"net"
6
-	"time"
7
-)
8
-
9
-type conn struct {
10
-	net.Conn
11
-}
12
-
13
-func (c conn) Read(p []byte) (int, error) {
14
-	if err := c.SetReadDeadline(time.Now().Add(getTimeout())); err != nil {
15
-		return 0, fmt.Errorf("cannot set read deadline: %w", err)
16
-	}
17
-
18
-	return c.Conn.Read(p) // nolint: wrapcheck
19
-}

+ 3
- 4
mtglib/internal/relay/init.go Dosyayı Görüntüle

@@ -3,10 +3,9 @@ package relay
3 3
 import "time"
4 4
 
5 5
 const (
6
-	ConnectionTimeToLiveMin = 2 * time.Minute
7
-	ConnectionTimeToLiveMax = 10 * time.Minute
8
-	TimeoutMin              = 20 * time.Second
9
-	TimeoutMax              = time.Minute
6
+	copyBufferSize   = 64 * 1024
7
+	writerBufferSize = 128 * 1024
8
+	readTimeout      = 10 * time.Millisecond
10 9
 )
11 10
 
12 11
 type Logger interface {

+ 21
- 22
mtglib/internal/relay/pools.go Dosyayı Görüntüle

@@ -1,32 +1,31 @@
1 1
 package relay
2 2
 
3
-import "sync"
4
-
5
-type eastWest struct {
6
-	east []byte
7
-	west []byte
8
-}
9
-
10
-var eastWestPool = sync.Pool{
3
+import (
4
+	"bufio"
5
+	"io"
6
+	"net"
7
+	"sync"
8
+)
9
+
10
+var syncPairPool = sync.Pool{
11 11
 	New: func() interface{} {
12
-		return &eastWest{}
12
+		return &syncPair{
13
+			writer:  bufio.NewWriterSize(nil, writerBufferSize),
14
+			copyBuf: make([]byte, copyBufferSize),
15
+		}
13 16
 	},
14 17
 }
15 18
 
16
-func acquireEastWest(bufferSize int) *eastWest {
17
-	wanted := eastWestPool.Get().(*eastWest) // nolint: forcetypeassert
18
-
19
-	if len(wanted.east) != bufferSize {
20
-		wanted.east = make([]byte, bufferSize)
21
-	}
22
-
23
-	if len(wanted.west) != bufferSize {
24
-		wanted.west = make([]byte, bufferSize)
25
-	}
19
+func acquireSyncPair(reader net.Conn, writer io.Writer) *syncPair {
20
+	sp := syncPairPool.Get().(*syncPair) // nolint: forcetypeassert
21
+	sp.writer.Reset(writer)
22
+	sp.reader = reader
26 23
 
27
-	return wanted
24
+	return sp
28 25
 }
29 26
 
30
-func releaseEastWest(ew *eastWest) {
31
-	eastWestPool.Put(ew)
27
+func releaseSyncPair(sp *syncPair) {
28
+	sp.writer.Reset(nil)
29
+	sp.reader = nil
30
+	syncPairPool.Put(sp)
32 31
 }

+ 26
- 20
mtglib/internal/relay/relay.go Dosyayı Görüntüle

@@ -2,17 +2,18 @@ package relay
2 2
 
3 3
 import (
4 4
 	"context"
5
+	"errors"
5 6
 	"io"
6
-	"net"
7 7
 	"sync"
8
+
9
+	"github.com/9seconds/mtg/v2/essentials"
8 10
 )
9 11
 
10
-func Relay(ctx context.Context, log Logger, bufferSize int,
11
-	telegramConn net.Conn, clientConn io.ReadWriteCloser) {
12
+func Relay(ctx context.Context, log Logger, telegramConn, clientConn essentials.Conn) {
12 13
 	defer telegramConn.Close()
13 14
 	defer clientConn.Close()
14 15
 
15
-	ctx, cancel := context.WithTimeout(ctx, getConnectionTimeToLive())
16
+	ctx, cancel := context.WithCancel(ctx)
16 17
 	defer cancel()
17 18
 
18 19
 	go func() {
@@ -21,30 +22,35 @@ func Relay(ctx context.Context, log Logger, bufferSize int,
21 22
 		clientConn.Close()
22 23
 	}()
23 24
 
24
-	buffers := acquireEastWest(bufferSize)
25
-	defer releaseEastWest(buffers)
26
-
27
-	telegramConn = conn{
28
-		Conn: telegramConn,
29
-	}
30
-
31 25
 	wg := &sync.WaitGroup{}
32 26
 	wg.Add(2) // nolint: gomnd
33 27
 
34
-	go pump(log, telegramConn, clientConn, wg, buffers.east, "east -> west")
28
+	go pump(log, telegramConn, clientConn, wg, "client -> telegram")
35 29
 
36
-	pump(log, clientConn, telegramConn, wg, buffers.west, "west -> east")
30
+	pump(log, clientConn, telegramConn, wg, "telegram -> client")
37 31
 
38 32
 	wg.Wait()
39 33
 }
40 34
 
41
-func pump(log Logger, src io.ReadCloser, dst io.WriteCloser, wg *sync.WaitGroup,
42
-	buf []byte, direction string) {
43
-	defer wg.Done()
44
-	defer src.Close()
45
-	defer dst.Close()
35
+func pump(log Logger, src, dst essentials.Conn, wg *sync.WaitGroup, direction string) {
36
+	syncer := acquireSyncPair(src, dst)
37
+
38
+	defer func() {
39
+		syncer.Flush()
40
+		releaseSyncPair(syncer)
41
+		src.CloseRead()  // nolint: errcheck
42
+		dst.CloseWrite() // nolint: errcheck
43
+		wg.Done()
44
+	}()
45
+
46
+	n, err := syncer.Sync()
46 47
 
47
-	if n, err := io.CopyBuffer(dst, src, buf); err != nil {
48
-		log.Printf("cannot pump %s (written %d bytes): %w", direction, n, err)
48
+	switch {
49
+	case err == nil:
50
+		log.Printf("%s has been finished", direction)
51
+	case errors.Is(err, io.EOF):
52
+		log.Printf("%s has been finished because of EOF. Written %d bytes", direction, n)
53
+	default:
54
+		log.Printf("%s has been finished (written %d bytes): %v", direction, n, err)
49 55
 	}
50 56
 }

+ 11
- 7
mtglib/internal/relay/relay_test.go Dosyayı Görüntüle

@@ -17,8 +17,8 @@ type RelayTestSuite struct {
17 17
 	loggerMock       relay.Logger
18 18
 	ctx              context.Context
19 19
 	ctxCancel        context.CancelFunc
20
-	telegramConnMock *testlib.NetConnMock
21
-	clientConnMock   *testlib.NetConnMock
20
+	telegramConnMock *testlib.EssentialsConnMock
21
+	clientConnMock   *testlib.EssentialsConnMock
22 22
 }
23 23
 
24 24
 func (suite *RelayTestSuite) SetupTest() {
@@ -26,8 +26,8 @@ func (suite *RelayTestSuite) SetupTest() {
26 26
 	suite.ctx = ctx
27 27
 	suite.ctxCancel = cancel
28 28
 	suite.loggerMock = &loggerMock{}
29
-	suite.telegramConnMock = &testlib.NetConnMock{}
30
-	suite.clientConnMock = &testlib.NetConnMock{}
29
+	suite.telegramConnMock = &testlib.EssentialsConnMock{}
30
+	suite.clientConnMock = &testlib.EssentialsConnMock{}
31 31
 }
32 32
 
33 33
 func (suite *RelayTestSuite) TearDownTest() {
@@ -37,17 +37,21 @@ func (suite *RelayTestSuite) TearDownTest() {
37 37
 }
38 38
 
39 39
 func (suite *RelayTestSuite) TestExit() {
40
-	suite.telegramConnMock.On("SetReadDeadline", mock.Anything).Return(nil)
41 40
 	suite.telegramConnMock.On("Close").Return(nil)
41
+	suite.telegramConnMock.On("CloseRead").Return(nil).Once()
42
+	suite.telegramConnMock.On("CloseWrite").Return(nil).Once()
42 43
 	suite.telegramConnMock.On("Read", mock.Anything).Return(10, io.EOF).Once()
43 44
 	suite.telegramConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
45
+	suite.telegramConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe()
44 46
 
45 47
 	suite.clientConnMock.On("Read", mock.Anything).Return(0, io.EOF).Once()
46 48
 	suite.clientConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
47 49
 	suite.clientConnMock.On("Close").Return(nil)
50
+	suite.clientConnMock.On("CloseRead").Return(nil).Once()
51
+	suite.clientConnMock.On("CloseWrite").Return(nil).Once()
52
+	suite.clientConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe()
48 53
 
49
-	relay.Relay(suite.ctx, suite.loggerMock, 1024,
50
-		suite.telegramConnMock, suite.clientConnMock)
54
+	relay.Relay(suite.ctx, suite.loggerMock, suite.telegramConnMock, suite.clientConnMock)
51 55
 }
52 56
 
53 57
 func TestRelay(t *testing.T) {

+ 77
- 0
mtglib/internal/relay/sync_pair.go Dosyayı Görüntüle

@@ -0,0 +1,77 @@
1
+package relay
2
+
3
+import (
4
+	"bufio"
5
+	"errors"
6
+	"fmt"
7
+	"io"
8
+	"net"
9
+	"os"
10
+	"sync"
11
+	"time"
12
+)
13
+
14
+type syncPair struct {
15
+	writer  *bufio.Writer
16
+	copyBuf []byte
17
+
18
+	mutex  sync.Mutex
19
+	reader net.Conn
20
+}
21
+
22
+func (s *syncPair) Sync() (int64, error) {
23
+	return io.CopyBuffer(s, s, s.copyBuf) // nolint: wrapcheck
24
+}
25
+
26
+func (s *syncPair) Read(p []byte) (int, error) {
27
+	n, err := s.readBlocking(p, false)
28
+
29
+	// nothing has been delivered for readTimeout time. Let's flush.
30
+	if errors.Is(err, os.ErrDeadlineExceeded) {
31
+		if err := s.Flush(); err != nil {
32
+			return 0, fmt.Errorf("cannot flush writer hand-side: %w", err)
33
+		}
34
+
35
+		return s.readBlocking(p, true)
36
+	}
37
+
38
+	return n, err
39
+}
40
+
41
+func (s *syncPair) Write(p []byte) (int, error) {
42
+	s.mutex.Lock()
43
+	defer s.mutex.Unlock()
44
+
45
+	n, err := s.writer.Write(p)
46
+
47
+	// optimization for a case when we have a small package and want to avoid a
48
+	// delay in readTimeout. In that case, we assume that peer has finished to
49
+	// sent a data it wants to send so we can flush without waiting for anything
50
+	// else.
51
+	if err == nil && n < copyBufferSize {
52
+		err = s.writer.Flush()
53
+	}
54
+
55
+	return n, err // nolint: wrapcheck
56
+}
57
+
58
+func (s *syncPair) Flush() error {
59
+	s.mutex.Lock()
60
+	defer s.mutex.Unlock()
61
+
62
+	return s.writer.Flush() // nolint: wrapcheck
63
+}
64
+
65
+func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) {
66
+	var deadline time.Time
67
+
68
+	if !blocking {
69
+		deadline = time.Now().Add(readTimeout)
70
+	}
71
+
72
+	if err := s.reader.SetReadDeadline(deadline); err != nil {
73
+		return 0, fmt.Errorf("cannot set read deadline: %w", err)
74
+	}
75
+
76
+	return s.reader.Read(p) // nolint: wrapcheck
77
+}

+ 0
- 22
mtglib/internal/relay/timeouts.go Dosyayı Görüntüle

@@ -1,22 +0,0 @@
1
-package relay
2
-
3
-import (
4
-	"math/rand"
5
-	"time"
6
-)
7
-
8
-func getConnectionTimeToLive() time.Duration {
9
-	return getTime(ConnectionTimeToLiveMin, ConnectionTimeToLiveMax)
10
-}
11
-
12
-func getTimeout() time.Duration {
13
-	return getTime(TimeoutMin, TimeoutMax)
14
-}
15
-
16
-func getTime(minDuration, maxDuration time.Duration) time.Duration {
17
-	minDurationInSeconds := int(minDuration.Seconds())
18
-	maxDurationInSeconds := int(maxDuration.Seconds())
19
-	number := minDurationInSeconds + rand.Intn(maxDurationInSeconds-minDurationInSeconds)
20
-
21
-	return time.Duration(number) * time.Second
22
-}

+ 0
- 37
mtglib/internal/relay/timeouts_internal_test.go Dosyayı Görüntüle

@@ -1,37 +0,0 @@
1
-package relay
2
-
3
-import (
4
-	"fmt"
5
-	"testing"
6
-
7
-	"github.com/stretchr/testify/suite"
8
-)
9
-
10
-type TimeoutsTestSuite struct {
11
-	suite.Suite
12
-}
13
-
14
-func (suite *TimeoutsTestSuite) TestGetConnectionTimeToLive() {
15
-	for i := 0; i < 100; i++ {
16
-		value := getConnectionTimeToLive()
17
-		message := fmt.Sprintf("generated value is %v", value)
18
-
19
-		suite.GreaterOrEqual(value, ConnectionTimeToLiveMin, message)
20
-		suite.LessOrEqual(value, ConnectionTimeToLiveMax, message)
21
-	}
22
-}
23
-
24
-func (suite *TimeoutsTestSuite) TestGetTimeout() {
25
-	for i := 0; i < 100; i++ {
26
-		value := getTimeout()
27
-		message := fmt.Sprintf("generated value is %v", value)
28
-
29
-		suite.GreaterOrEqual(value, TimeoutMin, message)
30
-		suite.LessOrEqual(value, TimeoutMax, message)
31
-	}
32
-}
33
-
34
-func TestTimeouts(t *testing.T) {
35
-	t.Parallel()
36
-	suite.Run(t, &TimeoutsTestSuite{})
37
-}

+ 3
- 2
mtglib/internal/telegram/init.go Dosyayı Görüntüle

@@ -2,7 +2,8 @@ package telegram
2 2
 
3 3
 import (
4 4
 	"context"
5
-	"net"
5
+
6
+	"github.com/9seconds/mtg/v2/essentials"
6 7
 )
7 8
 
8 9
 type preferIP uint8
@@ -82,5 +83,5 @@ var (
82 83
 )
83 84
 
84 85
 type Dialer interface {
85
-	DialContext(ctx context.Context, network, address string) (net.Conn, error)
86
+	DialContext(ctx context.Context, network, address string) (essentials.Conn, error)
86 87
 }

+ 4
- 3
mtglib/internal/telegram/telegram.go Dosyayı Görüntüle

@@ -3,8 +3,9 @@ package telegram
3 3
 import (
4 4
 	"context"
5 5
 	"fmt"
6
-	"net"
7 6
 	"strings"
7
+
8
+	"github.com/9seconds/mtg/v2/essentials"
8 9
 )
9 10
 
10 11
 type Telegram struct {
@@ -13,7 +14,7 @@ type Telegram struct {
13 14
 	pool     addressPool
14 15
 }
15 16
 
16
-func (t Telegram) Dial(ctx context.Context, dc int) (net.Conn, error) {
17
+func (t Telegram) Dial(ctx context.Context, dc int) (essentials.Conn, error) {
17 18
 	var addresses []tgAddr
18 19
 
19 20
 	switch t.preferIP {
@@ -28,7 +29,7 @@ func (t Telegram) Dial(ctx context.Context, dc int) (net.Conn, error) {
28 29
 	}
29 30
 
30 31
 	var (
31
-		conn net.Conn
32
+		conn essentials.Conn
32 33
 		err  error
33 34
 	)
34 35
 

+ 3
- 6
mtglib/proxy.go Dosyayı Görüntüle

@@ -9,6 +9,7 @@ import (
9 9
 	"sync"
10 10
 	"time"
11 11
 
12
+	"github.com/9seconds/mtg/v2/essentials"
12 13
 	"github.com/9seconds/mtg/v2/mtglib/internal/faketls"
13 14
 	"github.com/9seconds/mtg/v2/mtglib/internal/faketls/record"
14 15
 	"github.com/9seconds/mtg/v2/mtglib/internal/obfuscated2"
@@ -25,7 +26,6 @@ type Proxy struct {
25 26
 
26 27
 	allowFallbackOnUnknownDC bool
27 28
 	tolerateTimeSkewness     time.Duration
28
-	bufferSize               int
29 29
 	domainFrontingPort       int
30 30
 	workerPool               *ants.PoolWithFunc
31 31
 	telegram                 *telegram.Telegram
@@ -46,7 +46,7 @@ func (p *Proxy) DomainFrontingAddress() string {
46 46
 
47 47
 // ServeConn serves a connection. We do not check IP blocklist and
48 48
 // concurrency limit here.
49
-func (p *Proxy) ServeConn(conn net.Conn) {
49
+func (p *Proxy) ServeConn(conn essentials.Conn) {
50 50
 	p.streamWaitGroup.Add(1)
51 51
 	defer p.streamWaitGroup.Done()
52 52
 
@@ -85,7 +85,6 @@ func (p *Proxy) ServeConn(conn net.Conn) {
85 85
 	relay.Relay(
86 86
 		ctx,
87 87
 		ctx.logger.Named("relay"),
88
-		p.bufferSize,
89 88
 		ctx.telegramConn,
90 89
 		ctx.clientConn,
91 90
 	)
@@ -276,7 +275,6 @@ func (p *Proxy) doDomainFronting(ctx *streamContext, conn *connRewind) {
276 275
 	relay.Relay(
277 276
 		ctx,
278 277
 		ctx.logger.Named("domain-fronting"),
279
-		p.bufferSize,
280 278
 		frontConn,
281 279
 		conn,
282 280
 	)
@@ -306,14 +304,13 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
306 304
 		logger:                   opts.getLogger("proxy"),
307 305
 		domainFrontingPort:       opts.getDomainFrontingPort(),
308 306
 		tolerateTimeSkewness:     opts.getTolerateTimeSkewness(),
309
-		bufferSize:               opts.getBufferSize(),
310 307
 		allowFallbackOnUnknownDC: opts.AllowFallbackOnUnknownDC,
311 308
 		telegram:                 tg,
312 309
 	}
313 310
 
314 311
 	pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
315 312
 		func(arg interface{}) {
316
-			proxy.ServeConn(arg.(net.Conn))
313
+			proxy.ServeConn(arg.(essentials.Conn))
317 314
 		},
318 315
 		ants.WithLogger(opts.getLogger("ants")),
319 316
 		ants.WithNonblocking(true))

+ 2
- 8
mtglib/proxy_opts.go Dosyayı Görüntüle

@@ -50,6 +50,8 @@ type ProxyOpts struct {
50 50
 	// buffers: to and from.
51 51
 	//
52 52
 	// This is an optional setting.
53
+	//
54
+	// Deprecated: this setting is no longer makes any effect.
53 55
 	BufferSize uint
54 56
 
55 57
 	// Concurrency is a size of the worker pool for connection management.
@@ -134,14 +136,6 @@ func (p ProxyOpts) valid() error {
134 136
 	return nil
135 137
 }
136 138
 
137
-func (p ProxyOpts) getBufferSize() int {
138
-	if p.BufferSize < 1 {
139
-		return DefaultBufferSize
140
-	}
141
-
142
-	return int(p.BufferSize)
143
-}
144
-
145 139
 func (p ProxyOpts) getConcurrency() int {
146 140
 	if p.Concurrency == 0 {
147 141
 		return DefaultConcurrency

+ 5
- 3
mtglib/stream_context.go Dosyayı Görüntüle

@@ -6,13 +6,15 @@ import (
6 6
 	"encoding/base64"
7 7
 	"net"
8 8
 	"time"
9
+
10
+	"github.com/9seconds/mtg/v2/essentials"
9 11
 )
10 12
 
11 13
 type streamContext struct {
12 14
 	ctx          context.Context
13 15
 	ctxCancel    context.CancelFunc
14
-	clientConn   net.Conn
15
-	telegramConn net.Conn
16
+	clientConn   essentials.Conn
17
+	telegramConn essentials.Conn
16 18
 	streamID     string
17 19
 	dc           int
18 20
 	logger       Logger
@@ -50,7 +52,7 @@ func (s *streamContext) ClientIP() net.IP {
50 52
 	return s.clientConn.RemoteAddr().(*net.TCPAddr).IP
51 53
 }
52 54
 
53
-func newStreamContext(ctx context.Context, logger Logger, clientConn net.Conn) *streamContext {
55
+func newStreamContext(ctx context.Context, logger Logger, clientConn essentials.Conn) *streamContext {
54 56
 	connIDBytes := make([]byte, ConnectionIDBytesLength)
55 57
 
56 58
 	if _, err := rand.Read(connIDBytes); err != nil {

+ 3
- 3
mtglib/stream_context_internal_test.go Dosyayı Görüntüle

@@ -12,7 +12,7 @@ import (
12 12
 type StreamContextTestSuite struct {
13 13
 	suite.Suite
14 14
 
15
-	connMock  *testlib.NetConnMock
15
+	connMock  *testlib.EssentialsConnMock
16 16
 	logger    NoopLogger
17 17
 	ctx       *streamContext
18 18
 	ctxCancel context.CancelFunc
@@ -27,7 +27,7 @@ func (suite *StreamContextTestSuite) SetupTest() {
27 27
 	ctx = context.WithValue(ctx, "key", "value") // nolint: golint, revive, staticcheck
28 28
 
29 29
 	suite.ctxCancel = cancel
30
-	suite.connMock = &testlib.NetConnMock{}
30
+	suite.connMock = &testlib.EssentialsConnMock{}
31 31
 
32 32
 	addr := &net.TCPAddr{
33 33
 		IP:   net.ParseIP("10.0.0.10"),
@@ -73,7 +73,7 @@ func (suite *StreamContextTestSuite) TestClientIP() {
73 73
 func (suite *StreamContextTestSuite) TestClose() {
74 74
 	suite.connMock.On("Close").Once().Return(nil)
75 75
 
76
-	tgConnMock := &testlib.NetConnMock{}
76
+	tgConnMock := &testlib.EssentialsConnMock{}
77 77
 	tgConnMock.On("Close").Once().Return(nil)
78 78
 
79 79
 	suite.ctx.telegramConn = tgConnMock

+ 7
- 5
network/circuit_breaker.go Dosyayı Görüntüle

@@ -2,9 +2,10 @@ package network
2 2
 
3 3
 import (
4 4
 	"context"
5
-	"net"
6 5
 	"sync/atomic"
7 6
 	"time"
7
+
8
+	"github.com/9seconds/mtg/v2/essentials"
8 9
 )
9 10
 
10 11
 const (
@@ -30,12 +31,12 @@ type circuitBreakerDialer struct {
30 31
 	resetFailuresTimeout time.Duration
31 32
 }
32 33
 
33
-func (c *circuitBreakerDialer) Dial(network, address string) (net.Conn, error) {
34
+func (c *circuitBreakerDialer) Dial(network, address string) (essentials.Conn, error) {
34 35
 	return c.DialContext(context.Background(), network, address)
35 36
 }
36 37
 
37 38
 func (c *circuitBreakerDialer) DialContext(ctx context.Context,
38
-	network, address string) (net.Conn, error) {
39
+	network, address string) (essentials.Conn, error) {
39 40
 	switch atomic.LoadUint32(&c.state) {
40 41
 	case circuitBreakerStateClosed:
41 42
 		return c.doClosed(ctx, network, address)
@@ -47,7 +48,7 @@ func (c *circuitBreakerDialer) DialContext(ctx context.Context,
47 48
 }
48 49
 
49 50
 func (c *circuitBreakerDialer) doClosed(ctx context.Context,
50
-	network, address string) (net.Conn, error) {
51
+	network, address string) (essentials.Conn, error) {
51 52
 	conn, err := c.Dialer.DialContext(ctx, network, address)
52 53
 
53 54
 	select {
@@ -78,7 +79,8 @@ func (c *circuitBreakerDialer) doClosed(ctx context.Context,
78 79
 	return conn, err // nolint: wrapcheck
79 80
 }
80 81
 
81
-func (c *circuitBreakerDialer) doHalfOpened(ctx context.Context, network, address string) (net.Conn, error) {
82
+func (c *circuitBreakerDialer) doHalfOpened(ctx context.Context,
83
+	network, address string) (essentials.Conn, error) {
82 84
 	if !atomic.CompareAndSwapUint32(&c.halfOpenAttempts, 0, 1) {
83 85
 		return nil, ErrCircuitBreakerOpened
84 86
 	}

+ 2
- 2
network/circuit_breaker_internal_test.go Dosyayı Görüntüle

@@ -21,7 +21,7 @@ type CircuitBreakerTestSuite struct {
21 21
 	mutex          sync.Mutex
22 22
 	ctx            context.Context
23 23
 	ctxCancel      context.CancelFunc
24
-	connMock       *testlib.NetConnMock
24
+	connMock       *testlib.EssentialsConnMock
25 25
 	baseDialerMock *DialerMock
26 26
 }
27 27
 
@@ -29,7 +29,7 @@ func (suite *CircuitBreakerTestSuite) SetupTest() {
29 29
 	suite.mutex = sync.Mutex{}
30 30
 	suite.ctx, suite.ctxCancel = context.WithCancel(context.Background())
31 31
 	suite.baseDialerMock = &DialerMock{}
32
-	suite.connMock = &testlib.NetConnMock{}
32
+	suite.connMock = &testlib.EssentialsConnMock{}
33 33
 	suite.d = newCircuitBreakerDialer(suite.baseDialerMock,
34 34
 		3, 100*time.Millisecond, 50*time.Millisecond)
35 35
 }

+ 10
- 16
network/default.go Dosyayı Görüntüle

@@ -5,19 +5,19 @@ import (
5 5
 	"fmt"
6 6
 	"net"
7 7
 	"time"
8
+
9
+	"github.com/9seconds/mtg/v2/essentials"
8 10
 )
9 11
 
10 12
 type defaultDialer struct {
11 13
 	net.Dialer
12
-
13
-	bufferSize int
14 14
 }
15 15
 
16
-func (d *defaultDialer) Dial(network, address string) (net.Conn, error) {
16
+func (d *defaultDialer) Dial(network, address string) (essentials.Conn, error) {
17 17
 	return d.DialContext(context.Background(), network, address)
18 18
 }
19 19
 
20
-func (d *defaultDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
20
+func (d *defaultDialer) DialContext(ctx context.Context, network, address string) (essentials.Conn, error) {
21 21
 	switch network {
22 22
 	case "tcp", "tcp4", "tcp6": // nolint: goconst
23 23
 	default:
@@ -30,13 +30,13 @@ func (d *defaultDialer) DialContext(ctx context.Context, network, address string
30 30
 	}
31 31
 
32 32
 	// we do not need to call to end user. End users call us.
33
-	if err := SetServerSocketOptions(conn, d.bufferSize); err != nil {
33
+	if err := SetServerSocketOptions(conn, 0); err != nil {
34 34
 		conn.Close()
35 35
 
36 36
 		return nil, fmt.Errorf("cannot set socket options: %w", err)
37 37
 	}
38 38
 
39
-	return conn, nil
39
+	return conn.(essentials.Conn), nil
40 40
 }
41 41
 
42 42
 // NewDefaultDialer build a new dialer which dials bypassing proxies
@@ -44,26 +44,20 @@ func (d *defaultDialer) DialContext(ctx context.Context, network, address string
44 44
 //
45 45
 // The most default one you can imagine. But it has tunes TCP
46 46
 // connections and setups SO_REUSEPORT.
47
+//
48
+// bufferSize is deprecated and ignored. It is kept here for backward
49
+// compatibility.
47 50
 func NewDefaultDialer(timeout time.Duration, bufferSize int) (Dialer, error) {
48 51
 	switch {
49 52
 	case timeout < 0:
50 53
 		return nil, fmt.Errorf("timeout %v should be positive number", timeout)
51
-	case bufferSize < 0:
52
-		return nil, fmt.Errorf("buffer size %d should be positive number", bufferSize)
53
-	}
54
-
55
-	if timeout == 0 {
54
+	case timeout == 0:
56 55
 		timeout = DefaultTimeout
57 56
 	}
58 57
 
59
-	if bufferSize == 0 {
60
-		bufferSize = DefaultBufferSize
61
-	}
62
-
63 58
 	return &defaultDialer{
64 59
 		Dialer: net.Dialer{
65 60
 			Timeout: timeout,
66 61
 		},
67
-		bufferSize: bufferSize,
68 62
 	}, nil
69 63
 }

+ 0
- 5
network/default_test.go Dosyayı Görüntüle

@@ -30,11 +30,6 @@ func (suite *DefaultDialerTestSuite) TestNegativeTimeout() {
30 30
 	suite.Error(err)
31 31
 }
32 32
 
33
-func (suite *DefaultDialerTestSuite) TestNegativeBufferSize() {
34
-	_, err := network.NewDefaultDialer(0, -1)
35
-	suite.Error(err)
36
-}
37
-
38 33
 func (suite *DefaultDialerTestSuite) TestUnsupportedProtocol() {
39 34
 	_, err := suite.d.DialContext(context.Background(),
40 35
 		"udp",

+ 10
- 3
network/init.go Dosyayı Görüntüle

@@ -20,8 +20,9 @@ package network
20 20
 import (
21 21
 	"context"
22 22
 	"errors"
23
-	"net"
24 23
 	"time"
24
+
25
+	"github.com/9seconds/mtg/v2/essentials"
25 26
 )
26 27
 
27 28
 const (
@@ -33,10 +34,16 @@ const (
33 34
 	// request.
34 35
 	DefaultHTTPTimeout = 10 * time.Second
35 36
 
37
+	// Deprecated:
38
+	//
36 39
 	// DefaultBufferSize defines a TCP buffer size. Both read and write, so
37 40
 	// for real size, please multiply this number by 2.
38 41
 	DefaultBufferSize = 16 * 1024 // 16 kib
39 42
 
43
+	// DefaultTCPKeepAlivePeriod defines a time period between 2
44
+	// consequitive probes.
45
+	DefaultTCPKeepAlivePeriod = 10 * time.Second
46
+
40 47
 	// ProxyDialerOpenThreshold is used for load balancing SOCKS5 dialer
41 48
 	// only.
42 49
 	//
@@ -89,6 +96,6 @@ var (
89 96
 // Dialer defines an interface which is required to bootstrap a network
90 97
 // instance from.
91 98
 type Dialer interface {
92
-	Dial(network, address string) (net.Conn, error)
93
-	DialContext(ctx context.Context, network, address string) (net.Conn, error)
99
+	Dial(network, address string) (essentials.Conn, error)
100
+	DialContext(ctx context.Context, network, address string) (essentials.Conn, error)
94 101
 }

+ 5
- 5
network/init_internal_test.go Dosyayı Görüntüle

@@ -2,8 +2,8 @@ package network
2 2
 
3 3
 import (
4 4
 	"context"
5
-	"net"
6 5
 
6
+	"github.com/9seconds/mtg/v2/essentials"
7 7
 	"github.com/stretchr/testify/mock"
8 8
 )
9 9
 
@@ -11,14 +11,14 @@ type DialerMock struct {
11 11
 	mock.Mock
12 12
 }
13 13
 
14
-func (d *DialerMock) Dial(network, address string) (net.Conn, error) {
14
+func (d *DialerMock) Dial(network, address string) (essentials.Conn, error) {
15 15
 	args := d.Called(network, address)
16 16
 
17
-	return args.Get(0).(net.Conn), args.Error(1) // nolint: wrapcheck
17
+	return args.Get(0).(essentials.Conn), args.Error(1) // nolint: wrapcheck
18 18
 }
19 19
 
20
-func (d *DialerMock) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
20
+func (d *DialerMock) DialContext(ctx context.Context, network, address string) (essentials.Conn, error) {
21 21
 	args := d.Called(ctx, network, address)
22 22
 
23
-	return args.Get(0).(net.Conn), args.Error(1) // nolint: wrapcheck
23
+	return args.Get(0).(essentials.Conn), args.Error(1) // nolint: wrapcheck
24 24
 }

+ 8
- 5
network/init_test.go Dosyayı Görüntüle

@@ -8,6 +8,7 @@ import (
8 8
 	"net/url"
9 9
 	"strings"
10 10
 
11
+	"github.com/9seconds/mtg/v2/essentials"
11 12
 	"github.com/9seconds/mtg/v2/network"
12 13
 	socks5 "github.com/armon/go-socks5"
13 14
 	"github.com/mccutchen/go-httpbin/httpbin"
@@ -18,16 +19,16 @@ type DialerMock struct {
18 19
 	mock.Mock
19 20
 }
20 21
 
21
-func (d *DialerMock) Dial(network, address string) (net.Conn, error) {
22
+func (d *DialerMock) Dial(network, address string) (essentials.Conn, error) {
22 23
 	args := d.Called(network, address)
23 24
 
24
-	return args.Get(0).(net.Conn), args.Error(1) // nolint: wrapcheck
25
+	return args.Get(0).(essentials.Conn), args.Error(1) // nolint: wrapcheck
25 26
 }
26 27
 
27
-func (d *DialerMock) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
28
+func (d *DialerMock) DialContext(ctx context.Context, network, address string) (essentials.Conn, error) {
28 29
 	args := d.Called(ctx, network, address)
29 30
 
30
-	return args.Get(0).(net.Conn), args.Error(1) // nolint: wrapcheck
31
+	return args.Get(0).(essentials.Conn), args.Error(1) // nolint: wrapcheck
31 32
 }
32 33
 
33 34
 type HTTPServerTestSuite struct {
@@ -53,7 +54,9 @@ func (suite *HTTPServerTestSuite) MakeURL(path string) string {
53 54
 func (suite *HTTPServerTestSuite) MakeHTTPClient(dialer network.Dialer) *http.Client {
54 55
 	return &http.Client{
55 56
 		Transport: &http.Transport{
56
-			DialContext: dialer.DialContext,
57
+			DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
58
+				return dialer.DialContext(ctx, network, address) // nolint: wrapcheck
59
+			},
57 60
 		},
58 61
 	}
59 62
 }

+ 4
- 3
network/load_balanced_socks5.go Dosyayı Görüntüle

@@ -4,19 +4,20 @@ import (
4 4
 	"context"
5 5
 	"fmt"
6 6
 	"math/rand"
7
-	"net"
8 7
 	"net/url"
8
+
9
+	"github.com/9seconds/mtg/v2/essentials"
9 10
 )
10 11
 
11 12
 type loadBalancedSocks5Dialer struct {
12 13
 	dialers []Dialer
13 14
 }
14 15
 
15
-func (l loadBalancedSocks5Dialer) Dial(network, address string) (net.Conn, error) {
16
+func (l loadBalancedSocks5Dialer) Dial(network, address string) (essentials.Conn, error) {
16 17
 	return l.DialContext(context.Background(), network, address)
17 18
 }
18 19
 
19
-func (l loadBalancedSocks5Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
20
+func (l loadBalancedSocks5Dialer) DialContext(ctx context.Context, network, address string) (essentials.Conn, error) {
20 21
 	length := len(l.dialers)
21 22
 	start := rand.Intn(length)
22 23
 	moved := false

+ 10
- 6
network/network.go Dosyayı Görüntüle

@@ -9,6 +9,7 @@ import (
9 9
 	"sync"
10 10
 	"time"
11 11
 
12
+	"github.com/9seconds/mtg/v2/essentials"
12 13
 	"github.com/9seconds/mtg/v2/mtglib"
13 14
 )
14 15
 
@@ -30,11 +31,11 @@ type network struct {
30 31
 	dns         *dnsResolver
31 32
 }
32 33
 
33
-func (n *network) Dial(protocol, address string) (net.Conn, error) {
34
+func (n *network) Dial(protocol, address string) (essentials.Conn, error) {
34 35
 	return n.DialContext(context.Background(), protocol, address)
35 36
 }
36 37
 
37
-func (n *network) DialContext(ctx context.Context, protocol, address string) (net.Conn, error) {
38
+func (n *network) DialContext(ctx context.Context, protocol, address string) (essentials.Conn, error) {
38 39
 	host, port, _ := net.SplitHostPort(address)
39 40
 
40 41
 	ips, err := n.dnsResolve(protocol, host)
@@ -46,7 +47,8 @@ func (n *network) DialContext(ctx context.Context, protocol, address string) (ne
46 47
 		ips[i], ips[j] = ips[j], ips[i]
47 48
 	})
48 49
 
49
-	var conn net.Conn
50
+	var conn essentials.Conn
51
+
50 52
 	for _, v := range ips {
51 53
 		conn, err = n.dialer.DialContext(ctx, protocol, net.JoinHostPort(v, port))
52 54
 
@@ -59,7 +61,7 @@ func (n *network) DialContext(ctx context.Context, protocol, address string) (ne
59 61
 }
60 62
 
61 63
 func (n *network) MakeHTTPClient(dialFunc func(ctx context.Context,
62
-	network, address string) (net.Conn, error)) *http.Client {
64
+	network, address string) (essentials.Conn, error)) *http.Client {
63 65
 	if dialFunc == nil {
64 66
 		dialFunc = n.DialContext
65 67
 	}
@@ -144,13 +146,15 @@ func NewNetwork(dialer Dialer,
144 146
 
145 147
 func makeHTTPClient(userAgent string,
146 148
 	timeout time.Duration,
147
-	dialFunc func(ctx context.Context, network, address string) (net.Conn, error)) *http.Client {
149
+	dialFunc func(ctx context.Context, network, address string) (essentials.Conn, error)) *http.Client {
148 150
 	return &http.Client{
149 151
 		Timeout: timeout,
150 152
 		Transport: networkHTTPTransport{
151 153
 			userAgent: userAgent,
152 154
 			next: &http.Transport{
153
-				DialContext: dialFunc,
155
+				DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
156
+					return dialFunc(ctx, network, address)
157
+				},
154 158
 			},
155 159
 		},
156 160
 	}

+ 10
- 24
network/sockopts.go Dosyayı Görüntüle

@@ -7,39 +7,25 @@ import (
7 7
 
8 8
 // SetClientSocketOptions tunes a TCP socket that represents a connection to
9 9
 // end user (not Telegram service or fronting domain).
10
+//
11
+// bufferSize setting is deprecated and ignored.
10 12
 func SetClientSocketOptions(conn net.Conn, bufferSize int) error {
11
-	tcpConn := conn.(*net.TCPConn) // nolint: forcetypeassert
12
-
13
-	if err := tcpConn.SetNoDelay(false); err != nil {
14
-		return fmt.Errorf("cannot disable TCP_NO_DELAY: %w", err)
15
-	}
16
-
17
-	return setCommonSocketOptions(tcpConn, bufferSize)
13
+	return setCommonSocketOptions(conn.(*net.TCPConn))
18 14
 }
19 15
 
20 16
 // SetServerSocketOptions tunes a TCP socket that represents a connection to
21 17
 // remote server like Telegram or fronting domain (but not end user).
22 18
 func SetServerSocketOptions(conn net.Conn, bufferSize int) error {
23
-	tcpConn := conn.(*net.TCPConn) // nolint: forcetypeassert
24
-
25
-	if err := tcpConn.SetNoDelay(true); err != nil {
26
-		return fmt.Errorf("cannot enable TCP_NO_DELAY: %w", err)
27
-	}
28
-
29
-	return setCommonSocketOptions(tcpConn, bufferSize)
19
+	return setCommonSocketOptions(conn.(*net.TCPConn))
30 20
 }
31 21
 
32
-func setCommonSocketOptions(conn *net.TCPConn, bufferSize int) error {
33
-	if err := conn.SetReadBuffer(bufferSize); err != nil {
34
-		return fmt.Errorf("cannot set read buffer size: %w", err)
35
-	}
36
-
37
-	if err := conn.SetWriteBuffer(bufferSize); err != nil {
38
-		return fmt.Errorf("cannot set write buffer size: %w", err)
22
+func setCommonSocketOptions(conn *net.TCPConn) error {
23
+	if err := conn.SetKeepAlive(true); err != nil {
24
+		return fmt.Errorf("cannot disable TCP keepalive probes: %w", err)
39 25
 	}
40 26
 
41
-	if err := conn.SetKeepAlive(false); err != nil {
42
-		return fmt.Errorf("cannot disable TCP keepalive probes: %w", err)
27
+	if err := conn.SetKeepAlivePeriod(DefaultTCPKeepAlivePeriod); err != nil {
28
+		return fmt.Errorf("cannot set time period of TCP keepalive probes: %w", err)
43 29
 	}
44 30
 
45 31
 	if err := conn.SetLinger(tcpLingerTimeout); err != nil {
@@ -51,7 +37,7 @@ func setCommonSocketOptions(conn *net.TCPConn, bufferSize int) error {
51 37
 		return fmt.Errorf("cannot get underlying raw connection: %w", err)
52 38
 	}
53 39
 
54
-	if err := setSocketReuseAddrPort(rawConn, bufferSize); err != nil {
40
+	if err := setSocketReuseAddrPort(rawConn); err != nil {
55 41
 		return fmt.Errorf("cannot setup SO_REUSEADDR/PORT: %w", err)
56 42
 	}
57 43
 

+ 1
- 1
network/sockopts_unix.go Dosyayı Görüntüle

@@ -10,7 +10,7 @@ import (
10 10
 	"golang.org/x/sys/unix"
11 11
 )
12 12
 
13
-func setSocketReuseAddrPort(conn syscall.RawConn, bufferSize int) error {
13
+func setSocketReuseAddrPort(conn syscall.RawConn) error {
14 14
 	var err error
15 15
 
16 16
 	conn.Control(func(fd uintptr) { // nolint: errcheck

+ 1
- 1
network/sockopts_windows.go Dosyayı Görüntüle

@@ -5,6 +5,6 @@ package network
5 5
 
6 6
 import "syscall"
7 7
 
8
-func setSocketReuseAddrPort(conn syscall.RawConn, bufferSize int) error {
8
+func setSocketReuseAddrPort(conn syscall.RawConn) error {
9 9
 	return nil
10 10
 }

+ 146
- 5
network/socks5.go Dosyayı Görüntüle

@@ -1,21 +1,162 @@
1 1
 package network
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"fmt"
6
+	"io"
7
+	"net"
5 8
 	"net/url"
6 9
 
7
-	"golang.org/x/net/proxy"
10
+	"github.com/9seconds/mtg/v2/essentials"
11
+	"github.com/txthinking/socks5"
8 12
 )
9 13
 
14
+type socks5Dialer struct {
15
+	Dialer
16
+
17
+	username     []byte
18
+	password     []byte
19
+	proxyAddress string
20
+}
21
+
22
+func (s socks5Dialer) Dial(network, address string) (essentials.Conn, error) {
23
+	return s.DialContext(context.Background(), network, address)
24
+}
25
+
26
+func (s socks5Dialer) DialContext(ctx context.Context, network, address string) (essentials.Conn, error) {
27
+	switch network {
28
+	case "tcp", "tcp4", "tcp6":
29
+	default:
30
+		return nil, fmt.Errorf("%s network type is not supported", network)
31
+	}
32
+
33
+	conn, err := s.Dialer.DialContext(ctx, network, s.proxyAddress)
34
+	if err != nil {
35
+		return nil, fmt.Errorf("cannot dial to the proxy: %w", err)
36
+	}
37
+
38
+	if err := s.handshake(conn); err != nil {
39
+		conn.Close()
40
+
41
+		return nil, fmt.Errorf("cannot perform a handshake: %w", err)
42
+	}
43
+
44
+	if err := s.connect(conn, address); err != nil {
45
+		conn.Close()
46
+
47
+		return nil, fmt.Errorf("cannot connect to a destination host %s: %w", address, err)
48
+	}
49
+
50
+	return conn, nil
51
+}
52
+
53
+func (s socks5Dialer) handshake(conn io.ReadWriter) error {
54
+	authMethod := socks5.MethodUsernamePassword
55
+	if len(s.username)+len(s.password) == 0 {
56
+		authMethod = socks5.MethodNone
57
+	}
58
+
59
+	if err := s.handshakeNegotiation(conn, authMethod); err != nil {
60
+		return fmt.Errorf("cannot perform negotiation: %w", err)
61
+	}
62
+
63
+	if authMethod == socks5.MethodNone {
64
+		return nil
65
+	}
66
+
67
+	if err := s.handshakeAuth(conn); err != nil {
68
+		return fmt.Errorf("cannot authenticate: %w", err)
69
+	}
70
+
71
+	return nil
72
+}
73
+
74
+func (s socks5Dialer) handshakeNegotiation(conn io.ReadWriter, authMethod byte) error {
75
+	request := socks5.NewNegotiationRequest([]byte{authMethod})
76
+	if _, err := request.WriteTo(conn); err != nil {
77
+		return fmt.Errorf("cannot send request: %w", err)
78
+	}
79
+
80
+	response, err := socks5.NewNegotiationReplyFrom(conn)
81
+	if err != nil {
82
+		return fmt.Errorf("cannot read response: %w", err)
83
+	}
84
+
85
+	if response.Method != authMethod {
86
+		return fmt.Errorf("%v is unsupported auth method", authMethod)
87
+	}
88
+
89
+	return nil
90
+}
91
+
92
+func (s socks5Dialer) handshakeAuth(conn io.ReadWriter) error {
93
+	request := socks5.NewUserPassNegotiationRequest(s.username, s.password)
94
+
95
+	if _, err := request.WriteTo(conn); err != nil {
96
+		return fmt.Errorf("cannot send a request: %w", err)
97
+	}
98
+
99
+	response, err := socks5.NewUserPassNegotiationReplyFrom(conn)
100
+	if err != nil {
101
+		return fmt.Errorf("cannot read a response: %w", err)
102
+	}
103
+
104
+	if response.Status != socks5.UserPassStatusSuccess {
105
+		return fmt.Errorf("authenticate has failed: %v", response.Status)
106
+	}
107
+
108
+	return nil
109
+}
110
+
111
+func (s socks5Dialer) connect(conn io.ReadWriter, address string) error {
112
+	addrType, host, port, err := socks5.ParseAddress(address)
113
+	if err != nil {
114
+		return fmt.Errorf("cannot parse address: %w", err)
115
+	}
116
+
117
+	if addrType == socks5.ATYPDomain {
118
+		host = host[1:]
119
+	}
120
+
121
+	request := socks5.NewRequest(socks5.CmdConnect, addrType, host, port)
122
+
123
+	if _, err := request.WriteTo(conn); err != nil {
124
+		return fmt.Errorf("cannot send a request: %w", err)
125
+	}
126
+
127
+	response, err := socks5.NewReplyFrom(conn)
128
+	if err != nil {
129
+		return fmt.Errorf("cannot read a response: %w", err)
130
+	}
131
+
132
+	if response.Rep != socks5.RepSuccess {
133
+		return fmt.Errorf("unsuccessful request: %v", response.Rep)
134
+	}
135
+
136
+	return nil
137
+}
138
+
10 139
 // NewSocks5Dialer build a new dialer from a given one (so, in theory
11 140
 // you can chain here). Proxy parameters are passed with URI in a form of:
12 141
 //
13 142
 //     socks5://[user:[password]]@host:port
14 143
 func NewSocks5Dialer(baseDialer Dialer, proxyURL *url.URL) (Dialer, error) {
15
-	rv, err := proxy.FromURL(proxyURL, baseDialer)
16
-	if err != nil {
17
-		return nil, fmt.Errorf("cannot initialize socks5 proxy dialer: %w", err)
144
+	if _, _, err := net.SplitHostPort(proxyURL.Host); err != nil {
145
+		return nil, fmt.Errorf("incorrect url %s", proxyURL.Redacted())
146
+	}
147
+
148
+	dialer := socks5Dialer{
149
+		Dialer:       baseDialer,
150
+		proxyAddress: proxyURL.Host,
151
+	}
152
+
153
+	if proxyURL.User != nil {
154
+		password, isSet := proxyURL.User.Password()
155
+		if isSet {
156
+			dialer.username = []byte(proxyURL.User.Username())
157
+			dialer.password = []byte(password)
158
+		}
18 159
 	}
19 160
 
20
-	return rv.(Dialer), nil
161
+	return dialer, nil
21 162
 }

+ 1
- 1
network/socks5_test.go Dosyayı Görüntüle

@@ -55,7 +55,7 @@ func (suite *Socks5TestSuite) TestRequestOk() {
55 55
 	suite.Equal(http.StatusOK, resp.StatusCode)
56 56
 }
57 57
 
58
-func TestSocks5TestSuite(t *testing.T) {
58
+func TestSocks5(t *testing.T) {
59 59
 	t.Parallel()
60 60
 	suite.Run(t, &Socks5TestSuite{})
61 61
 }

Loading…
İptal
Kaydet