소스 검색

Add skeleton of the proxy

tags/v2.0.0-rc1
9seconds 5 년 전
부모
커밋
23519913f2
7개의 변경된 파일224개의 추가작업 그리고 13개의 파일을 삭제
  1. 8
    9
      go.mod
  2. 7
    2
      go.sum
  3. 1
    1
      ipblocklist/firehol.go
  4. 13
    1
      mtglib/init.go
  5. 120
    0
      mtglib/proxy.go
  6. 12
    0
      mtglib/proxy_opts.go
  7. 63
    0
      mtglib/stream_context.go

+ 8
- 9
go.mod 파일 보기

@@ -3,26 +3,25 @@ module github.com/9seconds/mtg/v2
3 3
 go 1.16
4 4
 
5 5
 require (
6
-	github.com/OneOfOne/xxhash v1.2.8 // indirect
6
+	github.com/OneOfOne/xxhash v1.2.8
7 7
 	github.com/alecthomas/kong v0.2.16
8 8
 	github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
9 9
 	github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
10 10
 	github.com/babolivier/go-doh-client v0.0.0-20201028162107-a76cff4cb8b6
11
+	github.com/d4l3k/messagediff v1.2.1 // indirect
11 12
 	github.com/jarcoal/httpmock v1.0.8
12
-	github.com/kentik/patricia v0.0.0-20201202224819-f9447a6e25f1 // indirect
13
-	github.com/kr/pretty v0.1.0 // indirect
13
+	github.com/kentik/patricia v0.0.0-20201202224819-f9447a6e25f1
14 14
 	github.com/libp2p/go-reuseport v0.0.2
15 15
 	github.com/mccutchen/go-httpbin v1.1.1
16
-	github.com/panjf2000/ants v1.3.0 // indirect
16
+	github.com/panjf2000/ants/v2 v2.4.3
17 17
 	github.com/pelletier/go-toml v1.8.1
18
-	github.com/prometheus/client_golang v1.9.0 // indirect
19
-	github.com/rs/zerolog v1.20.0 // indirect
20
-	github.com/smira/go-statsd v1.3.2 // indirect
18
+	github.com/prometheus/client_golang v1.9.0
19
+	github.com/rs/zerolog v1.20.0
20
+	github.com/smira/go-statsd v1.3.2
21 21
 	github.com/stretchr/objx v0.3.0 // indirect
22 22
 	github.com/stretchr/testify v1.7.0
23
-	github.com/tylertreat/BoomFilters v0.0.0-20200520150052-42a7b4300c0c // indirect
23
+	github.com/tylertreat/BoomFilters v0.0.0-20200520150052-42a7b4300c0c
24 24
 	github.com/xeipuuv/gojsonschema v1.2.0
25 25
 	golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
26 26
 	golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 // indirect
27
-	gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
28 27
 )

+ 7
- 2
go.sum 파일 보기

@@ -50,6 +50,8 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
50 50
 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
51 51
 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
52 52
 github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
53
+github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U=
54
+github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo=
53 55
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
54 56
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
55 57
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -100,6 +102,7 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ
100 102
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
101 103
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
102 104
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
105
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
103 106
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
104 107
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
105 108
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
@@ -210,8 +213,8 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ
210 213
 github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
211 214
 github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
212 215
 github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
213
-github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
214
-github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
216
+github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
217
+github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
215 218
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
216 219
 github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
217 220
 github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM=
@@ -402,6 +405,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
402 405
 golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
403 406
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
404 407
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
408
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
405 409
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
406 410
 google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
407 411
 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@@ -445,6 +449,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
445 449
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
446 450
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
447 451
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
452
+gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
448 453
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
449 454
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
450 455
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 1
- 1
ipblocklist/firehol.go 파일 보기

@@ -18,7 +18,7 @@ import (
18 18
 	"github.com/9seconds/mtg/v2/mtglib"
19 19
 	"github.com/kentik/patricia"
20 20
 	"github.com/kentik/patricia/bool_tree"
21
-	"github.com/panjf2000/ants"
21
+	"github.com/panjf2000/ants/v2"
22 22
 )
23 23
 
