Преглед изворни кода

Merge pull request #31 from 9seconds/propagate-buffer-sizes

Propagate buffer sizes
tags/0.11
Sergey Arkhipov пре 7 година
родитељ
комит
2e8c9623de
No account linked to committer's email address
5 измењених фајлова са 63 додато и 61 уклоњено
  1. 3
    7
      client/direct.go
  2. 31
    38
      config/config.go
  3. 20
    5
      main.go
  4. 6
    4
      proxy/proxy.go
  5. 3
    7
      telegram/dialer.go

+ 3
- 7
client/direct.go Прегледај датотеку

@@ -12,11 +12,7 @@ import (
12 12
 	"github.com/9seconds/mtg/wrappers"
13 13
 )
14 14
 
15
-const (
16
-	handshakeTimeout = 10 * time.Second
17
-	readBufferSize   = 64 * 1024
18
-	writeBufferSize  = 64 * 1024
19
-)
15
+const handshakeTimeout = 10 * time.Second
20 16
 
21 17
 // DirectInit initializes client connection for proxy which connects to
22 18
 // Telegram directly.
@@ -25,10 +21,10 @@ func DirectInit(socket net.Conn, connID string, conf *config.Config) (wrappers.W
25 21
 	if err := tcpSocket.SetNoDelay(false); err != nil {
26 22
 		return nil, nil, errors.Annotate(err, "Cannot disable NO_DELAY to client socket")
27 23
 	}
28
-	if err := tcpSocket.SetReadBuffer(readBufferSize); err != nil {
24
+	if err := tcpSocket.SetReadBuffer(conf.ReadBufferSize); err != nil {
29 25
 		return nil, nil, errors.Annotate(err, "Cannot set read buffer size of client socket")
30 26
 	}
31
-	if err := tcpSocket.SetWriteBuffer(writeBufferSize); err != nil {
27
+	if err := tcpSocket.SetWriteBuffer(conf.WriteBufferSize); err != nil {
32 28
 		return nil, nil, errors.Annotate(err, "Cannot set write buffer size of client socket")
33 29
 	}
34 30
 

+ 31
- 38
config/config.go Прегледај датотеку

@@ -1,28 +1,25 @@
1 1
 package config
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"encoding/hex"
5 6
 	"fmt"
6 7
 	"net"
7 8
 	"strconv"
8
-	"strings"
9 9
 
10 10
 	"github.com/juju/errors"
11 11
 	statsd "gopkg.in/alexcesaro/statsd.v2"
12 12
 )
13 13
 
14
-// Buffer sizes define internal socket buffer sizes.
15
-const (
16
-	BufferWriteSize = 32 * 1024
17
-	BufferReadSize  = 32 * 1024
18
-)
19
-
20 14
 // Config represents common configuration of mtg.
21 15
 type Config struct {
22 16
 	Debug      bool
23 17
 	Verbose    bool
24 18
 	SecureMode bool
25 19
 
20
+	ReadBufferSize  int
21
+	WriteBufferSize int
22
+
26 23
 	BindPort       uint16
27 24
 	PublicIPv4Port uint16
28 25
 	PublicIPv6Port uint16
@@ -114,30 +111,21 @@ func getAddr(host fmt.Stringer, port uint16) string {
114 111
 // fetches data from external sources. Parameters passed to this
115 112
 // function, should come from command line arguments.
116 113
 func NewConfig(debug, verbose bool, // nolint: gocyclo
114
+	writeBufferSize, readBufferSize uint32,
117 115
 	bindIP, publicIPv4, publicIPv6, statsIP net.IP,
118 116
 	bindPort, publicIPv4Port, publicIPv6Port, statsPort, statsdPort uint16,
119
-	secret, adtag, statsdIP, statsdNetwork, statsdPrefix, statsdTagsFormat string,
120
-	statsdTags map[string]string) (*Config, error) {
117
+	statsdIP, statsdNetwork, statsdPrefix, statsdTagsFormat string,
118
+	statsdTags map[string]string,
119
+	secret, adtag []byte) (*Config, error) {
121 120
 	secureMode := false
122
-	if strings.HasPrefix(secret, "dd") && len(secret) == 34 {
121
+	if bytes.HasPrefix(secret, []byte{0xdd}) && len(secret) == 17 {
123 122
 		secureMode = true
124
-		secret = strings.TrimPrefix(secret, "dd")
125
-	} else if len(secret) != 32 {
123
+		secret = bytes.TrimPrefix(secret, []byte{0xdd})
124
+	} else if len(secret) != 16 {
126 125
 		return nil, errors.New("Telegram demands secret of length 32")
127 126
 	}
128
-	secretBytes, err := hex.DecodeString(secret)
129
-	if err != nil {
130
-		return nil, errors.Annotate(err, "Cannot create config")
131
-	}
132
-
133
-	var adTagBytes []byte
134
-	if len(adtag) != 0 {
135
-		adTagBytes, err = hex.DecodeString(adtag)
136
-		if err != nil {
137
-			return nil, errors.Annotate(err, "Cannot create config")
138
-		}
139
-	}
140 127
 
128
+	var err error
141 129
 	if publicIPv4 == nil {
142 130
 		publicIPv4, err = getGlobalIPv4()
143 131
 		if err != nil {
@@ -167,19 +155,21 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
167 155
 	}
168 156
 
169 157
 	conf := &Config{
170
-		Debug:          debug,
171
-		Verbose:        verbose,
172
-		BindIP:         bindIP,
173
-		BindPort:       bindPort,
174
-		PublicIPv4:     publicIPv4,
175
-		PublicIPv4Port: publicIPv4Port,
176
-		PublicIPv6:     publicIPv6,
177
-		PublicIPv6Port: publicIPv6Port,
178
-		StatsIP:        statsIP,
179
-		StatsPort:      statsPort,
180
-		Secret:         secretBytes,
181
-		AdTag:          adTagBytes,
182
-		SecureMode:     secureMode,
158
+		Debug:           debug,
159
+		Verbose:         verbose,
160
+		BindIP:          bindIP,
161
+		BindPort:        bindPort,
162
+		PublicIPv4:      publicIPv4,
163
+		PublicIPv4Port:  publicIPv4Port,
164
+		PublicIPv6:      publicIPv6,
165
+		PublicIPv6Port:  publicIPv6Port,
166
+		StatsIP:         statsIP,
167
+		StatsPort:       statsPort,
168
+		Secret:          secret,
169
+		AdTag:           adtag,
170
+		SecureMode:      secureMode,
171
+		ReadBufferSize:  int(readBufferSize),
172
+		WriteBufferSize: int(writeBufferSize),
183 173
 	}
184 174
 
185 175
 	if statsdIP != "" {
@@ -187,7 +177,10 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
187 177
 		conf.StatsD.Prefix = statsdPrefix
188 178
 		conf.StatsD.Tags = statsdTags
189 179
 
190
-		var addr net.Addr
180
+		var (
181
+			addr net.Addr
182
+			err  error
183
+		)
191 184
 		hostPort := net.JoinHostPort(statsdIP, strconv.Itoa(int(statsdPort)))
192 185
 		switch statsdNetwork {
193 186
 		case "tcp":

+ 20
- 5
main.go Прегледај датотеку

@@ -49,6 +49,19 @@ var (
49 49
 		Default("3128").
50 50
 		Uint16()
51 51
 
52
+	writeBufferSize = app.Flag("write-buffer",
53
+		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
54
+		Short('w').
55
+		Envar("MTG_BUFFER_WRITE").
56
+		Default("65536").
57
+		Uint32()
58
+	readBufferSize = app.Flag("read-buffer",
59
+		"Read buffer size in bytes. You can think about it as a buffer from Telegram to client.").
60
+		Short('r').
61
+		Envar("MTG_BUFFER_READ").
62
+		Default("131072").
63
+		Uint32()
64
+
52 65
 	publicIPv4 = app.Flag("public-ipv4",
53 66
 		"Which IPv4 address is public.").
54 67
 		Short('4').
@@ -110,17 +123,17 @@ var (
110 123
 		Envar("MTG_STATSD_TAGS").
111 124
 		StringMap()
112 125
 
113
-	secret = app.Arg("secret", "Secret of this proxy.").Required().String()
114
-	adtag  = app.Arg("adtag", "ADTag of the proxy.").String()
126
+	secret = app.Arg("secret", "Secret of this proxy.").Required().HexBytes()
127
+	adtag  = app.Arg("adtag", "ADTag of the proxy.").HexBytes()
115 128
 )
116 129
 
117 130
 func init() {
118 131
 	rand.Seed(time.Now().UTC().UnixNano())
119 132
 	app.Version(version)
120
-
133
+	app.HelpFlag.Short('h')
121 134
 }
122 135
 
123
-func main() {
136
+func main() { // nolint: gocyclo
124 137
 	kingpin.MustParse(app.Parse(os.Args[1:]))
125 138
 
126 139
 	err := setRLimit()
@@ -129,10 +142,12 @@ func main() {
129 142
 	}
130 143
 
131 144
 	conf, err := config.NewConfig(*debug, *verbose,
145
+		*writeBufferSize, *readBufferSize,
132 146
 		*bindIP, *publicIPv4, *publicIPv6, *statsIP,
133 147
 		*bindPort, *publicIPv4Port, *publicIPv6Port, *statsPort, *statsdPort,
134
-		*secret, *adtag, *statsdIP, *statsdNetwork, *statsdPrefix, *statsdTagsFormat,
148
+		*statsdIP, *statsdNetwork, *statsdPrefix, *statsdTagsFormat,
135 149
 		*statsdTags,
150
+		*secret, *adtag,
136 151
 	)
137 152
 	if err != nil {
138 153
 		usage(err.Error())

+ 6
- 4
proxy/proxy.go Прегледај датотеку

@@ -83,8 +83,8 @@ func (p *Proxy) accept(conn net.Conn) {
83 83
 	} else {
84 84
 		clientStream := clientConn.(wrappers.StreamReadWriteCloser)
85 85
 		serverStream := serverConn.(wrappers.StreamReadWriteCloser)
86
-		go p.directPipe(clientStream, serverStream, wait)
87
-		go p.directPipe(serverStream, clientStream, wait)
86
+		go p.directPipe(clientStream, serverStream, wait, p.conf.ReadBufferSize)
87
+		go p.directPipe(serverStream, clientStream, wait, p.conf.WriteBufferSize)
88 88
 	}
89 89
 
90 90
 	wait.Wait()
@@ -130,14 +130,16 @@ func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst io.WriteCloser,
130 130
 	}
131 131
 }
132 132
 
133
-func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser, wait *sync.WaitGroup) {
133
+func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser,
134
+	wait *sync.WaitGroup, bufferSize int) {
134 135
 	defer func() {
135 136
 		src.Close() // nolint: errcheck
136 137
 		dst.Close() // nolint: errcheck
137 138
 		wait.Done()
138 139
 	}()
139 140
 
140
-	if _, err := io.Copy(dst, src); err != nil {
141
+	buffer := make([]byte, bufferSize)
142
+	if _, err := io.CopyBuffer(dst, src, buffer); err != nil {
141 143
 		src.Logger().Warnw("Cannot pump sockets", "error", err)
142 144
 	}
143 145
 }

+ 3
- 7
telegram/dialer.go Прегледај датотеку

@@ -10,11 +10,7 @@ import (
10 10
 	"github.com/9seconds/mtg/wrappers"
11 11
 )
12 12
 
13
-const (
14
-	telegramDialTimeout = 10 * time.Second
15
-	readBufferSize      = 64 * 1024
16
-	writeBufferSize     = 64 * 1024
17
-)
13
+const telegramDialTimeout = 10 * time.Second
18 14
 
19 15
 type tgDialer struct {
20 16
 	net.Dialer
@@ -32,10 +28,10 @@ func (t *tgDialer) dial(addr string) (net.Conn, error) {
32 28
 	if err = tcpSocket.SetNoDelay(true); err != nil {
33 29
 		return nil, errors.Annotate(err, "Cannot set NO_DELAY to Telegram")
34 30
 	}
35
-	if err = tcpSocket.SetReadBuffer(readBufferSize); err != nil {
31
+	if err = tcpSocket.SetReadBuffer(t.conf.WriteBufferSize); err != nil {
36 32
 		return nil, errors.Annotate(err, "Cannot set read buffer size on telegram socket")
37 33
 	}
38
-	if err = tcpSocket.SetWriteBuffer(writeBufferSize); err != nil {
34
+	if err = tcpSocket.SetWriteBuffer(t.conf.ReadBufferSize); err != nil {
39 35
 		return nil, errors.Annotate(err, "Cannot set write buffer size on telegram socket")
40 36
 	}
41 37
 

Loading…
Откажи
Сачувај