Quellcode durchsuchen

Add fetching of addresses from proxyGetConfig endpoint

tags/v2.1.11^2^2
9seconds vor 2 Monaten
Ursprung
Commit
94d46d2c65

+ 0
- 16
mtglib/internal/dc/addr_test.go Datei anzeigen

@@ -1,16 +0,0 @@
1
-package dc_test
2
-
3
-import (
4
-	"testing"
5
-
6
-	"github.com/9seconds/mtg/v2/mtglib/internal/dc"
7
-	"github.com/stretchr/testify/assert"
8
-)
9
-
10
-func TestAddr(t *testing.T) {
11
-	t.Parallel()
12
-
13
-	addr := dc.Addr{Network: "tcp4", Address: "127.0.0.1:443"}
14
-
15
-	assert.Equal(t, "127.0.0.1:443", addr.String())
16
-}

+ 20
- 13
mtglib/internal/dc/init.go Datei anzeigen

@@ -1,5 +1,10 @@
1 1
 package dc
2 2
 
3
+import (
4
+	"context"
5
+	"time"
6
+)
7
+
3 8
 type preferIP uint8
4 9
 
5 10
 const (
@@ -10,7 +15,18 @@ const (
10 15
 )
11 16
 
12 17
 const (
18
+	// Default DC to connect to if not sure.
13 19
 	DefaultDC = 2
20
+
21
+	// How often should we request updates from
22
+	// https://core.telegram.org/getProxyConfig
23
+	PublicConfigUpdateEach  = time.Hour
24
+	PublicConfigUpdateURLv4 = "https://core.telegram.org/getProxyConfig"
25
+	PublicConfigUpdateURLv6 = "https://core.telegram.org/getProxyConfigV6"
26
+
27
+	// How often should we extract hosts from Telegram using help.getConfig
28
+	// method.
29
+	OwnConfigUpdateEach = time.Hour
14 30
 )
15 31
 
16 32
 type Logger interface {
@@ -18,6 +34,10 @@ type Logger interface {
18 34
 	WarningError(msg string, err error)
19 35
 }
20 36
 
37
+type Updater interface {
38
+	Run(ctx context.Context)
39
+}
40
+
21 41
 var (
22 42
 	// https://github.com/telegramdesktop/tdesktop/blob/master/Telegram/SourceFiles/mtproto/mtproto_dc_options.cpp#L30
23 43
 	defaultDCAddrSet = dcAddrSet{
@@ -57,17 +77,4 @@ var (
57 77
 			},
58 78
 		},
59 79
 	}
60
-
61
-	defaultDCOverridesAddrSet = dcAddrSet{
62
-		v4: map[int][]Addr{
63
-			203: {
64
-				{Network: "tcp4", Address: "91.105.192.100:443"},
65
-			},
66
-		},
67
-		v6: map[int][]Addr{
68
-			203: {
69
-				{Network: "tcp6", Address: "[2a0a:f280:0203:000a:5000:0000:0000:0100]:443"},
70
-			},
71
-		},
72
-	}
73 80
 )

+ 43
- 0
mtglib/internal/dc/init_test.go Datei anzeigen

@@ -0,0 +1,43 @@
1
+package dc
2
+
3
+import (
4
+	"context"
5
+
6
+	"github.com/stretchr/testify/mock"
7
+	"github.com/stretchr/testify/suite"
8
+)
9
+
10
+type LoggerMock struct {
11
+	mock.Mock
12
+}
13
+
14
+func (m *LoggerMock) Info(msg string) {
15
+	m.Called(msg)
16
+}
17
+
18
+func (m *LoggerMock) WarningError(msg string, err error) {
19
+	m.Called(msg, err)
20
+}
21
+
22
+type UpdaterTestSuiteBase struct {
23
+	suite.Suite
24
+
25
+	ctx        context.Context
26
+	ctxCancel  context.CancelFunc
27
+	loggerMock *LoggerMock
28
+}
29
+
30
+func (s *UpdaterTestSuiteBase) SetupTest() {
31
+	ctx, cancel := context.WithCancel(context.Background())
32
+
33
+	s.loggerMock = &LoggerMock{}
34
+	s.loggerMock.On("Info", mock.AnythingOfType("string"))
35
+	s.loggerMock.On("WarningError", mock.AnythingOfType("string"), mock.Anything)
36
+
37
+	s.ctx = ctx
38
+	s.ctxCancel = cancel
39
+}
40
+
41
+func (s *UpdaterTestSuiteBase) TearDownTest() {
42
+	s.ctxCancel()
43
+}

+ 93
- 0
mtglib/internal/dc/public_config_updater.go Datei anzeigen

@@ -0,0 +1,93 @@
1
+package dc
2
+
3
+import (
4
+	"bufio"
5
+	"context"
6
+	"fmt"
7
+	"io"
8
+	"net/http"
9
+	"regexp"
10
+	"strconv"
11
+)
12
+
13
+var publicConfigRe = regexp.MustCompile(`^\s*proxy_for\s+(\d+)\s+(\S+?)?;\s*$`)
14
+
15
+type PublicConfigUpdater struct {
16
+	updater
17
+
18
+	http *http.Client
19
+	tg   *Telegram
20
+}
21
+
22
+func (p PublicConfigUpdater) Run(ctx context.Context, url, network string) {
23
+	p.run(ctx, func() error {
24
+		req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
25
+		if err != nil {
26
+			panic(err)
27
+		}
28
+
29
+		resp, err := p.http.Do(req)
30
+		if err != nil {
31
+			if resp != nil {
32
+				io.Copy(io.Discard, resp.Body)
33
+				resp.Body.Close()
34
+			}
35
+			return fmt.Errorf("cannot fetch url %s: %w", url, err)
36
+		}
37
+
38
+		if resp.StatusCode >= http.StatusBadRequest {
39
+			return fmt.Errorf("unexpected status code from %s: %d", url, resp.StatusCode)
40
+		}
41
+
42
+		scanner := bufio.NewScanner(resp.Body)
43
+		addrs := map[int][]Addr{}
44
+
45
+		for scanner.Scan() {
46
+			matches := publicConfigRe.FindStringSubmatch(scanner.Text())
47
+			if len(matches) != 3 {
48
+				continue
49
+			}
50
+
51
+			dc, err := strconv.Atoi(matches[1])
52
+			if err != nil {
53
+				continue
54
+			}
55
+
56
+			switch dc {
57
+			// this is a list of DC we currently support. Other are ignored.
58
+			case 203: // CDN DC
59
+				p.logger.Info(fmt.Sprintf("found %s address for DC %d", matches[2], dc))
60
+				addrs[dc] = append(addrs[dc], Addr{
61
+					Network: network,
62
+					Address: matches[2],
63
+				})
64
+			}
65
+		}
66
+
67
+		if err := scanner.Err(); err != nil {
68
+			return fmt.Errorf("cannot read response body from %s: %w", url, err)
69
+		}
70
+
71
+		p.tg.lock.Lock()
72
+		defer p.tg.lock.Unlock()
73
+
74
+		if network == "tcp4" {
75
+			p.tg.view.publicConfigs.v4 = addrs
76
+		} else {
77
+			p.tg.view.publicConfigs.v6 = addrs
78
+		}
79
+
80
+		return nil
81
+	})
82
+}
83
+
84
+func NewPublicConfigUpdater(tg *Telegram, logger Logger, client *http.Client) PublicConfigUpdater {
85
+	return PublicConfigUpdater{
86
+		updater: updater{
87
+			logger: logger,
88
+			period: PublicConfigUpdateEach,
89
+		},
90
+		http: client,
91
+		tg:   tg,
92
+	}
93
+}

+ 135
- 0
mtglib/internal/dc/public_config_updater_test.go Datei anzeigen

@@ -0,0 +1,135 @@
1
+package dc
2
+
3
+import (
4
+	"net/http"
5
+	"net/http/httptest"
6
+	"sync"
7
+	"testing"
8
+	"time"
9
+
10
+	"github.com/stretchr/testify/require"
11
+	"github.com/stretchr/testify/suite"
12
+)
13
+
14
+type PublicConfigUpdaterTestSuite struct {
15
+	UpdaterTestSuiteBase
16
+
17
+	u               PublicConfigUpdater
18
+	lock            sync.Mutex
19
+	srv             *httptest.Server
20
+	responseHandler func(w http.ResponseWriter)
21
+}
22
+
23
+func (s *PublicConfigUpdaterTestSuite) SetupSuite() {
24
+	s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
25
+		s.lock.Lock()
26
+		s.responseHandler(w)
27
+		s.lock.Unlock()
28
+	}))
29
+}
30
+
31
+func (s *PublicConfigUpdaterTestSuite) TearDownSuite() {
32
+	s.srv.Close()
33
+}
34
+
35
+func (s *PublicConfigUpdaterTestSuite) SetupTest() {
36
+	s.UpdaterTestSuiteBase.SetupTest()
37
+
38
+	tg, err := New("prefer-ipv4")
39
+	require.NoError(s.T(), err)
40
+
41
+	s.u = NewPublicConfigUpdater(tg, s.loggerMock, s.srv.Client())
42
+}
43
+
44
+func (s *PublicConfigUpdaterTestSuite) Test502StatusCode() {
45
+	done := false
46
+
47
+	s.responseHandler = func(w http.ResponseWriter) {
48
+		w.WriteHeader(http.StatusBadGateway)
49
+		done = true
50
+	}
51
+	go s.u.Run(s.ctx, s.srv.URL, "tcp4")
52
+
53
+	s.Eventually(func() bool {
54
+		s.lock.Lock()
55
+		defer s.lock.Unlock()
56
+
57
+		return done
58
+	}, time.Second, 10*time.Millisecond)
59
+
60
+	s.Len(s.u.tg.view.publicConfigs.v4, 0)
61
+}
62
+
63
+func (s *PublicConfigUpdaterTestSuite) TestEmptyFile() {
64
+	done := false
65
+
66
+	s.responseHandler = func(w http.ResponseWriter) {
67
+		done = true
68
+		w.WriteHeader(http.StatusOK)
69
+	}
70
+	go s.u.Run(s.ctx, s.srv.URL, "tcp4")
71
+
72
+	s.Eventually(func() bool {
73
+		s.lock.Lock()
74
+		defer s.lock.Unlock()
75
+
76
+		return done
77
+	}, time.Second, 10*time.Millisecond)
78
+
79
+	s.Len(s.u.tg.view.publicConfigs.v4, 0)
80
+}
81
+
82
+func (s *PublicConfigUpdaterTestSuite) TestGarbage() {
83
+	result := `
84
+proxy_for -1 -1;
85
+proxy_for 100 100.10.0.0:3333;
86
+lala 0 0
87
+`
88
+	done := false
89
+
90
+	s.responseHandler = func(w http.ResponseWriter) {
91
+		done = true
92
+		w.WriteHeader(http.StatusOK)
93
+		w.Write([]byte(result))
94
+	}
95
+	go s.u.Run(s.ctx, s.srv.URL, "tcp4")
96
+
97
+	s.Eventually(func() bool {
98
+		s.lock.Lock()
99
+		defer s.lock.Unlock()
100
+
101
+		return done
102
+	}, time.Second, 10*time.Millisecond)
103
+
104
+	s.Len(s.u.tg.view.publicConfigs.v4, 0)
105
+}
106
+
107
+func (s *PublicConfigUpdaterTestSuite) TestOk() {
108
+	result := `
109
+proxy_for 203 100.10.0.0:3333;
110
+proxy_for -100 101.10.0.0:3333;
111
+`
112
+	done := false
113
+
114
+	s.responseHandler = func(w http.ResponseWriter) {
115
+		done = true
116
+		w.WriteHeader(http.StatusOK)
117
+		w.Write([]byte(result))
118
+	}
119
+	go s.u.Run(s.ctx, s.srv.URL, "tcp4")
120
+
121
+	s.Eventually(func() bool {
122
+		s.lock.Lock()
123
+		defer s.lock.Unlock()
124
+
125
+		return done
126
+	}, time.Second, 10*time.Millisecond)
127
+
128
+	s.Len(s.u.tg.view.publicConfigs.v4, 1)
129
+	s.Len(s.u.tg.view.publicConfigs.v4[203], 1)
130
+	s.Equal("100.10.0.0:3333", s.u.tg.view.publicConfigs.v4[203][0].Address)
131
+}
132
+
133
+func TestPublicConfigUpdater(t *testing.T) {
134
+	suite.Run(t, &PublicConfigUpdaterTestSuite{})
135
+}

