Kaynağa Gözat

Add statsd

tags/v2.0.0-rc1
9seconds 5 yıl önce
ebeveyn
işleme
8e7207d975
5 değiştirilmiş dosya ile 256 ekleme ve 0 silme
  1. 1
    0
      go.mod
  2. 2
    0
      go.sum
  3. 12
    0
      stats/init.go
  4. 115
    0
      stats/statsd.go
  5. 126
    0
      stats/statsd_test.go

+ 1
- 0
go.mod Dosyayı Görüntüle

@@ -16,6 +16,7 @@ require (
16 16
 	github.com/panjf2000/ants v1.3.0 // indirect
17 17
 	github.com/pelletier/go-toml v1.8.1
18 18
 	github.com/rs/zerolog v1.20.0 // indirect
19
+	github.com/smira/go-statsd v1.3.2 // indirect
19 20
 	github.com/stretchr/objx v0.3.0 // indirect
20 21
 	github.com/stretchr/testify v1.7.0
21 22
 	github.com/tylertreat/BoomFilters v0.0.0-20200520150052-42a7b4300c0c // indirect

+ 2
- 0
go.sum Dosyayı Görüntüle

@@ -37,6 +37,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
37 37
 github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
38 38
 github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs=
39 39
 github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
40
+github.com/smira/go-statsd v1.3.2 h1:1EeuzxNZ/TD9apbTOFSM9nulqfcsQFmT4u1A2DREabI=
41
+github.com/smira/go-statsd v1.3.2/go.mod h1:1srXJ9/pbnN04G8f4F1jUzsGOnwkPKXciyqpewGlkC4=
40 42
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
41 43
 github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
42 44
 github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=

+ 12
- 0
stats/init.go Dosyayı Görüntüle

@@ -0,0 +1,12 @@
1
+package stats
2
+
3
+const (
4
+	MetricActiveConnection   = "active_connections"
5
+	MetricSessionDuration    = "session_duration"
6
+	MetricConcurrencyLimited = "concurrency_limited"
7
+
8
+	TagIPType = "ip_type"
9
+
10
+	TagIPTypeIPv4 = "ipv4"
11
+	TagIPTypeIPv6 = "ipv4"
12
+)

+ 115
- 0
stats/statsd.go Dosyayı Görüntüle

@@ -0,0 +1,115 @@
1
+package stats
2
+
3
+import (
4
+	"fmt"
5
+	"net"
6
+	"strings"
7
+	"time"
8
+
9
+	"github.com/9seconds/mtg/v2/events"
10
+	"github.com/9seconds/mtg/v2/mtglib"
11
+	statsd "github.com/smira/go-statsd"
12
+)
13
+
14
+type statsdFakeLogger struct{}
15
+
16
+func (s statsdFakeLogger) Printf(msg string, args ...interface{}) {}
17
+
18
+type statsdStreamInfo struct {
19
+	createdAt time.Time
20
+	clientIP  net.IP
21
+}
22
+
23
+func (s *statsdStreamInfo) ClientIPTag() statsd.Tag {
24
+	if s.clientIP.To4() == nil {
25
+		return statsd.StringTag(TagIPType, TagIPTypeIPv6)
26
+	} else {
27
+		return statsd.StringTag(TagIPType, TagIPTypeIPv4)
28
+	}
29
+}
30
+
31
+type statsdProcessor struct {
32
+	streams map[string]*statsdStreamInfo
33
+	client  *statsd.Client
34
+}
35
+
36
+func (s statsdProcessor) EventStart(evt mtglib.EventStart) {
37
+	clientInfo := &statsdStreamInfo{
38
+		createdAt: evt.CreatedAt,
39
+		clientIP:  evt.RemoteIP,
40
+	}
41
+	s.streams[evt.StreamID()] = clientInfo
42
+
43
+	s.client.GaugeDelta(MetricActiveConnection, 1, clientInfo.ClientIPTag())
44
+}
45
+
46
+func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
47
+	clientInfo, ok := s.streams[evt.StreamID()]
48
+	if !ok {
49
+		return
50
+	}
51
+
52
+	defer delete(s.streams, evt.StreamID())
53
+
54
+	duration := evt.CreatedAt.Sub(clientInfo.createdAt)
55
+
56
+	s.client.GaugeDelta(MetricActiveConnection, -1, clientInfo.ClientIPTag())
57
+	s.client.PrecisionTiming(MetricSessionDuration, duration)
58
+}
59
+
60
+func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
61
+	s.client.Incr(MetricConcurrencyLimited, 1)
62
+}
63
+
64
+func (s statsdProcessor) Shutdown() {
65
+	now := time.Now()
66
+	events := make([]mtglib.EventFinish, 0, len(s.streams))
67
+
68
+	for k := range s.streams {
69
+		events = append(events, mtglib.EventFinish{
70
+			CreatedAt: now,
71
+			ConnID:    k,
72
+		})
73
+	}
74
+
75
+	for i := range events {
76
+		s.EventFinish(events[i])
77
+	}
78
+}
79
+
80
+type StatsdFactory struct {
81
+	client *statsd.Client
82
+}
83
+
84
+func (s StatsdFactory) Close() error {
85
+	return s.client.Close()
86
+}
87
+
88
+func (s StatsdFactory) Make() events.Observer {
89
+	return statsdProcessor{
90
+		client:  s.client,
91
+		streams: make(map[string]*statsdStreamInfo),
92
+	}
93
+}
94
+
95
+func NewStatsd(address, metricPrefix, tagFormat string) (StatsdFactory, error) {
96
+	options := []statsd.Option{
97
+		statsd.MetricPrefix(metricPrefix),
98
+		statsd.Logger(statsdFakeLogger{}),
99
+	}
100
+
101
+	switch strings.ToLower(tagFormat) {
102
+	case "datadog":
103
+		options = append(options, statsd.TagStyle(statsd.TagFormatDatadog))
104
+	case "influxdb":
105
+		options = append(options, statsd.TagStyle(statsd.TagFormatInfluxDB))
106
+	case "graphite":
107
+		options = append(options, statsd.TagStyle(statsd.TagFormatGraphite))
108
+	default:
109
+		return StatsdFactory{}, fmt.Errorf("unknown tag format %s", tagFormat)
110
+	}
111
+
112
+	return StatsdFactory{
113
+		client: statsd.NewClient(address, options...),
114
+	}, nil
115
+}

