Explorar el Código

Merge pull request #419 from 9seconds/timers

Remove clock goroutine
tags/v2.2.5^2^2
Sergei Arkhipov hace 1 mes
padre
commit
139db15e83
No account linked to committer's email address

+ 0
- 35
mtglib/internal/doppel/clock.go Ver fichero

1
-package doppel
2
-
3
-import (
4
-	"context"
5
-	"time"
6
-)
7
-
8
-type Clock struct {
9
-	stats *Stats
10
-	tick  chan struct{}
11
-}
12
-
13
-func (c Clock) Start(ctx context.Context) {
14
-	tickTock := time.NewTimer(c.stats.Delay())
15
-	defer func() {
16
-		tickTock.Stop()
17
-		select {
18
-		case <-tickTock.C:
19
-		default:
20
-		}
21
-	}()
22
-
23
-	for {
24
-		select {
25
-		case <-ctx.Done():
26
-			return
27
-		case <-tickTock.C:
28
-			select {
29
-			case <-ctx.Done():
30
-			case c.tick <- struct{}{}:
31
-			}
32
-			tickTock.Reset(c.stats.Delay())
33
-		}
34
-	}
35
-}

+ 0
- 80
mtglib/internal/doppel/clock_test.go Ver fichero

1
-package doppel
2
-
3
-import (
4
-	"context"
5
-	"sync"
6
-	"testing"
7
-	"time"
8
-
9
-	"github.com/stretchr/testify/suite"
10
-)
11
-
12
-type ClockTestSuite struct {
13
-	suite.Suite
14
-
15
-	clock     Clock
16
-	wg        sync.WaitGroup
17
-	ctx       context.Context
18
-	ctxCancel context.CancelFunc
19
-}
20
-
21
-func (suite *ClockTestSuite) SetupTest() {
22
-	ctx, cancel := context.WithCancel(context.Background())
23
-
24
-	suite.ctx = ctx
25
-	suite.ctxCancel = cancel
26
-	suite.clock = Clock{
27
-		stats: &Stats{
28
-			k:      StatsDefaultK,
29
-			lambda: StatsDefaultLambda,
30
-		},
31
-		tick: make(chan struct{}),
32
-	}
33
-
34
-	suite.wg.Go(func() {
35
-		suite.clock.Start(suite.ctx)
36
-	})
37
-}
38
-
39
-func (suite *ClockTestSuite) TearDownTest() {
40
-	suite.ctxCancel()
41
-	suite.wg.Wait()
42
-}
43
-
44
-func (suite *ClockTestSuite) TestTicks() {
45
-	received := 0
46
-
47
-	for range 3 {
48
-		select {
49
-		case <-suite.clock.tick:
50
-			received++
51
-		case <-time.After(2 * time.Second):
52
-			suite.Fail("timed out waiting for tick")
53
-		}
54
-	}
55
-
56
-	suite.Equal(3, received)
57
-}
58
-
59
-func (suite *ClockTestSuite) TestStopsOnCancel() {
60
-	select {
61
-	case <-suite.clock.tick:
62
-	case <-time.After(2 * time.Second):
63
-		suite.Fail("timed out waiting for first tick")
64
-	}
65
-
66
-	suite.ctxCancel()
67
-
68
-	time.Sleep(50 * time.Millisecond)
69
-
70
-	select {
71
-	case <-suite.clock.tick:
72
-		suite.Fail("received tick after cancel")
73
-	default:
74
-	}
75
-}
76
-
77
-func TestClock(t *testing.T) {
78
-	t.Parallel()
79
-	suite.Run(t, &ClockTestSuite{})
80
-}

+ 10
- 17
mtglib/internal/doppel/conn.go Ver fichero

4
 	"bytes"
4
 	"bytes"
5
 	"context"
5
 	"context"
6
 	"sync"
6
 	"sync"
7
+	"time"
7
 
8
 
8
 	"github.com/9seconds/mtg/v2/essentials"
