Просмотр исходного кода

Correctly reset relay state

tags/v2.0.0-rc1
9seconds 5 лет назад
Родитель
Сommit
3cacd74e12
2 измененных файлов: 25 добавлений и 15 удалений
  1. 1
    6
      mtglib/internal/relay/pools.go
  2. 24
    9
      mtglib/internal/relay/relay.go

+ 1
- 6
mtglib/internal/relay/pools.go Просмотреть файл

@@ -36,11 +36,6 @@ func AcquireRelay(ctx context.Context, logger Logger, bufferSize int, idleTimeou
36 36
 }
37 37
 
38 38
 func ReleaseRelay(r *Relay) {
39
-	r.ctxCancel()
40
-
41
-	r.ctx = nil
42
-	r.ctxCancel = nil
43
-	r.logger = nil
44
-
39
+	r.Reset()
45 40
 	relayPool.Put(r)
46 41
 }

+ 24
- 9
mtglib/internal/relay/relay.go Просмотреть файл

@@ -11,6 +11,7 @@ type Relay struct {
11 11
 	ctx          context.Context
12 12
 	ctxCancel    context.CancelFunc
13 13
 	logger       Logger
14
+	processMutex sync.Mutex
14 15
 	eastBuffer   []byte
15 16
 	westBuffer   []byte
16 17
 	tickChannel  chan struct{}
@@ -18,7 +19,23 @@ type Relay struct {
18 19
 	tickTimeout  time.Duration
19 20
 }
20 21
 
22
+func (r *Relay) Reset() {
23
+	r.processMutex.Lock()
24
+	defer r.processMutex.Unlock()
25
+
26
+	if r.ctxCancel != nil {
27
+		r.ctxCancel()
28
+	}
29
+
30
+	r.ctx = nil
31
+	r.ctxCancel = nil
32
+	r.logger = nil
33
+}
34
+
21 35
 func (r *Relay) Process(eastConn, westConn io.ReadWriteCloser) error {
36
+	r.processMutex.Lock()
37
+	defer r.processMutex.Unlock()
38
+
22 39
 	eastConn = conn{
23 40
 		ReadWriteCloser: eastConn,
24 41
 		ctx:             r.ctx,
@@ -30,16 +47,10 @@ func (r *Relay) Process(eastConn, westConn io.ReadWriteCloser) error {
30 47
 		tickChannel:     r.tickChannel,
31 48
 	}
32 49
 
33
-	defer func() {
34
-		r.ctxCancel()
35
-		eastConn.Close()
36
-		westConn.Close()
37
-	}()
38
-
39 50
 	wg := &sync.WaitGroup{}
40 51
 	wg.Add(3) // nolint: gomnd
41 52
 
42
-	go r.runObserver(wg)
53
+	go r.runObserver(eastConn, westConn, wg)
43 54
 
44 55
 	go r.transmit(eastConn, westConn, r.westBuffer, "west", wg)
45 56
 
@@ -58,9 +69,10 @@ func (r *Relay) Process(eastConn, westConn io.ReadWriteCloser) error {
58 69
 func (r *Relay) transmit(src io.ReadCloser, dst io.WriteCloser,
59 70
 	buffer []byte, direction string, wg *sync.WaitGroup) {
60 71
 	defer func() {
61
-		wg.Done()
62 72
 		src.Close()
63 73
 		dst.Close()
74
+		wg.Done()
75
+		r.ctxCancel()
64 76
 	}()
65 77
 
66 78
 	if _, err := io.CopyBuffer(dst, src, buffer); err != nil {
@@ -79,10 +91,13 @@ func (r *Relay) transmit(src io.ReadCloser, dst io.WriteCloser,
79 91
 	}
80 92
 }
81 93
 
82
-func (r *Relay) runObserver(wg *sync.WaitGroup) {
94
+func (r *Relay) runObserver(one, another io.Closer, wg *sync.WaitGroup) {
83 95
 	ticker := time.NewTicker(time.Second)
84 96
 
85 97
 	defer func() {
98
+		one.Close()
99
+		another.Close()
100
+
86 101
 		ticker.Stop()
87 102
 
88 103
 		select {

Загрузка…
Отмена
Сохранить