| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- package stats
-
- import (
- "context"
- "net"
- "net/http"
- "strconv"
- "time"
-
- "github.com/9seconds/mtg/v2/events"
- "github.com/9seconds/mtg/v2/mtglib"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- )
-
- type prometheusProcessor struct {
- streams map[string]*streamInfo
- factory *PrometheusFactory
- }
-
- func (p prometheusProcessor) EventStart(evt mtglib.EventStart) {
- sInfo := &streamInfo{
- createdAt: evt.CreatedAt,
- clientIP: evt.RemoteIP,
- }
- p.streams[evt.StreamID()] = sInfo
-
- p.factory.metricClientConnections.WithLabelValues(sInfo.GetClientIPType()).Inc()
- }
-
- func (p prometheusProcessor) EventConnectedToDC(evt mtglib.EventConnectedToDC) {
- sInfo, ok := p.streams[evt.StreamID()]
- if !ok {
- return
- }
-
- sInfo.remoteIP = evt.RemoteIP
- sInfo.dc = evt.DC
-
- p.factory.metricTelegramConnections.WithLabelValues(
- sInfo.GetRemoteIPType(),
- sInfo.remoteIP.String(),
- strconv.Itoa(sInfo.dc)).Inc()
- }
-
- func (p prometheusProcessor) EventTraffic(evt mtglib.EventTraffic) {
- sInfo, ok := p.streams[evt.StreamID()]
- if !ok {
- return
- }
-
- labels := []string{
- sInfo.GetRemoteIPType(),
- sInfo.remoteIP.String(),
- strconv.Itoa(sInfo.dc),
- }
-
- if evt.IsRead {
- sInfo.bytesRecvFromTelegram += evt.Traffic
-
- labels = append(labels, TagDirectionClient)
- } else {
- sInfo.bytesSentToTelegram += evt.Traffic
-
- labels = append(labels, TagDirectionTelegram)
- }
-
- p.factory.metricTraffic.WithLabelValues(labels...).Add(float64(evt.Traffic))
- }
-
- func (p prometheusProcessor) EventFinish(evt mtglib.EventFinish) {
- sInfo, ok := p.streams[evt.StreamID()]
- if !ok {
- return
- }
-
- defer delete(p.streams, evt.StreamID())
-
- duration := evt.CreatedAt.Sub(sInfo.createdAt)
-
- p.factory.metricClientConnections.WithLabelValues(sInfo.GetRemoteIPType()).Dec()
- p.factory.metricSessionDuration.Observe(float64(duration) / float64(time.Second))
-
- if sInfo.remoteIP == nil {
- return
- }
-
- labels := []string{
- sInfo.GetRemoteIPType(),
- sInfo.remoteIP.String(),
- strconv.Itoa(sInfo.dc),
- }
-
- p.factory.metricTelegramConnections.WithLabelValues(labels...).Dec()
-
- labels = append(labels, TagDirectionClient)
- p.factory.metricSessionTraffic.
- WithLabelValues(labels...).
- Observe(float64(sInfo.bytesRecvFromTelegram))
-
- labels[3] = TagDirectionTelegram
- p.factory.metricSessionTraffic.
- WithLabelValues(labels...).
- Observe(float64(sInfo.bytesSentToTelegram))
- }
-
- func (p prometheusProcessor) EventConcurrencyLimited(_ mtglib.EventConcurrencyLimited) {
- p.factory.metricConcurrencyLimited.Inc()
- }
-
- func (p prometheusProcessor) EventIPBlocklisted(evt mtglib.EventIPBlocklisted) {
- if evt.RemoteIP.To4() == nil {
- p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv6).Inc()
- } else {
- p.factory.metricIPBlocklisted.WithLabelValues(TagIPTypeIPv4).Inc()
- }
- }
-
- func (p prometheusProcessor) Shutdown() {
- p.streams = make(map[string]*streamInfo)
- }
-
- type PrometheusFactory struct {
- httpServer *http.Server
-
- metricClientConnections *prometheus.GaugeVec
- metricTelegramConnections *prometheus.GaugeVec
- metricTraffic *prometheus.CounterVec
- metricIPBlocklisted *prometheus.CounterVec
- metricSessionTraffic *prometheus.HistogramVec
- metricConcurrencyLimited prometheus.Counter
- metricSessionDuration prometheus.Histogram
- }
-
- func (p *PrometheusFactory) Make() events.Observer {
- return prometheusProcessor{
- streams: make(map[string]*streamInfo),
- factory: p,
- }
- }
-
- func (p *PrometheusFactory) Serve(listener net.Listener) error {
- return p.httpServer.Serve(listener)
- }
-
- func (p *PrometheusFactory) Close() error {
- return p.httpServer.Shutdown(context.Background())
- }
-
- func NewPrometheus(metricPrefix, httpPath string) *PrometheusFactory { // nolint: funlen
- registry := prometheus.NewPedanticRegistry()
- httpHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{
- EnableOpenMetrics: true,
- })
- mux := http.NewServeMux()
-
- mux.Handle(httpPath, httpHandler)
-
- factory := &PrometheusFactory{
- httpServer: &http.Server{
- Handler: mux,
- },
-
- metricClientConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
- Namespace: metricPrefix,
- Name: MetricClientConnections,
- Help: "A number of connections under active processing.",
- }, []string{TagIPType}),
- metricTelegramConnections: prometheus.NewGaugeVec(prometheus.GaugeOpts{
- Namespace: metricPrefix,
- Name: MetricTelegramConnections,
- Help: "A number of connections to Telegram servers.",
- }, []string{TagIPType, TagTelegramIP, TagDC}),
- metricSessionDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
- Namespace: metricPrefix,
- Name: MetricSessionDuration,
- Help: "Session duration.",
- Buckets: []float64{ // per 30 seconds
- 30,
- 60,
- 90,
- 120,
- 150,
- 180,
- 210,
- 240,
- 270,
- 300,
- },
- }),
- metricSessionTraffic: prometheus.NewHistogramVec(prometheus.HistogramOpts{
- Namespace: metricPrefix,
- Name: MetricSessionTraffic,
- Help: "A traffic size which flew via proxy within a single session.",
- Buckets: []float64{ // per 1mb
- 1 * 1024 * 1024,
- 2 * 1024 * 1024,
- 3 * 1024 * 1024,
- 4 * 1024 * 1024,
- 5 * 1024 * 1024,
- 6 * 1024 * 1024,
- 7 * 1024 * 1024,
- 8 * 1024 * 1024,
- 9 * 1024 * 1024,
- },
- }, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
- metricTraffic: prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: metricPrefix,
- Name: MetricTraffic,
- Help: "Traffic which is sent through this proxy.",
- }, []string{TagIPType, TagTelegramIP, TagDC, TagDirection}),
- metricConcurrencyLimited: prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: metricPrefix,
- Name: MetricConcurrencyLimited,
- Help: "A number of sessions that were rejected by concurrency limiter.",
- }),
- metricIPBlocklisted: prometheus.NewCounterVec(prometheus.CounterOpts{
- Namespace: metricPrefix,
- Name: MetricIPBlocklisted,
- Help: "A number of rejected sessions due to ip blocklisting",
- }, []string{TagIPType}),
- }
-
- registry.MustRegister(factory.metricClientConnections)
- registry.MustRegister(factory.metricTelegramConnections)
- registry.MustRegister(factory.metricTraffic)
- registry.MustRegister(factory.metricSessionTraffic)
- registry.MustRegister(factory.metricSessionDuration)
- registry.MustRegister(factory.metricConcurrencyLimited)
- registry.MustRegister(factory.metricIPBlocklisted)
-
- return factory
- }
|