Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package stats
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/9seconds/mtg/v2/events"
  7. "github.com/9seconds/mtg/v2/logger"
  8. "github.com/9seconds/mtg/v2/mtglib"
  9. statsd "github.com/smira/go-statsd"
  10. )
  11. type statsdProcessor struct {
  12. streams map[string]*streamInfo
  13. client *statsd.Client
  14. }
  15. func (s statsdProcessor) EventStart(evt mtglib.EventStart) {
  16. sInfo := &streamInfo{
  17. createdAt: evt.CreatedAt,
  18. clientIP: evt.RemoteIP,
  19. }
  20. s.streams[evt.StreamID()] = sInfo
  21. ipTypeTag := statsd.StringTag(TagIPType, sInfo.IPType())
  22. s.client.GaugeDelta(MetricActiveConnection, 1, ipTypeTag)
  23. }
  24. func (s statsdProcessor) EventFinish(evt mtglib.EventFinish) {
  25. sInfo, ok := s.streams[evt.StreamID()]
  26. if !ok {
  27. return
  28. }
  29. defer delete(s.streams, evt.StreamID())
  30. duration := evt.CreatedAt.Sub(sInfo.createdAt)
  31. ipTypeTag := statsd.StringTag(TagIPType, sInfo.IPType())
  32. s.client.GaugeDelta(MetricActiveConnection, -1, ipTypeTag)
  33. s.client.PrecisionTiming(MetricSessionDuration, duration)
  34. }
  35. func (s statsdProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
  36. s.client.Incr(MetricConcurrencyLimited, 1)
  37. }
  38. func (s statsdProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
  39. var tag statsd.Tag
  40. if evt.RemoteIP.To4() == nil {
  41. tag = statsd.StringTag(TagIPType, TagIPTypeIPv6)
  42. } else {
  43. tag = statsd.StringTag(TagIPType, TagIPTypeIPv4)
  44. }
  45. s.client.Incr(MetricIPBlocklisted, 1, tag)
  46. }
  47. func (s statsdProcessor) Shutdown() {
  48. now := time.Now()
  49. events := make([]mtglib.EventFinish, 0, len(s.streams))
  50. for k := range s.streams {
  51. events = append(events, mtglib.EventFinish{
  52. CreatedAt: now,
  53. ConnID: k,
  54. })
  55. }
  56. for i := range events {
  57. s.EventFinish(events[i])
  58. }
  59. }
  60. type StatsdFactory struct {
  61. client *statsd.Client
  62. }
  63. func (s StatsdFactory) Close() error {
  64. return s.client.Close()
  65. }
  66. func (s StatsdFactory) Make() events.Observer {
  67. return statsdProcessor{
  68. client: s.client,
  69. streams: make(map[string]*streamInfo),
  70. }
  71. }
  72. func NewStatsd(address string, log logger.StdLikeLogger,
  73. metricPrefix, tagFormat string) (StatsdFactory, error) {
  74. options := []statsd.Option{
  75. statsd.MetricPrefix(metricPrefix),
  76. statsd.Logger(log),
  77. }
  78. switch strings.ToLower(tagFormat) {
  79. case "datadog":
  80. options = append(options, statsd.TagStyle(statsd.TagFormatDatadog))
  81. case "influxdb":
  82. options = append(options, statsd.TagStyle(statsd.TagFormatInfluxDB))
  83. case "graphite":
  84. options = append(options, statsd.TagStyle(statsd.TagFormatGraphite))
  85. default:
  86. return StatsdFactory{}, fmt.Errorf("unknown tag format %s", tagFormat)
  87. }
  88. return StatsdFactory{
  89. client: statsd.NewClient(address, options...),
  90. }, nil
  91. }