Просмотр исходного кода

Merge pull request #509 from msukhotin/feat/configurable-tcp-not-sent-lowat

Make TCP_NOTSENT_LOWAT configurable
master
Sergei Arkhipov 15 часов назад
Родитель
Сommit
9f6e5d277d
Аккаунт пользователя с таким Email не найден

+ 14
- 0
example.config.toml Просмотреть файл

@@ -221,6 +221,20 @@ http = "10s"
221 221
 idle = "5m"
222 222
 handshake = "10s"
223 223
 
224
+# TCP_NOTSENT_LOWAT (bytes) caps the amount of *unsent* data that the
225
+# kernel buffers per outbound socket. When the not-yet-sent backlog drops
226
+# below this threshold the socket becomes writable again, which applies
227
+# back-pressure to mtg's relay loop instead of letting kernel buffers
228
+# bloat. This reduces per-connection memory and bufferbloat, but on
229
+# high-BDP links (fast pipe × high RTT) it can cap single-flow upload
230
+# throughput at roughly value / RTT.
231
+#
232
+# Default is 128kib (good for interactive latency). If uploads through
233
+# the proxy feel slow on a fast link, raise this to 1mib–4mib. Suffixes
234
+# follow the same scheme as anti-replay max-size (kib, mib, gib).
235
+# Only applies to Linux and Darwin; ignored on other platforms.
236
+# tcp-not-sent-lowat = "128kib"
237
+
224 238
 # this defines a configuration for TCP keep alives. Default values are taken
225 239
 # from Golang default behavior.
226 240
 [network.keep-alive]

+ 1
- 0
internal/cli/doctor.go Просмотреть файл

@@ -105,6 +105,7 @@ func (d *Doctor) Run(cli *CLI, version string) error {
105 105
 			Interval: conf.Network.KeepAlive.Interval.Get(0),
106 106
 			Count:    int(conf.Network.KeepAlive.Count.Get(0)),
107 107
 		},
108
+		int(conf.Network.TCPNotSentLowat.Get(network.DefaultTCPNotSentLowat)),
108 109
 	)
109 110
 
110 111
 	fmt.Println("Validate native network connectivity")

+ 1
- 0
internal/cli/run_proxy.go Просмотреть файл

@@ -57,6 +57,7 @@ func makeNetwork(conf *config.Config, version string) (mtglib.Network, error) {
57 57
 			Interval: conf.Network.KeepAlive.Interval.Get(0),
58 58
 			Count:    int(conf.Network.KeepAlive.Count.Get(0)),
59 59
 		},
60
+		int(conf.Network.TCPNotSentLowat.Get(network.DefaultTCPNotSentLowat)),
60 61
 	)
61 62
 
62 63
 	proxyDialers := make([]mtglib.Network, len(conf.Network.Proxies))

+ 4
- 3
internal/config/config.go Просмотреть файл

@@ -71,9 +71,10 @@ type Config struct {
71 71
 			Interval TypeDuration    `json:"interval"`
72 72
 			Count    TypeConcurrency `json:"count"`
73 73
 		} `json:"keepAlive"`
74
-		DOHIP   TypeIP         `json:"dohIp"`
75
-		DNS     TypeDNSURI     `json:"dns"`
76
-		Proxies []TypeProxyURL `json:"proxies"`
74
+		DOHIP            TypeIP         `json:"dohIp"`
75
+		DNS              TypeDNSURI     `json:"dns"`
76
+		Proxies          []TypeProxyURL `json:"proxies"`
77
+		TCPNotSentLowat  TypeBytes      `json:"tcpNotSentLowat"`
77 78
 	} `json:"network"`
