Procházet zdrojové kódy

Create internal DC package

tags/v2.1.9^2
9seconds před 2 měsíci
rodič
revize
0a5a45b32d

+ 10
- 0
mtglib/internal/dc/addr.go Zobrazit soubor

@@ -0,0 +1,10 @@
1
+package dc
2
+
3
+type Addr struct {
4
+	Network string
5
+	Address string
6
+}
7
+
8
+func (d Addr) String() string {
9
+	return d.Address
10
+}

+ 33
- 0
mtglib/internal/dc/addr_set.go Zobrazit soubor

@@ -0,0 +1,33 @@
1
+package dc
2
+
3
+import "math/rand/v2"
4
+
5
+type dcAddrSet struct {
6
+	v4 map[int][]Addr
7
+	v6 map[int][]Addr
8
+}
9
+
10
+func (d dcAddrSet) getV4(dc int) []Addr {
11
+	if d.v4 == nil {
12
+		return nil
13
+	}
14
+	return d.get(d.v4[dc])
15
+}
16
+
17
+func (d dcAddrSet) getV6(dc int) []Addr {
18
+	if d.v6 == nil {
19
+		return nil
20
+	}
21
+	return d.get(d.v6[dc])
22
+}
23
+
24
+func (d dcAddrSet) get(addrs []Addr) []Addr {
25
+	otherSet := make([]Addr, 0, len(addrs))
26
+	otherSet = append(otherSet, addrs...)
27
+
28
+	rand.Shuffle(len(otherSet), func(i, j int) {
29
+		otherSet[i], otherSet[j] = otherSet[j], otherSet[i]
30
+	})
31
+
32
+	return otherSet
33
+}

+ 16
- 0
mtglib/internal/dc/addr_test.go Zobrazit soubor

@@ -0,0 +1,16 @@
1
+package dc_test
2
+
3
+import (
4
+	"testing"
5
+
6
+	"github.com/9seconds/mtg/v2/mtglib/internal/dc"
7
+	"github.com/stretchr/testify/assert"
8
+)
9
+
10
+func TestAddr(t *testing.T) {
11
+	t.Parallel()
12
+
13
+	addr := dc.Addr{Network: "tcp4", Address: "127.0.0.1:443"}
14
+
15
+	assert.Equal(t, "127.0.0.1:443", addr.String())
16
+}

+ 79
- 0
mtglib/internal/dc/init.go Zobrazit soubor

@@ -0,0 +1,79 @@
1
+package dc
2
+
3
+import "time"
4
+
5
+type preferIP uint8
6
+
7
+const (
8
+	preferIPOnlyIPv4 preferIP = iota
9
+	preferIPOnlyIPv6
10
+	preferIPPreferIPv4
11
+	preferIPPreferIPv6
12
+)
13
+
14
+const (
15
+	DefaultDC                    = 2
16
+	DefaultUpdateDCAddressesEach = time.Hour
17
+
18
+	defaultAppID   = 123456
19
+	defaultAppHash = ""
20
+)
21
+
22
+type Logger interface {
23
+	Info(msg string)
24
+	WarningError(msg string, err error)
25
+}
26
+
27
+var (
28
+	// https://github.com/telegramdesktop/tdesktop/blob/master/Telegram/SourceFiles/mtproto/mtproto_dc_options.cpp#L30
29
+	defaultDCAddrSet = dcAddrSet{
30
+		v4: map[int][]Addr{
31
+			1: {
32
+				{Network: "tcp4", Address: "149.154.175.50:443"},
33
+			},
34
+			2: {
35
+				{Network: "tcp4", Address: "149.154.167.51:443"},
36
+				{Network: "tcp4", Address: "95.161.76.100:443"},
37
+			},
38
+			3: {
39
+				{Network: "tcp4", Address: "149.154.175.100:443"},
40
+			},
41
+			4: {
42
+				{Network: "tcp4", Address: "149.154.167.91:443"},
43
+			},
44
+			5: {
45
+				{Network: "tcp4", Address: "149.154.171.5:443"},
46
+			},
47
+		},
48
+		v6: map[int][]Addr{
49
+			1: {
50
+				{Network: "tcp6", Address: "[2001:b28:f23d:f001::a]:443"},
51
+			},
52
+			2: {
53
+				{Network: "tcp6", Address: "[2001:67c:04e8:f002::a]:443"},
54
+			},
55
+			3: {
56
+				{Network: "tcp6", Address: "[2001:b28:f23d:f003::a]:443"},
57
+			},
58
+			4: {
59
+				{Network: "tcp6", Address: "[2001:67c:04e8:f004::a]:443"},
60
+			},
61
+			5: {
62
+				{Network: "tcp6", Address: "[2001:b28:f23f:f005::a]:443"},
63
+			},
64
+		},
65
+	}
66
+
67
+	defaultDCOverridesAddrSet = dcAddrSet{
68
+		v4: map[int][]Addr{
69
+			203: {
70
+				{Network: "tcp4", Address: "91.105.192.100:443"},
71
+			},
72
+		},
73
+		v6: map[int][]Addr{
74
+			203: {
75
+				{Network: "tcp6", Address: "[2a0a:f280:0203:000a:5000:0000:0000:0100]:443"},
76
+			},
77
+		},
78
+	}
79
+)

