Просмотр исходного кода

Add possibility to set read/write buffers separately

tags/0.11
9seconds 7 лет назад
Родитель
Сommit
0ce325f908
5 измененных файлов: 46 добавлений и 38 удалений
  1. 3
    7
      client/direct.go
  2. 19
    19
      config/config.go
  3. 15
    1
      main.go
  4. 6
    4
      proxy/proxy.go
  5. 3
    7
      telegram/dialer.go

+ 3
- 7
client/direct.go Просмотреть файл

12
 	"github.com/9seconds/mtg/wrappers"
12
 	"github.com/9seconds/mtg/wrappers"
13
 )
13
 )
14
 
14
 
15
-const (
16
-	handshakeTimeout = 10 * time.Second
17
-	readBufferSize   = 64 * 1024
18
-	writeBufferSize  = 64 * 1024
19
-)
15
+const handshakeTimeout = 10 * time.Second
20
 
16
 
21
 // DirectInit initializes client connection for proxy which connects to
17
 // DirectInit initializes client connection for proxy which connects to
22
 // Telegram directly.
18
 // Telegram directly.
25
 	if err := tcpSocket.SetNoDelay(false); err != nil {
21
 	if err := tcpSocket.SetNoDelay(false); err != nil {
26
 		return nil, nil, errors.Annotate(err, "Cannot disable NO_DELAY to client socket")
22
 		return nil, nil, errors.Annotate(err, "Cannot disable NO_DELAY to client socket")
27
 	}
23
 	}
28
-	if err := tcpSocket.SetReadBuffer(readBufferSize); err != nil {
24
+	if err := tcpSocket.SetReadBuffer(conf.ReadBufferSize); err != nil {
29
 		return nil, nil, errors.Annotate(err, "Cannot set read buffer size of client socket")
25
 		return nil, nil, errors.Annotate(err, "Cannot set read buffer size of client socket")
30
 	}
26
 	}
31
-	if err := tcpSocket.SetWriteBuffer(writeBufferSize); err != nil {
27
+	if err := tcpSocket.SetWriteBuffer(conf.WriteBufferSize); err != nil {
32
 		return nil, nil, errors.Annotate(err, "Cannot set write buffer size of client socket")
28
 		return nil, nil, errors.Annotate(err, "Cannot set write buffer size of client socket")
33
 	}
29
 	}
34
 
30
 

+ 19
- 19
config/config.go Просмотреть файл

11
 	statsd "gopkg.in/alexcesaro/statsd.v2"
11
 	statsd "gopkg.in/alexcesaro/statsd.v2"
12
 )
12
 )
13
 
13
 
14
-// Buffer sizes define internal socket buffer sizes.
15
-const (
16
-	BufferWriteSize = 32 * 1024
17
-	BufferReadSize  = 32 * 1024
18
-)
19
-
20
 // Config represents common configuration of mtg.
14
 // Config represents common configuration of mtg.
