Bläddra i källkod

Merge remote-tracking branch 'origin/master' into stable

tags/v2.1.4
9seconds 4 år sedan
förälder
incheckning
fee133a62f

+ 2
- 2
Makefile Visa fil

@@ -39,7 +39,7 @@ vendor: go.mod go.sum
39 39
 
40 40
 .PHONY: fmt
41 41
 fmt:
42
-	@$(GOTOOL) gofumpt -w -s -extra "$(ROOT_DIR)"
42
+	@$(GOTOOL) gofumpt -w -extra "$(ROOT_DIR)"
43 43
 
44 44
 .PHONY: test
45 45
 test:
@@ -95,4 +95,4 @@ install-tools-goreleaser: .bin
95 95
 
96 96
 .PHONY: update-deps
97 97
 update-deps:
98
-	@go get -u && go mod tidy
98
+	@go get -u && go mod tidy -go=1.17

+ 0
- 2
essentials/conns.go Visa fil

@@ -8,14 +8,12 @@ import (
8 8
 // CloseableReader is a reader interface that can close its reading end.
9 9
 type CloseableReader interface {
10 10
 	io.Reader
11
-
12 11
 	CloseRead() error
13 12
 }
14 13
 
15 14
 // CloseableWriter is a writer that can close its writing end.
16 15
 type CloseableWriter interface {
17 16
 	io.Writer
18
-
19 17
 	CloseWrite() error
20 18
 }
21 19
 

+ 2
- 2
go.mod Visa fil

@@ -14,7 +14,7 @@ require (
14 14
 	github.com/jarcoal/httpmock v1.0.8
15 15
 	github.com/kentik/patricia v0.0.0-20210909164817-21603333b70e
16 16
 	github.com/mccutchen/go-httpbin v1.1.1
17
-	github.com/panjf2000/ants/v2 v2.4.6
17
+	github.com/panjf2000/ants/v2 v2.4.7
18 18
 	github.com/pelletier/go-toml v1.9.4
19 19
 	github.com/prometheus/client_golang v1.11.0
20 20
 	github.com/prometheus/common v0.32.1 // indirect
@@ -24,7 +24,7 @@ require (
24 24
 	github.com/stretchr/objx v0.3.0 // indirect
25 25
 	github.com/stretchr/testify v1.7.0
26 26
 	github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43
27
-	golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871
27
+	golang.org/x/crypto v0.0.0-20211202192323-5770296d904e
28 28
 	golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
29 29
 	golang.org/x/sys v0.0.0-20211124211545-fe61309f8881
30 30
 	google.golang.org/protobuf v1.27.1 // indirect

+ 4
- 4
go.sum Visa fil

@@ -193,8 +193,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
193 193
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
194 194
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
195 195
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
196
-github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ=
197
-github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
196
+github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M=
197
+github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
198 198
 github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
199 199
 github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
200 200
 github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
@@ -285,8 +285,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
285 285
 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
286 286
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
287 287
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
288
-golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI=
289
-golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
288
+golang.org/x/crypto v0.0.0-20211202192323-5770296d904e h1:MUP6MR3rJ7Gk9LEia0LP2ytiH6MuCfs7qYz+47jGdD8=
289
+golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
290 290
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
291 291
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
292 292
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=

+ 1
- 5
mtglib/internal/relay/init.go Visa fil

@@ -1,11 +1,7 @@
1 1
 package relay
2 2
 
3
-import "time"
4
-
5 3
 const (
6
-	copyBufferSize   = 64 * 1024
7
-	writerBufferSize = 128 * 1024
8
-	readTimeout      = 10 * time.Millisecond
4
+	copyBufferSize = 64 * 1024
9 5
 )
10 6
 
11 7
 type Logger interface {

+ 9
- 21
mtglib/internal/relay/pools.go Visa fil

@@ -1,31 +1,19 @@
1 1
 package relay
2 2
 
3
-import (
4
-	"bufio"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-)
3
+import "sync"
9 4
 
10
-var syncPairPool = sync.Pool{
5
+var copyBufferPool = sync.Pool{
11 6
 	New: func() interface{} {
12
-		return &syncPair{
13
-			writer:  bufio.NewWriterSize(nil, writerBufferSize),
14
-			copyBuf: make([]byte, copyBufferSize),
15
-		}
7
+		rv := make([]byte, copyBufferSize)
8
+
9
+		return &rv
16 10
 	},
17 11
 }
18 12
 
19
-func acquireSyncPair(reader net.Conn, writer io.Writer) *syncPair {
20
-	sp := syncPairPool.Get().(*syncPair) // nolint: forcetypeassert
21
-	sp.writer.Reset(writer)
22
-	sp.reader = reader
23
-
24
-	return sp
13
+func acquireCopyBuffer() *[]byte {
14
+	return copyBufferPool.Get().(*[]byte)
25 15
 }
26 16
 
27
-func releaseSyncPair(sp *syncPair) {
28
-	sp.writer.Reset(nil)
29
-	sp.reader = nil
30
-	syncPairPool.Put(sp)
17
+func releaseCopyBuffer(buf *[]byte) {
18
+	copyBufferPool.Put(buf)
31 19
 }

+ 14
- 16
mtglib/internal/relay/relay.go Visa fil

@@ -4,7 +4,6 @@ import (
4 4
 	"context"
5 5
 	"errors"
6 6
 	"io"
7
-	"sync"
8 7
 
9 8
 	"github.com/9seconds/mtg/v2/essentials"
10 9
 )
@@ -22,28 +21,27 @@ func Relay(ctx context.Context, log Logger, telegramConn, clientConn essentials.
22 21
 		clientConn.Close()
23 22
 	}()
24 23
 
25
-	wg := &sync.WaitGroup{}
26
-	wg.Add(2) // nolint: gomnd
24
+	closeChan := make(chan struct{})
27 25
 
28
-	go pump(log, telegramConn, clientConn, wg, "client -> telegram")
26
+	go func() {
27
+		defer close(closeChan)
28
+
29
+		pump(log, telegramConn, clientConn, "client -> telegram")
30
+	}()
29 31
 
30
-	pump(log, clientConn, telegramConn, wg, "telegram -> client")
32
+	pump(log, clientConn, telegramConn, "telegram -> client")
31 33
 
32
-	wg.Wait()
34
+	<-closeChan
33 35
 }
34 36
 
35
-func pump(log Logger, src, dst essentials.Conn, wg *sync.WaitGroup, direction string) {
36
-	syncer := acquireSyncPair(src, dst)
37
+func pump(log Logger, src, dst essentials.Conn, direction string) {
38
+	defer src.CloseRead()  // nolint: errcheck
39
+	defer dst.CloseWrite() // nolint: errcheck
37 40
 
38
-	defer func() {
39
-		syncer.Flush()
40
-		releaseSyncPair(syncer)
41
-		src.CloseRead()  // nolint: errcheck
42
-		dst.CloseWrite() // nolint: errcheck
43
-		wg.Done()
44
-	}()
41
+	copyBuffer := acquireCopyBuffer()
42
+	defer releaseCopyBuffer(copyBuffer)
45 43
 
46
-	n, err := syncer.Sync()
44
+	n, err := io.CopyBuffer(src, dst, *copyBuffer)
47 45
 
48 46
 	switch {
49 47
 	case err == nil:

+ 0
- 2
mtglib/internal/relay/relay_test.go Visa fil

@@ -42,14 +42,12 @@ func (suite *RelayTestSuite) TestExit() {
42 42
 	suite.telegramConnMock.On("CloseWrite").Return(nil).Once()
43 43
 	suite.telegramConnMock.On("Read", mock.Anything).Return(10, io.EOF).Once()
44 44
 	suite.telegramConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
45
-	suite.telegramConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe()
46 45
 
47 46
 	suite.clientConnMock.On("Read", mock.Anything).Return(0, io.EOF).Once()
48 47
 	suite.clientConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
49 48
 	suite.clientConnMock.On("Close").Return(nil)
50 49
 	suite.clientConnMock.On("CloseRead").Return(nil).Once()
51 50
 	suite.clientConnMock.On("CloseWrite").Return(nil).Once()
52
-	suite.clientConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe()
53 51
 
54 52
 	relay.Relay(suite.ctx, suite.loggerMock, suite.telegramConnMock, suite.clientConnMock)
55 53
 }

+ 0
- 77
mtglib/internal/relay/sync_pair.go Visa fil

@@ -1,77 +0,0 @@
1
-package relay
2
-
3
-import (
4
-	"bufio"
5
-	"errors"
6
-	"fmt"
7
-	"io"
8
-	"net"
9
-	"os"
10
-	"sync"
11
-	"time"
12
-)
13
-
14
-type syncPair struct {
15
-	writer  *bufio.Writer
16
-	copyBuf []byte
17
-
18
-	mutex  sync.Mutex
19
-	reader net.Conn
20
-}
21
-
22
-func (s *syncPair) Sync() (int64, error) {
23
-	return io.CopyBuffer(s, s, s.copyBuf) // nolint: wrapcheck
24
-}
25
-
26
-func (s *syncPair) Read(p []byte) (int, error) {
27
-	n, err := s.readBlocking(p, false)
28
-
29
-	// nothing has been delivered for readTimeout time. Let's flush.
30
-	if errors.Is(err, os.ErrDeadlineExceeded) {
31
-		if err := s.Flush(); err != nil {
32
-			return 0, fmt.Errorf("cannot flush writer hand-side: %w", err)
33
-		}
34
-
35
-		return s.readBlocking(p, true)
36
-	}
37
-
38
-	return n, err
39
-}
40
-
41
-func (s *syncPair) Write(p []byte) (int, error) {
42
-	s.mutex.Lock()
43
-	defer s.mutex.Unlock()
44
-
45
-	n, err := s.writer.Write(p)
46
-
47
-	// optimization for a case when we have a small package and want to avoid a
48
-	// delay in readTimeout. In that case, we assume that peer has finished to
49
-	// sent a data it wants to send so we can flush without waiting for anything
50
-	// else.
51
-	if err == nil && n < copyBufferSize {
52
-		err = s.writer.Flush()
53
-	}
54
-
55
-	return n, err // nolint: wrapcheck
56
-}
57
-
58
-func (s *syncPair) Flush() error {
59
-	s.mutex.Lock()
60
-	defer s.mutex.Unlock()
61
-
62
-	return s.writer.Flush() // nolint: wrapcheck
63
-}
64
-
65
-func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) {
66
-	var deadline time.Time
67
-
68
-	if !blocking {
69
-		deadline = time.Now().Add(readTimeout)
70
-	}
71
-
72
-	if err := s.reader.SetReadDeadline(deadline); err != nil {
73
-		return 0, fmt.Errorf("cannot set read deadline: %w", err)
74
-	}
75
-
76
-	return s.reader.Read(p) // nolint: wrapcheck
77
-}

Laddar…
Avbryt
Spara