Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

prometheus.go 5.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package stats
  2. import (
  3. "context"
  4. "net"
  5. "net/http"
  6. "github.com/9seconds/mtg/v2/events"
  7. "github.com/9seconds/mtg/v2/mtglib"
  8. "github.com/prometheus/client_golang/prometheus"
  9. "github.com/prometheus/client_golang/prometheus/promhttp"
  10. )
  11. type prometheusProcessor struct {
  12. streams map[string]*streamInfo
  13. factory *PrometheusFactory
  14. }
  15. func (p prometheusProcessor) EventStart(evt mtglib.EventStart) {
  16. info := acquireStreamInfo()
  17. info.SetStartTime(evt.CreatedAt)
  18. info.SetClientIP(evt.RemoteIP)
  19. p.streams[evt.StreamID()] = info
  20. p.factory.metricClientConnections.
  21. WithLabelValues(info.V(TagIPFamily)).
  22. Inc()
  23. }
  24. func (p prometheusProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
  25. info, ok := p.streams[evt.StreamID()]
  26. if !ok {
  27. return
  28. }
  29. info.SetTelegramIP(evt.RemoteIP)
  30. info.SetDC(evt.DC)
  31. p.factory.metricTelegramConnections.
  32. WithLabelValues(info.V(TagTelegramIP), info.V(TagDC)).
  33. Inc()
  34. }
  35. func (p prometheusProcessor) EventTraffic(evt mtglib.EventTraffic) {
  36. info, ok := p.streams[evt.StreamID()]
  37. if !ok {
  38. return
  39. }
  40. p.factory.metricTelegramTraffic.
  41. WithLabelValues(info.V(TagTelegramIP), info.V(TagDC), getDirection(evt.IsRead)).
  42. Add(float64(evt.Traffic))
  43. }
  44. func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
  45. info, ok := p.streams[evt.StreamID()]
  46. if !ok {
  47. return
  48. }
  49. defer func() {
  50. delete(p.streams, evt.StreamID())
  51. releaseStreamInfo(info)
  52. }()
  53. p.factory.metricClientConnections.
  54. WithLabelValues(info.V(TagIPFamily)).
  55. Dec()
  56. if info.V(TagTelegramIP) != "" {
  57. p.factory.metricTelegramConnections.
  58. WithLabelValues(info.V(TagTelegramIP), info.V(TagDC)).
  59. Dec()
  60. }
  61. }
  62. func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
  63. p.factory.metricConcurrencyLimited.Inc()
  64. }
  65. func (p prometheusProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
  66. p.factory.metricIPBlocklisted.Inc()
  67. }
  68. func (p prometheusProcessor) Shutdown() {
  69. p.streams = make(map[string]*streamInfo)
  70. }
  71. type PrometheusFactory struct {
  72. httpServer *http.Server
  73. metricClientConnections *prometheus.GaugeVec
  74. metricTelegramConnections *prometheus.GaugeVec
  75. metricDomainDisguisingConnections *prometheus.GaugeVec
  76. metricTelegramTraffic *prometheus.CounterVec
  77. metricDomainDisguisingTraffic *prometheus.CounterVec
  78. metricDomainDisguising prometheus.Counter
  79. metricConcurrencyLimited prometheus.Counter
  80. metricIPBlocklisted prometheus.Counter
  81. metricReplayAttacks prometheus.Counter
  82. }
  83. func (p *PrometheusFactory) Make() events.Observer {
  84. return prometheusProcessor{
  85. streams: make(map[string]*streamInfo),
  86. factory: p,
  87. }
  88. }
  89. func (p *PrometheusFactory) Serve(listener net.Listener) error {
  90. return p.httpServer.Serve(listener)
  91. }
  92. func (p *PrometheusFactory) Close() error {
  93. return p.httpServer.Shutdown(context.Background())
  94. }
  95. func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory { // nolint: funlen
  96. registry := prometheus.NewPedanticRegistry()
  97. httpHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
  98. EnableOpenMetrics: true,
  99. })
  100. mux := http.NewServeMux()
  101. mux.Handle(httpPath, httpHandler)
  102. factory := &PrometheusFactory{
  103. httpServer: &http.Server{
  104. Handler: mux,
  105. },
  106. metricClientConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
  107. Namespace: metricPrefix,
  108. Name: MetricClientConnections,
  109. Help: "A number of actively processing client connections.",
  110. }, []string{TagIPFamily}),
  111. metricTelegramConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
  112. Namespace: metricPrefix,
  113. Name: MetricTelegramConnections,
  114. Help: "A number of connections to Telegram servers.",
  115. }, []string{TagTelegramIP, TagDC}),
  116. metricDomainDisguisingConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
  117. Namespace: metricPrefix,
  118. Name: MetricDomainDisguisingConnections,
  119. Help: "A number of connections which talk with disguising domain.",
  120. }, []string{TagIPFamily}),
  121. metricTelegramTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
  122. Namespace: metricPrefix,
  123. Name: MetricTelegramTraffic,
  124. Help: "Traffic which is generated talking with Telegram servers.",
  125. }, []string{TagTelegramIP, TagDC, TagDirection}),
  126. metricDomainDisguisingTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
  127. Namespace: metricPrefix,
  128. Name: MetricDomainDisguisingTraffic,
  129. Help: "Traffic which is generated talking with disguising domain.",
  130. }, []string{TagDirection}),
  131. metricDomainDisguising: prometheus.NewCounter(prometheus.CounterOpts{
  132. Namespace: metricPrefix,
  133. Name: MetricDomainDisguising,
  134. Help: "A number of routings to disguising domain.",
  135. }),
  136. metricConcurrencyLimited: prometheus.NewCounter(prometheus.CounterOpts{
  137. Namespace: metricPrefix,
  138. Name: MetricConcurrencyLimited,
  139. Help: "A number of sessions that were rejected by concurrency limiter.",
  140. }),
  141. metricIPBlocklisted: prometheus.NewCounter(prometheus.CounterOpts{
  142. Namespace: metricPrefix,
  143. Name: MetricIPBlocklisted,
  144. Help: "A number of rejected sessions due to ip blocklisting.",
  145. }),
  146. metricReplayAttacks: prometheus.NewCounter(prometheus.CounterOpts{
  147. Namespace: metricPrefix,
  148. Name: MetricReplayAttacks,
  149. Help: "A number of detected replay attacks.",
  150. }),
  151. }
  152. registry.MustRegister(factory.metricClientConnections)
  153. registry.MustRegister(factory.metricTelegramConnections)
  154. registry.MustRegister(factory.metricDomainDisguisingConnections)
  155. registry.MustRegister(factory.metricTelegramTraffic)
  156. registry.MustRegister(factory.metricDomainDisguisingTraffic)
  157. registry.MustRegister(factory.metricDomainDisguising)
  158. registry.MustRegister(factory.metricConcurrencyLimited)
  159. registry.MustRegister(factory.metricIPBlocklisted)
  160. registry.MustRegister(factory.metricReplayAttacks)
  161. return factory
  162. }