+ 160
- 0
mtglib/internal/dc/telegram.go Zobrazit soubor

@@ -0,0 +1,160 @@
1
+package dc
2
+
3
+import (
4
+	"context"
5
+	"fmt"
6
+	"net"
7
+	"strconv"
8
+	"strings"
9
+	"sync"
10
+	"time"
11
+
12
+	"github.com/gotd/td/telegram"
13
+)
14
+
15
+type Telegram struct {
16
+	logger   Logger
17
+	lock     sync.RWMutex
18
+	view     dcView
19
+	preferIP preferIP
20
+	client   *telegram.Client
21
+}
22
+
23
+func (t *Telegram) GetAddresses(dc int) []Addr {
24
+	t.lock.RLock()
25
+	defer t.lock.RUnlock()
26
+
27
+	switch t.preferIP {
28
+	case preferIPOnlyIPv4:
29
+		return t.view.getV4(dc)
30
+	case preferIPOnlyIPv6:
31
+		return t.view.getV4(dc)
32
+	case preferIPPreferIPv4:
33
+		return append(t.view.getV4(dc), t.view.getV6(dc)...)
34
+	}
35
+
36
+	return append(t.view.getV6(dc), t.view.getV4(dc)...)
37
+}
38
+
39
+func (t *Telegram) Run(ctx context.Context, updateEach time.Duration) {
40
+	if updateEach == 0 {
41
+		updateEach = DefaultUpdateDCAddressesEach
42
+	}
43
+
44
+	t.update(ctx)
45
+
46
+	ticker := time.NewTicker(updateEach)
47
+	defer func() {
48
+		ticker.Stop()
49
+
50
+		select {
51
+		case <-ctx.Done():
52
+		case <-ticker.C:
53
+		default:
54
+		}
55
+	}()
56
+
57
+	for {
58
+		select {
59
+		case <-ctx.Done():
60
+			return
61
+		case <-ticker.C:
62
+			t.update(ctx)
63
+		}
64
+	}
65
+}
66
+
67
+func (t *Telegram) update(ctx context.Context) {
68
+	collected := dcAddrSet{
69
+		v4: map[int][]Addr{},
70
+		v6: map[int][]Addr{},
71
+	}
72
+
73
+	err := t.client.Run(ctx, func(tgctx context.Context) error {
74
+		conf, err := t.client.API().HelpGetConfig(tgctx)
75
+		if err != nil {
76
+			return err
77
+		}
78
+
79
+		for _, opt := range conf.DCOptions {
80
+			addr := net.JoinHostPort(opt.IPAddress, strconv.Itoa(opt.Port))
81
+
82
+			if opt.Ipv6 {
83
+				collected.v6[opt.ID] = append(collected.v6[opt.ID], Addr{
84
+					Network: "tcp6",
85
+					Address: addr,
86
+				})
87
+			} else {
88
+				collected.v4[opt.ID] = append(collected.v4[opt.ID], Addr{
89
+					Network: "tcp4",
90
+					Address: addr,
91
+				})
92
+			}
93
+		}
94
+
95
+		return nil
96
+	})
97
+	if err != nil {
98
+		t.logger.WarningError("update has failed", err)
99
+		return
100
+	}
101
+
102
+	t.lock.Lock()
103
+	t.view.collected = collected
104
+	t.lock.Unlock()
105
+
106
+	t.logger.Info(fmt.Sprintf("updated DC list: %v", collected))
107
+}
108
+
109
+func New(logger Logger, ipPreference string, userOverrides map[int][]string) (*Telegram, error) {
110
+	var pref preferIP
111
+
112
+	switch strings.ToLower(ipPreference) {
113
+	case "prefer-ipv4":
114
+		pref = preferIPPreferIPv4
115
+	case "prefer-ipv6":
116
+		pref = preferIPPreferIPv6
117
+	case "only-ipv4":
118
+		pref = preferIPOnlyIPv4
119
+	case "only-ipv6":
120
+		pref = preferIPOnlyIPv6
121
+	default:
122
+		return nil, fmt.Errorf("unknown ip preference %s", ipPreference)
123
+	}
124
+
125
+	overrides := dcAddrSet{}
126
+	for dc, addrs := range userOverrides {
127
+		for _, addr := range addrs {
128
+			host, _, err := net.SplitHostPort(addr)
129
+			if err != nil {
130
+				return nil, fmt.Errorf("incorrect host %s: %w", addr, err)
131
+			}
132
+
133
+			parsed := net.ParseIP(host)
134
+			if parsed == nil {
135
+				return nil, fmt.Errorf("incorrect host %s", addr)
136
+			}
137
+
138
+			if parsed.To4() != nil {
139
+				overrides.v4[dc] = append(overrides.v4[dc], Addr{
140
+					Network: "tcp4",
141
+					Address: addr,
142
+				})
143
+			} else {
144
+				overrides.v6[dc] = append(overrides.v6[dc], Addr{
145
+					Network: "tcp6",
146
+					Address: addr,
147
+				})
148
+			}
149
+		}
150
+	}
151
+
152
+	return &Telegram{
153
+		view: dcView{
154
+			overrides: overrides,
155
+		},
156
+		logger:   logger,
157
+		client:   telegram.NewClient(defaultAppID, defaultAppHash, telegram.Options{}),
158
+		preferIP: pref,
159
+	}, nil
160
+}