+ 126
- 0
stats/statsd_test.go Dosyayı Görüntüle

@@ -0,0 +1,126 @@
1
+package stats_test
2
+
3
+import (
4
+	"bytes"
5
+	"net"
6
+	"strings"
7
+	"testing"
8
+	"time"
9
+
10
+	"github.com/9seconds/mtg/v2/events"
11
+	"github.com/9seconds/mtg/v2/mtglib"
12
+	"github.com/9seconds/mtg/v2/stats"
13
+	statsd "github.com/smira/go-statsd"
14
+	"github.com/stretchr/testify/suite"
15
+)
16
+
17
+type statsdFakeServer struct {
18
+	conn *net.UDPConn
19
+	buf  *bytes.Buffer
20
+}
21
+
22
+func (s statsdFakeServer) Addr() string {
23
+	return s.conn.LocalAddr().String()
24
+}
25
+
26
+func (s statsdFakeServer) Close() error {
27
+	if s.conn != nil {
28
+		return s.conn.Close()
29
+	}
30
+
31
+	return nil
32
+}
33
+
34
+func (s statsdFakeServer) String() string {
35
+	return strings.TrimSpace(s.buf.String())
36
+}
37
+
38
+func statsdNewFakeServer() statsdFakeServer {
39
+	conn, err := net.ListenUDP("udp", &net.UDPAddr{
40
+		IP:   net.ParseIP("127.0.0.1"),
41
+		Port: 0,
42
+	})
43
+	if err != nil {
44
+		panic(err)
45
+	}
46
+
47
+	buf := &bytes.Buffer{}
48
+
49
+	go func() {
50
+		currentBuffer := make([]byte, 4096)
51
+
52
+		for {
53
+			n, _, err := conn.ReadFromUDP(currentBuffer)
54
+			if n > 0 {
55
+				buf.Write(currentBuffer[:n])
56
+			}
57
+
58
+			if err != nil {
59
+				return
60
+			}
61
+		}
62
+	}()
63
+
64
+	return statsdFakeServer{
65
+		conn: conn,
66
+		buf:  buf,
67
+	}
68
+}
69
+
70
+type StatsdTestSuite struct {
71
+	suite.Suite
72
+
73
+	statsdServer statsdFakeServer
74
+	factory      stats.StatsdFactory
75
+	statsd       events.Observer
76
+}
77
+
78
+func (suite *StatsdTestSuite) SetupTest() {
79
+	suite.statsdServer = statsdNewFakeServer()
80
+
81
+	factory, err := stats.NewStatsd(suite.statsdServer.Addr(), "mtg.", "datadog")
82
+	if err != nil {
83
+		panic(err)
84
+	}
85
+
86
+	suite.factory = factory
87
+	suite.statsd = suite.factory.Make()
88
+}
89
+
90
+func (suite *StatsdTestSuite) TearDownTest() {
91
+	suite.statsd.Shutdown()
92
+	suite.factory.Close()
93
+	suite.statsdServer.Close()
94
+}
95
+
96
+func (suite *StatsdTestSuite) TestEventStartFinish() {
97
+	suite.statsd.EventStart(mtglib.EventStart{
98
+		CreatedAt: time.Now(),
99
+		ConnID:    "connID",
100
+	})
101
+
102
+	time.Sleep(2 * statsd.DefaultFlushInterval)
103
+	suite.Equal("mtg.active_connections:+1|g|#ip_type:ipv4", suite.statsdServer.String())
104
+
105
+	suite.statsd.EventFinish(mtglib.EventFinish{
106
+		CreatedAt: time.Now(),
107
+		ConnID:    "connID",
108
+	})
109
+
110
+	time.Sleep(2 * statsd.DefaultFlushInterval)
111
+	suite.Contains(suite.statsdServer.String(), "mtg.session_duration")
112
+}
113
+
114
+func (suite *StatsdTestSuite) TestEventConcurrencyLimited() {
115
+	suite.statsd.EventConcurrencyLimited(mtglib.EventConcurrencyLimited{
116
+		CreatedAt: time.Now(),
117
+	})
118
+
119
+	time.Sleep(2 * statsd.DefaultFlushInterval)
120
+	suite.Equal("mtg.concurrency_limited:1|c", suite.statsdServer.String())
121
+}
122
+
123
+func TestStatsd(t *testing.T) {
124
+	t.Parallel()
125
+	suite.Run(t, &StatsdTestSuite{})
126
+}

Loading…
İptal
Kaydet