9
 	"github.com/9seconds/mtg/v2/essentials"
9
 	"github.com/9seconds/mtg/v2/mtglib/internal/tls"
10
 	"github.com/9seconds/mtg/v2/mtglib/internal/tls"
25
 type connPayload struct {
26
 type connPayload struct {
26
 	ctx         context.Context
27
 	ctx         context.Context
27
 	ctxCancel   context.CancelCauseFunc
28
 	ctxCancel   context.CancelCauseFunc
28
-	clock       Clock
29
+	stats       Stats
29
 	wg          sync.WaitGroup
30
 	wg          sync.WaitGroup
30
 	writeStream bytes.Buffer
31
 	writeStream bytes.Buffer
31
 	writtenCond sync.Cond
32
 	writtenCond sync.Cond
46
 	return len(p), context.Cause(c.p.ctx)
47
 	return len(p), context.Cause(c.p.ctx)
47
 }
48
 }
48
 
49
 
49
-func (c Conn) Start() {
50
-	c.p.wg.Go(func() {
51
-		c.start()
52
-	})
53
-}
54
-
55
 func (c Conn) start() {
50
 func (c Conn) start() {
56
 	bp := doppelBufPool.Get().(*[]byte)
51
 	bp := doppelBufPool.Get().(*[]byte)
57
 	buf := *bp
52
 	buf := *bp
58
 	defer doppelBufPool.Put(bp)
53
 	defer doppelBufPool.Put(bp)
59
 
54
 
55
+	timer := time.NewTimer(c.p.stats.Delay())
56
+	defer timer.Stop()
57
+
60
 	for {
58
 	for {
61
 		select {
59
 		select {
62
 		case <-c.p.ctx.Done():
60
 		case <-c.p.ctx.Done():
63
 			return
61
 			return
64
-		case <-c.p.clock.tick:
62
+		case <-timer.C:
63
+			timer.Reset(c.p.stats.Delay())
65
 		}
64
 		}
66
 
65
 
67
-		size := c.p.clock.stats.Size()
66
+		size := c.p.stats.Size()
68
 
67
 
69
 		c.p.writtenCond.L.Lock()
68
 		c.p.writtenCond.L.Lock()
70
 		for c.p.writeStream.Len() == 0 && !c.p.done {
69
 		for c.p.writeStream.Len() == 0 && !c.p.done {
95
 	c.p.wg.Wait()
94
 	c.p.wg.Wait()
96
 }
95
 }
97
 
96
 
98
-func NewConn(ctx context.Context, conn essentials.Conn, stats *Stats) Conn {
97
+func NewConn(ctx context.Context, conn essentials.Conn, stats Stats) Conn {
99
 	ctx, cancel := context.WithCancelCause(ctx)
98
 	ctx, cancel := context.WithCancelCause(ctx)
100
 	rv := Conn{
99
 	rv := Conn{
101
 		Conn: conn,
100
 		Conn: conn,
102
 		p: &connPayload{
101
 		p: &connPayload{
103
 			ctx:       ctx,
102
 			ctx:       ctx,
104
 			ctxCancel: cancel,
103
 			ctxCancel: cancel,
104
+			stats:     stats,
105
 			writtenCond: sync.Cond{
105
 			writtenCond: sync.Cond{
106
 				L: &sync.Mutex{},
106
 				L: &sync.Mutex{},
107
 			},
107
 			},
108
-			clock: Clock{
109
-				stats: stats,
110
-				tick:  make(chan struct{}),
111
-			},
112
 		},
108
 		},
113
 	}
109
 	}
114
 
110
 
115
 	rv.p.writeStream.Grow(tls.DefaultBufferSize)
111
 	rv.p.writeStream.Grow(tls.DefaultBufferSize)
116
 
112
 
117
-	rv.p.wg.Go(func() {
118
-		rv.p.clock.Start(ctx)
119
-	})
120
 	rv.p.wg.Go(func() {
113
 	rv.p.wg.Go(func() {
121
 		rv.start()
114
 		rv.start()
122
 	})
115
 	})

+ 2
- 2
mtglib/internal/doppel/conn_test.go Ver fichero

63
 }
63
 }
64
 
64
 
65
 func (suite *ConnTestSuite) makeConn() Conn {
65
 func (suite *ConnTestSuite) makeConn() Conn {
66
-	return NewConn(suite.ctx, suite.connMock, &Stats{
66
+	return NewConn(suite.ctx, suite.connMock, Stats{
67
 		k:      2.0,
67
 		k:      2.0,
68
 		lambda: 0.01,
68
 		lambda: 0.01,
69
 	})
69
 	})
152
 			ctx, cancel := context.WithCancel(suite.ctx)
152
 			ctx, cancel := context.WithCancel(suite.ctx)
153
 			defer cancel()
153
 			defer cancel()
154
 
154
 
155
-			c := NewConn(ctx, suite.connMock, &Stats{
155
+			c := NewConn(ctx, suite.connMock, Stats{
156
 				k:      2.0,
156
 				k:      2.0,
157
 				lambda: 0.01,
157
 				lambda: 0.01,
158
 			})
158
 			})

+ 3
- 3
mtglib/internal/doppel/ganger.go Ver fichero

47
 
47
 
48
 	drs bool
48
 	drs bool
49
 
49
 
50
-	stats     *Stats
50
+	stats     Stats
51
 	durations []time.Duration
51
 	durations []time.Duration
52
 	certSizes []int
52
 	certSizes []int
53
 
53
 
113
 	scoutCollectedChan := make(chan scoutRaidResult)
113
 	scoutCollectedChan := make(chan scoutRaidResult)
114
 	currentScoutCollectedChan := scoutCollectedChan
114
 	currentScoutCollectedChan := scoutCollectedChan
115
 
115
 
116
-	updatedStatsChan := make(chan *Stats)
116
+	updatedStatsChan := make(chan Stats)
117
 
117
 
118
 	g.wg.Go(func() {
118
 	g.wg.Go(func() {
119
 		g.runScoutRaid(scoutCollectedChan)
119
 		g.runScoutRaid(scoutCollectedChan)
256
 		scoutRaidEach:    scoutEach,
256
 		scoutRaidEach:    scoutEach,
257
 		scoutRaidRepeats: scoutRepeats,
257
 		scoutRaidRepeats: scoutRepeats,
258
 		drs:              drs,
258
 		drs:              drs,
259
-		stats: &Stats{
259
+		stats: Stats{
260
 			k:      StatsDefaultK,
260
 			k:      StatsDefaultK,
261
 			lambda: StatsDefaultLambda,
261
 			lambda: StatsDefaultLambda,
262
 			drs:    drs,
262
 			drs:    drs,

+ 2
- 2
mtglib/internal/doppel/stats.go Ver fichero

112
 	return TLSRecordSizeMax
112
 	return TLSRecordSizeMax
113
 }
113
 }
114
 
114
 
115
-func NewStats(durations []time.Duration, drs bool) *Stats {
115
+func NewStats(durations []time.Duration, drs bool) Stats {
116
 	n := float64(len(durations))
116
 	n := float64(len(durations))
117
 
117
 
118
 	// in milliseconds
118
 	// in milliseconds
162
 	// λ = (Σxᵢᵏ / n)^(1/k)
162
 	// λ = (Σxᵢᵏ / n)^(1/k)
163
 	lambda := math.Pow(sumXK/n, 1.0/k)
163
 	lambda := math.Pow(sumXK/n, 1.0/k)
164
 
164
 
165
-	return &Stats{
165
+	return Stats{
166
 		k:      k,
166
 		k:      k,
167
 		lambda: lambda,
167
 		lambda: lambda,
168
 		drs:    drs,
168
 		drs:    drs,

Loading…
Cancelar
Guardar