+ 24
- 0
mtglib/internal/dc/view.go Zobrazit soubor

@@ -0,0 +1,24 @@
1
+package dc
2
+
3
+type dcView struct {
4
+	overrides dcAddrSet
5
+	collected dcAddrSet
6
+}
7
+
8
+func (d dcView) getV4(dc int) []Addr {
9
+	addrs := d.overrides.getV4(dc)
10
+	addrs = append(addrs, defaultDCOverridesAddrSet.getV4(dc)...)
11
+	addrs = append(addrs, d.collected.getV4(dc)...)
12
+	addrs = append(addrs, defaultDCAddrSet.getV4(dc)...)
13
+
14
+	return addrs
15
+}
16
+
17
+func (d dcView) getV6(dc int) []Addr {
18
+	addrs := d.overrides.getV6(dc)
19
+	addrs = append(addrs, defaultDCOverridesAddrSet.getV6(dc)...)
20
+	addrs = append(addrs, d.collected.getV6(dc)...)
21
+	addrs = append(addrs, defaultDCAddrSet.getV6(dc)...)
22
+
23
+	return addrs
24
+}

+ 92
- 0
mtglib/internal/dc/view_test.go Zobrazit soubor

@@ -0,0 +1,92 @@
1
+package dc
2
+
3
+import (
4
+	"fmt"
5
+	"testing"
6
+
7
+	"github.com/stretchr/testify/assert"
8
+	"github.com/stretchr/testify/suite"
9
+)
10
+
11
+type ViewTestSuite struct {
12
+	suite.Suite
13
+
14
+	view dcView
15
+}
16
+
17
+func (suite *ViewTestSuite) SetupSuite() {
18
+	suite.view = dcView{
19
+		overrides: dcAddrSet{
20
+			v4: map[int][]Addr{
21
+				111: {
22
+					{Network: "tcp4", Address: "127.0.0.1:443"},
23
+				},
24
+				203: {
25
+					{Network: "tcp4", Address: "127.0.0.2:443"},
26
+				},
27
+			},
28
+			v6: map[int][]Addr{
29
+				203: {
30
+					{Network: "tcp6", Address: "xxx"},
31
+				},
32
+			},
33
+		},
34
+		collected: dcAddrSet{
35
+			v4: map[int][]Addr{
36
+				1: {
37
+					{Network: "tcp4", Address: "127.1.0.1:443"},
38
+				},
39
+			},
40
+		},
41
+	}
42
+}
43
+
44
+func (suite *ViewTestSuite) TestGetV4() {
45
+	testData := map[int][]Addr{
46
+		111: {
47
+			{"tcp4", "127.0.0.1:443"},
48
+		},
49
+		203: {
50
+			{"tcp4", "127.0.0.2:443"},
51
+			{"tcp4", "91.105.192.100:443"},
52
+		},
53
+		2: {
54
+			{"tcp4", "149.154.167.51:443"},
55
+			{"tcp4", "95.161.76.100:443"},
56
+		},
57
+		1: {
58
+			{"tcp4", "127.1.0.1:443"},
59
+			{"tcp4", "149.154.175.50:443"},
60
+		},
61
+	}
62
+
63
+	for dc, addresses := range testData {
64
+		suite.T().Run(fmt.Sprintf("dc%d", dc), func(t *testing.T) {
65
+			assert.ElementsMatch(t, addresses, suite.view.getV4(dc))
66
+		})
67
+	}
68
+}
69
+
70
+func (suite *ViewTestSuite) TestGetV6() {
71
+	testData := map[int][]Addr{
72
+		111: {},
73
+		203: {
74
+			{"tcp6", "xxx"},
75
+			{"tcp6", "[2a0a:f280:0203:000a:5000:0000:0000:0100]:443"},
76
+		},
77
+		1: {
78
+			{"tcp6", "[2001:b28:f23d:f001::a]:443"},
79
+		},
80
+	}
81
+
82
+	for dc, addresses := range testData {
83
+		suite.T().Run(fmt.Sprintf("dc%d", dc), func(t *testing.T) {
84
+			assert.ElementsMatch(t, addresses, suite.view.getV6(dc))
85
+		})
86
+	}
87
+}
88
+
89
+func TestView(t *testing.T) {
90
+	t.Parallel()
91
+	suite.Run(t, &ViewTestSuite{})
92
+}

