Przeglądaj źródła

fix: use shared idle tracker for relay connections

connIdleTimeout previously set per-direction deadlines independently.
During media downloads the client→telegram direction can be idle at the
application level while telegram→client is actively streaming data.
After IdleTimeout (default 1 min) the idle direction's ReadDeadline
fires, tearing down the entire relay and breaking media transfers.

Replace the per-direction timeout with a shared atomic timestamp that
both pump goroutines update on any successful Read or Write. When a
ReadDeadline fires on the idle direction, we check the shared tracker:
if the other direction was recently active, we retry instead of closing.
The connection is only torn down when both directions are idle for the
full timeout period.

This matches the documented IdleTimeout contract: "if we have any
message which will pass to either direction, a timer is reset."

Overhead: one atomic.Int64 (8 bytes) per connection pair, one
atomic.Store (~1 ns) per Read/Write with data, zero extra goroutines.

Fixes #423
tags/v2.2.6^2^2
Alexey Dolotov 1 miesiąc temu
rodzic
commit
4627910238
2 zmienionych plików z 61 dodań i 9 usunięć
  1. 53
    5
      mtglib/conns.go
  2. 8
    4
      mtglib/proxy.go

+ 53
- 5
mtglib/conns.go Wyświetl plik

@@ -6,6 +6,7 @@ import (
6 6
 	"fmt"
7 7
 	"io"
8 8
 	"net"
9
+	"sync/atomic"
9 10
 	"time"
10 11
 
11 12
 	"github.com/9seconds/mtg/v2/essentials"
@@ -97,20 +98,67 @@ func newConnProxyProtocol(source, target essentials.Conn) *connProxyProtocol {
97 98
 	}
98 99
 }
99 100
 
101
+// idleTracker is a shared idle tracker for a pair of relay connections.
102
+// Both directions update the same timestamp so that activity in one direction
103
+// prevents the other (idle) direction from timing out.
104
+type idleTracker struct {
105
+	lastActive atomic.Int64 // unix nanos
106
+	timeout    time.Duration
107
+}
108
+
109
+func newIdleTracker(timeout time.Duration) *idleTracker {
110
+	t := &idleTracker{timeout: timeout}
111
+	t.touch()
112
+
113
+	return t
114
+}
115
+
116
+func (t *idleTracker) touch() {
117
+	t.lastActive.Store(time.Now().UnixNano())
118
+}
119
+
120
+func (t *idleTracker) isIdle() bool {
121
+	last := time.Unix(0, t.lastActive.Load())
122
+
123
+	return time.Since(last) >= t.timeout
124
+}
125
+
100 126
 type connIdleTimeout struct {
101 127
 	essentials.Conn
102 128
 
103
-	timeout time.Duration
129
+	tracker *idleTracker
104 130
 }
105 131
 
106 132
 func (c connIdleTimeout) Read(b []byte) (int, error) {
107
-	c.SetReadDeadline(time.Now().Add(c.timeout)) //nolint: errcheck
133
+	for {
134
+		c.SetReadDeadline(time.Now().Add(c.tracker.timeout)) //nolint: errcheck
135
+
136
+		n, err := c.Conn.Read(b)
137
+		if n > 0 {
138
+			c.tracker.touch()
108 139
 
109
-	return c.Conn.Read(b) //nolint: wrapcheck
140
+			return n, err //nolint: wrapcheck
141
+		}
142
+
143
+		if err != nil {
144
+			if netErr, ok := err.(net.Error); ok && netErr.Timeout() && !c.tracker.isIdle() { //nolint: errorlint
145
+				continue
146
+			}
147
+
148
+			return 0, err //nolint: wrapcheck
149
+		}
150
+
151
+		return 0, nil
152
+	}
110 153
 }
111 154
 
112 155
 func (c connIdleTimeout) Write(b []byte) (int, error) {
113
-	c.SetWriteDeadline(time.Now().Add(c.timeout)) //nolint: errcheck
156
+	c.SetWriteDeadline(time.Now().Add(c.tracker.timeout)) //nolint: errcheck
114 157
 
115
-	return c.Conn.Write(b) //nolint: wrapcheck
158
+	n, err := c.Conn.Write(b)
159
+	if n > 0 {
160
+		c.tracker.touch()
161
+	}
162
+
163
+	return n, err //nolint: wrapcheck
116 164
 }

+ 8
- 4
mtglib/proxy.go Wyświetl plik

@@ -102,11 +102,13 @@ func (p *Proxy) ServeConn(conn essentials.Conn) {
102 102
 		return
103 103
 	}
104 104
 
105
+	tracker := newIdleTracker(p.idleTimeout)
106
+
105 107
 	relay.Relay(
106 108
 		ctx,
107 109
 		ctx.logger.Named("relay"),
108
-		connIdleTimeout{Conn: ctx.telegramConn, timeout: p.idleTimeout},
109
-		connIdleTimeout{Conn: ctx.clientConn, timeout: p.idleTimeout},
110
+		connIdleTimeout{Conn: ctx.telegramConn, tracker: tracker},
111
+		connIdleTimeout{Conn: ctx.clientConn, tracker: tracker},
110 112
 	)
111 113
 }
112 114
 
@@ -305,11 +307,13 @@ func (p *Proxy) doDomainFronting(ctx *streamContext, conn *connRewind) {
305 307
 		stream:   p.eventStream,
306 308
 	}
307 309
 
310
+	tracker := newIdleTracker(p.idleTimeout)
311
+
308 312
 	relay.Relay(
309 313
 		ctx,
310 314
 		ctx.logger.Named("domain-fronting"),
311
-		connIdleTimeout{Conn: frontConn, timeout: p.idleTimeout},
312
-		connIdleTimeout{Conn: conn, timeout: p.idleTimeout},
315
+		connIdleTimeout{Conn: frontConn, tracker: tracker},
316
+		connIdleTimeout{Conn: conn, tracker: tracker},
313 317
 	)
314 318
 }
315 319
 

Ładowanie…
Anuluj
Zapisz