Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package stats
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "github.com/9seconds/mtg/v2/events"
  8. "github.com/9seconds/mtg/v2/logger"
  9. "github.com/9seconds/mtg/v2/mtglib"
  10. statsd "github.com/smira/go-statsd"
  11. )
  12. type statsdProcessor struct {
  13. streams map[string]*streamInfo
  14. client *statsd.Client
  15. }
  16. func (s statsdProcessor) EventStart(evt mtglib.EventStart) {
  17. info := acquireStreamInfo()
  18. if evt.RemoteIP.To4() != nil {
  19. info.tags[TagIPFamily] = TagIPFamilyIPv4
  20. } else {
  21. info.tags[TagIPFamily] = TagIPFamilyIPv6
  22. }
  23. s.streams[evt.StreamID()] = info
  24. s.client.GaugeDelta(MetricClientConnections,
  25. 1,
  26. info.T(TagIPFamily))
  27. }
  28. func (s statsdProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
  29. info, ok := s.streams[evt.StreamID()]
  30. if !ok {
  31. return
  32. }
  33. info.tags[TagTelegramIP] = evt.RemoteIP.String()
  34. info.tags[TagDC] = strconv.Itoa(evt.DC)
  35. s.client.GaugeDelta(MetricTelegramConnections,
  36. 1,
  37. info.T(TagTelegramIP),
  38. info.T(TagDC))
  39. }
  40. func (s statsdProcessor) EventDomainFronting(evt mtglib.EventDomainFronting) {
  41. info, ok := s.streams[evt.StreamID()]
  42. if !ok {
  43. return
  44. }
  45. info.isDomainFronted = true
  46. s.client.Incr(MetricDomainFronting, 1)
  47. s.client.GaugeDelta(MetricDomainFrontingConnections,
  48. 1,
  49. info.T(TagIPFamily))
  50. }
  51. func (s statsdProcessor) EventTraffic(evt mtglib.EventTraffic) {
  52. info, ok := s.streams[evt.StreamID()]
  53. if !ok {
  54. return
  55. }
  56. directionTag := statsd.StringTag(TagDirection, getDirection(evt.IsRead))
  57. if info.isDomainFronted {
  58. s.client.Incr(MetricDomainFrontingTraffic,
  59. int64(evt.Traffic),
  60. directionTag)
  61. } else {
  62. s.client.Incr(MetricTelegramTraffic,
  63. int64(evt.Traffic),
  64. info.T(TagTelegramIP),
  65. info.T(TagDC),
  66. directionTag)
  67. }
  68. }
  69. func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
  70. info, ok := s.streams[evt.StreamID()]
  71. if !ok {
  72. return
  73. }
  74. defer func() {
  75. delete(s.streams, evt.StreamID())
  76. releaseStreamInfo(info)
  77. }()
  78. s.client.GaugeDelta(MetricClientConnections,
  79. -1,
  80. info.T(TagIPFamily))
  81. if info.isDomainFronted {
  82. s.client.GaugeDelta(MetricDomainFrontingConnections,
  83. -1,
  84. info.T(TagIPFamily))
  85. } else if _, ok := info.tags[TagTelegramIP]; ok {
  86. s.client.GaugeDelta(MetricTelegramConnections,
  87. -1,
  88. info.T(TagTelegramIP),
  89. info.T(TagDC))
  90. }
  91. }
  92. func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
  93. s.client.Incr(MetricConcurrencyLimited, 1)
  94. }
  95. func (s statsdProcessor) EventIPBlocklisted(_ mtglib.EventIPBlocklisted) {
  96. s.client.Incr(MetricIPBlocklisted, 1)
  97. }
  98. func (s statsdProcessor) EventReplayAttack(_ mtglib.EventReplayAttack) {
  99. s.client.Incr(MetricReplayAttacks, 1)
  100. }
  101. func (s statsdProcessor) Shutdown() {
  102. now := time.Now()
  103. events := make([]mtglib.EventFinish, 0, len(s.streams))
  104. for k := range s.streams {
  105. events = append(events, mtglib.EventFinish{
  106. CreatedAt: now,
  107. ConnID: k,
  108. })
  109. }
  110. for i := range events {
  111. s.EventFinish(events[i])
  112. }
  113. }
  114. // StatsdFactory is a factory of events.Observers which dumps
  115. // information to statsd.
  116. //
  117. // Please beware that we support ONLY UDP endpoints there. And this
  118. // factory won't use mtglib.Network so it won't use a proxy if you
  119. // provide any. If you need it, I would recommend starting a local
  120. // statsd and route metrics further by features of the chosen server.
  121. type StatsdFactory struct {
  122. client *statsd.Client
  123. }
  124. // Close stops sending requests to statsd.
  125. func (s StatsdFactory) Close() error {
  126. return s.client.Close()
  127. }
  128. // Make build a new observer.
  129. func (s StatsdFactory) Make() events.Observer {
  130. return statsdProcessor{
  131. client: s.client,
  132. streams: make(map[string]*streamInfo),
  133. }
  134. }
  135. // NewStatsd builds an events.ObserverFactory that sends events
  136. // to statsd.
  137. //
  138. // Valid tagFormats are 'datadog', 'influxdb' and 'graphite'.
  139. func NewStatsd(address string, log logger.StdLikeLogger,
  140. metricPrefix, tagFormat string) (StatsdFactory, error) {
  141. options := []statsd.Option{
  142. statsd.MetricPrefix(metricPrefix),
  143. statsd.Logger(log),
  144. }
  145. switch strings.ToLower(tagFormat) {
  146. case "datadog":
  147. options = append(options, statsd.TagStyle(statsd.TagFormatDatadog))
  148. case "influxdb":
  149. options = append(options, statsd.TagStyle(statsd.TagFormatInfluxDB))
  150. case "graphite":
  151. options = append(options, statsd.TagStyle(statsd.TagFormatGraphite))
  152. default:
  153. return StatsdFactory{}, fmt.Errorf("unknown tag format %s", tagFormat)
  154. }
  155. return StatsdFactory{
  156. client: statsd.NewClient(address, options...),
  157. }, nil
  158. }