Просмотр исходного кода

Remove all cleverness that broke uploads

tags/v2.1.4^2
9seconds 4 лет назад
Родитель
Сommit
93bed24a0b

+ 0
- 2
essentials/conns.go Просмотреть файл

8
 // CloseableReader is a reader interface that can close its reading end.
8
 // CloseableReader is a reader interface that can close its reading end.
9
 type CloseableReader interface {
9
 type CloseableReader interface {
10
 	io.Reader
10
 	io.Reader
11
-
12
 	CloseRead() error
11
 	CloseRead() error
13
 }
12
 }
14
 
13
 
15
 // CloseableWriter is a writer that can close its writing end.
14
 // CloseableWriter is a writer that can close its writing end.
16
 type CloseableWriter interface {
15
 type CloseableWriter interface {
17
 	io.Writer
16
 	io.Writer
18
-
19
 	CloseWrite() error
17
 	CloseWrite() error
20
 }
18
 }
21
 
19
 

+ 1
- 5
mtglib/internal/relay/init.go Просмотреть файл

1
 package relay
1
 package relay
2
 
2
 
3
-import "time"
4
-
5
 const (
3
 const (
6
-	copyBufferSize   = 64 * 1024
7
-	writerBufferSize = 128 * 1024
8
-	readTimeout      = 10 * time.Millisecond
4
+	copyBufferSize = 64 * 1024
9
 )
5
 )
10
 
6
 
11
 type Logger interface {
7
 type Logger interface {

+ 9
- 21
mtglib/internal/relay/pools.go Просмотреть файл

1
 package relay
1
 package relay
2
 
2
 
3
-import (
4
-	"bufio"
5
-	"io"
6
-	"net"
7
-	"sync"
8
-)
3
+import "sync"
9
 
4
 
10
-var syncPairPool = sync.Pool{
5
+var copyBufferPool = sync.Pool{
11
 	New: func() interface{} {
6
 	New: func() interface{} {
12
-		return &syncPair{
13
-			writer:  bufio.NewWriterSize(nil, writerBufferSize),
14
-			copyBuf: make([]byte, copyBufferSize),
15
-		}
7
+		rv := make([]byte, copyBufferSize)
8
+
9
+		return &rv
16
 	},
10
 	},
17
 }
11
 }
18
 
12
 
19
-func acquireSyncPair(reader net.Conn, writer io.Writer) *syncPair {
20
-	sp := syncPairPool.Get().(*syncPair) // nolint: forcetypeassert
21
-	sp.writer.Reset(writer)
22
-	sp.reader = reader
23
-
24
-	return sp
13
+func acquireCopyBuffer() *[]byte {
14
+	return copyBufferPool.Get().(*[]byte)
25
 }
15
 }
26
 
16
 
27
-func releaseSyncPair(sp *syncPair) {
28
-	sp.writer.Reset(nil)
29
-	sp.reader = nil
30
-	syncPairPool.Put(sp)
17
+func releaseCopyBuffer(buf *[]byte) {
18
+	copyBufferPool.Put(buf)
31
 }
19
 }

+ 14
- 16
mtglib/internal/relay/relay.go Просмотреть файл

4
 	"context"
4
 	"context"
5
 	"errors"
5
 	"errors"
6
 	"io"
6
 	"io"
7
-	"sync"
8
 
7
 
9
 	"github.com/9seconds/mtg/v2/essentials"
8
 	"github.com/9seconds/mtg/v2/essentials"
10
 )
9
 )
22
 		clientConn.Close()
21
 		clientConn.Close()
23
 	}()
22
 	}()
24
 
23
 
25
-	wg := &sync.WaitGroup{}
26
-	wg.Add(2) // nolint: gomnd
24
+	closeChan := make(chan struct{})
27
 
25
 
28
-	go pump(log, telegramConn, clientConn, wg, "client -> telegram")
26
+	go func() {
27
+		defer close(closeChan)
28
+
29
+		pump(log, telegramConn, clientConn, "client -> telegram")
30
+	}()
29
 
31
 
30
-	pump(log, clientConn, telegramConn, wg, "telegram -> client")
32
+	pump(log, clientConn, telegramConn, "telegram -> client")
31
 
33
 
32
-	wg.Wait()
34
+	<-closeChan
33
 }
35
 }
34
 
36
 