21
 type Config struct {
15
 type Config struct {
22
 	Debug      bool
16
 	Debug      bool
23
 	Verbose    bool
17
 	Verbose    bool
24
 	SecureMode bool
18
 	SecureMode bool
25
 
19
 
20
+	ReadBufferSize  int
21
+	WriteBufferSize int
22
+
26
 	BindPort       uint16
23
 	BindPort       uint16
27
 	PublicIPv4Port uint16
24
 	PublicIPv4Port uint16
28
 	PublicIPv6Port uint16
25
 	PublicIPv6Port uint16
114
 // fetches data from external sources. Parameters passed to this
111
 // fetches data from external sources. Parameters passed to this
115
 // function, should come from command line arguments.
112
 // function, should come from command line arguments.
116
 func NewConfig(debug, verbose bool, // nolint: gocyclo
113
 func NewConfig(debug, verbose bool, // nolint: gocyclo
114
+	writeBufferSize, readBufferSize uint32,
117
 	bindIP, publicIPv4, publicIPv6, statsIP net.IP,
115
 	bindIP, publicIPv4, publicIPv6, statsIP net.IP,
118
 	bindPort, publicIPv4Port, publicIPv6Port, statsPort, statsdPort uint16,
116
 	bindPort, publicIPv4Port, publicIPv6Port, statsPort, statsdPort uint16,
119
 	statsdIP, statsdNetwork, statsdPrefix, statsdTagsFormat string,
117
 	statsdIP, statsdNetwork, statsdPrefix, statsdTagsFormat string,
157
 	}
155
 	}
158
 
156
 
159
 	conf := &Config{
157
 	conf := &Config{
160
-		Debug:          debug,
161
-		Verbose:        verbose,
162
-		BindIP:         bindIP,
163
-		BindPort:       bindPort,
164
-		PublicIPv4:     publicIPv4,
165
-		PublicIPv4Port: publicIPv4Port,
166
-		PublicIPv6:     publicIPv6,
167
-		PublicIPv6Port: publicIPv6Port,
168
-		StatsIP:        statsIP,
169
-		StatsPort:      statsPort,
170
-		Secret:         secret,
171
-		AdTag:          adtag,
172
-		SecureMode:     secureMode,
158
+		Debug:           debug,
159
+		Verbose:         verbose,
160
+		BindIP:          bindIP,
161
+		BindPort:        bindPort,
162
+		PublicIPv4:      publicIPv4,
163
+		PublicIPv4Port:  publicIPv4Port,
164
+		PublicIPv6:      publicIPv6,
165
+		PublicIPv6Port:  publicIPv6Port,
166
+		StatsIP:         statsIP,
167
+		StatsPort:       statsPort,
168
+		Secret:          secret,
169
+		AdTag:           adtag,
170
+		SecureMode:      secureMode,
171
+		ReadBufferSize:  int(readBufferSize),
172
+		WriteBufferSize: int(writeBufferSize),
173
 	}
173
 	}
174
 
174
 
175
 	if statsdIP != "" {
175
 	if statsdIP != "" {

+ 15
- 1
main.go Просмотреть файл

49
 		Default("3128").
49
 		Default("3128").
50
 		Uint16()
50
 		Uint16()
51
 
51
 
52
+	writeBufferSize = app.Flag("write-buffer",
53
+		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
54
+		Short('w').
55
+		Envar("MTG_BUFFER_WRITE").
56
+		Default("65536").
57
+		Uint32()
58
+	readBufferSize = app.Flag("read-buffer",
59
+		"Read buffer size in bytes. You can think about it as a buffer from Telegram to client.").
60
+		Short('r').
61
+		Envar("MTG_BUFFER_READ").
62
+		Default("131072").
63
+		Uint32()
64
+
52
 	publicIPv4 = app.Flag("public-ipv4",
65
 	publicIPv4 = app.Flag("public-ipv4",
53
 		"Which IPv4 address is public.").
66
 		"Which IPv4 address is public.").
54
 		Short('4').
67
 		Short('4').
117
 func init() {
130
 func init() {
118
 	rand.Seed(time.Now().UTC().UnixNano())
131
 	rand.Seed(time.Now().UTC().UnixNano())
119
 	app.Version(version)
132
 	app.Version(version)
120
-
133
+	app.HelpFlag.Short('h')
121
 }
134
 }
122
 
135
 
123
 func main() { // nolint: gocyclo
136
 func main() { // nolint: gocyclo
129
 	}
142
 	}
130
 
143
 
131
 	conf, err := config.NewConfig(*debug, *verbose,
144
 	conf, err := config.NewConfig(*debug, *verbose,
145
+		*writeBufferSize, *readBufferSize,
132
 		*bindIP, *publicIPv4, *publicIPv6, *statsIP,
146
 		*bindIP, *publicIPv4, *publicIPv6, *statsIP,
133
 		*bindPort, *publicIPv4Port, *publicIPv6Port, *statsPort, *statsdPort,
147
 		*bindPort, *publicIPv4Port, *publicIPv6Port, *statsPort, *statsdPort,
134
 		*statsdIP, *statsdNetwork, *statsdPrefix, *statsdTagsFormat,
148
 		*statsdIP, *statsdNetwork, *statsdPrefix, *statsdTagsFormat,

+ 6
- 4
proxy/proxy.go Просмотреть файл

83
 	} else {
83
 	} else {
84
 		clientStream := clientConn.(wrappers.StreamReadWriteCloser)
84
 		clientStream := clientConn.(wrappers.StreamReadWriteCloser)
85
 		serverStream := serverConn.(wrappers.StreamReadWriteCloser)
85
 		serverStream := serverConn.(wrappers.StreamReadWriteCloser)
86
-		go p.directPipe(clientStream, serverStream, wait)
87
-		go p.directPipe(serverStream, clientStream, wait)
86
+		go p.directPipe(clientStream, serverStream, wait, p.conf.ReadBufferSize)
87
+		go p.directPipe(serverStream, clientStream, wait, p.conf.WriteBufferSize)
88
 	}
88
 	}
89
 
89
 
90
 	wait.Wait()
90
 	wait.Wait()
130
 	}
130
 	}
131
 }
131
 }
132
 
132
 
133
-func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser, wait *sync.WaitGroup) {
133
+func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser,
134
+	wait *sync.WaitGroup, bufferSize int) {
134
 	defer func() {
135
 	defer func() {
135
 		src.Close() // nolint: errcheck
136
 		src.Close() // nolint: errcheck
136
 		dst.Close() // nolint: errcheck
137
 		dst.Close() // nolint: errcheck
137
 		wait.Done()
138
 		wait.Done()
138
 	}()
139
 	}()
139
 
140
 
140
-	if _, err := io.Copy(dst, src); err != nil {
141
+	buffer := make([]byte, bufferSize)
142
+	if _, err := io.CopyBuffer(dst, src, buffer); err != nil {
141
 		src.Logger().Warnw("Cannot pump sockets", "error", err)
143
 		src.Logger().Warnw("Cannot pump sockets", "error", err)
142
 	}
144
 	}
143
 }