+ 0
- 19
mtglib/internal/telegram/dc_addresses.go Zobrazit soubor

@@ -1,19 +0,0 @@
1
-package telegram
2
-
3
-type dcAddresses struct {
4
-	v4 map[int][]tgAddr
5
-	v6 map[int][]tgAddr
6
-}
7
-
8
-func (a dcAddresses) getV4(dc int) []tgAddr {
9
-	return a.v4[dc]
10
-}
11
-
12
-func (a dcAddresses) getV6(dc int) []tgAddr {
13
-	return a.v6[dc]
14
-}
15
-
16
-func (a dcAddresses) isValidDC(dc int) bool {
17
-	_, ok := a.v4[dc]
18
-	return ok
19
-}

+ 0
- 80
mtglib/internal/telegram/init.go Zobrazit soubor

@@ -1,80 +0,0 @@
1
-package telegram
2
-
3
-import (
4
-	"context"
5
-	"errors"
6
-	"time"
7
-
8
-	"github.com/9seconds/mtg/v2/essentials"
9
-)
10
-
11
-var errNoAddresses = errors.New("no addresses")
12
-
13
-type preferIP uint8
14
-
15
-const (
16
-	preferIPOnlyIPv4 preferIP = iota
17
-	preferIPOnlyIPv6
18
-	preferIPPreferIPv4
19
-	preferIPPreferIPv6
20
-)
21
-
22
-const (
23
-	defaultDC                    = 2
24
-	defaultUpdateDCAddressesEach = time.Hour
25
-	defaultAppID                 = 123456
26
-	defaultAppHash               = ""
27
-)
28
-
29
-type loggerInterface interface {
30
-	Info(msg string)
31
-	WarningError(msg string, err error)
32
-}
33
-
34
-type tgAddr struct {
35
-	network string
36
-	address string
37
-}
38
-
39
-// https://github.com/telegramdesktop/tdesktop/blob/master/Telegram/SourceFiles/mtproto/mtproto_dc_options.cpp#L30
40
-var (
41
-	defaultV4Addresses = map[int][]tgAddr{
42
-		1: {
43
-			{network: "tcp4", address: "149.154.175.50:443"},
44
-		},
45
-		2: {
46
-			{network: "tcp4", address: "149.154.167.51:443"},
47
-			{network: "tcp4", address: "95.161.76.100:443"},
48
-		},
49
-		3: {
50
-			{network: "tcp4", address: "149.154.175.100:443"},
51
-		},
52
-		4: {
53
-			{network: "tcp4", address: "149.154.167.91:443"},
54
-		},
55
-		5: {
56
-			{network: "tcp4", address: "149.154.171.5:443"},
57
-		},
58
-	}
59
-	defaultV6Addresses = map[int][]tgAddr{
60
-		1: {
61
-			{network: "tcp6", address: "[2001:b28:f23d:f001::a]:443"},
62
-		},
63
-		2: {
64
-			{network: "tcp6", address: "[2001:67c:04e8:f002::a]:443"},
65
-		},
66
-		3: {
67
-			{network: "tcp6", address: "[2001:b28:f23d:f003::a]:443"},
68
-		},
69
-		4: {
70
-			{network: "tcp6", address: "[2001:67c:04e8:f004::a]:443"},
71
-		},
72
-		5: {
73
-			{network: "tcp6", address: "[2001:b28:f23f:f005::a]:443"},
74
-		},
75
-	}
76
-)
77
-
78
-type Dialer interface {
79
-	DialContext(ctx context.Context, network, address string) (essentials.Conn, error)
80
-}

