ソースを参照

Merge pull request #9 from 9seconds/buffers

Tune socket options
tags/0.9
Sergey Arkhipov 7年前
コミット
ccdbd5ca39
コミッターのメールアドレスに関連付けられたアカウントが存在しません
7個のファイルの変更59行の追加87行の削除
  1. 13
    3
      client/direct.go
  2. 32
    6
      config/config.go
  3. 0
    12
      main.go
  4. 5
    3
      proxy/copy_pool.go
  5. 7
    20
      telegram/dialer.go
  6. 2
    1
      telegram/direct.go
  7. 0
    42
      wrappers/timeoutrwc.go

+ 13
- 3
client/direct.go ファイルの表示

@@ -3,6 +3,7 @@ package client
3 3
 import (
4 4
 	"io"
5 5
 	"net"
6
+	"time"
6 7
 
7 8
 	"github.com/juju/errors"
8 9
 
@@ -12,10 +13,19 @@ import (
12 13
 	"github.com/9seconds/mtg/wrappers"
13 14
 )
14 15
 
16
+const (
17
+	handshakeTimeout = 10 * time.Second
18
+)
19
+
15 20
 // DirectInit initializes client to access Telegram bypassing middleproxies.
16 21
 func DirectInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, io.ReadWriteCloser, error) {
17
-	socket := wrappers.NewTimeoutRWC(conn, conf.TimeoutRead, conf.TimeoutWrite)
18
-	frame, err := obfuscated2.ExtractFrame(socket)
22
+	if err := config.SetSocketOptions(conn); err != nil {
23
+		return nil, nil, errors.Annotate(err, "Cannot set socket options")
24
+	}
25
+
26
+	conn.SetReadDeadline(time.Now().Add(handshakeTimeout)) // nolint: errcheck
27
+	frame, err := obfuscated2.ExtractFrame(conn)
28
+	conn.SetReadDeadline(time.Time{}) // nolint: errcheck
19 29
 	if err != nil {
20 30
 		return nil, nil, errors.Annotate(err, "Cannot extract frame")
21 31
 	}
@@ -26,7 +36,7 @@ func DirectInit(conn net.Conn, conf *config.Config) (*mtproto.ConnectionOpts, io
26 36
 		return nil, nil, errors.Annotate(err, "Cannot parse obfuscated frame")
27 37
 	}
28 38
 
29
-	socket = wrappers.NewStreamCipherRWC(socket, obfs2.Encryptor, obfs2.Decryptor)
39
+	socket := wrappers.NewStreamCipherRWC(conn, obfs2.Encryptor, obfs2.Decryptor)
30 40
 
31 41
 	return connOpts, socket, nil
32 42
 }

+ 32
- 6
config/config.go ファイルの表示

@@ -10,6 +10,15 @@ import (
10 10
 	"github.com/juju/errors"
11 11
 )
12 12
 
13
+// Buffer sizes define internal socket buffer sizes.
14
+const (
15
+	BufferWriteSize = 32 * 1024
16
+	BufferReadSize  = 32 * 1024
17
+	BufferSizeCopy  = 32 * 1024
18
+
19
+	keepAlivePeriod = 20 * time.Second
20
+)
21
+
13 22
 // Config represents common configuration of mtg.