78 79
 	Stats struct {
79 80
 		StatsD struct {

+ 4
- 3
internal/config/parse.go Просмотреть файл

@@ -67,9 +67,10 @@ type tomlConfig struct {
67 67
 			Interval string `toml:"interval" json:"interval,omitempty"`
68 68
 			Count    uint   `toml:"count" json:"count,omitempty"`
69 69
 		} `toml:"keep-alive" json:"keepAlive,omitempty"`
70
-		DOHIP   string   `toml:"doh-ip" json:"dohIp,omitempty"`
71
-		DNS     string   `toml:"dns" json:"dns,omitempty"`
72
-		Proxies []string `toml:"proxies" json:"proxies,omitempty"`
70
+		DOHIP           string   `toml:"doh-ip" json:"dohIp,omitempty"`
71
+		DNS             string   `toml:"dns" json:"dns,omitempty"`
72
+		Proxies         []string `toml:"proxies" json:"proxies,omitempty"`
73
+		TCPNotSentLowat string   `toml:"tcp-not-sent-lowat" json:"tcpNotSentLowat,omitempty"`
73 74
 	} `toml:"network" json:"network,omitempty"`
74 75
 	Stats struct {
75 76
 		StatsD struct {

+ 1
- 1
network/v2/base_http_test.go Просмотреть файл

@@ -25,7 +25,7 @@ func (suite *BaseHTTPTestSuite) SetupSuite() {
25 25
 }
26 26
 
27 27
 func (suite *BaseHTTPTestSuite) SetupTest() {
28
-	suite.client = network.New(nil, "mtg/1", 0, 0, 0, network.DefaultKeepAliveConfig).MakeHTTPClient(nil)
28
+	suite.client = network.New(nil, "mtg/1", 0, 0, 0, network.DefaultKeepAliveConfig, 0).MakeHTTPClient(nil)
29 29
 }
30 30
 
31 31
 func (suite *BaseHTTPTestSuite) TestGet() {

+ 1
- 1
network/v2/base_network_test.go Просмотреть файл

@@ -19,7 +19,7 @@ type BaseNetworkTestSuite struct {
19 19
 func (suite *BaseNetworkTestSuite) SetupSuite() {
20 20
 	suite.EchoServerTestSuite.SetupSuite()
21 21
 
22
-	suite.net = network.New(nil, "agent", 0, 0, 0, network.DefaultKeepAliveConfig)
22
+	suite.net = network.New(nil, "agent", 0, 0, 0, network.DefaultKeepAliveConfig, 0)
23 23
 }
24 24
 
25 25
 func (suite *BaseNetworkTestSuite) TestDialUnknownNetwork() {

+ 13
- 7
network/v2/init.go Просмотреть файл

@@ -56,15 +56,21 @@ const (
56 56
 	// unacknowledged data.
57 57
 	tcpLingerTimeout = 1
58 58
 
59
-	// tcpNotSentLowat limits the amount of unsent data queued in the
60
-	// kernel write buffer per socket. When the unsent data drops below
61
-	// this threshold, the socket becomes writable again. This reduces
62
-	// per-connection memory usage and bufferbloat by applying
63
-	// back-pressure to the relay loop instead of piling up data in
64
-	// kernel buffers.
65
-	tcpNotSentLowat = 128 * 1024
66 59
 )
67 60
 
61
+// DefaultTCPNotSentLowat is the default value applied to TCP_NOTSENT_LOWAT
62
+// on outbound sockets. It limits the amount of unsent data queued in the
63
+// kernel write buffer per socket. When the unsent data drops below this
64
+// threshold, the socket becomes writable again. This reduces per-connection
65
+// memory usage and bufferbloat by applying back-pressure to the relay loop
66
+// instead of piling up data in kernel buffers.
67
+//
68
+// The default is conservative and biased towards interactive latency. On
69
+// high-BDP links (fast pipe × high RTT) it can cap single-flow upload
70
+// throughput at roughly DefaultTCPNotSentLowat / RTT. Raise it via the
71
+// [network.tcp-not-sent-lowat] config option if uploads feel slow.
72
+const DefaultTCPNotSentLowat = 128 * 1024
73
+
68 74
 var (
69 75
 	ErrCannotDial = errors.New("cannot dial to any address")
70 76
 

+ 12
- 5
network/v2/network.go Просмотреть файл

@@ -14,10 +14,11 @@ import (
14 14
 type network struct {
15 15
 	net.Dialer
16 16
 
17
-	keepAliveConfig net.KeepAliveConfig
18
-	httpTimeout     time.Duration
19
-	idleTimeout     time.Duration
20
-	userAgent       string
17
+	keepAliveConfig  net.KeepAliveConfig
18
+	httpTimeout      time.Duration
19
+	idleTimeout      time.Duration
20
+	userAgent        string
21
+	tcpNotSentLowat  int
21 22
 }
22 23
 
23 24
 func (n *network) Dial(network, address string) (essentials.Conn, error) {
@@ -38,7 +39,7 @@ func (n *network) DialContext(ctx context.Context, network, address string) (ess
38 39
 
39 40
 	tcpConn := conn.(*net.TCPConn)
40 41
 
41
-	return tcpConn, setCommonSocketOptions(tcpConn, n.keepAliveConfig)
42
+	return tcpConn, setCommonSocketOptions(tcpConn, n.keepAliveConfig, n.tcpNotSentLowat)
42 43
 }
43 44
 
44 45
 func (n *network) MakeHTTPClient(
@@ -73,6 +74,7 @@ func New(
73 74
 	httpTimeout,
74 75
 	idleTimeout time.Duration,
75 76
 	keepAliveConfig net.KeepAliveConfig,
77
+	tcpNotSentLowat int,
76 78
 ) mtglib.Network {
77 79
 	if dnsResolver == nil {
78 80
 		dnsResolver = net.DefaultResolver
@@ -82,6 +84,10 @@ func New(
82 84
 		userAgent = UserAgent
83 85
 	}
84 86
 
87
+	if tcpNotSentLowat == 0 {
88
+		tcpNotSentLowat = DefaultTCPNotSentLowat
89
+	}
90
+
85 91
 	return &network{
86 92
 		Dialer: net.Dialer{
87 93
 			Timeout:       tcpTimeout,
@@ -92,5 +98,6 @@ func New(
92 98
 		idleTimeout:     idleTimeout,
93 99
 		httpTimeout:     httpTimeout,
94 100
 		keepAliveConfig: keepAliveConfig,
101
+		tcpNotSentLowat: tcpNotSentLowat,
95 102
 	}
96 103
 }

+ 1
- 0
network/v2/proxy_network_internal_test.go Просмотреть файл

@@ -27,6 +27,7 @@ func TestProxyServerDialerDropsCustomResolver(t *testing.T) {
27 27
 		0,
28 28
 		0,
29 29
 		DefaultKeepAliveConfig,
30
+		0,
30 31
 	)
31 32
 
32 33
 	if base.NativeDialer().Resolver != customResolver {

+ 2
- 2
network/v2/sockopts.go Просмотреть файл

@@ -5,7 +5,7 @@ import (
5 5
 	"net"
6 6
 )
7 7
 
8
-func setCommonSocketOptions(conn *net.TCPConn, keepAliveConfig net.KeepAliveConfig) error {
8
+func setCommonSocketOptions(conn *net.TCPConn, keepAliveConfig net.KeepAliveConfig, tcpNotSentLowat int) error {
9 9
 	if err := applyKeepAlive(conn, keepAliveConfig); err != nil {
10 10
 		return fmt.Errorf("cannot configure TCP keepalive: %w", err)
11 11
 	}
@@ -25,7 +25,7 @@ func setCommonSocketOptions(conn *net.TCPConn, keepAliveConfig net.KeepAliveConf
25 25
 
26 26
 	setCongestionControl(rawConn)
27 27
 	setTCPUserTimeout(rawConn, keepAliveConfig)
28
-	setNotSentLowat(rawConn)
28
+	setNotSentLowat(rawConn, tcpNotSentLowat)
29 29
 
30 30
 	return nil
31 31
 }

+ 15
- 7
network/v2/sockopts_lowat.go Просмотреть файл

@@ -8,13 +8,21 @@ import (
8 8
 	"golang.org/x/sys/unix"
9 9
 )
10 10
 
11
-// setNotSentLowat sets TCP_NOTSENT_LOWAT which limits the amount of
12
-// unsent data queued in the kernel write buffer. Once unsent data drops
13
-// below this threshold the socket becomes writable again, applying
14
-// back-pressure to the relay loop instead of piling up data in kernel
15
-// buffers. This reduces per-connection memory and bufferbloat.
16
-func setNotSentLowat(conn syscall.RawConn) {
11
+// setNotSentLowat sets TCP_NOTSENT_LOWAT to value bytes. The option
12
+// limits the amount of unsent data queued in the kernel write buffer:
13
+// once the unsent backlog drops below the threshold, the socket becomes
14
+// writable again, applying back-pressure to the relay loop instead of
15
+// piling up data in kernel buffers. This reduces per-connection memory
16
+// and bufferbloat.
17
+//
18
+// A non-positive value disables the call, leaving the kernel default in
19
+// effect (no upper bound beyond SO_SNDBUF).
20
+func setNotSentLowat(conn syscall.RawConn, value int) {
21
+	if value <= 0 {
22
+		return
23
+	}
24
+
17 25
 	conn.Control(func(fd uintptr) { //nolint: errcheck
18
-		unix.SetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_NOTSENT_LOWAT, tcpNotSentLowat) //nolint: errcheck
26
+		unix.SetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_NOTSENT_LOWAT, value) //nolint: errcheck
19 27
 	})
20 28
 }

+ 1
- 1
network/v2/sockopts_lowat_stub.go Просмотреть файл

@@ -4,4 +4,4 @@ package network
4 4
 
5 5
 import "syscall"
6 6
 
7
-func setNotSentLowat(conn syscall.RawConn) {}
7
+func setNotSentLowat(conn syscall.RawConn, value int) {}

+ 55
- 1
network/v2/sockopts_test.go Просмотреть файл

@@ -65,7 +65,7 @@ func TestSetCommonSocketOptionsKeepAlive(t *testing.T) {
65 65
 
66 66
 	tcpConn := accepted.(*net.TCPConn)
67 67
 
68
-	err = setCommonSocketOptions(tcpConn, DefaultKeepAliveConfig)
68
+	err = setCommonSocketOptions(tcpConn, DefaultKeepAliveConfig, DefaultTCPNotSentLowat)
69 69
 	require.NoError(t, err)
70 70
 
71 71
 	rawConn, err := tcpConn.SyscallConn()
@@ -90,3 +90,57 @@ func TestSetCommonSocketOptionsKeepAlive(t *testing.T) {
90 90
 	})
91 91
 	require.NoError(t, err)
92 92
 }
93
+
94
+func TestSetCommonSocketOptionsNotSentLowat(t *testing.T) {
95
+	t.Parallel()
96
+
97
+	cases := []struct {
98
+		name string
99
+		want int
100
+	}{
101
+		{name: "default", want: DefaultTCPNotSentLowat},
102
+		{name: "custom", want: 4 * 1024 * 1024},
103
+	}
104
+
105
+	for _, tc := range cases {
106
+		t.Run(tc.name, func(t *testing.T) {
107
+			t.Parallel()
108
+
109
+			listener, err := net.Listen("tcp", "127.0.0.1:0")
110
+			require.NoError(t, err)
111
+			defer listener.Close() //nolint: errcheck
112
+
113
+			dialDone := make(chan struct{})
114
+
115
+			go func() {
116
+				c, err := net.Dial("tcp", listener.Addr().String())
117
+				if err == nil {
118
+					defer c.Close() //nolint: errcheck
119
+				}
120
+				close(dialDone)
121
+			}()
122
+
123
+			require.NoError(t, listener.(*net.TCPListener).SetDeadline(time.Now().Add(5*time.Second)))
124
+
125
+			accepted, err := listener.Accept()
126
+			require.NoError(t, err)
127
+			defer accepted.Close() //nolint: errcheck
128
+
129
+			<-dialDone
130
+
131
+			tcpConn := accepted.(*net.TCPConn)
132
+
133
+			require.NoError(t, setCommonSocketOptions(tcpConn, DefaultKeepAliveConfig, tc.want))
134
+
135
+			rawConn, err := tcpConn.SyscallConn()
136
+			require.NoError(t, err)
137
+
138
+			err = rawConn.Control(func(fd uintptr) {
139
+				got, err := unix.GetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_NOTSENT_LOWAT)
140
+				require.NoError(t, err)
141
+				require.Equal(t, tc.want, got, "TCP_NOTSENT_LOWAT should match value passed to setCommonSocketOptions")
142
+			})
143
+			require.NoError(t, err)
144
+		})
145
+	}
146
+}

+ 1
- 1
network/v2/socks_proxy_test.go Просмотреть файл

@@ -66,7 +66,7 @@ func (suite *SocksProxyTestSuite) SetupSuite() {
66 66
 	require.NoError(suite.T(), err)
67 67
 	suite.authURL = parsed
68 68
 
69
-	suite.baseNetwork = network.New(nil, "mtg", 0, 0, 0, network.DefaultKeepAliveConfig)
69
+	suite.baseNetwork = network.New(nil, "mtg", 0, 0, 0, network.DefaultKeepAliveConfig, 0)
70 70
 }
71 71
 
72 72
 func (suite *SocksProxyTestSuite) TestIncorrectSchema() {

Загрузка…
Отмена
Сохранить