+ 0
- 42
mtglib/internal/telegram/rpc_client.go Zobrazit soubor

@@ -1,42 +0,0 @@
1
-package telegram
2
-
3
-import (
4
-	"context"
5
-	"net"
6
-	"strconv"
7
-
8
-	"github.com/gotd/td/telegram"
9
-)
10
-
11
-type rpcClient struct {
12
-	*telegram.Client
13
-}
14
-
15
-func (r rpcClient) getDCAddresses(logger loggerInterface, ctx context.Context) (dcAddresses, error) {
16
-	addrs := dcAddresses{
17
-		v4: map[int][]tgAddr{},
18
-		v6: map[int][]tgAddr{},
19
-	}
20
-
21
-	err := r.Client.Run(ctx, func(_ context.Context) error {
22
-		for _, opt := range r.Client.Config().DCOptions {
23
-			addr := net.JoinHostPort(opt.IPAddress, strconv.Itoa(opt.Port))
24
-
25
-			if opt.Ipv6 {
26
-				addrs.v6[opt.ID] = append(addrs.v6[opt.ID], tgAddr{
27
-					network: "tcp6",
28
-					address: addr,
29
-				})
30
-			} else {
31
-				addrs.v4[opt.ID] = append(addrs.v4[opt.ID], tgAddr{
32
-					network: "tcp4",
33
-					address: addr,
34
-				})
35
-			}
36
-		}
37
-
38
-		return nil
39
-	})
40
-
41
-	return addrs, err
42
-}

+ 0
- 139
mtglib/internal/telegram/telegram.go Zobrazit soubor

