瀏覽代碼

Merge remote-tracking branch 'upstream/master'

# Conflicts:
#	mtglib/proxy.go
pull/434/head
Alexey Dolotov 1 月之前
父節點
當前提交
659c478339

+ 0
- 35
mtglib/internal/doppel/clock.go 查看文件

@@ -1,35 +0,0 @@
1
-package doppel
2
-
3
-import (
4
-	"context"
5
-	"time"
6
-)
7
-
8
-type Clock struct {
9
-	stats *Stats
10
-	tick  chan struct{}
11
-}
12
-
13
-func (c Clock) Start(ctx context.Context) {
14
-	tickTock := time.NewTimer(c.stats.Delay())
15
-	defer func() {
16
-		tickTock.Stop()
17
-		select {
18
-		case <-tickTock.C:
19
-		default:
20
-		}
21
-	}()
22
-
23
-	for {
24
-		select {
25
-		case <-ctx.Done():
26
-			return
27
-		case <-tickTock.C:
28
-			select {
29
-			case <-ctx.Done():
30
-			case c.tick <- struct{}{}:
31
-			}
32
-			tickTock.Reset(c.stats.Delay())
33
-		}
34
-	}
35
-}

+ 0
- 80
mtglib/internal/doppel/clock_test.go 查看文件

@@ -1,80 +0,0 @@
1
-package doppel
2
-
3
-import (
4
-	"context"
5
-	"sync"
6
-	"testing"
7
-	"time"
8
-
9
-	"github.com/stretchr/testify/suite"
10
-)
11
-
12
-type ClockTestSuite struct {
13
-	suite.Suite
14
-
15
-	clock     Clock
16
-	wg        sync.WaitGroup
17
-	ctx       context.Context
18
-	ctxCancel context.CancelFunc
19
-}
20
-
21
-func (suite *ClockTestSuite) SetupTest() {
22
-	ctx, cancel := context.WithCancel(context.Background())
23
-
24
-	suite.ctx = ctx
25
-	suite.ctxCancel = cancel
26
-	suite.clock = Clock{
27
-		stats: &Stats{
28
-			k:      StatsDefaultK,
29
-			lambda: StatsDefaultLambda,
30
-		},
31
-		tick: make(chan struct{}),
32
-	}
33
-
34
-	suite.wg.Go(func() {
35
-		suite.clock.Start(suite.ctx)
36
-	})
37
-}
38
-
39
-func (suite *ClockTestSuite) TearDownTest() {
40
-	suite.ctxCancel()
41
-	suite.wg.Wait()
42
-}
43
-
44
-func (suite *ClockTestSuite) TestTicks() {
45
-	received := 0
46
-
47
-	for range 3 {
48
-		select {
49
-		case <-suite.clock.tick:
50
-			received++
51
-		case <-time.After(2 * time.Second):
52
-			suite.Fail("timed out waiting for tick")
53
-		}
54
-	}
55
-
56
-	suite.Equal(3, received)
57
-}
58
-
59
-func (suite *ClockTestSuite) TestStopsOnCancel() {
60
-	select {
61
-	case <-suite.clock.tick:
62
-	case <-time.After(2 * time.Second):
63
-		suite.Fail("timed out waiting for first tick")
64
-	}
65
-
66
-	suite.ctxCancel()
67
-
68
-	time.Sleep(50 * time.Millisecond)
69
-
70
-	select {
71
-	case <-suite.clock.tick:
72
-		suite.Fail("received tick after cancel")
73
-	default:
74
-	}
75
-}
76
-
77
-func TestClock(t *testing.T) {
78
-	t.Parallel()
79
-	suite.Run(t, &ClockTestSuite{})
80
-}

+ 10
- 17
mtglib/internal/doppel/conn.go 查看文件

