Ver código fonte

Wait in doppel.Conn if there is anything to write

tags/v2.2.2^2^2
9seconds 1 mês atrás
pai
commit
724904f50d
2 arquivos alterados com 63 adições e 20 exclusões
  1. 32
    20
      mtglib/internal/doppel/conn.go
  2. 31
    0
      mtglib/internal/doppel/conn_test.go

+ 32
- 20
mtglib/internal/doppel/conn.go Ver arquivo

@@ -16,22 +16,25 @@ type Conn struct {
16 16
 }
17 17
 
18 18
 type connPayload struct {
19
-	ctx           context.Context
20
-	ctxCancel     context.CancelCauseFunc
21
-	clock         Clock
22
-	wg            sync.WaitGroup
23
-	syncWriteLock sync.RWMutex
24
-	writeStream   bytes.Buffer
25
-	writeCond     *sync.Cond
19
+	ctx         context.Context
20
+	ctxCancel   context.CancelCauseFunc
21
+	clock       Clock
22
+	wg          sync.WaitGroup
23
+	writeStream bytes.Buffer
24
+	writtenCond sync.Cond
25
+	done        bool
26 26
 }
27 27
 
28 28
 func (c Conn) Write(p []byte) (int, error) {
29
-	c.p.syncWriteLock.RLock()
30
-	defer c.p.syncWriteLock.RUnlock()
29
+	if len(p) == 0 {
30
+		return 0, context.Cause(c.p.ctx)
31
+	}
31 32
 
32
-	c.p.writeCond.L.Lock()
33
+	c.p.writtenCond.L.Lock()
33 34
 	c.p.writeStream.Write(p)
34
-	c.p.writeCond.L.Unlock()
35
+	c.p.writtenCond.L.Unlock()
36
+
37
+	c.p.writtenCond.Signal()
35 38
 
36 39
 	return len(p), context.Cause(c.p.ctx)
37 40
 }
@@ -43,8 +46,6 @@ func (c Conn) Start() {
43 46
 }
44 47
 
45 48
 func (c Conn) start() {
46
-	defer c.p.writeCond.Broadcast()
47
-
48 49
 	buf := [tls.MaxRecordSize]byte{}
49 50
 
50 51
 	for {
@@ -54,11 +55,16 @@ func (c Conn) start() {
54 55
 		case <-c.p.clock.tick:
55 56
 		}
56 57
 
57
-		c.p.writeCond.L.Lock()
58
-		n, err := c.p.writeStream.Read(buf[:c.p.clock.stats.Size()])
59
-		c.p.writeCond.L.Unlock()
58
+		size := c.p.clock.stats.Size()
59
+
60
+		c.p.writtenCond.L.Lock()
61
+		for c.p.writeStream.Len() == 0 && !c.p.done {
62
+			c.p.writtenCond.Wait()
63
+		}
64
+		n, _ := c.p.writeStream.Read(buf[:size])
65
+		c.p.writtenCond.L.Unlock()
60 66
 
61
-		if n == 0 || err != nil {
67
+		if n == 0 {
62 68
 			continue
63 69
 		}
64 70
 
@@ -66,13 +72,17 @@ func (c Conn) start() {
66 72
 			c.p.ctxCancel(err)
67 73
 			return
68 74
 		}
69
-
70
-		c.p.writeCond.Signal()
71 75
 	}
72 76
 }
73 77
 
74 78
 func (c Conn) Stop() {
75 79
 	c.p.ctxCancel(nil)
80
+
81
+	c.p.writtenCond.L.Lock()
82
+	c.p.done = true
83
+	c.p.writtenCond.L.Unlock()
84
+	c.p.writtenCond.Broadcast()
85
+
76 86
 	c.p.wg.Wait()
77 87
 }
78 88
 
@@ -83,7 +93,9 @@ func NewConn(ctx context.Context, conn essentials.Conn, stats *Stats) Conn {
83 93
 		p: &connPayload{
84 94
 			ctx:       ctx,
85 95
 			ctxCancel: cancel,
86
-			writeCond: sync.NewCond(&sync.Mutex{}),
96
+			writtenCond: sync.Cond{
97
+				L: &sync.Mutex{},
98
+			},
87 99
 			clock: Clock{
88 100
 				stats: stats,
89 101
 				tick:  make(chan struct{}),

+ 31
- 0
mtglib/internal/doppel/conn_test.go Ver arquivo

@@ -141,6 +141,37 @@ func (suite *ConnTestSuite) TestWriteReturnsErrorAfterStop() {
141 141
 	suite.Error(err)
142 142
 }
143 143
 
144
+func (suite *ConnTestSuite) TestStopDoesNotDeadlockWhenStartIsWaiting() {
145
+	suite.connMock.
146
+		On("Write", mock.AnythingOfType("[]uint8")).
147
+		Return(0, nil).
148
+		Maybe()
149
+
150
+	for range 100 {
151
+		func() {
152
+			ctx, cancel := context.WithCancel(suite.ctx)
153
+			defer cancel()
154
+
155
+			c := NewConn(ctx, suite.connMock, &Stats{
156
+				k:      2.0,
157
+				lambda: 0.01,
158
+			})
159
+
160
+			done := make(chan struct{})
161
+			go func() {
162
+				defer close(done)
163
+				c.Stop()
164
+			}()
165
+
166
+			select {
167
+			case <-done:
168
+			case <-time.After(2 * time.Second):
169
+				suite.Fail("Stop() deadlocked: start() likely stuck in writtenCond.Wait()")
170
+			}
171
+		}()
172
+	}
173
+}
174
+
144 175
 func (suite *ConnTestSuite) TestStopOnUnderlyingWriteError() {
145 176
 	suite.connMock.
146 177
 		On("Write", mock.AnythingOfType("[]uint8")).

Carregando…
Cancelar
Salvar