@@ -1,139 +0,0 @@
1
-package telegram
2
-
3
-import (
4
-	"context"
5
-	"fmt"
6
-	"strings"
7
-	"sync"
8
-	"time"
9
-
10
-	"github.com/9seconds/mtg/v2/essentials"
11
-	"github.com/gotd/td/telegram"
12
-)
13
-
14
-type Telegram struct {
15
-	ctx       context.Context
16
-	ctxCancel context.CancelFunc
17
-	lock      sync.RWMutex
18
-
19
-	dialer    Dialer
20
-	preferIP  preferIP
21
-	addresses dcAddresses
22
-	rpc       rpcClient
23
-}
24
-
25
-func (t *Telegram) Dial(ctx context.Context, dc int) (essentials.Conn, error) {
26
-	var addresses []tgAddr
27
-
28
-	t.lock.RLock()
29
-	switch t.preferIP {
30
-	case preferIPOnlyIPv4:
31
-		addresses = t.addresses.getV4(dc)
32
-	case preferIPOnlyIPv6:
33
-		addresses = t.addresses.getV6(dc)
34
-	case preferIPPreferIPv4:
35
-		addresses = append(t.addresses.getV4(dc), t.addresses.getV6(dc)...)
36
-	case preferIPPreferIPv6:
37
-		addresses = append(t.addresses.getV6(dc), t.addresses.getV4(dc)...)
38
-	}
39
-	t.lock.RUnlock()
40
-
41
-	var conn essentials.Conn
42
-
43
-	err := errNoAddresses
44
-
45
-	for _, v := range addresses {
46
-		conn, err = t.dialer.DialContext(ctx, v.network, v.address)
47
-		if err == nil {
48
-			return conn, nil
49
-		}
50
-	}
51
-
52
-	return nil, fmt.Errorf("cannot dial to %d dc: %w", dc, err)
53
-}
54
-
55
-func (t *Telegram) IsKnownDC(dc int) bool {
56
-	return t.addresses.isValidDC(dc)
57
-}
58
-
59
-func (t *Telegram) GetFallbackDC() int {
60
-	return defaultDC
61
-}
62
-
63
-func (t *Telegram) Shutdown() {
64
-	t.ctxCancel()
65
-}
66
-
67
-func (t *Telegram) Run(logger loggerInterface, updateEach time.Duration) {
68
-	if updateEach == 0 {
69
-		updateEach = defaultUpdateDCAddressesEach
70
-	}
71
-
72
-	t.update(logger)
73
-
74
-	ticker := time.NewTicker(updateEach)
75
-	defer func() {
76
-		ticker.Stop()
77
-
78
-		select {
79
-		case <-ticker.C:
80
-		default:
81
-		}
82
-	}()
83
-
84
-	for {
85
-		select {
86
-		case <-t.ctx.Done():
87
-			return
88
-		case <-ticker.C:
89
-			t.update(logger)
90
-		}
91
-	}
92
-}
93
-
94
-func (t *Telegram) update(logger loggerInterface) {
95
-	otherAddresses, err := t.rpc.getDCAddresses(logger, t.ctx)
96
-	if err != nil {
97
-		logger.WarningError("Cannot update DC list", err)
98
-		return
99
-	}
100
-
101
-	t.lock.Lock()
102
-	t.addresses = otherAddresses
103
-	t.lock.Unlock()
104
-
105
-	logger.Info(fmt.Sprintf("DC are updated: %v", t.addresses))
106
-}
107
-
108
-func New(dialer Dialer, ipPreference string) (*Telegram, error) {
109
-	var pref preferIP
110
-
111
-	switch strings.ToLower(ipPreference) {
112
-	case "prefer-ipv4":
113
-		pref = preferIPPreferIPv4
114
-	case "prefer-ipv6":
115
-		pref = preferIPPreferIPv6
116
-	case "only-ipv4":
117
-		pref = preferIPOnlyIPv4
118
-	case "only-ipv6":
119
-		pref = preferIPOnlyIPv6
120
-	default:
121
-		return nil, fmt.Errorf("unknown ip preference %s", ipPreference)
122
-	}
123
-
124
-	ctx, cancel := context.WithCancel(context.Background())
125
-
126
-	return &Telegram{
127
-		ctx:       ctx,
128
-		ctxCancel: cancel,
129
-		dialer:    dialer,
130
-		preferIP:  pref,
131
-		addresses: dcAddresses{
132
-			v4: defaultV4Addresses,
133
-			v6: defaultV6Addresses,
134
-		},
135
-		rpc: rpcClient{
136
-			Client: telegram.NewClient(defaultAppID, defaultAppHash, telegram.Options{}),
137
-		},
138
-	}, nil
139
-}

+ 0
- 159
mtglib/internal/telegram/telegram_internal_test.go Zobrazit soubor

