Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

prometheus.go 6.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package stats
  2. import (
  3. "context"
  4. "net"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. "github.com/9seconds/mtg/v2/events"
  9. "github.com/9seconds/mtg/v2/mtglib"
  10. "github.com/prometheus/client_golang/prometheus"
  11. "github.com/prometheus/client_golang/prometheus/promhttp"
  12. )
  13. type prometheusProcessor struct {
  14. streams map[string]*streamInfo
  15. factory *PrometheusFactory
  16. }
  17. func (p prometheusProcessor) EventStart(evt mtglib.EventStart) {
  18. sInfo := &streamInfo{
  19. createdAt: evt.CreatedAt,
  20. clientIP: evt.RemoteIP,
  21. }
  22. p.streams[evt.StreamID()] = sInfo
  23. p.factory.metricClientConnections.WithLabelValues(sInfo.GetClientIPType()).Inc()
  24. }
  25. func (p prometheusProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
  26. sInfo, ok := p.streams[evt.StreamID()]
  27. if !ok {
  28. return
  29. }
  30. sInfo.remoteIP = evt.RemoteIP
  31. sInfo.dc = evt.DC
  32. p.factory.metricTelegramConnections.WithLabelValues(
  33. sInfo.GetRemoteIPType(),
  34. sInfo.remoteIP.String(),
  35. strconv.Itoa(sInfo.dc)).Inc()
  36. }
  37. func (p prometheusProcessor) EventTraffic(evt mtglib.EventTraffic) {
  38. sInfo, ok := p.streams[evt.StreamID()]
  39. if !ok {
  40. return
  41. }
  42. labels := []string{
  43. sInfo.GetRemoteIPType(),
  44. sInfo.remoteIP.String(),
  45. strconv.Itoa(sInfo.dc),
  46. }
  47. if evt.IsRead {
  48. sInfo.bytesRecvFromTelegram += evt.Traffic
  49. labels = append(labels, TagDirectionClient)
  50. } else {
  51. sInfo.bytesSentToTelegram += evt.Traffic
  52. labels = append(labels, TagDirectionTelegram)
  53. }
  54. p.factory.metricTraffic.WithLabelValues(labels...).Add(float64(evt.Traffic))
  55. }
  56. func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
  57. sInfo, ok := p.streams[evt.StreamID()]
  58. if !ok {
  59. return
  60. }
  61. defer delete(p.streams, evt.StreamID())
  62. duration := evt.CreatedAt.Sub(sInfo.createdAt)
  63. p.factory.metricClientConnections.WithLabelValues(sInfo.GetClientIPType()).Dec()
  64. p.factory.metricSessionDuration.Observe(float64(duration) / float64(time.Second))
  65. if sInfo.remoteIP == nil {
  66. return
  67. }
  68. labels := []string{
  69. sInfo.GetRemoteIPType(),
  70. sInfo.remoteIP.String(),
  71. strconv.Itoa(sInfo.dc),
  72. }
  73. p.factory.metricTelegramConnections.WithLabelValues(labels...).Dec()
  74. labels = append(labels, TagDirectionClient)
  75. p.factory.metricSessionTraffic.
  76. WithLabelValues(labels...).
  77. Observe(float64(sInfo.bytesRecvFromTelegram))
  78. labels[3] = TagDirectionTelegram
  79. p.factory.metricSessionTraffic.
  80. WithLabelValues(labels...).
  81. Observe(float64(sInfo.bytesSentToTelegram))
  82. }
  83. func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
  84. p.factory.metricConcurrencyLimited.Inc()
  85. }
  86. func (p prometheusProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
  87. if evt.RemoteIP.To4() == nil {
  88. p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv6).Inc()
  89. } else {
  90. p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv4).Inc()
  91. }
  92. }
  93. func (p prometheusProcessor) Shutdown() {
  94. p.streams = make(map[string]*streamInfo)
  95. }
  96. type PrometheusFactory struct {
  97. httpServer *http.Server
  98. metricClientConnections *prometheus.GaugeVec
  99. metricTelegramConnections *prometheus.GaugeVec
  100. metricTraffic *prometheus.CounterVec
  101. metricIPBlocklisted *prometheus.CounterVec
  102. metricSessionTraffic *prometheus.HistogramVec
  103. metricConcurrencyLimited prometheus.Counter
  104. metricSessionDuration prometheus.Histogram
  105. }
  106. func (p *PrometheusFactory) Make() events.Observer {
  107. return prometheusProcessor{
  108. streams: make(map[string]*streamInfo),
  109. factory: p,
  110. }
  111. }
  112. func (p *PrometheusFactory) Serve(listener net.Listener) error {
  113. return p.httpServer.Serve(listener)
  114. }
  115. func (p *PrometheusFactory) Close() error {
  116. return p.httpServer.Shutdown(context.Background())
  117. }
  118. func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory { // nolint: funlen
  119. registry := prometheus.NewPedanticRegistry()
  120. httpHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
  121. EnableOpenMetrics: true,
  122. })
  123. mux := http.NewServeMux()
  124. mux.Handle(httpPath, httpHandler)
  125. factory := &PrometheusFactory{
  126. httpServer: &http.Server{
  127. Handler: mux,
  128. },
  129. metricClientConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
  130. Namespace: metricPrefix,
  131. Name: MetricClientConnections,
  132. Help: "A number of connections under active processing.",
  133. }, []string{TagIPType}),
  134. metricTelegramConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
  135. Namespace: metricPrefix,
  136. Name: MetricTelegramConnections,
  137. Help: "A number of connections to Telegram servers.",
  138. }, []string{TagIPType, TagTelegramIP, TagDC}),
  139. metricSessionDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
  140. Namespace: metricPrefix,
  141. Name: MetricSessionDuration,
  142. Help: "Session duration.",
  143. Buckets: []float64{ // per 30 seconds
  144. 30,
  145. 60,
  146. 90,
  147. 120,
  148. 150,
  149. 180,
  150. 210,
  151. 240,
  152. 270,
  153. 300,
  154. },
  155. }),
  156. metricSessionTraffic: prometheus.NewHistogramVec(prometheus.HistogramOpts{
  157. Namespace: metricPrefix,
  158. Name: MetricSessionTraffic,
  159. Help: "A traffic size which flew via proxy within a single session.",
  160. Buckets: []float64{ // per 1mb
  161. 1 * 1024 * 1024,
  162. 2 * 1024 * 1024,
  163. 3 * 1024 * 1024,
  164. 4 * 1024 * 1024,
  165. 5 * 1024 * 1024,
  166. 6 * 1024 * 1024,
  167. 7 * 1024 * 1024,
  168. 8 * 1024 * 1024,
  169. 9 * 1024 * 1024,
  170. },
  171. }, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
  172. metricTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
  173. Namespace: metricPrefix,
  174. Name: MetricTraffic,
  175. Help: "Traffic which is sent through this proxy.",
  176. }, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
  177. metricConcurrencyLimited: prometheus.NewCounter(prometheus.CounterOpts{
  178. Namespace: metricPrefix,
  179. Name: MetricConcurrencyLimited,
  180. Help: "A number of sessions that were rejected by concurrency limiter.",
  181. }),
  182. metricIPBlocklisted: prometheus.NewCounterVec(prometheus.CounterOpts{
  183. Namespace: metricPrefix,
  184. Name: MetricIPBlocklisted,
  185. Help: "A number of rejected sessions due to ip blocklisting",
  186. }, []string{TagIPType}),
  187. }
  188. registry.MustRegister(factory.metricClientConnections)
  189. registry.MustRegister(factory.metricTelegramConnections)
  190. registry.MustRegister(factory.metricTraffic)
  191. registry.MustRegister(factory.metricSessionTraffic)
  192. registry.MustRegister(factory.metricSessionDuration)
  193. registry.MustRegister(factory.metricConcurrencyLimited)
  194. registry.MustRegister(factory.metricIPBlocklisted)
  195. return factory
  196. }