Quellcode durchsuchen

Merge remote-tracking branch 'origin/master' into stable

tags/0.15^0
9seconds vor 7 Jahren
Ursprung
Commit
61a1024264
14 geänderte Dateien mit 351 neuen und 247 gelöschten Zeilen
  1. 3
    4
      Dockerfile
  2. 14
    1
      README.md
  3. 5
    1
      config/config.go
  4. 12
    3
      go.mod
  5. 24
    6
      go.sum
  6. 8
    2
      main.go
  7. 6
    16
      proxy/proxy.go
  8. 17
    86
      stats/channels.go
  9. 28
    0
      stats/init.go
  10. 80
    0
      stats/prometheus.go
  11. 10
    38
      stats/server.go
  12. 119
    46
      stats/stats.go
  13. 5
    7
      stats/statsd.go
  14. 20
    37
      wrappers/conn.go

+ 3
- 4
Dockerfile Datei anzeigen

@@ -10,8 +10,7 @@ RUN set -x \
10 10
     curl \
11 11
     git \
12 12
     make \
13
-    upx \
14
-  && update-ca-certificates
13
+    upx
15 14
 
16 15
 COPY . /go/src/github.com/9seconds/mtg/
17 16
 
@@ -26,7 +25,7 @@ RUN set -x \
26 25
 
27 26
 FROM scratch
28 27
 
29
-ENTRYPOINT ["/usr/local/bin/mtg"]
28
+ENTRYPOINT ["/mtg"]
30 29
 ENV MTG_IP=0.0.0.0 \
31 30
     MTG_PORT=3128 \
32 31
     MTG_STATS_IP=0.0.0.0 \
@@ -34,4 +33,4 @@ ENV MTG_IP=0.0.0.0 \
34 33
 EXPOSE 3128 3129
35 34
 
36 35
 COPY --from=0 /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
37
-COPY --from=0 /go/src/github.com/9seconds/mtg/mtg /usr/local/bin/mtg
36
+COPY --from=0 /go/src/github.com/9seconds/mtg/mtg /mtg

+ 14
- 1
README.md Datei anzeigen

@@ -3,6 +3,7 @@
3 3
 Bullshit-free MTPROTO proxy for Telegram
4 4
 