@@ -1,159 +0,0 @@
1
-package telegram
2
-
3
-import (
4
-	"context"
5
-	"errors"
6
-	"io"
7
-	"net"
8
-	"strconv"
9
-	"testing"
10
-
11
-	"github.com/9seconds/mtg/v2/internal/testlib"
12
-	"github.com/stretchr/testify/assert"
13
-	"github.com/stretchr/testify/mock"
14
-	"github.com/stretchr/testify/suite"
15
-)
16
-
17
-type TelegramTestSuite struct {
18
-	suite.Suite
19
-
20
-	dialerMock *testlib.MtglibNetworkMock
21
-	t          *Telegram
22
-}
23
-
24
-func (suite *TelegramTestSuite) SetupTest() {
25
-	suite.dialerMock = &testlib.MtglibNetworkMock{}
26
-	suite.t, _ = New(suite.dialerMock, "prefer-ipv4", false)
27
-}
28
-
29
-func (suite *TelegramTestSuite) TearDownTest() {
30
-	suite.dialerMock.AssertExpectations(suite.T())
31
-}
32
-
33
-func (suite *TelegramTestSuite) TestUnknownDC() {
34
-	testData := []int{
35
-		-1,
36
-		0,
37
-		6,
38
-		100,
39
-	}
40
-
41
-	for _, v := range testData {
42
-		value := v
43
-
44
-		suite.T().Run(strconv.Itoa(value), func(t *testing.T) {
45
-			_, err := suite.t.Dial(context.Background(), value)
46
-			assert.Error(t, err)
47
-			assert.False(t, suite.t.IsKnownDC(value))
48
-		})
49
-	}
50
-}
51
-
52
-func (suite *TelegramTestSuite) TestDialToCorrectIPs() {
53
-	testData := map[int][]tgAddr{}
54
-
55
-	for i := 1; i <= 5; i++ {
56
-		testData[i] = []tgAddr{}
57
-		testData[i] = append(testData[i], productionV4Addresses[i-1]...)
58
-		testData[i] = append(testData[i], productionV6Addresses[i-1]...)
59
-	}
60
-
61
-	for i, v := range testData {
62
-		idx := i
63
-		addresses := v
64
-
65
-		suite.T().Run(strconv.Itoa(idx), func(t *testing.T) {
66
-			for _, addr := range addresses {
67
-				suite.dialerMock.
68
-					On("DialContext", mock.Anything, addr.network, addr.address).
69
-					Once().
70
-					Return((*net.TCPConn)(nil), io.EOF)
71
-			}
72
-
73
-			_, err := suite.t.Dial(context.Background(), idx)
74
-			assert.True(t, errors.Is(err, io.EOF))
75
-			assert.True(t, suite.t.IsKnownDC(idx))
76
-		})
77
-	}
78
-}
79
-
80
-func (suite *TelegramTestSuite) TestDialPreferIPRange() {
81
-	testData := map[string][]tgAddr{
82
-		"prefer-ipv4": {testV4Addresses[0][0], testV6Addresses[0][0]},
83
-		"prefer-ipv6": {testV6Addresses[0][0], testV4Addresses[0][0]},
84
-		"only-ipv4":   {testV4Addresses[0][0]},
85
-		"only-ipv6":   {testV6Addresses[0][0]},
86
-	}
87
-
88
-	for k, v := range testData {
89
-		name := k
90
-		addresses := v
91
-
92
-		suite.T().Run(name, func(t *testing.T) {
93
-			for _, addr := range addresses {
94
-				suite.dialerMock.
95
-					On("DialContext", mock.Anything, addr.network, addr.address).
96
-					Once().
97
-					Return((*net.TCPConn)(nil), io.EOF)
98
-			}
99
-
100
-			tg, _ := New(suite.dialerMock, name, true)
101
-			_, err := tg.Dial(context.Background(), 1)
102
-
103
-			assert.True(t, errors.Is(err, io.EOF))
104
-		})
105
-	}
106
-}
107
-
108
-func (suite *TelegramTestSuite) TestDialPreferIPPriority() {
109
-	testData := map[string]tgAddr{
110
-		"prefer-ipv4": productionV4Addresses[0][0],
111
-		"prefer-ipv6": productionV6Addresses[0][0],
112
-	}
113
-
114
-	for k, v := range testData {
115
-		name := k
116
-		addr := v
117
-
118
-		suite.T().Run(name, func(t *testing.T) {
119
-			conn := &net.TCPConn{}
120
-
121
-			suite.dialerMock.
122
-				On("DialContext", mock.Anything, addr.network, addr.address).
123
-				Once().
124
-				Return(conn, nil)
125
-
126
-			tg, _ := New(suite.dialerMock, name, false)
127
-
128
-			res, err := tg.Dial(context.Background(), 1)
129
-			assert.NoError(t, err)
130
-			assert.Equal(t, conn, res)
131
-		})
132
-	}
133
-}
134
-
135
-func (suite *TelegramTestSuite) TestUnknownPreferIP() {
136
-	_, err := New(suite.dialerMock, "xxx", false)
137
-	suite.Error(err)
138
-}
139
-
140
-func (suite *TelegramTestSuite) TestFallbackDC() {
141
-	dcs := make([]int, 10)
142
-
143
-	for i := 0; i < len(dcs); i++ {
144
-		dcs[i] = suite.t.GetFallbackDC()
145
-	}
146
-
147
-	for _, v := range dcs {
148
-		value := v
149
-
150
-		suite.T().Run(strconv.Itoa(value), func(t *testing.T) {
151
-			assert.True(t, suite.t.IsKnownDC(value))
152
-		})
153
-	}
154
-}
155
-
156
-func TestTelegram(t *testing.T) {
157
-	t.Parallel()
158
-	suite.Run(t, &TelegramTestSuite{})
159
-}

+ 26
- 13
mtglib/proxy.go Zobrazit soubor

