Bladeren bron

Add event stream module

tags/v2.0.0-rc1
9seconds 5 jaren geleden
bovenliggende
commit
b08d945d7c

+ 1
- 0
antireplay/noop.go Bestand weergeven

@@ -5,6 +5,7 @@ import "github.com/9seconds/mtg/v2/mtglib"
5 5
 type noop struct{}
6 6
 
7 7
 func (n noop) SeenBefore(_ []byte) bool { return false }
8
+func (n noop) Shutdown()                {}
8 9
 
9 10
 func NewNoop() mtglib.AntiReplayCache {
10 11
 	return noop{}

+ 2
- 0
antireplay/noop_test.go Bestand weergeven

@@ -18,6 +18,8 @@ func (suite *NoopTestSuite) TestOp() {
18 18
 	suite.False(filter.SeenBefore([]byte{4, 5, 6}))
19 19
 	suite.False(filter.SeenBefore([]byte{1, 2, 3}))
20 20
 	suite.False(filter.SeenBefore([]byte{4, 5, 6}))
21
+
22
+	filter.Shutdown()
21 23
 }
22 24
 
23 25
 func TestNoop(t *testing.T) {

+ 2
- 0
antireplay/stable_bloom_filter.go Bestand weergeven

@@ -20,6 +20,8 @@ func (s *stableBloomFilter) SeenBefore(digest []byte) bool {
20 20
 	return s.filter.TestAndAdd(digest)
21 21
 }
22 22
 
23
+func (s *stableBloomFilter) Shutdown() {}
24
+
23 25
 func NewStableBloomFilter(byteSize uint, errorRate float64) mtglib.AntiReplayCache {
24 26
 	sf := boom.NewDefaultStableBloomFilter(byteSize*8, errorRate) // nolint: gomnd
25 27
 	sf.SetHash(xxhash.New64())

+ 2
- 0
antireplay/stable_bloom_filter_test.go Bestand weergeven

@@ -18,6 +18,8 @@ func (suite *StableBloomFilterTestSuite) TestOp() {
18 18
 	suite.False(filter.SeenBefore([]byte{4, 5, 6}))
19 19
 	suite.True(filter.SeenBefore([]byte{1, 2, 3}))
20 20
 	suite.True(filter.SeenBefore([]byte{4, 5, 6}))
21
+
22
+	filter.Shutdown()
21 23
 }
22 24
 
23 25
 func TestStableBloomFilter(t *testing.T) {

+ 64
- 0
events/event_stream.go Bestand weergeven

@@ -0,0 +1,64 @@
1
+package events
2
+
3
+import (
4
+	"context"
5
+	"runtime"
6
+
7
+	"github.com/9seconds/mtg/v2/mtglib"
8
+	"github.com/OneOfOne/xxhash"
9
+)
10
+
11
+type eventStream struct {
12
+	ctx       context.Context
13
+	ctxCancel context.CancelFunc
14
+	chans     []chan mtglib.Event
15
+}
16
+
17
+func (e eventStream) Send(ctx context.Context, evt mtglib.Event) {
18
+	chanNo := int(xxhash.ChecksumString32(evt.ConnectionID())) % len(e.chans)
19
+
20
+	select {
21
+	case <-ctx.Done():
22
+	case <-e.ctx.Done():
23
+	case e.chans[chanNo] <- evt:
24
+	}
25
+}
26
+
27
+func (e eventStream) Shutdown() {
28
+	e.ctxCancel()
29
+}
30
+
31
+func NewEventStream(observerFactories []ObserverFactory) mtglib.EventStream {
32
+	ctx, cancel := context.WithCancel(context.Background())
33
+	rv := eventStream{
34
+		ctx:       ctx,
35
+		ctxCancel: cancel,
36
+		chans:     make([]chan mtglib.Event, runtime.NumCPU()),
37
+	}
38
+
39
+	for i := 0; i < runtime.NumCPU(); i++ {
40
+		rv.chans[i] = make(chan mtglib.Event, 1)
41
+
42
+		go eventStreamProcessor(ctx, rv.chans[i], newMultiObserver(observerFactories))
43
+	}
44
+
45
+	return rv
46
+}
47
+
48
+func eventStreamProcessor(ctx context.Context, eventChan <-chan mtglib.Event, observer Observer) {
49
+	defer observer.Shutdown()
50
+
51
+	for {
52
+		select {
53
+		case <-ctx.Done():
54
+			return
55
+		case evt := <-eventChan:
56
+			switch typedEvt := evt.(type) {
57
+			case mtglib.EventStart:
58
+				observer.EventStart(typedEvt)
59
+			case mtglib.EventFinish:
60
+				observer.EventFinish(typedEvt)
61
+			}
62
+		}
63
+	}
64
+}

+ 12
- 0
events/init.go Bestand weergeven

@@ -0,0 +1,12 @@
1
+package events
2
+
3
+import "github.com/9seconds/mtg/v2/mtglib"
4
+
5
+type Observer interface {
6
+	EventStart(mtglib.EventStart)
7
+	EventFinish(mtglib.EventFinish)
8
+
9
+	Shutdown()
10
+}
11
+
12
+type ObserverFactory func() Observer

+ 59
- 0
events/multi_observer.go Bestand weergeven

@@ -0,0 +1,59 @@
1
+package events
2
+
3
+import (
4
+	"sync"
5
+
6
+	"github.com/9seconds/mtg/v2/mtglib"
7
+)
8
+
9
+type multiObserver struct {
10
+	observers []Observer
11
+}
12
+
13
+func (m multiObserver) EventStart(evt mtglib.EventStart) {
14
+	wg := &sync.WaitGroup{}
15
+	wg.Add(len(m.observers))
16
+
17
+	for _, v := range m.observers {
18
+		go func(obs Observer) {
19
+			defer wg.Done()
20
+
21
+			obs.EventStart(evt)
22
+		}(v)
23
+	}
24
+
25
+	wg.Wait()
26
+}
27
+
28
+func (m multiObserver) EventFinish(evt mtglib.EventFinish) {
29
+	wg := &sync.WaitGroup{}
30
+	wg.Add(len(m.observers))
31
+
32
+	for _, v := range m.observers {
33
+		go func(obs Observer) {
34
+			defer wg.Done()
35
+
36
+			obs.EventFinish(evt)
37
+		}(v)
38
+	}
39
+
40
+	wg.Wait()
41
+}
42
+
43
+func (m multiObserver) Shutdown() {
44
+	for _, v := range m.observers {
45
+		v.Shutdown()
46
+	}
47
+}
48
+
49
+func newMultiObserver(factories []ObserverFactory) Observer {
50
+	observers := make([]Observer, len(factories))
51
+
52
+	for i, v := range factories {
53
+		observers[i] = v()
54
+	}
55
+
56
+	return multiObserver{
57
+		observers: observers,
58
+	}
59
+}

+ 23
- 10
ipblocklist/firehol.go Bestand weergeven

@@ -29,14 +29,20 @@ const (
29 29
 var fireholRegexpComment = regexp.MustCompile(`\s*#.*?$`)
30 30
 
31 31
 type Firehol struct {
32
-	logger     mtglib.Logger
33
-	rwMutex    sync.RWMutex
32
+	ctx       context.Context
33
+	ctxCancel context.CancelFunc
34
+	logger    mtglib.Logger
35
+
36
+	rwMutex sync.RWMutex
37
+
34 38
 	remoteURLs []string
35 39
 	localFiles []string
40
+
36 41
 	httpClient *http.Client
37 42
 	workerPool *ants.Pool
38
-	treeV4     *bool_tree.TreeV4
39
-	treeV6     *bool_tree.TreeV6
43
+
44
+	treeV4 *bool_tree.TreeV4
45
+	treeV6 *bool_tree.TreeV6
40 46
 }
41 47
 
42 48
 func (f *Firehol) Contains(ip net.IP) bool {
@@ -76,7 +82,7 @@ func (f *Firehol) containsIPv6(addr net.IP) bool {
76 82
 	return false
77 83
 }
78 84
 
79
-func (f *Firehol) Run(ctx context.Context, updateEach time.Duration) {
85
+func (f *Firehol) Run(updateEach time.Duration) {
80 86
 	ticker := time.NewTicker(updateEach)
81 87
 
82 88
 	defer func() {
@@ -88,24 +94,28 @@ func (f *Firehol) Run(ctx context.Context, updateEach time.Duration) {
88 94
 		}
89 95
 	}()
90 96
 
91
-	if err := f.update(ctx); err != nil {
97
+	if err := f.update(); err != nil {
92 98
 		f.logger.WarningError("cannot update blocklist", err)
93 99
 	}
94 100
 
95 101
 	for {
96 102
 		select {
97
-		case <-ctx.Done():
103
+		case <-f.ctx.Done():
98 104
 			return
99 105
 		case <-ticker.C:
100
-			if err := f.update(ctx); err != nil {
106
+			if err := f.update(); err != nil {
101 107
 				f.logger.WarningError("cannot update blocklist", err)
102 108
 			}
103 109
 		}
104 110
 	}
105 111
 }
106 112
 
107
-func (f *Firehol) update(ctx context.Context) error { // nolint: funlen, cyclop
108
-	ctx, cancel := context.WithCancel(ctx)
113
+func (f *Firehol) Shutdown() {
114
+	f.ctxCancel()
115
+}
116
+
117
+func (f *Firehol) update() error { // nolint: funlen, cyclop
118
+	ctx, cancel := context.WithCancel(f.ctx)
109 119
 	defer cancel()
110 120
 
111 121
 	wg := &sync.WaitGroup{}
@@ -314,8 +324,11 @@ func NewFirehol(logger mtglib.Logger, network mtglib.Network,
314 324
 	}
315 325
 
316 326
 	workerPool, _ := ants.NewPool(int(downloadConcurrency))
327
+	ctx, cancel := context.WithCancel(context.Background())
317 328
 
318 329
 	return &Firehol{
330
+		ctx:        ctx,
331
+		ctxCancel:  cancel,
319 332
 		logger:     logger.Named("firehol"),
320 333
 		httpClient: network.MakeHTTPClient(nil),
321 334
 		treeV4:     bool_tree.NewTreeV4(),

+ 16
- 5
ipblocklist/firehol_test.go Bestand weergeven

@@ -1,7 +1,6 @@
1 1
 package ipblocklist_test
2 2
 
3 3
 import (
4
-	"context"
5 4
 	"io"
6 5
 	"net"
7 6
 	"net/http"
@@ -72,12 +71,15 @@ func (suite *FireholTestSuite) TestLocalFail() {
72 71
 
73 72
 	suite.NoError(err)
74 73
 
75
-	go blocklist.Run(context.Background(), time.Hour)
74
+	go blocklist.Run(time.Hour)
76 75
 
77 76
 	time.Sleep(500 * time.Millisecond)
78 77
 
79 78
 	suite.False(blocklist.Contains(net.ParseIP("10.0.0.10")))
80 79
 	suite.False(blocklist.Contains(net.ParseIP("127.0.0.1")))
80
+
81
+	blocklist.Shutdown()
82
+	time.Sleep(500 * time.Millisecond)
81 83
 }
82 84
 
83 85
 func (suite *FireholTestSuite) TestLocalOk() {
@@ -87,12 +89,15 @@ func (suite *FireholTestSuite) TestLocalOk() {
87 89
 
88 90
 	suite.NoError(err)
89 91
 
90
-	go blocklist.Run(context.Background(), time.Hour)
92
+	go blocklist.Run(time.Hour)
91 93
 
92 94
 	time.Sleep(500 * time.Millisecond)
93 95
 
94 96
 	suite.True(blocklist.Contains(net.ParseIP("10.0.0.10")))
95 97
 	suite.False(blocklist.Contains(net.ParseIP("127.0.0.1")))
98
+
99
+	blocklist.Shutdown()
100
+	time.Sleep(500 * time.Millisecond)
96 101
 }
97 102
 
98 103
 func (suite *FireholTestSuite) TestRemoteFail() {
@@ -102,11 +107,14 @@ func (suite *FireholTestSuite) TestRemoteFail() {
102 107
 
103 108
 	suite.NoError(err)
104 109
 
105
-	go blocklist.Run(context.Background(), time.Hour)
110
+	go blocklist.Run(time.Hour)
106 111
 
107 112
 	time.Sleep(500 * time.Millisecond)
108 113
 
109 114
 	suite.False(blocklist.Contains(net.ParseIP("10.2.2.2")))
115
+
116
+	blocklist.Shutdown()
117
+	time.Sleep(500 * time.Millisecond)
110 118
 }
111 119
 
112 120
 func (suite *FireholTestSuite) TestMixed() {
@@ -123,12 +131,15 @@ func (suite *FireholTestSuite) TestMixed() {
123 131
 
124 132
 	suite.NoError(err)
125 133
 
126
-	go blocklist.Run(context.Background(), time.Hour)
134
+	go blocklist.Run(time.Hour)
127 135
 
128 136
 	time.Sleep(500 * time.Millisecond)
129 137
 
130 138
 	suite.True(blocklist.Contains(net.ParseIP("10.2.2.2")))
131 139
 	suite.True(blocklist.Contains(net.ParseIP("10.1.0.100")))
140
+
141
+	blocklist.Shutdown()
142
+	time.Sleep(500 * time.Millisecond)
132 143
 }
133 144
 
134 145
 func TestFirehol(t *testing.T) {

+ 1
- 0
ipblocklist/noop.go Bestand weergeven

@@ -9,6 +9,7 @@ import (
9 9
 type noop struct{}
10 10
 
11 11
 func (n noop) Contains(ip net.IP) bool { return false }
12
+func (n noop) Shutdown()               {}
12 13
 
13 14
 func NewNoop() mtglib.IPBlocklist {
14 15
 	return noop{}

+ 25
- 0
mtglib/events.go Bestand weergeven

@@ -0,0 +1,25 @@
1
+package mtglib
2
+
3
+import (
4
+	"net"
5
+	"time"
6
+)
7
+
8
+type eventBase struct {
9
+	CreatedAt time.Time
10
+	ConnID    string
11
+}
12
+
13
+func (e eventBase) ConnectionID() string {
14
+	return e.ConnID
15
+}
16
+
17
+type EventStart struct {
18
+	eventBase
19
+
20
+	RemoteIP net.IP
21
+}
22
+
23
+type EventFinish struct {
24
+	eventBase
25
+}

+ 11
- 0
mtglib/init.go Bestand weergeven

@@ -19,10 +19,21 @@ type Network interface {
19 19
 
20 20
 type AntiReplayCache interface {
21 21
 	SeenBefore(data []byte) bool
22
+	Shutdown()
22 23
 }
23 24
 
24 25
 type IPBlocklist interface {
25 26
 	Contains(net.IP) bool
27
+	Shutdown()
28
+}
29
+
30
+type Event interface {
31
+	ConnectionID() string
32
+}
33
+
34
+type EventStream interface {
35
+	Send(context.Context, Event)
36
+	Shutdown()
26 37
 }
27 38
 
28 39
 type Logger interface {

+ 7
- 0
testlib/events_observer_mock.go Bestand weergeven

@@ -0,0 +1,7 @@
1
+package testlib
2
+
3
+import "github.com/stretchr/testify/mock"
4
+
5
+type EventsObserverMock struct {
6
+	mock.Mock
7
+}

Laden…
Annuleren
Opslaan