5 5
 [![Build Status](https://travis-ci.org/9seconds/mtg.svg?branch=master)](https://travis-ci.org/9seconds/mtg)
6
+[![Go Report Card](https://goreportcard.com/badge/github.com/9seconds/mtg)](https://goreportcard.com/report/github.com/9seconds/mtg)
6 7
 [![Docker Build Status](https://img.shields.io/docker/build/nineseconds/mtg.svg)](https://hub.docker.com/r/nineseconds/mtg/)
7 8
 
8 9
 # Rationale
@@ -32,7 +33,7 @@ mtg is an implementation in golang which is intended to be:
32 33
   software. I also believe that in case of throwout proxies, this feature
33 34
   is useless luxury.
34 35
 * **Minimum docker image size**
35
-  Official image is less than 2.5 megabytes. Literally.
36
+  Official image is less than 3 megabytes. Literally.
36 37
 * **No management WebUI**
37 38
   This is an implementation of simple lightweight proxy. I won't do that.
38 39
 
@@ -159,6 +160,7 @@ supported environment variables:
159 160
 | `MTG_STATSD_PREFIX`      | `--statsd-prefix`      | `mtg`                             | Which bucket prefix we should use. For example, if you set `mtg`, then metric `traffic.ingress` would be send as `mtg.traffic.ingress`.                                                                                                                                    |
160 161
 | `MTG_STATSD_TAGS_FORMAT` | `--statsd-tags-format` |                                   | Which tags format we should use. By default, we are using default vanilla statsd tags format but if you want to send directly to InfluxDB or Datadog, please specify it there. Possible options are `influxdb` and `datadog`.                                              |
161 162
 | `MTG_STATSD_TAGS`        | `--statsd-tags`        |                                   | Which tags should we send to statsd with our metrics. Please specify them as `key=value` pairs.                                                                                                                                                                            |
163
+| `MTG_PROMETHEUS_PREFIX`  | `--prometheus-prefix`  | `mtg`                             | Which namespace should be used for prometheus metrics.                                                                                                                                                                                                                     |
162 164
 | `MTG_BUFFER_WRITE`       | `-w`, `--write-buffer` | `65536`                           | The size of TCP write buffer in bytes. Write buffer is the buffer for messages which are going from client to Telegram.                                                                                                                                                    |
163 165
 | `MTG_BUFFER_READ`        | `-r`, `--read-buffer`  | `131072`                          | The size of TCP read buffer in bytes. Read buffer is the buffer for messages from Telegram to client.                                                                                                                                                                      |
164 166
 | `MTG_SECURE_ONLY`        | `-s`, `--secure-only`  | `false`                           | Support only clients with secure mode (i.e only clients with dd-secrets).                                                                                                                                                                                                  |
@@ -234,3 +236,14 @@ All metrics are gauges. Here is the list of metrics and their meaning:
234 236
 All metrics are prefixed with given prefix. Default prefix is `mtg`.
235 237
 With such prefix metric name `traffic.ingress`, for example, would be
236 238
 `mtg.traffic.ingress`.
239
+
240
+
241
+# Prometheus integration
242
+
243
+[Prometheus](https://prometheus.io) integration comes out of
244
+the box, you do not need to setup anything special. Prometheus
245
+scrape endpoint lives on the same IP/port where generic stats
246
+service (`http://${MTG_STATS_IP}:${MTG_STATS_PORT}`) but on
247
+`/prometheus` path. So, if you access http stats service as `curl
248
+http://localhost:3129/`, then your prometheus endpoint is `curl
249
+http://localhost:3129/prometheus/`.

+ 5
- 1
config/config.go Datei anzeigen

@@ -38,6 +38,9 @@ type Config struct {
38 38
 		TagsFormat statsd.TagFormat
39 39
 		Enabled    bool
40 40
 	}
41
+	Prometheus struct {
42
+		Prefix string
43
+	}
41 44
 
42 45
 	Secret []byte
43 46
 	AdTag  []byte
@@ -116,7 +119,7 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
116 119
 	bindIP, publicIPv4, publicIPv6, statsIP net.IP,
117 120
 	bindPort, publicIPv4Port, publicIPv6Port, statsPort, statsdPort uint16,
118 121
 	statsdIP, statsdNetwork, statsdPrefix, statsdTagsFormat string,
119
-	statsdTags map[string]string,
122
+	statsdTags map[string]string, prometheusPrefix string,
120 123
 	secureOnly bool,
121 124
 	secret, adtag []byte) (*Config, error) {
122 125
 	secureMode := secureOnly
@@ -174,6 +177,7 @@ func NewConfig(debug, verbose bool, // nolint: gocyclo
174 177
 		ReadBufferSize:  int(readBufferSize),
175 178
 		WriteBufferSize: int(writeBufferSize),
176 179
 	}
180
+	conf.Prometheus.Prefix = prometheusPrefix
177 181
 
178 182
 	if statsdIP != "" {
179 183
 		conf.StatsD.Enabled = true

+ 12
- 3
go.mod Datei anzeigen

@@ -4,20 +4,29 @@ require (
4 4
 	github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
5 5
 	github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
6 6
 	github.com/beevik/ntp v0.2.0
7
+	github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
7 8
 	github.com/davecgh/go-spew v1.1.1 // indirect
8
-	github.com/dustin/go-humanize v0.0.0-20180713052910-9f541cc9db5d
9
+	github.com/dustin/go-humanize v1.0.0
9 10
 	github.com/gofrs/uuid v3.1.0+incompatible
10
-	github.com/juju/errors v0.0.0-20180806074554-22422dad46e1
11
+	github.com/gogo/protobuf v1.1.1 // indirect
12
+	github.com/golang/protobuf v1.2.0 // indirect
13
+	github.com/juju/errors v0.0.0-20181012004132-a4583d0a56ea
11 14
 	github.com/juju/loggo v0.0.0-20180524022052-584905176618 // indirect
12 15
 	github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073 // indirect
13 16
 	github.com/kr/pretty v0.1.0 // indirect
17
+	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
14 18
 	github.com/pkg/errors v0.8.0 // indirect
15 19
 	github.com/pmezard/go-difflib v1.0.0 // indirect
20
+	github.com/prometheus/client_golang v0.9.0
21
+	github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
22
+	github.com/prometheus/common v0.0.0-20181015124227-bcb74de08d37 // indirect
23
+	github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d // indirect
16 24
 	github.com/stretchr/testify v1.2.2
17 25
 	go.uber.org/atomic v1.3.2 // indirect
18 26
 	go.uber.org/multierr v1.1.0 // indirect
19 27
 	go.uber.org/zap v1.9.1
20
-	golang.org/x/net v0.0.0-20180921000356-2f5d2388922f // indirect
28
+	golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 // indirect
29
+	golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
21 30
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
22 31
 	gopkg.in/alexcesaro/statsd.v2 v2.0.0
23 32
 	gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect

+ 24
- 6
go.sum Datei anzeigen

@@ -4,14 +4,20 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq
4 4
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
5 5
 github.com/beevik/ntp v0.2.0 h1:sGsd+kAXzT0bfVfzJfce04g+dSRfrs+tbQW8lweuYgw=
6 6
 github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
7
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
8
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
7 9
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8 10
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9
-github.com/dustin/go-humanize v0.0.0-20180713052910-9f541cc9db5d h1:lDrio3iIdNb0Gw9CgH7cQF+iuB5mOOjdJ9ERNJCBgb4=
10
-github.com/dustin/go-humanize v0.0.0-20180713052910-9f541cc9db5d/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
11
+github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
12
+github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
11 13
 github.com/gofrs/uuid v3.1.0+incompatible h1:q2rtkjaKT4YEr6E1kamy0Ha4RtepWlQBedyHx0uzKwA=
12 14
 github.com/gofrs/uuid v3.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
13
-github.com/juju/errors v0.0.0-20180806074554-22422dad46e1 h1:wnhMXidtb70kDZCeLt/EfsVtkXS5c8zLnE9y/6DIRAU=
14
-github.com/juju/errors v0.0.0-20180806074554-22422dad46e1/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
15
+github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
16
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
17
+github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
18
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
19
+github.com/juju/errors v0.0.0-20181012004132-a4583d0a56ea h1:g2k+8WR7cHch4g0tBDhfiEvAp7fXxTNBiD1oC1Oxj3E=
20
+github.com/juju/errors v0.0.0-20181012004132-a4583d0a56ea/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
15 21
 github.com/juju/loggo v0.0.0-20180524022052-584905176618 h1:MK144iBQF9hTSwBW/9eJm034bVoG30IshVm688T2hi8=
16 22
 github.com/juju/loggo v0.0.0-20180524022052-584905176618/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U=
17 23
 github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073 h1:WQM1NildKThwdP7qWrNAFGzp4ijNLw8RlgENkaI4MJs=
@@ -21,10 +27,20 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
21 27
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
22 28
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
23 29
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
30
+github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
31
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
24 32
 github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
25 33
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
26 34
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
27 35
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
36
+github.com/prometheus/client_golang v0.9.0 h1:tXuTFVHC03mW0D+Ua1Q2d1EAVqLTuggX50V0VLICCzY=
37
+github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
38
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
39
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
40
+github.com/prometheus/common v0.0.0-20181015124227-bcb74de08d37 h1:Y7YdJ9Xb3MoQOzAWXnDunAJYpvhVwZdTirNfGUgPKaA=
41
+github.com/prometheus/common v0.0.0-20181015124227-bcb74de08d37/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
42
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ=
43
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
28 44
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
29 45
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
30 46
 go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
@@ -33,8 +49,10 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
33 49
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
34 50
 go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
35 51
 go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
36
-golang.org/x/net v0.0.0-20180921000356-2f5d2388922f h1:QM2QVxvDoW9PFSPp/zy9FgxJLfaWTZlS61KEPtBwacM=
37
-golang.org/x/net v0.0.0-20180921000356-2f5d2388922f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
52
+golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w=
53
+golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
54
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
55
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
38 56
 gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
39 57
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
40 58
 gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=

+ 8
- 2
main.go Datei anzeigen

@@ -110,6 +110,12 @@ var (
110 110
 		Envar("MTG_STATSD_TAGS").
111 111
 		StringMap()
112 112
 
113
+	prometheusPrefix = app.Flag("prometheus-prefix",
114
+		"Which namespace to use to send stats to Prometheus.").
115
+		Envar("MTG_PROMETHEUS_PREFIX").
116
+		Default("mtg").
117
+		String()
118
+
113 119
 	writeBufferSize = app.Flag("write-buffer",
114 120
 		"Write buffer size in bytes. You can think about it as a buffer from client to Telegram.").
115 121
 		Short('w').
@@ -151,7 +157,7 @@ func main() { // nolint: gocyclo
151 157
 		*bindIP, *publicIPv4, *publicIPv6, *statsIP,
152 158
 		*bindPort, *publicIPv4Port, *publicIPv6Port, *statsPort, *statsdPort,
153 159
 		*statsdIP, *statsdNetwork, *statsdPrefix, *statsdTagsFormat,
154
-		*statsdTags, *secureOnly,
160
+		*statsdTags, *prometheusPrefix, *secureOnly,
155 161
 		*secret, *adtag,
156 162
 	)
157 163
 	if err != nil {
@@ -193,7 +199,7 @@ func main() { // nolint: gocyclo
193 199
 		zap.S().Infow("Use direct connection to Telegram")
194 200
 	}
195 201
 
196
-	if err := stats.Start(conf); err != nil {
202
+	if err := stats.Init(conf); err != nil {
197 203
 		panic(err)
198 204
 	}
199 205
 

+ 6
- 16
proxy/proxy.go Datei anzeigen

@@ -93,12 +93,12 @@ func (p *Proxy) accept(conn net.Conn) {
93 93
 		clientPacket := clientConn.(wrappers.PacketReadWriteCloser)
94 94
 		serverPacket := serverConn.(wrappers.PacketReadWriteCloser)
95 95
 		go p.middlePipe(clientPacket, serverPacket, wait, &opts.ReadHacks)
96
-		go p.middlePipe(serverPacket, clientPacket, wait, &opts.WriteHacks)
96
+		p.middlePipe(serverPacket, clientPacket, wait, &opts.WriteHacks)
97 97
 	} else {
98 98
 		clientStream := clientConn.(wrappers.StreamReadWriteCloser)
99 99
 		serverStream := serverConn.(wrappers.StreamReadWriteCloser)
100 100
 		go p.directPipe(clientStream, serverStream, wait, p.conf.ReadBufferSize)
101
-		go p.directPipe(serverStream, clientStream, wait, p.conf.WriteBufferSize)
101
+		p.directPipe(serverStream, clientStream, wait, p.conf.WriteBufferSize)
102 102
 	}
103 103
 
104 104
 	wait.Wait()
@@ -121,13 +121,8 @@ func (p *Proxy) getTelegramConn(ctx context.Context, cancel context.CancelFunc,
121 121
 	return packetConn, nil
122 122
 }
123 123
 
124
-func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst io.WriteCloser,
125
-	wait *sync.WaitGroup, hacks *mtproto.Hacks) {
126
-	defer func() {
127
-		src.Close() // nolint: errcheck, gosec
128
-		dst.Close() // nolint: errcheck, gosec
129
-		wait.Done()
130
-	}()
124
+func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst io.Writer, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
125
+	defer wait.Done()
131 126
 
132 127
 	for {
133 128
 		hacks.SimpleAck = false
@@ -145,13 +140,8 @@ func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst io.WriteCloser,
145 140
 	}
146 141
 }
147 142
 
148
-func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.WriteCloser,
149
-	wait *sync.WaitGroup, bufferSize int) {
150
-	defer func() {
151
-		src.Close() // nolint: errcheck, gosec
152
-		dst.Close() // nolint: errcheck, gosec
153
-		wait.Done()
154
-	}()
143
+func (p *Proxy) directPipe(src wrappers.StreamReadCloser, dst io.Writer, wait *sync.WaitGroup, bufferSize int) {
144
+	defer wait.Done()
155 145
 
156 146
 	buffer := make([]byte, bufferSize)
157 147
 	if _, err := io.CopyBuffer(dst, src, buffer); err != nil {

+ 17
- 86
stats/channels.go Datei anzeigen

@@ -2,21 +2,20 @@ package stats
2 2
 
3 3
 import (
4 4
 	"net"
5
-	"time"
6 5
 
7 6
 	"github.com/9seconds/mtg/mtproto"
8 7
 )
9 8
 
10 9
 const (
11
-	crashesChanLength     = 1
12
-	connectionsChanLength = 20
13
-	trafficChanLength     = 5000
10
+	connectionsChanLength = 10
11
+	trafficChanLength     = 10
14 12
 )
15 13
 
16 14
 var (
17
-	crashesChan     = make(chan struct{}, crashesChanLength)
18
-	connectionsChan = make(chan *connectionData, connectionsChanLength)
19
-	trafficChan     = make(chan *trafficData, trafficChanLength)
15
+	crashesChan     = make(chan struct{})
16
+	statsChan       = make(chan chan<- Stats)
17
+	connectionsChan = make(chan connectionData, connectionsChanLength)
18
+	trafficChan     = make(chan trafficData, trafficChanLength)
20 19
 )
21 20
 
22 21
 type connectionData struct {
@@ -30,81 +29,6 @@ type trafficData struct {
30 29
 	ingress bool
31 30
 }
32 31
 
33
-func crashManager() {
34
-	for range crashesChan {
35
-		instance.mutex.RLock()
36
-
37
-		instance.Crashes++
38
-
39
-		instance.mutex.RUnlock()
40
-	}
41
-}
42
-
43
-func connectionManager() {
44
-	for event := range connectionsChan {
45
-		instance.mutex.RLock()
46
-
47
-		isIPv4 := event.addr.IP.To4() != nil
48
-		var inc uint32 = 1
49
-		if !event.connected {
50
-			inc = ^uint32(0)
51
-		}
52
-
53
-		switch event.connectionType {
54
-		case mtproto.ConnectionTypeAbridged:
55
-			if isIPv4 {
56
-				instance.Connections.Abridged.IPv4 += inc
57
-			} else {
58
-				instance.Connections.Abridged.IPv6 += inc
59
-			}
60
-		case mtproto.ConnectionTypeSecure:
61
-			if isIPv4 {
62
-				instance.Connections.Secure.IPv4 += inc
63
-			} else {
64
-				instance.Connections.Secure.IPv6 += inc
65
-			}
66
-		default:
67
-			if isIPv4 {
68
-				instance.Connections.Intermediate.IPv4 += inc
69
-			} else {
70
-				instance.Connections.Intermediate.IPv6 += inc
71
-			}
72
-		}
73
-
74
-		instance.mutex.RUnlock()
75
-	}
76
-}
77
-
78
-func trafficManager() {
79
-	speedChan := time.Tick(time.Second)
80
-
81
-	for {
82
-		select {
83
-		case event := <-trafficChan:
84
-			instance.mutex.RLock()
85
-
86
-			if event.ingress {
87
-				instance.Traffic.Ingress += trafficValue(event.traffic)
88
-				instance.speedCurrent.Ingress += trafficSpeedValue(event.traffic)
89
-			} else {
90
-				instance.Traffic.Egress += trafficValue(event.traffic)
91
-				instance.speedCurrent.Egress += trafficSpeedValue(event.traffic)
92
-			}
93
-
94
-			instance.mutex.RUnlock()
95
-		case <-speedChan:
96
-			instance.mutex.RLock()
97
-
98
-			instance.Speed.Ingress = instance.speedCurrent.Ingress
99
-			instance.Speed.Egress = instance.speedCurrent.Egress
100
-			instance.speedCurrent.Ingress = trafficSpeedValue(0)
101
-			instance.speedCurrent.Egress = trafficSpeedValue(0)
102
-
103
-			instance.mutex.RUnlock()
104
-		}
105
-	}
106
-}
107
-
108 32
 // NewCrash indicates new crash.
109 33
 func NewCrash() {
110 34
 	crashesChan <- struct{}{}
@@ -112,7 +36,7 @@ func NewCrash() {
112 36
 
113 37
 // ClientConnected indicates that new client was connected.
114 38
 func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
115
-	connectionsChan <- &connectionData{
39
+	connectionsChan <- connectionData{
116 40
 		connectionType: connectionType,
117 41
 		addr:           addr,
118 42
 		connected:      true,
@@ -121,7 +45,7 @@ func ClientConnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
121 45
 
122 46
 // ClientDisconnected indicates that client was disconnected.
123 47
 func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr) {
124
-	connectionsChan <- &connectionData{
48
+	connectionsChan <- connectionData{
125 49
 		connectionType: connectionType,
126 50
 		addr:           addr,
127 51
 		connected:      false,
@@ -130,7 +54,7 @@ func ClientDisconnected(connectionType mtproto.ConnectionType, addr *net.TCPAddr
130 54
 
131 55
 // IngressTraffic accounts new ingress traffic.
132 56
 func IngressTraffic(traffic int) {
133
-	trafficChan <- &trafficData{
57
+	trafficChan <- trafficData{
134 58
 		traffic: traffic,
135 59
 		ingress: true,
136 60
 	}
@@ -138,8 +62,15 @@ func IngressTraffic(traffic int) {
138 62
 
139 63
 // EgressTraffic accounts new ingress traffic.
140 64
 func EgressTraffic(traffic int) {
141
-	trafficChan <- &trafficData{
65
+	trafficChan <- trafficData{
142 66
 		traffic: traffic,
143 67
 		ingress: false,
144 68
 	}
145 69
 }
70
+
71
+// GetStats returns a snapshot of Stats instance.
72
+func GetStats() Stats {
73
+	rpcChan := make(chan Stats)
74
+	statsChan <- rpcChan
75
+	return <-rpcChan
76
+}

+ 28
- 0
stats/init.go Datei anzeigen

@@ -0,0 +1,28 @@
1
+package stats
2
+
3
+import (
4
+	"github.com/juju/errors"
5
+
6
+	"github.com/9seconds/mtg/config"
7
+)
8
+
9
+// Init initializes stats subsystem.
10
+func Init(conf *config.Config) error {
11
+	if conf.StatsD.Enabled {
12
+		client, err := newStatsd(conf)
13
+		if err != nil {
14
+			return errors.Annotate(err, "Cannot initialize statsd client")
15
+		}
16
+		go client.run()
17
+	}
18
+	prometheus, err := newPrometheus(conf)
19
+	if err != nil {
20
+		return errors.Annotate(err, "Cannot initialize prometheus client")
21
+	}
22
+	go prometheus.run()
23
+
24
+	go NewStats(conf).start()
25
+	go startServer(conf)
26
+
27
+	return nil
28
+}

+ 80
- 0
stats/prometheus.go Datei anzeigen

@@ -0,0 +1,80 @@
1
+package stats
2
+
3
+import (
4
+	"time"
5
+
6
+	"github.com/juju/errors"
7
+	"github.com/prometheus/client_golang/prometheus"
8
+
9
+	"github.com/9seconds/mtg/config"
10
+)
11
+
12
+const prometheusPollTime = time.Second
13
+
14
+type prometheusExporter struct {
15
+	connections *prometheus.GaugeVec
16
+	traffic     *prometheus.GaugeVec
17
+	speed       *prometheus.GaugeVec
18
+	crashes     prometheus.Gauge
19
+}
20
+
21
+func (p *prometheusExporter) run() {
22
+	for range time.Tick(prometheusPollTime) {
23
+		instance := GetStats()
24
+
25
+		p.connections.WithLabelValues("abridged", "v4").Set(float64(instance.Connections.Abridged.IPv4))
26
+		p.connections.WithLabelValues("abridged", "v6").Set(float64(instance.Connections.Abridged.IPv6))
27
+		p.connections.WithLabelValues("intermediate", "v4").Set(float64(instance.Connections.Intermediate.IPv4))
28
+		p.connections.WithLabelValues("intermediate", "v6").Set(float64(instance.Connections.Intermediate.IPv6))
29
+		p.connections.WithLabelValues("secure", "v4").Set(float64(instance.Connections.Secure.IPv4))
30
+		p.connections.WithLabelValues("secure", "v6").Set(float64(instance.Connections.Secure.IPv6))
31
+		p.traffic.WithLabelValues("ingress").Set(float64(instance.Traffic.ingress))
32
+		p.traffic.WithLabelValues("egress").Set(float64(instance.Traffic.egress))
33
+		p.speed.WithLabelValues("ingress").Set(float64(instance.Speed.ingress))
34
+		p.speed.WithLabelValues("egress").Set(float64(instance.Speed.egress))
35
+		p.crashes.Set(float64(instance.Crashes))
36
+	}
37
+}
38
+
39
+func newPrometheus(conf *config.Config) (*prometheusExporter, error) {
40
+	connections := prometheus.NewGaugeVec(prometheus.GaugeOpts{
41
+		Namespace: conf.Prometheus.Prefix,
42
+		Name:      "connections",
43
+		Help:      "Current number of connections to the proxy.",
44
+	}, []string{"type", "protocol"})
45
+	traffic := prometheus.NewGaugeVec(prometheus.GaugeOpts{
46
+		Namespace: conf.Prometheus.Prefix,
47
+		Name:      "traffic",
48
+		Help:      "Traffic passed through the proxy in bytes.",
49
+	}, []string{"direction"})
50
+	speed := prometheus.NewGaugeVec(prometheus.GaugeOpts{
51
+		Namespace: conf.Prometheus.Prefix,
52
+		Name:      "speed",
53
+		Help:      "Current throughput in bytes per second.",
54
+	}, []string{"direction"})
55
+	crashes := prometheus.NewGauge(prometheus.GaugeOpts{
56
+		Namespace: conf.Prometheus.Prefix,
57
+		Name:      "crashes",
58
+		Help:      "How many crashes happened.",
59
+	})
60
+
61
+	if err := prometheus.Register(connections); err != nil {
62
+		return nil, errors.Annotate(err, "Cannot register connections collector")
63
+	}
64
+	if err := prometheus.Register(traffic); err != nil {
65
+		return nil, errors.Annotate(err, "cannot register traffic collector")
66
+	}
67
+	if err := prometheus.Register(speed); err != nil {
68
+		return nil, errors.Annotate(err, "cannot register speed collector")
69
+	}
70
+	if err := prometheus.Register(crashes); err != nil {
71
+		return nil, errors.Annotate(err, "cannot register crashes collector")
72
+	}
73
+
74
+	return &prometheusExporter{
75
+		connections: connections,
76
+		traffic:     traffic,
77
+		speed:       speed,
78
+		crashes:     crashes,
79
+	}, nil
80
+}

+ 10
- 38
stats/server.go Datei anzeigen

@@ -3,67 +3,39 @@ package stats
3 3
 import (
4 4
 	"encoding/json"
5 5
 	"net/http"
6
-	"sync"
7
-	"time"
8 6
 
7
+	"github.com/prometheus/client_golang/prometheus/promhttp"
9 8
 	"go.uber.org/zap"
10 9
 
11 10
 	"github.com/9seconds/mtg/config"
12 11
 )
13 12
 
14
-var instance *stats
15
-
16
-// Start starts new statistics server.
17
-func Start(conf *config.Config) error {
13
+func startServer(conf *config.Config) {
18 14
 	log := zap.S().Named("stats")
19 15
 
20
-	instance = &stats{
21
-		URLs:   conf.GetURLs(),
22
-		Uptime: uptime(time.Now()),
23
-		mutex:  &sync.RWMutex{},
24
-	}
25
-
26
-	if conf.StatsD.Enabled {
27
-		client, err := newStatsd(conf)
28
-		if err != nil {
29
-			return err
30
-		}
31
-		go client.run()
32
-	}
33
-
34
-	go crashManager()
35
-	go connectionManager()
36
-	go trafficManager()
37
-
38 16
 	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
39 17
 		w.Header().Set("Content-Type", "application/json")
40 18
 
41
-		instance.mutex.Lock()
42
-		first, err := json.Marshal(instance)
43
-		instance.mutex.Unlock()
44
-
19
+		first, err := json.Marshal(GetStats())
45 20
 		if err != nil {
46 21
 			log.Errorw("Cannot encode json", "error", err)
47 22
 			http.Error(w, "Internal server error", 500)
48 23
 			return
49 24
 		}
50 25
 
51
-		interm := map[string]interface{}{}
52
-		json.Unmarshal(first, &interm) // nolint: errcheck, gosec
26
+		interim := map[string]interface{}{}
27
+		json.Unmarshal(first, &interim) // nolint: errcheck, gosec
53 28
 
54 29
 		encoder := json.NewEncoder(w)
55 30
 		encoder.SetEscapeHTML(false)
56 31
 		encoder.SetIndent("", "  ")
57
-		if err = encoder.Encode(interm); err != nil {
32
+		if err = encoder.Encode(interim); err != nil {
58 33
 			log.Errorw("Cannot encode json", "error", err)
59 34
 		}
60 35
 	})
36
+	http.Handle("/prometheus/", promhttp.Handler())
61 37
 
62
-	go func() {
63
-		if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
64
-			log.Fatalw("Stats server has been stopped", "error", err)
65
-		}
66
-	}()
67
-
68
-	return nil
38
+	if err := http.ListenAndServe(conf.StatAddr(), nil); err != nil {
39
+		log.Fatalw("Stats server has been stopped", "error", err)
40
+	}
69 41
 }

+ 119
- 46
stats/stats.go Datei anzeigen

@@ -4,12 +4,12 @@ import (
4 4
 	"encoding/json"
5 5
 	"fmt"
6 6
 	"strconv"
7
-	"sync"
8 7
 	"time"
9 8
 
10 9
 	humanize "github.com/dustin/go-humanize"
11 10
 
12 11
 	"github.com/9seconds/mtg/config"
12
+	"github.com/9seconds/mtg/mtproto"
13 13
 )
14 14
 
15 15
 type uptime time.Time
@@ -24,72 +24,73 @@ func (u uptime) MarshalJSON() ([]byte, error) {
24 24
 	return json.Marshal(value)
25 25
 }
26 26
 
27
-type trafficValue uint64
28
-
29
-func (t trafficValue) MarshalJSON() ([]byte, error) {
30
-	tv := uint64(t)
31
-	value := map[string]interface{}{
32
-		"bytes": tv,
33
-		"human": humanize.Bytes(tv),
34
-	}
35
-
36
-	return json.Marshal(value)
37
-}
38
-
39
-type trafficSpeedValue uint64
40
-
41
-func (t trafficSpeedValue) MarshalJSON() ([]byte, error) {
42
-	speed := uint64(t)
43
-	value := map[string]interface{}{
44
-		"bytes/s": speed,
45
-		"human":   fmt.Sprintf("%s/S", humanize.Bytes(speed)),
46
-	}
47
-
48
-	return json.Marshal(value)
27
+type connectionType struct {
28
+	IPv6 uint32 `json:"ipv6"`
29
+	IPv4 uint32 `json:"ipv4"`
49 30
 }
50 31
 
51
-type connections struct {
32
+type baseConnections struct {
52 33
 	All          connectionType `json:"all"`
53 34
 	Abridged     connectionType `json:"abridged"`
54 35
 	Intermediate connectionType `json:"intermediate"`
55 36
 	Secure       connectionType `json:"secure"`
56 37
 }
57 38
 
39
+type connections struct {
40
+	baseConnections
41
+}
42
+
58 43
 func (c connections) MarshalJSON() ([]byte, error) {
59 44
 	c.All.IPv4 = c.Abridged.IPv4 + c.Intermediate.IPv4 + c.Secure.IPv4
60 45
 	c.All.IPv6 = c.Abridged.IPv6 + c.Intermediate.IPv6 + c.Secure.IPv6
61 46
 
62
-	value := struct {
63
-		All          connectionType `json:"all"`
64
-		Abridged     connectionType `json:"abridged"`
65
-		Intermediate connectionType `json:"intermediate"`
66
-		Secure       connectionType `json:"secure"`
67
-	}{
68
-		All:          c.All,
69
-		Abridged:     c.Abridged,
70
-		Intermediate: c.Intermediate,
71
-		Secure:       c.Secure,
47
+	return json.Marshal(c.baseConnections)
48
+}
49
+
50
+type traffic struct {
51
+	ingress uint64
52
+	egress  uint64
53
+}
54
+
55
+func (t *traffic) dumpValue(value uint64) map[string]interface{} {
56
+	return map[string]interface{}{
57
+		"bytes": value,
58
+		"human": humanize.Bytes(value),
59
+	}
60
+}
61
+
62
+func (t traffic) MarshalJSON() ([]byte, error) {
63
+	value := map[string]map[string]interface{}{
64
+		"ingress": t.dumpValue(t.ingress),
65
+		"egress":  t.dumpValue(t.egress),
72 66
 	}
73 67
 
74 68
 	return json.Marshal(value)
75 69
 }
76 70
 
77
-type connectionType struct {
78
-	IPv6 uint32 `json:"ipv6"`
79
-	IPv4 uint32 `json:"ipv4"`
71
+type speed struct {
72
+	ingress uint64
73
+	egress  uint64
80 74
 }
81 75
 
82
-type traffic struct {
83
-	Ingress trafficValue `json:"ingress"`
84
-	Egress  trafficValue `json:"egress"`
76
+func (s *speed) dumpValue(value uint64) map[string]interface{} {
77
+	return map[string]interface{}{
78
+		"bytes/s": value,
79
+		"human":   fmt.Sprintf("%s/s", humanize.Bytes(value)),
80
+	}
85 81
 }
86 82
 
87
-type speed struct {
88
-	Ingress trafficSpeedValue `json:"ingress"`
89
-	Egress  trafficSpeedValue `json:"egress"`
83
+func (s speed) MarshalJSON() ([]byte, error) {
84
+	value := map[string]map[string]interface{}{
85
+		"ingress": s.dumpValue(s.ingress),
86
+		"egress":  s.dumpValue(s.egress),
87
+	}
88
+
89
+	return json.Marshal(value)
90 90
 }
91 91
 
92
-type stats struct {
92
+// Stats represents a statistics of the proxy.
93
+type Stats struct {
93 94
 	URLs        config.IPURLs `json:"urls"`
94 95
 	Connections connections   `json:"connections"`
95 96
 	Traffic     traffic       `json:"traffic"`
@@ -97,6 +98,78 @@ type stats struct {
97 98
 	Uptime      uptime        `json:"uptime"`
98 99
 	Crashes     uint32        `json:"crashes"`
99 100
 
100
-	speedCurrent speed
101
-	mutex        *sync.RWMutex
101
+	previousTraffic traffic
102
+}
103
+
104
+func (s *Stats) start() {
105
+	speedChan := time.Tick(time.Second)
106
+
107
+	for {
108
+		select {
109
+		case <-speedChan:
110
+			s.handleSpeed()
111
+		case event := <-trafficChan:
112
+			s.handleTraffic(event)
113
+		case event := <-connectionsChan:
114
+			s.handleConnection(event)
115
+		case getStatsChan := <-statsChan:
116
+			s.handleGetStats(getStatsChan)
117
+		case <-crashesChan:
118
+			s.handleCrash()
119
+		}
120
+	}
121
+}
122
+
123
+func (s *Stats) handleTraffic(evt trafficData) {
124
+	if evt.ingress {
125
+		s.Traffic.ingress += uint64(evt.traffic)
126
+	} else {
127
+		s.Traffic.egress += uint64(evt.traffic)
128
+	}
129
+}
130
+
131
+func (s *Stats) handleSpeed() {
132
+	s.Speed.ingress = s.Traffic.ingress - s.previousTraffic.ingress
133
+	s.Speed.egress = s.Traffic.egress - s.previousTraffic.egress
134
+	s.previousTraffic.ingress = s.Traffic.ingress
135
+	s.previousTraffic.egress = s.Traffic.egress
136
+}
137
+
138
+func (s *Stats) handleConnection(evt connectionData) {
139
+	var inc uint32 = 1
140
+	if !evt.connected {
141
+		inc = ^uint32(0)
142
+	}
143
+
144
+	var conn *connectionType
145
+	switch evt.connectionType {
146
+	case mtproto.ConnectionTypeAbridged:
147
+		conn = &s.Connections.Abridged
148
+	case mtproto.ConnectionTypeSecure:
149
+		conn = &s.Connections.Secure
150
+	default:
151
+		conn = &s.Connections.Intermediate
152
+	}
153
+
154
+	if evt.addr.IP.To4() != nil {
155
+		conn.IPv4 += inc
156
+	} else {
157
+		conn.IPv6 += inc
158
+	}
159
+}
160
+
161
+func (s *Stats) handleGetStats(getStatsChan chan<- Stats) {
162
+	getStatsChan <- *s
163
+}
164
+
165
+func (s *Stats) handleCrash() {
166
+	s.Crashes++
167
+}
168
+
169
+// NewStats creates a new instance of Stats structure.
170
+func NewStats(conf *config.Config) *Stats {
171
+	return &Stats{
172
+		URLs:   conf.GetURLs(),
173
+		Uptime: uptime(time.Now()),
174
+	}
102 175
 }

+ 5
- 7
stats/statsd.go Datei anzeigen

@@ -36,7 +36,7 @@ type statsdExporter struct {
36 36
 
37 37
 func (s *statsdExporter) run() {
38 38
 	for range time.Tick(statsdPollTime) {
39
-		instance.mutex.Lock()
39
+		instance := GetStats()
40 40
 
41 41
 		s.client.Gauge(statsdConnectionsAbridgedV4, instance.Connections.Abridged.IPv4)
42 42
 		s.client.Gauge(statsdConnectionsAbridgedV6, instance.Connections.Abridged.IPv6)
@@ -44,13 +44,11 @@ func (s *statsdExporter) run() {
44 44
 		s.client.Gauge(statsdConnectionsIntermediateV6, instance.Connections.Intermediate.IPv6)
45 45
 		s.client.Gauge(statsdConnectionsSecureV4, instance.Connections.Secure.IPv4)
46 46
 		s.client.Gauge(statsdConnectionsSecureV6, instance.Connections.Secure.IPv6)
47
-		s.client.Gauge(statsdTrafficIngress, uint64(instance.Traffic.Ingress))
48
-		s.client.Gauge(statsdTrafficEgress, uint64(instance.Traffic.Egress))
49
-		s.client.Gauge(statsdSpeedIngress, uint64(instance.Speed.Ingress))
50
-		s.client.Gauge(statsdSpeedEgress, uint64(instance.Speed.Egress))
47
+		s.client.Gauge(statsdTrafficIngress, instance.Traffic.ingress)
48
+		s.client.Gauge(statsdTrafficEgress, instance.Traffic.egress)
49
+		s.client.Gauge(statsdSpeedIngress, instance.Speed.ingress)
50
+		s.client.Gauge(statsdSpeedEgress, instance.Speed.egress)
51 51
 		s.client.Gauge(statsdCrashes, instance.Crashes)
52
-
53
-		instance.mutex.Unlock()
54 52
 	}
55 53
 }
56 54
 

+ 20
- 37
wrappers/conn.go Datei anzeigen

@@ -38,13 +38,6 @@ const (
38 38
 	connTimeoutWrite = 2 * time.Minute
39 39
 )
40 40
 
41
-type ioResult struct {
42
-	n   int
43
-	err error
44
-}
45
-
46
-type ioFunc func([]byte) (int, error)
47
-
48 41
 // Conn is a basic wrapper for net.Conn providing the most low-level
49 42
 // logic and management as possible.
50 43
 type Conn struct {
@@ -61,12 +54,20 @@ type Conn struct {
61 54
 func (c *Conn) Write(p []byte) (int, error) {
62 55
 	select {
63 56
 	case <-c.ctx.Done():
57
+		c.Close() // nolint: gosec
64 58
 		return 0, errors.Annotate(c.ctx.Err(), "Cannot write because context was closed")
65 59
 	default:
66
-		n, err := c.doIO(c.conn.Write, p, connTimeoutWrite)
60
+		if err := c.conn.SetWriteDeadline(time.Now().Add(connTimeoutWrite)); err != nil {
61
+			c.Close() // nolint: gosec
62
+			return 0, errors.Annotate(err, "Cannot set write deadline to the socket")
63
+		}
67 64
 
65
+		n, err := c.conn.Write(p)
68 66
 		c.logger.Debugw("Write to stream", "bytes", n, "error", err)
69 67
 		stats.EgressTraffic(n)
68
+		if err != nil {
69
+			c.Close() // nolint: gosec
70
+		}
70 71
 
71 72
 		return n, err
72 73
 	}
@@ -75,48 +76,30 @@ func (c *Conn) Write(p []byte) (int, error) {
75 76
 func (c *Conn) Read(p []byte) (int, error) {
76 77
 	select {
77 78
 	case <-c.ctx.Done():
79
+		c.Close() // nolint: gosec
78 80
 		return 0, errors.Annotate(c.ctx.Err(), "Cannot read because context was closed")
79 81
 	default:
80
-		n, err := c.doIO(c.conn.Read, p, connTimeoutRead)
82
+		if err := c.conn.SetReadDeadline(time.Now().Add(connTimeoutRead)); err != nil {
83
+			c.Close() // nolint: gosec
84
+			return 0, errors.Annotate(err, "Cannot set read deadline to the socket")
85
+		}
81 86
 
87
+		n, err := c.conn.Read(p)
82 88
 		c.logger.Debugw("Read from stream", "bytes", n, "error", err)
83 89
 		stats.IngressTraffic(n)
84
-
85
-		return n, err
86
-	}
87
-}
88
-
89
-func (c *Conn) doIO(callback ioFunc, p []byte, timeout time.Duration) (int, error) {
90
-	resChan := make(chan ioResult, 1)
91
-	timer := time.NewTimer(timeout)
92
-
93
-	go func() {
94
-		n, err := callback(p)
95
-		resChan <- ioResult{n: n, err: err}
96
-	}()
97
-
98
-	select {
99
-	case res := <-resChan:
100
-		timer.Stop()
101
-		if res.err != nil {
90
+		if err != nil {
102 91
 			c.Close() // nolint: gosec
103 92
 		}
104
-		return res.n, res.err
105
-	case <-c.ctx.Done():
106
-		timer.Stop()
107
-		c.Close() // nolint: gosec
108
-		return 0, errors.Annotate(c.ctx.Err(), "Cannot do IO because context is closed")
109
-	case <-timer.C:
110
-		c.Close() // nolint: gosec
111
-		return 0, errors.Annotate(c.ctx.Err(), "Timeout on IO operation")
93
+
94
+		return n, err
112 95
 	}
113 96
 }
114 97
 
115 98
 // Close closes underlying net.Conn instance.
116 99
 func (c *Conn) Close() error {
117
-	defer c.logger.Debugw("Close connection")
118
-
100
+	c.logger.Debugw("Close connection")
119 101
 	c.cancel()
102
+
120 103
 	return c.conn.Close()
121 104
 }
122 105
 

Laden…
Abbrechen
Speichern