@@ -4,6 +4,7 @@ import (
4 4
 	"bytes"
5 5
 	"context"
6 6
 	"sync"
7
+	"time"
7 8
 
8 9
 	"github.com/9seconds/mtg/v2/essentials"
9 10
 	"github.com/9seconds/mtg/v2/mtglib/internal/tls"
@@ -25,7 +26,7 @@ type Conn struct {
25 26
 type connPayload struct {
26 27
 	ctx         context.Context
27 28
 	ctxCancel   context.CancelCauseFunc
28
-	clock       Clock
29
+	stats       Stats
29 30
 	wg          sync.WaitGroup
30 31
 	writeStream bytes.Buffer
31 32
 	writtenCond sync.Cond
@@ -46,25 +47,23 @@ func (c Conn) Write(p []byte) (int, error) {
46 47
 	return len(p), context.Cause(c.p.ctx)
47 48
 }
48 49
 
49
-func (c Conn) Start() {
50
-	c.p.wg.Go(func() {
51
-		c.start()
52
-	})
53
-}
54
-
55 50
 func (c Conn) start() {
56 51
 	bp := doppelBufPool.Get().(*[]byte)
57 52
 	buf := *bp
58 53
 	defer doppelBufPool.Put(bp)
59 54
 
55
+	timer := time.NewTimer(c.p.stats.Delay())
56
+	defer timer.Stop()
57
+
60 58
 	for {
61 59
 		select {
62 60
 		case <-c.p.ctx.Done():
63 61
 			return
64
-		case <-c.p.clock.tick:
62
+		case <-timer.C:
63
+			timer.Reset(c.p.stats.Delay())
65 64
 		}
66 65
 
67
-		size := c.p.clock.stats.Size()
66
+		size := c.p.stats.Size()
68 67
 
69 68
 		c.p.writtenCond.L.Lock()
70 69
 		for c.p.writeStream.Len() == 0 && !c.p.done {
@@ -95,28 +94,22 @@ func (c Conn) Stop() {
95 94
 	c.p.wg.Wait()
96 95
 }
97 96
 
98
-func NewConn(ctx context.Context, conn essentials.Conn, stats *Stats) Conn {
97
+func NewConn(ctx context.Context, conn essentials.Conn, stats Stats) Conn {
99 98
 	ctx, cancel := context.WithCancelCause(ctx)
100 99
 	rv := Conn{
101 100
 		Conn: conn,
102 101
 		p: &connPayload{
103 102
 			ctx:       ctx,
104 103
 			ctxCancel: cancel,
104
+			stats:     stats,
105 105
 			writtenCond: sync.Cond{
106 106
 				L: &sync.Mutex{},
107 107
 			},
108
-			clock: Clock{
109
-				stats: stats,
110
-				tick:  make(chan struct{}),
111
-			},
112 108
 		},
113 109
 	}
114 110
 
115 111
 	rv.p.writeStream.Grow(tls.DefaultBufferSize)
116 112
 
117
-	rv.p.wg.Go(func() {
118
-		rv.p.clock.Start(ctx)
119
-	})
120 113
 	rv.p.wg.Go(func() {
121 114
 		rv.start()
122 115
 	})

+ 2
- 2
mtglib/internal/doppel/conn_test.go 查看文件

@@ -63,7 +63,7 @@ func (suite *ConnTestSuite) TearDownTest() {
63 63
 }
64 64
 
65 65
 func (suite *ConnTestSuite) makeConn() Conn {
66
-	return NewConn(suite.ctx, suite.connMock, &Stats{
66
+	return NewConn(suite.ctx, suite.connMock, Stats{
67 67
 		k:      2.0,
68 68
 		lambda: 0.01,
69 69
 	})
@@ -152,7 +152,7 @@ func (suite *ConnTestSuite) TestStopDoesNotDeadlockWhenStartIsWaiting() {
152 152
 			ctx, cancel := context.WithCancel(suite.ctx)
153 153
 			defer cancel()
154 154
 
155
-			c := NewConn(ctx, suite.connMock, &Stats{
155
+			c := NewConn(ctx, suite.connMock, Stats{
156 156
 				k:      2.0,
157 157
 				lambda: 0.01,
158 158
 			})

+ 3
- 3
mtglib/internal/doppel/ganger.go 查看文件

@@ -47,7 +47,7 @@ type Ganger struct {
47 47
 
48 48
 	drs bool
49 49
 
50
-	stats     *Stats
50
+	stats     Stats
51 51
 	durations []time.Duration
52 52
 	certSizes []int
53 53
 
@@ -113,7 +113,7 @@ func (g *Ganger) run() {
113 113
 	scoutCollectedChan := make(chan scoutRaidResult)
114 114
 	currentScoutCollectedChan := scoutCollectedChan
115 115
 
116
-	updatedStatsChan := make(chan *Stats)
116
+	updatedStatsChan := make(chan Stats)
117 117
 
118 118
 	g.wg.Go(func() {
119 119
 		g.runScoutRaid(scoutCollectedChan)
@@ -256,7 +256,7 @@ func NewGanger(
256 256
 		scoutRaidEach:    scoutEach,
257 257
 		scoutRaidRepeats: scoutRepeats,
258 258
 		drs:              drs,
259
-		stats: &Stats{
259
+		stats: Stats{
260 260
 			k:      StatsDefaultK,
261 261
 			lambda: StatsDefaultLambda,
262 262
 			drs:    drs,

+ 2
- 2
mtglib/internal/doppel/stats.go 查看文件

@@ -112,7 +112,7 @@ func (d *Stats) Size() int {
112 112
 	return TLSRecordSizeMax
113 113
 }
114 114
 
115
-func NewStats(durations []time.Duration, drs bool) *Stats {
115
+func NewStats(durations []time.Duration, drs bool) Stats {
116 116
 	n := float64(len(durations))
117 117
 
118 118
 	// in milliseconds
@@ -162,7 +162,7 @@ func NewStats(durations []time.Duration, drs bool) *Stats {
162 162
 	// λ = (Σxᵢᵏ / n)^(1/k)
163 163
 	lambda := math.Pow(sumXK/n, 1.0/k)
164 164
 
165
-	return &Stats{
165
+	return Stats{
166 166
 		k:      k,
167 167
 		lambda: lambda,
168 168
 		drs:    drs,

+ 13
- 0
mtglib/internal/relay/pool_settings_mips.go 查看文件

@@ -0,0 +1,13 @@
1
+//go:build mips || mipsle
2
+
3
+package relay
4
+
5
+import "github.com/9seconds/mtg/v2/mtglib/internal/tls"
6
+
7
+const (
8
+	// MIPS is quite short in resources, and usually it means that it will run
9
+	// on Microtiks, OpenWRT-based routers or similar hardware. I think it worth
10
+	// to sacrifice a number of read syscalls (read, CPU load) to shrink
11
+	// limited RAM resources.
12
+	bufPoolSize = tls.MaxRecordPayloadSize / 2
13
+)

+ 9
- 0
mtglib/internal/relay/pool_settings_other.go 查看文件

@@ -0,0 +1,9 @@
1
+//go:build !mips && !mipsle
2
+
3
+package relay
4
+
5
+import "github.com/9seconds/mtg/v2/mtglib/internal/tls"
6
+
7
+const (
8
+	bufPoolSize = tls.MaxRecordPayloadSize
9
+)

+ 18
- 0
mtglib/internal/relay/pools.go 查看文件

@@ -0,0 +1,18 @@
1
+package relay
2
+
3
+import "sync"
4
+
5
+var bufPool = sync.Pool{
6
+	New: func() any {
7
+		b := make([]byte, bufPoolSize)
8
+		return &b
9
+	},
10
+}
11
+
12
+func acquireBuffer() *[]byte {
13
+	return bufPool.Get().(*[]byte)
14
+}
15
+
16
+func releaseBuffer(p *[]byte) {
17
+	bufPool.Put(p)
18
+}

+ 3
- 12
mtglib/internal/relay/relay.go 查看文件

@@ -4,19 +4,10 @@ import (
4 4
 	"context"
5 5
 	"errors"
6 6
 	"io"
7
-	"sync"
8 7
 
9 8
 	"github.com/9seconds/mtg/v2/essentials"
10
-	"github.com/9seconds/mtg/v2/mtglib/internal/tls"
11 9
 )
12 10
 
13
-var bufPool = sync.Pool{
14
-	New: func() any {
15
-		b := make([]byte, tls.MaxRecordPayloadSize)
16
-		return &b
17
-	},
18
-}
19
-
20 11
 func Relay(ctx context.Context, log Logger, telegramConn, clientConn essentials.Conn) {
21 12
 	defer telegramConn.Close() //nolint: errcheck
22 13
 	defer clientConn.Close()   //nolint: errcheck
@@ -44,13 +35,13 @@ func Relay(ctx context.Context, log Logger, telegramConn, clientConn essentials.
44 35
 }
45 36
 
46 37
 func pump(log Logger, src, dst essentials.Conn, direction string) {
47
-	bp := bufPool.Get().(*[]byte)
48
-	defer bufPool.Put(bp)
38
+	buf := acquireBuffer()
39
+	defer releaseBuffer(buf)
49 40
 
50 41
 	defer src.CloseRead()  //nolint: errcheck
51 42
 	defer dst.CloseWrite() //nolint: errcheck
52 43
 
53
-	n, err := io.CopyBuffer(src, dst, *bp)
44
+	n, err := io.CopyBuffer(src, dst, *buf)
54 45
 
55 46
 	switch {
56 47
 	case err == nil:

+ 2
- 4
mtglib/proxy.go 查看文件

@@ -111,13 +111,11 @@ func (p *Proxy) ServeConn(conn essentials.Conn) {
111 111
 		return
112 112
 	}
113 113
 
114
-	countedClientConn := newCountingConn(ctx.clientConn, p.stats, ctx.secretName)
115
-
116 114
 	relay.Relay(
117 115
 		ctx,
118 116
 		ctx.logger.Named("relay"),
119
-		ctx.telegramConn,
120
-		countedClientConn,
117
+		connIdleTimeout{Conn: ctx.telegramConn, timeout: p.idleTimeout},
118
+		newCountingConn(connIdleTimeout{Conn: ctx.clientConn, timeout: p.idleTimeout}, p.stats, ctx.secretName),
121 119
 	)
122 120
 }
123 121
 

Loading…
取消
儲存