9seconds пре 4 година
родитељ
комит
7b1f86b75d
2 измењених фајлова са 18 додато и 4 уклоњено
  1. 5
    3
      mtglib/internal/relay/relay.go
  2. 13
    1
      mtglib/internal/relay/sync_pair.go

+ 5
- 3
mtglib/internal/relay/relay.go Прегледај датотеку

7
 )
7
 )
8
 
8
 
9
 func Relay(ctx context.Context, log Logger, telegramConn, clientConn net.Conn) {
9
 func Relay(ctx context.Context, log Logger, telegramConn, clientConn net.Conn) {
10
+	defer telegramConn.Close()
11
+	defer clientConn.Close()
12
+
10
 	ctx, cancel := context.WithCancel(ctx)
13
 	ctx, cancel := context.WithCancel(ctx)
11
 	defer cancel()
14
 	defer cancel()
12
 
15
 
28
 
31
 
29
 func pump(log Logger, src, dst net.Conn, wg *sync.WaitGroup, direction string) {
32
 func pump(log Logger, src, dst net.Conn, wg *sync.WaitGroup, direction string) {
30
 	defer wg.Done()
33
 	defer wg.Done()
31
-	defer src.Close()
32
-	defer dst.Close()
33
 
34
 
34
 	syncer := acquireSyncPair(src, dst)
35
 	syncer := acquireSyncPair(src, dst)
35
 	defer releaseSyncPair(syncer)
36
 	defer releaseSyncPair(syncer)
37
+	defer syncer.Flush()
36
 
38
 
37
 	if n, err := syncer.Sync(); err != nil {
39
 	if n, err := syncer.Sync(); err != nil {
38
-		log.Printf("cannot pump %s (written %d bytes): %w", direction, n, err)
40
+		log.Printf("cannot pump %s (written %d bytes): %v", direction, n, err)
39
 	}
41
 	}
40
 }
42
 }

+ 13
- 1
mtglib/internal/relay/sync_pair.go Прегледај датотеку

7
 	"io"
7
 	"io"
8
 	"net"
8
 	"net"
9
 	"os"
9
 	"os"
10
+	"sync"
10
 	"time"
11
 	"time"
11
 )
12
 )
12
 
13
 
14
 	writer  *bufio.Writer
15
 	writer  *bufio.Writer
15
 	copyBuf []byte
16
 	copyBuf []byte
16
 
17
 
18
+	mutex  sync.Mutex
17
 	reader net.Conn
19
 	reader net.Conn
18
 }
20
 }
19
 
21
 
25
 	n, err := s.readBlocking(p, false)
27
 	n, err := s.readBlocking(p, false)
26
 
28
 
27
 	if errors.Is(err, os.ErrDeadlineExceeded) {
29
 	if errors.Is(err, os.ErrDeadlineExceeded) {
28
-		if err := s.writer.Flush(); err != nil {
30
+		if err := s.Flush(); err != nil {
29
 			return 0, fmt.Errorf("cannot flush writer hand-side: %w", err)
31
 			return 0, fmt.Errorf("cannot flush writer hand-side: %w", err)
30
 		}
32
 		}
31
 
33
 
36
 }
38
 }
37
 
39
 
38
 func (s *syncPair) Write(p []byte) (int, error) {
40
 func (s *syncPair) Write(p []byte) (int, error) {
41
+	s.mutex.Lock()
42
+	defer s.mutex.Unlock()
43
+
39
 	return s.writer.Write(p) // nolint: wrapcheck
44
 	return s.writer.Write(p) // nolint: wrapcheck
40
 }
45
 }
41
 
46
 
47
+func (s *syncPair) Flush() error {
48
+	s.mutex.Lock()
49
+	defer s.mutex.Unlock()
50
+
51
+	return s.writer.Flush()
52
+}
53
+
42
 func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) {
54
 func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) {
43
 	var deadline time.Time
55
 	var deadline time.Time
44
 
56
 

Loading…
Откажи
Сачувај