24 24
 const (

+ 13
- 1
mtglib/init.go 파일 보기

@@ -8,7 +8,19 @@ import (
8 8
 	"time"
9 9
 )
10 10
 
11
-var ErrSecretEmpty = errors.New("secret is empty")
11
+var (
12
+	ErrSecretEmpty                 = errors.New("secret is empty")
13
+	ErrSecretInvalid               = errors.New("secret is invalid")
14
+	ErrNetworkIsNotDefined         = errors.New("network is not defined")
15
+	ErrAntiReplayCacheIsNotDefined = errors.New("anti-replay cache is not defined")
16
+	ErrIPBlocklistIsNotDefined     = errors.New("ip blocklist is not defined")
17
+	ErrEventStreamIsNotDefined     = errors.New("event stream is not defined")
18
+	ErrLoggerIsNotDefined          = errors.New("logger is not defined")
19
+)
20
+
21
+const (
22
+	DefaultConcurrency = 4096
23
+)
12 24
 
13 25
 type Network interface {
14 26
 	Dial(network, address string) (net.Conn, error)

+ 120
- 0
mtglib/proxy.go 파일 보기

@@ -0,0 +1,120 @@
1
+package mtglib
2
+
3
+import (
4
+	"context"
5
+	"errors"
6
+	"fmt"
7
+	"net"
8
+	"sync"
9
+	"time"
10
+
11
+	"github.com/panjf2000/ants/v2"
12
+)
13
+
14
+type Proxy struct {
15
+	ctx             context.Context
16
+	ctxCancel       context.CancelFunc
17
+	streamWaitGroup sync.WaitGroup
18
+	workerPool      *ants.PoolWithFunc
19
+
20
+	secret          Secret
21
+	network         Network
22
+	antiReplayCache AntiReplayCache
23
+	ipBlocklist     IPBlocklist
24
+	eventStream     EventStream
25
+	logger          Logger
26
+}
27
+
28
+func (p *Proxy) ServeConn(conn net.Conn) {
29
+	ctx := newStreamContext(p.ctx, p.logger, conn)
30
+	defer ctx.Close()
31
+
32
+	p.eventStream.Send(ctx, EventStart{
33
+		CreatedAt: time.Now(),
34
+		ConnID:    ctx.connID,
35
+		RemoteIP:  ctx.ClientIP(),
36
+	})
37
+	ctx.logger.Info("Stream has been started")
38
+
39
+	defer func() {
40
+		p.eventStream.Send(ctx, EventFinish{
41
+			CreatedAt: time.Now(),
42
+			ConnID:    ctx.connID,
43
+		})
44
+		ctx.logger.Info("Stream has been finished")
45
+	}()
46
+}
47
+
48
+func (p *Proxy) Serve(listener net.Listener) error {
49
+	for {
50
+		conn, err := listener.Accept()
51
+		if err != nil {
52
+			return fmt.Errorf("cannot accept a new connection: %w", err)
53
+		}
54
+
55
+		err = p.workerPool.Invoke(conn)
56
+
57
+		switch {
58
+		case err == nil:
59
+		case errors.Is(err, ants.ErrPoolClosed):
60
+			return nil
61
+		case errors.Is(err, ants.ErrPoolOverload):
62
+			p.eventStream.Send(p.ctx, EventConcurrencyLimited{})
63
+		}
64
+	}
65
+}
66
+
67
+func (p *Proxy) Shutdown() {
68
+	p.ctxCancel()
69
+	p.streamWaitGroup.Wait()
70
+	p.workerPool.Release()
71
+}
72
+
73
+type antsLogger struct{}
74
+
75
+func (a antsLogger) Printf(msg string, args ...interface{}) {}
76
+
77
+func NewProxy(opts ProxyOpts) (*Proxy, error) {
78
+	switch {
79
+	case opts.Network == nil:
80
+		return nil, ErrNetworkIsNotDefined
81
+	case opts.AntiReplayCache == nil:
82
+		return nil, ErrAntiReplayCacheIsNotDefined
83
+	case opts.IPBlocklist == nil:
84
+		return nil, ErrIPBlocklistIsNotDefined
85
+	case opts.EventStream == nil:
86
+		return nil, ErrEventStreamIsNotDefined
87
+	case opts.Logger == nil:
88
+		return nil, ErrLoggerIsNotDefined
89
+	case !opts.Secret.Valid():
90
+		return nil, ErrSecretInvalid
91
+	}
92
+
93
+	concurrency := opts.Concurrency
94
+	if concurrency == 0 {
95
+		concurrency = DefaultConcurrency
96
+	}
97
+
98
+	ctx, cancel := context.WithCancel(context.Background())
99
+	proxy := &Proxy{
100
+		ctx:             ctx,
101
+		ctxCancel:       cancel,
102
+		secret:          opts.Secret,
103
+		network:         opts.Network,
104
+		antiReplayCache: opts.AntiReplayCache,
105
+		ipBlocklist:     opts.IPBlocklist,
106
+		eventStream:     opts.EventStream,
107
+		logger:          opts.Logger.Named("proxy"),
108
+	}
109
+
110
+	pool, err := ants.NewPoolWithFunc(int(concurrency), func(arg interface{}) {
111
+		proxy.ServeConn(arg.(net.Conn))
112
+	}, ants.WithLogger(antsLogger{}))
113
+	if err != nil {
114
+		return nil, fmt.Errorf("cannot initialize a pool: %w", err)
115
+	}
116
+
117
+	proxy.workerPool = pool
118
+
119
+	return proxy, nil
120
+}

+ 12
- 0
mtglib/proxy_opts.go 파일 보기

@@ -0,0 +1,12 @@
1
+package mtglib
2
+
3
+type ProxyOpts struct {
4
+	Secret          Secret
5
+	Network         Network
6
+	AntiReplayCache AntiReplayCache
7
+	IPBlocklist     IPBlocklist
8
+	EventStream     EventStream
9
+	Logger          Logger
10
+
11
+	Concurrency uint
12
+}

+ 63
- 0
mtglib/stream_context.go 파일 보기

@@ -0,0 +1,63 @@
1
+package mtglib
2
+
3
+import (
4
+	"context"
5
+	"crypto/rand"
6
+	"encoding/base64"
7
+	"net"
8
+	"time"
9
+)
10
+
11
+type streamContext struct {
12
+	ctx        context.Context
13
+	ctxCancel  context.CancelFunc
14
+	clientConn net.Conn
15
+	connID     string
16
+	logger     Logger
17
+}
18
+
19
+func (s *streamContext) Deadline() (time.Time, bool) {
20
+	return s.ctx.Deadline()
21
+}
22
+
23
+func (s *streamContext) Done() <-chan struct{} {
24
+	return s.ctx.Done()
25
+}
26
+
27
+func (s *streamContext) Err() error {
28
+	return s.ctx.Err()
29
+}
30
+
31
+func (s *streamContext) Value(key interface{}) interface{} {
32
+	return s.ctx.Value(key)
33
+}
34
+
35
+func (s *streamContext) Close() {
36
+	s.ctxCancel()
37
+	s.clientConn.Close()
38
+}
39
+
40
+func (s *streamContext) ClientIP() net.IP {
41
+	return s.clientConn.RemoteAddr().(*net.TCPAddr).IP
42
+}
43
+
44
+func newStreamContext(ctx context.Context, logger Logger, clientConn net.Conn) *streamContext {
45
+	connIDBytes := make([]byte, 16)
46
+
47
+	if _, err := rand.Read(connIDBytes); err != nil {
48
+		panic(err)
49
+	}
50
+
51
+	ctx, cancel := context.WithCancel(ctx)
52
+	streamCtx := &streamContext{
53
+		ctx:        ctx,
54
+		ctxCancel:  cancel,
55
+		clientConn: clientConn,
56
+		connID:     base64.RawURLEncoding.EncodeToString(connIDBytes),
57
+	}
58
+	streamCtx.logger = logger.
59
+		BindStr("stream-id", streamCtx.connID).
60
+		BindStr("client-ip", streamCtx.ClientIP().String())
61
+
62
+	return streamCtx
63
+}

Loading…
취소
저장