Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

statsd.go 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package stats
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "github.com/9seconds/mtg/v2/events"
  7. "github.com/9seconds/mtg/v2/logger"
  8. "github.com/9seconds/mtg/v2/mtglib"
  9. statsd "github.com/smira/go-statsd"
  10. )
  11. type statsdProcessor struct {
  12. streams map[string]*streamInfo
  13. client *statsd.Client
  14. }
  15. func (s statsdProcessor) EventStart(evt mtglib.EventStart) {
  16. info := acquireStreamInfo()
  17. if evt.RemoteIP.To4() != nil {
  18. info.tags[TagIPFamily] = TagIPFamilyIPv4
  19. } else {
  20. info.tags[TagIPFamily] = TagIPFamilyIPv6
  21. }
  22. s.streams[evt.StreamID()] = info
  23. s.client.GaugeDelta(MetricClientConnections,
  24. 1,
  25. info.T(TagIPFamily))
  26. }
  27. func (s statsdProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
  28. info, ok := s.streams[evt.StreamID()]
  29. if !ok {
  30. return
  31. }
  32. info.tags[TagTelegramIP] = evt.RemoteIP.String()
  33. info.tags[TagDC] = strconv.Itoa(evt.DC)
  34. s.client.GaugeDelta(MetricTelegramConnections,
  35. 1,
  36. info.T(TagTelegramIP),
  37. info.T(TagDC))
  38. }
  39. func (s statsdProcessor) EventDomainFronting(evt mtglib.EventDomainFronting) {
  40. info, ok := s.streams[evt.StreamID()]
  41. if !ok {
  42. return
  43. }
  44. info.isDomainFronted = true
  45. s.client.Incr(MetricDomainFronting, 1)
  46. s.client.GaugeDelta(MetricDomainFrontingConnections,
  47. 1,
  48. info.T(TagIPFamily))
  49. }
  50. func (s statsdProcessor) EventTraffic(evt mtglib.EventTraffic) {
  51. info, ok := s.streams[evt.StreamID()]
  52. if !ok {
  53. return
  54. }
  55. directionTag := statsd.StringTag(TagDirection, getDirection(evt.IsRead))
  56. if info.isDomainFronted {
  57. s.client.Incr(MetricDomainFrontingTraffic,
  58. int64(evt.Traffic),
  59. directionTag)
  60. } else {
  61. s.client.Incr(MetricTelegramTraffic,
  62. int64(evt.Traffic),
  63. info.T(TagTelegramIP),
  64. info.T(TagDC),
  65. directionTag)
  66. }
  67. }
  68. func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
  69. info, ok := s.streams[evt.StreamID()]
  70. if !ok {
  71. return
  72. }
  73. defer func() {
  74. delete(s.streams, evt.StreamID())
  75. releaseStreamInfo(info)
  76. }()
  77. s.client.GaugeDelta(MetricClientConnections,
  78. -1,
  79. info.T(TagIPFamily))
  80. if info.isDomainFronted {
  81. s.client.GaugeDelta(MetricDomainFrontingConnections,
  82. -1,
  83. info.T(TagIPFamily))
  84. } else if _, ok := info.tags[TagTelegramIP]; ok {
  85. s.client.GaugeDelta(MetricTelegramConnections,
  86. -1,
  87. info.T(TagTelegramIP),
  88. info.T(TagDC))
  89. }
  90. }
  91. func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
  92. s.client.Incr(MetricConcurrencyLimited, 1)
  93. }
  94. func (s statsdProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
  95. tag := TagIPListBlock
  96. if !evt.IsBlockList {
  97. tag = TagIPListAllow
  98. }
  99. s.client.Incr(MetricIPBlocklisted, 1, statsd.StringTag(TagIPList, tag))
  100. }
  101. func (s statsdProcessor) EventReplayAttack(_ mtglib.EventReplayAttack) {
  102. s.client.Incr(MetricReplayAttacks, 1)
  103. }
  104. func (s statsdProcessor) EventIPListSize(evt mtglib.EventIPListSize) {
  105. tag := TagIPListBlock
  106. if !evt.IsBlockList {
  107. tag = TagIPListAllow
  108. }
  109. s.client.Gauge(MetricIPListSize, int64(evt.Size), statsd.StringTag(TagIPList, tag))
  110. }
  111. func (s statsdProcessor) Shutdown() {
  112. events := make([]mtglib.EventFinish, 0, len(s.streams))
  113. for k := range s.streams {
  114. events = append(events, mtglib.NewEventFinish(k))
  115. }
  116. for i := range events {
  117. s.EventFinish(events[i])
  118. }
  119. }
  120. // StatsdFactory is a factory of [events.Observer] which dumps information to
  121. // statsd.
  122. //
  123. // Please beware that we support ONLY UDP endpoints there. And this factory
  124. // won't use [mtglib.Network] so it won't use a proxy if you provide any. If
  125. // you need it, I would recommend starting a local statsd and route metrics
  126. // further by features of the chosen server.
  127. type StatsdFactory struct {
  128. client *statsd.Client
  129. }
  130. // Close stops sending requests to statsd.
  131. func (s StatsdFactory) Close() error {
  132. return s.client.Close() //nolint: wrapcheck
  133. }
  134. // Make build a new observer.
  135. func (s StatsdFactory) Make() events.Observer {
  136. return statsdProcessor{
  137. client: s.client,
  138. streams: make(map[string]*streamInfo),
  139. }
  140. }
  141. // NewStatsd builds an [events.ObserverFactory] that sends events to statsd.
  142. //
  143. // Valid tagFormats are 'datadog', 'influxdb' and 'graphite'.
  144. func NewStatsd(address string, log logger.StdLikeLogger,
  145. metricPrefix, tagFormat string,
  146. ) (StatsdFactory, error) {
  147. options := []statsd.Option{
  148. statsd.MetricPrefix(metricPrefix),
  149. statsd.Logger(log),
  150. }
  151. switch strings.ToLower(tagFormat) {
  152. case "datadog":
  153. options = append(options, statsd.TagStyle(statsd.TagFormatDatadog))
  154. case "influxdb":
  155. options = append(options, statsd.TagStyle(statsd.TagFormatInfluxDB))
  156. case "graphite":
  157. options = append(options, statsd.TagStyle(statsd.TagFormatGraphite))
  158. default:
  159. return StatsdFactory{}, fmt.Errorf("unknown tag format %s", tagFormat)
  160. }
  161. return StatsdFactory{
  162. client: statsd.NewClient(address, options...),
  163. }, nil
  164. }