35
-func pump(log Logger, src, dst essentials.Conn, wg *sync.WaitGroup, direction string) {
36
-	syncer := acquireSyncPair(src, dst)
37
+func pump(log Logger, src, dst essentials.Conn, direction string) {
38
+	defer src.CloseRead()  // nolint: errcheck
39
+	defer dst.CloseWrite() // nolint: errcheck
37
 
40
 
38
-	defer func() {
39
-		syncer.Flush()
40
-		releaseSyncPair(syncer)
41
-		src.CloseRead()  // nolint: errcheck
42
-		dst.CloseWrite() // nolint: errcheck
43
-		wg.Done()
44
-	}()
41
+	copyBuffer := acquireCopyBuffer()
42
+	defer releaseCopyBuffer(copyBuffer)
45
 
43
 
46
-	n, err := syncer.Sync()
44
+	n, err := io.CopyBuffer(src, dst, *copyBuffer)
47
 
45
 
48
 	switch {
46
 	switch {
49
 	case err == nil:
47
 	case err == nil:

+ 0
- 2
mtglib/internal/relay/relay_test.go Просмотреть файл

42
 	suite.telegramConnMock.On("CloseWrite").Return(nil).Once()
42
 	suite.telegramConnMock.On("CloseWrite").Return(nil).Once()
43
 	suite.telegramConnMock.On("Read", mock.Anything).Return(10, io.EOF).Once()
43
 	suite.telegramConnMock.On("Read", mock.Anything).Return(10, io.EOF).Once()
44
 	suite.telegramConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
44
 	suite.telegramConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
45
-	suite.telegramConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe()
46
 
45
 
47
 	suite.clientConnMock.On("Read", mock.Anything).Return(0, io.EOF).Once()
46
 	suite.clientConnMock.On("Read", mock.Anything).Return(0, io.EOF).Once()
48
 	suite.clientConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
47
 	suite.clientConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe()
49
 	suite.clientConnMock.On("Close").Return(nil)
48
 	suite.clientConnMock.On("Close").Return(nil)
50
 	suite.clientConnMock.On("CloseRead").Return(nil).Once()
49
 	suite.clientConnMock.On("CloseRead").Return(nil).Once()
51
 	suite.clientConnMock.On("CloseWrite").Return(nil).Once()
50
 	suite.clientConnMock.On("CloseWrite").Return(nil).Once()
52
-	suite.clientConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe()
53
 
51
 
54
 	relay.Relay(suite.ctx, suite.loggerMock, suite.telegramConnMock, suite.clientConnMock)
52
 	relay.Relay(suite.ctx, suite.loggerMock, suite.telegramConnMock, suite.clientConnMock)
55
 }
53
 }

+ 0
- 77
mtglib/internal/relay/sync_pair.go Просмотреть файл

1
-package relay
2
-
3
-import (
4
-	"bufio"
5
-	"errors"
6
-	"fmt"
7
-	"io"
8
-	"net"
9
-	"os"
10
-	"sync"
11
-	"time"
12
-)
13
-
14
-type syncPair struct {
15
-	writer  *bufio.Writer
16
-	copyBuf []byte
17
-
18
-	mutex  sync.Mutex
19
-	reader net.Conn
20
-}
21
-
22
-func (s *syncPair) Sync() (int64, error) {
23
-	return io.CopyBuffer(s, s, s.copyBuf) // nolint: wrapcheck
24
-}
25
-
26
-func (s *syncPair) Read(p []byte) (int, error) {
27
-	n, err := s.readBlocking(p, false)
28
-
29
-	// nothing has been delivered for readTimeout time. Let's flush.
30
-	if errors.Is(err, os.ErrDeadlineExceeded) {
31
-		if err := s.Flush(); err != nil {
32
-			return 0, fmt.Errorf("cannot flush writer hand-side: %w", err)
33
-		}
34
-
35
-		return s.readBlocking(p, true)
36
-	}
37
-
38
-	return n, err
39
-}
40
-
41
-func (s *syncPair) Write(p []byte) (int, error) {
42
-	s.mutex.Lock()
43
-	defer s.mutex.Unlock()
44
-
45
-	n, err := s.writer.Write(p)
46
-
47
-	// optimization for a case when we have a small package and want to avoid a
48
-	// delay in readTimeout. In that case, we assume that peer has finished to
49
-	// sent a data it wants to send so we can flush without waiting for anything
50
-	// else.
51
-	if err == nil && n < copyBufferSize {
52
-		err = s.writer.Flush()
53
-	}
54
-
55
-	return n, err // nolint: wrapcheck
56
-}
57
-
58
-func (s *syncPair) Flush() error {
59
-	s.mutex.Lock()
60
-	defer s.mutex.Unlock()
61
-
62
-	return s.writer.Flush() // nolint: wrapcheck
63
-}
64
-
65
-func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) {
66
-	var deadline time.Time
67
-
68
-	if !blocking {
69
-		deadline = time.Now().Add(readTimeout)
70
-	}
71
-
72
-	if err := s.reader.SetReadDeadline(deadline); err != nil {
73
-		return 0, fmt.Errorf("cannot set read deadline: %w", err)
74
-	}
75
-
76
-	return s.reader.Read(p) // nolint: wrapcheck
77
-}

Загрузка…
Отмена
Сохранить