+ 8
- 35
mtglib/internal/dc/telegram.go Datei anzeigen

@@ -1,17 +1,23 @@
1 1
 package dc
2 2
 
3 3
 import (
4
+	"context"
4 5
 	"fmt"
5
-	"net"
6 6
 	"strings"
7
+	"sync"
7 8
 )
8 9
 
9 10
 type Telegram struct {
11
+	ctx      context.Context
12
+	lock     sync.RWMutex
10 13
 	view     dcView
11 14
 	preferIP preferIP
12 15
 }
13 16
 
14 17
 func (t *Telegram) GetAddresses(dc int) []Addr {
18
+	t.lock.RLock()
19
+	defer t.lock.RUnlock()
20
+
15 21
 	switch t.preferIP {
16 22
 	case preferIPOnlyIPv4:
17 23
 		return t.view.getV4(dc)
@@ -24,7 +30,7 @@ func (t *Telegram) GetAddresses(dc int) []Addr {
24 30
 	return append(t.view.getV6(dc), t.view.getV4(dc)...)
25 31
 }
26 32
 
27
-func New(ipPreference string, userOverrides map[int][]string) (*Telegram, error) {
33
+func New(ipPreference string) (*Telegram, error) {
28 34
 	var pref preferIP
29 35
 
30 36
 	switch strings.ToLower(ipPreference) {
@@ -40,40 +46,7 @@ func New(ipPreference string, userOverrides map[int][]string) (*Telegram, error)
40 46
 		return nil, fmt.Errorf("unknown ip preference %s", ipPreference)
41 47
 	}
42 48
 
43
-	overrides := dcAddrSet{
44
-		v4: map[int][]Addr{},
45
-		v6: map[int][]Addr{},
46
-	}
47
-	for dc, addrs := range userOverrides {
48
-		for _, addr := range addrs {
49
-			host, _, err := net.SplitHostPort(addr)
50
-			if err != nil {
51
-				return nil, fmt.Errorf("incorrect host %s: %w", addr, err)
52
-			}
53
-
54
-			parsed := net.ParseIP(host)
55
-			if parsed == nil {
56
-				return nil, fmt.Errorf("incorrect host %s", addr)
57
-			}
58
-
59
-			if parsed.To4() != nil {
60
-				overrides.v4[dc] = append(overrides.v4[dc], Addr{
61
-					Network: "tcp4",
62
-					Address: addr,
63
-				})
64
-			} else {
65
-				overrides.v6[dc] = append(overrides.v6[dc], Addr{
66
-					Network: "tcp6",
67
-					Address: addr,
68
-				})
69
-			}
70
-		}
71
-	}
72
-
73 49
 	return &Telegram{
74
-		view: dcView{
75
-			overrides: overrides,
76
-		},
77 50
 		preferIP: pref,
78 51
 	}, nil
79 52
 }

+ 39
- 0
mtglib/internal/dc/updater.go Datei anzeigen

@@ -0,0 +1,39 @@
1
+package dc
2
+
3
+import (
4
+	"context"
5
+	"time"
6
+)
7
+
8
+type updater struct {
9
+	logger Logger
10
+	period time.Duration
11
+}
12
+
13
+func (u updater) run(ctx context.Context, callback func() error) {
14
+	ticker := time.NewTicker(u.period)
15
+
16
+	defer func() {
17
+		ticker.Stop()
18
+
19
+		select {
20
+		case <-ticker.C:
21
+		default:
22
+		}
23
+	}()
24
+
25
+	for {
26
+		u.logger.Info("start update")
27
+		if err := callback(); err != nil {
28
+			u.logger.WarningError("cannot update: %w", err)
29
+		}
30
+		u.logger.Info("updated")
31
+
32
+		select {
33
+		case <-ctx.Done():
34
+			u.logger.Info("stop updating")
35
+			return
36
+		case <-ticker.C:
37
+		}
38
+	}
39
+}

+ 55
- 0
mtglib/internal/dc/updater_test.go Datei anzeigen

@@ -0,0 +1,55 @@
1
+package dc
2
+
3
+import (
4
+	"sync"
5
+	"testing"
6
+	"time"
7
+
8
+	"github.com/stretchr/testify/suite"
9
+)
10
+
11
+type UpdaterTestSuite struct {
12
+	UpdaterTestSuiteBase
13
+
14
+	u updater
15
+}
16
+
17
+func (s *UpdaterTestSuite) SetupTest() {
18
+	s.UpdaterTestSuiteBase.SetupTest()
19
+	s.u = updater{
20
+		logger: s.loggerMock,
21
+		period: 100 * time.Millisecond,
22
+	}
23
+}
24
+
25
+func (s *UpdaterTestSuite) TestPeriodicUpdates() {
26
+	ticker := time.NewTicker(10 * time.Millisecond)
27
+	defer ticker.Stop()
28
+
29
+	lock := &sync.Mutex{}
30
+	collected := []time.Time{}
31
+
32
+	go s.u.run(s.ctx, func() error {
33
+		select {
34
+		case <-s.ctx.Done():
35
+		case value := <-ticker.C:
36
+			lock.Lock()
37
+			collected = append(collected, value)
38
+			lock.Unlock()
39
+		}
40
+
41
+		return nil
42
+	})
43
+
44
+	s.Eventually(func() bool {
45
+		lock.Lock()
46
+		defer lock.Unlock()
47
+
48
+		return len(collected) == 3
49
+	}, time.Second, 10*time.Millisecond)
50
+}
51
+
52
+func TestUpdater(t *testing.T) {
53
+	t.Parallel()
54
+	suite.Run(t, &UpdaterTestSuite{})
55
+}

+ 6
- 5
mtglib/internal/dc/view.go Datei anzeigen

@@ -1,20 +1,21 @@
1 1
 package dc
2 2
 
3 3
 type dcView struct {
4
-	overrides dcAddrSet
4
+	publicConfigs dcAddrSet
5
+	ownConfigs    dcAddrSet
5 6
 }
6 7
 
7 8
 func (d dcView) getV4(dc int) []Addr {
8
-	addrs := d.overrides.getV4(dc)
9
-	addrs = append(addrs, defaultDCOverridesAddrSet.getV4(dc)...)
9
+	addrs := d.publicConfigs.getV4(dc)
10
+	addrs = append(addrs, d.ownConfigs.getV4(dc)...)
10 11
 	addrs = append(addrs, defaultDCAddrSet.getV4(dc)...)
11 12
 
12 13
 	return addrs
13 14
 }
14 15
 
15 16
 func (d dcView) getV6(dc int) []Addr {
16
-	addrs := d.overrides.getV6(dc)
17
-	addrs = append(addrs, defaultDCOverridesAddrSet.getV6(dc)...)
17
+	addrs := d.publicConfigs.getV6(dc)
18
+	addrs = append(addrs, d.ownConfigs.getV6(dc)...)
18 19
 	addrs = append(addrs, defaultDCAddrSet.getV6(dc)...)
19 20
 
20 21
 	return addrs

+ 7
- 9
mtglib/internal/dc/view_test.go Datei anzeigen

@@ -16,7 +16,7 @@ type ViewTestSuite struct {
16 16
 
17 17
 func (suite *ViewTestSuite) SetupSuite() {
18 18
 	suite.view = dcView{
19
-		overrides: dcAddrSet{
19
+		publicConfigs: dcAddrSet{
20 20
 			v4: map[int][]Addr{
21 21
 				111: {
22 22
 					{Network: "tcp4", Address: "127.0.0.1:443"},
@@ -37,15 +37,14 @@ func (suite *ViewTestSuite) SetupSuite() {
37 37
 func (suite *ViewTestSuite) TestGetV4() {
38 38
 	testData := map[int][]Addr{
39 39
 		111: {
40
-			{"tcp4", "127.0.0.1:443"},
40
+			{Network: "tcp4", Address: "127.0.0.1:443"},
41 41
 		},
42 42
 		203: {
43
-			{"tcp4", "127.0.0.2:443"},
44
-			{"tcp4", "91.105.192.100:443"},
43
+			{Network: "tcp4", Address: "127.0.0.2:443"},
45 44
 		},
46 45
 		2: {
47
-			{"tcp4", "149.154.167.51:443"},
48
-			{"tcp4", "95.161.76.100:443"},
46
+			{Network: "tcp4", Address: "149.154.167.51:443"},
47
+			{Network: "tcp4", Address: "95.161.76.100:443"},
49 48
 		},
50 49
 	}
51 50
 
@@ -60,11 +59,10 @@ func (suite *ViewTestSuite) TestGetV6() {
60 59
 	testData := map[int][]Addr{
61 60
 		111: {},
62 61
 		203: {
63
-			{"tcp6", "xxx"},
64
-			{"tcp6", "[2a0a:f280:0203:000a:5000:0000:0000:0100]:443"},
62
+			{Network: "tcp6", Address: "xxx"},
65 63
 		},
66 64
 		1: {
67
-			{"tcp6", "[2001:b28:f23d:f001::a]:443"},
65
+			{Network: "tcp6", Address: "[2001:b28:f23d:f001::a]:443"},
68 66
 		},
69 67
 	}
70 68
 

+ 13
- 2
mtglib/proxy.go Datei anzeigen

@@ -297,12 +297,15 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
297 297
 		return nil, fmt.Errorf("invalid settings: %w", err)
298 298
 	}
299 299
 
300
-	tg, err := dc.New(opts.getPreferIP(), opts.DCOverrides)
300
+	tg, err := dc.New(opts.getPreferIP())
301 301
 	if err != nil {
302 302
 		return nil, fmt.Errorf("cannot build telegram dc fetcher: %w", err)
303 303
 	}
304 304
 
305 305
 	ctx, cancel := context.WithCancel(context.Background())
306
+	logger := opts.getLogger("proxy")
307
+	updatersLogger := logger.Named("telegram-updaters")
308
+
306 309
 	proxy := &Proxy{
307 310
 		ctx:                      ctx,
308 311
 		ctxCancel:                cancel,
@@ -312,7 +315,7 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
312 315
 		blocklist:                opts.IPBlocklist,
313 316
 		allowlist:                opts.IPAllowlist,
314 317
 		eventStream:              opts.EventStream,
315
-		logger:                   opts.getLogger("proxy"),
318
+		logger:                   logger,
316 319
 		domainFrontingPort:       opts.getDomainFrontingPort(),
317 320
 		tolerateTimeSkewness:     opts.getTolerateTimeSkewness(),
318 321
 		allowFallbackOnUnknownDC: opts.AllowFallbackOnUnknownDC,
@@ -322,6 +325,14 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
322 325
 		},
323 326
 	}
324 327
 
328
+	publicConfigUpdater := dc.NewPublicConfigUpdater(
329
+		tg,
330
+		updatersLogger.Named("public-config"),
331
+		opts.Network.MakeHTTPClient(nil),
332
+	)
333
+	go publicConfigUpdater.Run(ctx, dc.PublicConfigUpdateURLv4, "tcp4")
334
+	go publicConfigUpdater.Run(ctx, dc.PublicConfigUpdateURLv6, "tcp6")
335
+
325 336
 	pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
326 337
 		func(arg any) {
327 338
 			proxy.ServeConn(arg.(essentials.Conn)) //nolint: forcetypeassert

Laden…
Abbrechen
Speichern