14 23
 type Config struct {
15 24
 	Debug   bool
@@ -20,9 +29,6 @@ type Config struct {
20 29
 	PublicIPv6Port uint16
21 30
 	StatsPort      uint16
22 31
 
23
-	TimeoutRead  time.Duration
24
-	TimeoutWrite time.Duration
25
-
26 32
 	BindIP     net.IP
27 33
 	PublicIPv4 net.IP
28 34
 	PublicIPv6 net.IP
@@ -85,7 +91,6 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
85 91
 	publicIPv4 net.IP, PublicIPv4Port uint16,
86 92
 	publicIPv6 net.IP, publicIPv6Port uint16,
87 93
 	statsIP net.IP, statsPort uint16,
88
-	timeoutRead, timeoutWrite time.Duration,
89 94
 	secret string) (*Config, error) {
90 95
 	if len(secret) != 32 {
91 96
 		return nil, errors.New("Telegram demands secret of length 32")
@@ -136,10 +141,31 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
136 141
 		PublicIPv6Port: publicIPv6Port,
137 142
 		StatsIP:        statsIP,
138 143
 		StatsPort:      statsPort,
139
-		TimeoutRead:    timeoutRead,
140
-		TimeoutWrite:   timeoutWrite,
141 144
 		Secret:         secretBytes,
142 145
 	}
143 146
 
144 147
 	return conf, nil
145 148
 }
149
+
150
+// SetSocketOptions makes socket keepalive, sets buffer sizes
151
+func SetSocketOptions(conn net.Conn) error {
152
+	socket := conn.(*net.TCPConn)
153
+
154
+	if err := socket.SetReadBuffer(BufferReadSize); err != nil {
155
+		return errors.Annotate(err, "Cannot set read buffer size")
156
+	}
157
+	if err := socket.SetWriteBuffer(BufferWriteSize); err != nil {
158
+		return errors.Annotate(err, "Cannot set write buffer size")
159
+	}
160
+	if err := socket.SetKeepAlive(true); err != nil {
161
+		return errors.Annotate(err, "Cannot make socket keepalive")
162
+	}
163
+	if err := socket.SetKeepAlivePeriod(keepAlivePeriod); err != nil {
164
+		return errors.Annotate(err, "Cannot set keepalive period")
165
+	}
166
+	if err := socket.SetNoDelay(true); err != nil {
167
+		return errors.Annotate(err, "Cannot activate nodelay for the socket")
168
+	}
169
+
170
+	return nil
171
+}

+ 0
- 12
main.go ファイルの表示

@@ -69,17 +69,6 @@ var (
69 69
 			Default("3129").
70 70
 			Uint16()
71 71
 
72
-	readTimeout = app.Flag("read-timeout", "Socket read timeout.").
73
-			Short('r').
74
-			Envar("MTG_READ_TIMEOUT").
75
-			Default("30s").
76
-			Duration()
77
-	writeTimeout = app.Flag("write-timeout", "Socket write timeout.").
78
-			Short('w').
79
-			Envar("MTG_WRITE_TIMEOUT").
80
-			Default("30s").
81
-			Duration()
82
-
83 72
 	secret = app.Arg("secret", "Secret of this proxy.").Required().String()
84 73
 )
85 74
 
@@ -102,7 +91,6 @@ func main() {
102 91
 		*publicIPv4, *publicIPv4Port,
103 92
 		*publicIPv6, *publicIPv6Port,
104 93
 		*statsIP, *statsPort,
105
-		*readTimeout, *writeTimeout,
106 94
 		*secret,
107 95
 	)
108 96
 	if err != nil {

+ 5
- 3
proxy/copy_pool.go ファイルの表示

@@ -1,15 +1,17 @@
1 1
 package proxy
2 2
 
3
-import "sync"
3
+import (
4
+	"sync"
4 5
 
5
-const copyBufferSize = 30 * 1024
6
+	"github.com/9seconds/mtg/config"
7
+)
6 8
 
7 9
 var copyPool sync.Pool
8 10
 
9 11
 func init() {
10 12
 	copyPool = sync.Pool{
11 13
 		New: func() interface{} {
12
-			data := make([]byte, copyBufferSize)
14
+			data := make([]byte, config.BufferSizeCopy)
13 15
 			return &data
14 16
 		},
15 17
 	}

+ 7
- 20
telegram/dialer.go ファイルの表示

@@ -8,29 +8,23 @@ import (
8 8
 	"github.com/juju/errors"
9 9
 
10 10
 	"github.com/9seconds/mtg/config"
11
-	"github.com/9seconds/mtg/wrappers"
12 11
 )
13 12
 
14
-const telegramKeepAlive = 30 * time.Second
13
+const (
14
+	telegramDialTimeout = 10 * time.Second
15
+)
15 16
 
16 17
 type tgDialer struct {
17 18
 	net.Dialer
18
-
19
-	conf *config.Config
20 19
 }
21 20
 
22 21
 func (t *tgDialer) dial(addr string) (net.Conn, error) {
23
-	connRaw, err := t.Dialer.Dial("tcp", addr)
22
+	conn, err := t.Dialer.Dial("tcp", addr)
24 23
 	if err != nil {
25 24
 		return nil, errors.Annotate(err, "Cannot connect to Telegram")
26 25
 	}
27
-	conn := connRaw.(*net.TCPConn)
28
-
29
-	if err = conn.SetKeepAlive(true); err != nil {
30
-		return nil, errors.Annotate(err, "Cannot establish keepalive connection")
31
-	}
32
-	if err = conn.SetKeepAlivePeriod(telegramKeepAlive); err != nil {
33
-		return nil, errors.Annotate(err, "Cannot set keepalive timeout")
26
+	if err = config.SetSocketOptions(conn); err != nil {
27
+		return nil, errors.Annotate(err, "Cannot set socket options")
34 28
 	}
35 29
 
36 30
 	return conn, nil
@@ -42,12 +36,5 @@ func (t *tgDialer) dialRWC(addr string) (io.ReadWriteCloser, error) {
42 36
 		return nil, err
43 37
 	}
44 38
 
45
-	return wrappers.NewTimeoutRWC(conn, t.conf.TimeoutRead, t.conf.TimeoutWrite), nil
46
-}
47
-
48
-func newDialer(conf *config.Config) tgDialer {
49
-	return tgDialer{
50
-		Dialer: net.Dialer{Timeout: conf.TimeoutRead},
51
-		conf:   conf,
52
-	}
39
+	return conn, nil
53 40
 }

+ 2
- 1
telegram/direct.go ファイルの表示

@@ -2,6 +2,7 @@ package telegram
2 2
 
3 3
 import (
4 4
 	"io"
5
+	"net"
5 6
 
6 7
 	"github.com/juju/errors"
7 8
 
@@ -58,7 +59,7 @@ func (t *directTelegram) Init(connOpts *mtproto.ConnectionOpts, conn io.ReadWrit
58 59
 // to Telegram bypassing middleproxies.
59 60
 func NewDirectTelegram(conf *config.Config) Telegram {
60 61
 	return &directTelegram{baseTelegram{
61
-		dialer:      newDialer(conf),
62
+		dialer:      tgDialer{net.Dialer{Timeout: telegramDialTimeout}},
62 63
 		v4Addresses: directV4Addresses,
63 64
 		v6Addresses: directV6Addresses,
64 65
 	}}

+ 0
- 42
wrappers/timeoutrwc.go ファイルの表示

@@ -1,42 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"io"
5
-	"net"
6
-	"time"
7
-)
8
-
9
-// TimeoutReadWriteCloser sets timeouts for read/write into underlying
10
-// network connection.
11
-type TimeoutReadWriteCloser struct {
12
-	conn         net.Conn
13
-	readTimeout  time.Duration
14
-	writeTimeout time.Duration
15
-}
16
-
17
-// Read reads from connection
18
-func (t *TimeoutReadWriteCloser) Read(p []byte) (int, error) {
19
-	t.conn.SetReadDeadline(time.Now().Add(t.readTimeout)) // nolint: errcheck, gas
20
-	return t.conn.Read(p)
21
-}
22
-
23
-// Write writes into connection.
24
-func (t *TimeoutReadWriteCloser) Write(p []byte) (int, error) {
25
-	t.conn.SetWriteDeadline(time.Now().Add(t.writeTimeout)) // nolint: errcheck, gas
26
-	return t.conn.Write(p)
27
-}
28
-
29
-// Close closes underlying connection.
30
-func (t *TimeoutReadWriteCloser) Close() error {
31
-	return t.conn.Close()
32
-}
33
-
34
-// NewTimeoutRWC returns wrapper over net.Conn which sets deadlines for
35
-// every wrapped Read/Write.
36
-func NewTimeoutRWC(conn net.Conn, readTimeout, writeTimeout time.Duration) io.ReadWriteCloser {
37
-	return &TimeoutReadWriteCloser{
38
-		conn:         conn,
39
-		readTimeout:  readTimeout,
40
-		writeTimeout: writeTimeout,
41
-	}
42
-}

読み込み中…
キャンセル
保存