145
 }

+ 3
- 7
telegram/dialer.go Просмотреть файл

10
 	"github.com/9seconds/mtg/wrappers"
10
 	"github.com/9seconds/mtg/wrappers"
11
 )
11
 )
12
 
12
 
13
-const (
14
-	telegramDialTimeout = 10 * time.Second
15
-	readBufferSize      = 64 * 1024
16
-	writeBufferSize     = 64 * 1024
17
-)
13
+const telegramDialTimeout = 10 * time.Second
18
 
14
 
19
 type tgDialer struct {
15
 type tgDialer struct {
20
 	net.Dialer
16
 	net.Dialer
32
 	if err = tcpSocket.SetNoDelay(true); err != nil {
28
 	if err = tcpSocket.SetNoDelay(true); err != nil {
33
 		return nil, errors.Annotate(err, "Cannot set NO_DELAY to Telegram")
29
 		return nil, errors.Annotate(err, "Cannot set NO_DELAY to Telegram")
34
 	}
30
 	}
35
-	if err = tcpSocket.SetReadBuffer(readBufferSize); err != nil {
31
+	if err = tcpSocket.SetReadBuffer(t.conf.WriteBufferSize); err != nil {
36
 		return nil, errors.Annotate(err, "Cannot set read buffer size on telegram socket")
32
 		return nil, errors.Annotate(err, "Cannot set read buffer size on telegram socket")
37
 	}
33
 	}
38
-	if err = tcpSocket.SetWriteBuffer(writeBufferSize); err != nil {
34
+	if err = tcpSocket.SetWriteBuffer(t.conf.ReadBufferSize); err != nil {
39
 		return nil, errors.Annotate(err, "Cannot set write buffer size on telegram socket")
35
 		return nil, errors.Annotate(err, "Cannot set write buffer size on telegram socket")
40
 	}
36
 	}
41
 
37
 

Загрузка…
Отмена
Сохранить