@@ -10,11 +10,11 @@ import (
10 10
 	"time"
11 11
 
12 12
 	"github.com/9seconds/mtg/v2/essentials"
13
+	"github.com/9seconds/mtg/v2/mtglib/internal/dc"
13 14
 	"github.com/9seconds/mtg/v2/mtglib/internal/faketls"
14 15
 	"github.com/9seconds/mtg/v2/mtglib/internal/faketls/record"
15 16
 	"github.com/9seconds/mtg/v2/mtglib/internal/obfuscated2"
16 17
 	"github.com/9seconds/mtg/v2/mtglib/internal/relay"
17
-	"github.com/9seconds/mtg/v2/mtglib/internal/telegram"
18 18
 	"github.com/panjf2000/ants/v2"
19 19
 )
20 20
 
@@ -28,7 +28,7 @@ type Proxy struct {
28 28
 	tolerateTimeSkewness     time.Duration
29 29
 	domainFrontingPort       int
30 30
 	workerPool               *ants.PoolWithFunc
31
-	telegram                 *telegram.Telegram
31
+	telegram                 *dc.Telegram
32 32
 
33 33
 	secret          Secret
34 34
 	network         Network
@@ -144,7 +144,6 @@ func (p *Proxy) Shutdown() {
144 144
 	p.ctxCancel()
145 145
 	p.streamWaitGroup.Wait()
146 146
 	p.workerPool.Release()
147
-	p.telegram.Shutdown()
148 147
 
149 148
 	p.allowlist.Shutdown()
150 149
 	p.blocklist.Shutdown()
@@ -220,18 +219,26 @@ func (p *Proxy) doObfuscated2Handshake(ctx *streamContext) error {
220 219
 }
221 220
 
222 221
 func (p *Proxy) doTelegramCall(ctx *streamContext) error {
223
-	dc := ctx.dc
224
-
225
-	if p.allowFallbackOnUnknownDC && !p.telegram.IsKnownDC(dc) {
226
-		dc = p.telegram.GetFallbackDC()
227
-		ctx.logger = ctx.logger.BindInt("fallback_dc", dc)
222
+	dcid := ctx.dc
228 223
 
224
+	addresses := p.telegram.GetAddresses(dcid)
225
+	if len(addresses) == 0 && p.allowFallbackOnUnknownDC {
226
+		ctx.logger = ctx.logger.BindInt("fallback_dc", dc.DefaultDC)
229 227
 		ctx.logger.Warning("unknown DC, fallbacks")
228
+		addresses = p.telegram.GetAddresses(dc.DefaultDC)
230 229
 	}
231 230
 
232
-	conn, err := p.telegram.Dial(ctx, dc)
231
+	var conn essentials.Conn
232
+	var err error
233
+
234
+	for _, addr := range addresses {
235
+		conn, err = p.network.Dial(addr.Network, addr.Address)
236
+		if err == nil {
237
+			break
238
+		}
239
+	}
233 240
 	if err != nil {
234
-		return fmt.Errorf("cannot dial to Telegram: %w", err)
241
+		return fmt.Errorf("no addresses to call: %w", err)
235 242
 	}
236 243
 
237 244
 	encryptor, decryptor, err := obfuscated2.ServerHandshake(conn)
@@ -293,9 +300,15 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
293 300
 		return nil, fmt.Errorf("invalid settings: %w", err)
294 301
 	}
295 302
 
296
-	tg, err := telegram.New(opts.Network, opts.getPreferIP())
303
+	logger := opts.getLogger("proxy")
304
+
305
+	tg, err := dc.New(
306
+		logger.Named("telegram"),
307
+		opts.getPreferIP(),
308
+		map[int][]string{},
309
+	) // TODO: propagate value
297 310
 	if err != nil {
298
-		return nil, fmt.Errorf("cannot build telegram dialer: %w", err)
311
+		return nil, fmt.Errorf("cannot build telegram dc fetcher: %w", err)
299 312
 	}
300 313
 
301 314
 	ctx, cancel := context.WithCancel(context.Background())
@@ -315,7 +328,7 @@ func NewProxy(opts ProxyOpts) (*Proxy, error) {
315 328
 		telegram:                 tg,
316 329
 	}
317 330
 
318
-	go tg.Run(proxy.logger.Named("telegram"), 0)
331
+	go tg.Run(ctx, 0) // TODO: propagate value
319 332
 
320 333
 	pool, err := ants.NewPoolWithFunc(opts.getConcurrency(),
321 334
 		func(arg interface{}) {

Načítá se…
Zrušit
Uložit