Kaynağa Gözat

Merge pull request #426 from dolonet/fix/flaky-ci-race-and-bloom

tags/v2.2.7^2^2
Sergei Arkhipov 1 ay önce
ebeveyn
işleme
89930631cf
No account linked to committer's email address

+ 1
- 1
antireplay/stable_bloom_filter_test.go Dosyayı Görüntüle

@@ -12,7 +12,7 @@ type StableBloomFilterTestSuite struct {
12 12
 }
13 13
 
14 14
 func (suite *StableBloomFilterTestSuite) TestOp() {
15
-	filter := antireplay.NewStableBloomFilter(500, 0.001)
15
+	filter := antireplay.NewStableBloomFilter(100000, 0.001)
16 16
 
17 17
 	suite.False(filter.SeenBefore([]byte{1, 2, 3}))
18 18
 	suite.False(filter.SeenBefore([]byte{4, 5, 6}))

+ 12
- 6
mtglib/internal/doppel/scout.go Dosyayı Görüntüle

@@ -61,23 +61,29 @@ func (s Scout) learn(ctx context.Context, url string) (ScoutResult, error) {
61 61
 		client.CloseIdleConnections()
62 62
 	}
63 63
 
64
-	if err != nil || len(results.data) == 0 {
64
+	if err != nil {
65 65
 		return ScoutResult{}, err
66 66
 	}
67 67
 
68
+	data, writeIndex := results.Snapshot()
69
+
70
+	if len(data) == 0 {
71
+		return ScoutResult{}, nil
72
+	}
73
+
68 74
 	var result ScoutResult
69 75
 
70 76
 	// Compute inter-record durations (existing logic).
71 77
 	lastTimestamp := time.Time{}
72 78
 
73
-	for i, v := range results.data {
79
+	for i, v := range data {
74 80
 		if v.recordType != tls.TypeApplicationData {
75 81
 			continue
76 82
 		}
77 83
 
78 84
 		if lastTimestamp.IsZero() {
79 85
 			if i > 0 {
80
-				lastTimestamp = results.data[i-1].timestamp
86
+				lastTimestamp = data[i-1].timestamp
81 87
 			} else {
82 88
 				lastTimestamp = v.timestamp
83 89
 			}
@@ -90,12 +96,12 @@ func (s Scout) learn(ctx context.Context, url string) (ScoutResult, error) {
90 96
 	// Compute cert size: sum of ApplicationData payload between CCS and
91 97
 	// the first client Write (which marks the end of server handshake).
92 98
 	seenCCS := false
93
-	boundary := results.writeIndex
99
+	boundary := writeIndex
94 100
 	if boundary < 0 {
95
-		boundary = len(results.data)
101
+		boundary = len(data)
96 102
 	}
97 103
 
98
-	for i, v := range results.data {
104
+	for i, v := range data {
99 105
 		if i >= boundary {
100 106
 			break
101 107
 		}

+ 20
- 1
mtglib/internal/doppel/scout_conn_collected.go Dosyayı Görüntüle

@@ -1,6 +1,10 @@
1 1
 package doppel
2 2
 
3
-import "time"
3
+import (
4
+	"slices"
5
+	"sync"
6
+	"time"
7
+)
4 8
 
5 9
 const (
6 10
 	ScoutConnCollectedPreallocSize = 100
@@ -13,23 +17,38 @@ type ScoutConnResult struct {
13 17
 }
14 18
 
15 19
 type ScoutConnCollected struct {
20
+	mu         sync.Mutex
16 21
 	data       []ScoutConnResult
17 22
 	writeIndex int // index at which client first wrote post-handshake data; -1 if not set
18 23
 }
19 24
 
20 25
 func (s *ScoutConnCollected) Add(record byte, payloadLen int) {
26
+	s.mu.Lock()
21 27
 	s.data = append(s.data, ScoutConnResult{
22 28
 		timestamp:  time.Now(),
23 29
 		recordType: record,
24 30
 		payloadLen: payloadLen,
25 31
 	})
32
+	s.mu.Unlock()
26 33
 }
27 34
 
28 35
 // MarkWrite records the current data length as the handshake boundary.
29 36
 func (s *ScoutConnCollected) MarkWrite() {
37
+	s.mu.Lock()
30 38
 	if s.writeIndex < 0 {
31 39
 		s.writeIndex = len(s.data)
32 40
 	}
41
+	s.mu.Unlock()
42
+}
43
+
44
+// Snapshot returns a copy of the collected data and the write index.
45
+func (s *ScoutConnCollected) Snapshot() ([]ScoutConnResult, int) {
46
+	s.mu.Lock()
47
+	snapshot := slices.Clone(s.data)
48
+	writeIndex := s.writeIndex
49
+	s.mu.Unlock()
50
+
51
+	return snapshot, writeIndex
33 52
 }
34 53
 
35 54
 func NewScoutConnCollected() *ScoutConnCollected {

+ 48
- 4
mtglib/internal/doppel/scout_conn_collected_test.go Dosyayı Görüntüle

@@ -1,6 +1,7 @@
1 1
 package doppel
2 2
 
3 3
 import (
4
+	"sync"
4 5
 	"testing"
5 6
 	"time"
6 7
 
@@ -16,8 +17,10 @@ func (suite *ScoutConnCollectedTestSuite) TestAddSingle() {
16 17
 	collected := NewScoutConnCollected()
17 18
 	collected.Add(tls.TypeApplicationData, 100)
18 19
 
19
-	suite.Len(collected.data, 1)
20
-	suite.Equal(byte(tls.TypeApplicationData), collected.data[0].recordType)
20
+	data, _ := collected.Snapshot()
21
+
22
+	suite.Len(data, 1)
23
+	suite.Equal(byte(tls.TypeApplicationData), data[0].recordType)
21 24
 }
22 25
 
23 26
 func (suite *ScoutConnCollectedTestSuite) TestAddTimestampsAreMonotonic() {
@@ -31,11 +34,52 @@ func (suite *ScoutConnCollectedTestSuite) TestAddTimestampsAreMonotonic() {
31 34
 	time.Sleep(time.Microsecond)
32 35
 	collected.Add(tls.TypeApplicationData, 100)
33 36
 
34
-	for i := 1; i < len(collected.data); i++ {
35
-		suite.True(collected.data[i].timestamp.After(collected.data[i-1].timestamp))
37
+	data, _ := collected.Snapshot()
38
+
39
+	for i := 1; i < len(data); i++ {
40
+		suite.True(data[i].timestamp.After(data[i-1].timestamp))
36 41
 	}
37 42
 }
38 43
 
44
+func (suite *ScoutConnCollectedTestSuite) TestConcurrentAddSnapshot() {
45
+	collected := NewScoutConnCollected()
46
+
47
+	var wg sync.WaitGroup
48
+
49
+	wg.Add(3)
50
+
51
+	go func() {
52
+		defer wg.Done()
53
+
54
+		for i := 0; i < 1000; i++ {
55
+			collected.Add(tls.TypeApplicationData, i)
56
+		}
57
+	}()
58
+
59
+	go func() {
60
+		defer wg.Done()
61
+
62
+		for i := 0; i < 100; i++ {
63
+			collected.MarkWrite()
64
+		}
65
+	}()
66
+
67
+	go func() {
68
+		defer wg.Done()
69
+
70
+		for i := 0; i < 1000; i++ {
71
+			// call Snapshot concurrently to exercise the lock under -race
72
+			collected.Snapshot() //nolint:errcheck
73
+		}
74
+	}()
75
+
76
+	wg.Wait()
77
+
78
+	data, writeIndex := collected.Snapshot()
79
+	suite.Len(data, 1000)
80
+	suite.GreaterOrEqual(writeIndex, 0)
81
+}
82
+
39 83
 func TestScoutConnCollected(t *testing.T) {
40 84
 	t.Parallel()
41 85
 	suite.Run(t, &ScoutConnCollectedTestSuite{})

+ 1
- 1
mtglib/proxy_test.go Dosyayı Görüntüle

@@ -175,7 +175,7 @@ func (suite *ProxyTestSuite) TestHTTPSRequest() {
175 175
 	addr := fmt.Sprintf("https://%s/headers", suite.ProxyAddress())
176 176
 
177 177
 	resp, err := client.Get(addr) //nolint: noctx
178
-	suite.NoError(err)
178
+	suite.Require().NoError(err)
179 179
 
180 180
 	defer resp.Body.Close() //nolint: errcheck
181 181
 

Loading…
İptal
Kaydet