Highly-opinionated (ex-bullshit-free) MTPROTO proxy for Telegram. If you use v1.0 or upgrade broke you proxy, please read the chapter Version 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

stress_bench_test.go 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package relay
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "runtime"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/dolonet/mtg-multi/mtglib/internal/tls"
  12. )
  13. // ============================================================
  14. // Stress test: N concurrent connections, each transferring dataSize bytes.
  15. // Measures total wall-clock time, aggregate throughput, peak memory, GC pauses.
  16. // This is the closest simulation to real proxy load.
  17. // ============================================================
  18. type stressResult struct {
  19. totalBytes int64
  20. wallTime time.Duration
  21. gcPauseTotal time.Duration
  22. numGC uint32
  23. peakStackMB float64
  24. peakHeapMB float64
  25. peakTotalMB float64
  26. throughputMBs float64
  27. }
  28. func runStressTest(b *testing.B, numConns int, dataPerConn int, getBuf func() []byte, putBuf func([]byte)) stressResult {
  29. b.Helper()
  30. // Force GC before measuring
  31. runtime.GC()
  32. runtime.GC()
  33. var memBefore runtime.MemStats
  34. runtime.ReadMemStats(&memBefore)
  35. var totalTransferred atomic.Int64
  36. var wg sync.WaitGroup
  37. start := time.Now()
  38. // Launch all connections concurrently
  39. for i := 0; i < numConns; i++ {
  40. wg.Add(1)
  41. go func() {
  42. defer wg.Done()
  43. serverConn, clientConn := net.Pipe()
  44. // Writer goroutine: send data
  45. go func() {
  46. data := make([]byte, 32*1024) // write in 32KB chunks
  47. written := 0
  48. for written < dataPerConn {
  49. toWrite := len(data)
  50. if dataPerConn-written < toWrite {
  51. toWrite = dataPerConn - written
  52. }
  53. n, err := serverConn.Write(data[:toWrite])
  54. written += n
  55. if err != nil {
  56. break
  57. }
  58. }
  59. serverConn.Close()
  60. }()
  61. // Reader goroutine (the relay pump simulation)
  62. buf := getBuf()
  63. n, _ := io.CopyBuffer(io.Discard, clientConn, buf)
  64. putBuf(buf)
  65. totalTransferred.Add(n)
  66. clientConn.Close()
  67. }()
  68. }
  69. wg.Wait()
  70. elapsed := time.Since(start)
  71. var memAfter runtime.MemStats
  72. runtime.ReadMemStats(&memAfter)
  73. gcPause := time.Duration(memAfter.PauseTotalNs-memBefore.PauseTotalNs) * time.Nanosecond
  74. numGC := memAfter.NumGC - memBefore.NumGC
  75. total := totalTransferred.Load()
  76. throughput := float64(total) / elapsed.Seconds() / (1024 * 1024)
  77. return stressResult{
  78. totalBytes: total,
  79. wallTime: elapsed,
  80. gcPauseTotal: gcPause,
  81. numGC: numGC,
  82. peakStackMB: float64(memAfter.StackInuse) / (1024 * 1024),
  83. peakHeapMB: float64(memAfter.HeapInuse) / (1024 * 1024),
  84. peakTotalMB: float64(memAfter.StackInuse+memAfter.HeapInuse) / (1024 * 1024),
  85. throughputMBs: throughput,
  86. }
  87. }
  88. func reportStress(b *testing.B, r stressResult) {
  89. b.ReportMetric(r.throughputMBs, "MB/s")
  90. b.ReportMetric(r.peakStackMB, "peak_stack_MB")
  91. b.ReportMetric(r.peakHeapMB, "peak_heap_MB")
  92. b.ReportMetric(r.peakTotalMB, "peak_total_MB")
  93. b.ReportMetric(float64(r.gcPauseTotal.Microseconds()), "gc_pause_us")
  94. b.ReportMetric(float64(r.numGC), "gc_cycles")
  95. }
  96. // BenchmarkStress_ConcurrentRelays runs N concurrent relay pumps with different
  97. // buffer strategies and measures aggregate throughput + memory + GC.
  98. func BenchmarkStress_ConcurrentRelays(b *testing.B) {
  99. type bufStrategy struct {
  100. name string
  101. getBuf func() []byte
  102. putBuf func([]byte)
  103. }
  104. pool16 := &sync.Pool{New: func() any { buf := make([]byte, tls.MaxRecordPayloadSize); return &buf }}
  105. pool4 := &sync.Pool{New: func() any { buf := make([]byte, 4096); return &buf }}
  106. strategies := []bufStrategy{
  107. {
  108. name: "stack_16KB",
  109. getBuf: func() []byte { buf := make([]byte, tls.MaxRecordPayloadSize); return buf },
  110. putBuf: func([]byte) {},
  111. },
  112. {
  113. name: "pool_16KB",
  114. getBuf: func() []byte { return *pool16.Get().(*[]byte) },
  115. putBuf: func(b []byte) { pool16.Put(&b) },
  116. },
  117. {
  118. name: "pool_4KB",
  119. getBuf: func() []byte { return *pool4.Get().(*[]byte) },
  120. putBuf: func(b []byte) { pool4.Put(&b) },
  121. },
  122. }
  123. // Test scenarios
  124. type scenario struct {
  125. conns int
  126. dataPerConn int
  127. label string
  128. }
  129. scenarios := []scenario{
  130. {100, 10 * 1024 * 1024, "100conn_10MB"}, // 100 connections × 10 MB = 1 GB total
  131. {500, 10 * 1024 * 1024, "500conn_10MB"}, // 500 × 10 MB = 5 GB total
  132. {1000, 10 * 1024 * 1024, "1000conn_10MB"}, // 1000 × 10 MB = 10 GB total
  133. {2000, 1 * 1024 * 1024, "2000conn_1MB"}, // 2000 × 1 MB = 2 GB (many short conns)
  134. {500, 50 * 1024 * 1024, "500conn_50MB"}, // 500 × 50 MB = 25 GB (big files)
  135. }
  136. for _, sc := range scenarios {
  137. for _, strat := range strategies {
  138. name := fmt.Sprintf("%s/%s", sc.label, strat.name)
  139. getBuf := strat.getBuf
  140. putBuf := strat.putBuf
  141. sc := sc
  142. b.Run(name, func(b *testing.B) {
  143. for i := 0; i < b.N; i++ {
  144. r := runStressTest(b, sc.conns, sc.dataPerConn, getBuf, putBuf)
  145. reportStress(b, r)
  146. }
  147. })
  148. }
  149. }
  150. }
  151. // BenchmarkStress_PoolContention specifically tests sync.Pool under heavy
  152. // concurrent access — many goroutines doing Get/Put rapidly.
  153. func BenchmarkStress_PoolContention(b *testing.B) {
  154. pool := &sync.Pool{New: func() any { buf := make([]byte, tls.MaxRecordPayloadSize); return &buf }}
  155. for _, numWorkers := range []int{100, 500, 1000, 2000} {
  156. b.Run(fmt.Sprintf("workers=%d", numWorkers), func(b *testing.B) {
  157. b.RunParallel(func(pb *testing.PB) {
  158. for pb.Next() {
  159. bp := pool.Get().(*[]byte)
  160. // Simulate minimal work with the buffer
  161. (*bp)[0] = 1
  162. (*bp)[len(*bp)-1] = 1
  163. pool.Put(bp)
  164. }
  165. })
  166. })
  167. }
  168. }
  169. // BenchmarkStress_TinyPackets simulates massive amounts of tiny packets
  170. // (chat messages, typing indicators, status updates, ACKs).
  171. // Each connection sends many small writes — this maximizes per-read overhead.
  172. func BenchmarkStress_TinyPackets(b *testing.B) {
  173. type bufStrategy struct {
  174. name string
  175. getBuf func() []byte
  176. putBuf func([]byte)
  177. }
  178. pool16 := &sync.Pool{New: func() any { buf := make([]byte, tls.MaxRecordPayloadSize); return &buf }}
  179. pool4 := &sync.Pool{New: func() any { buf := make([]byte, 4096); return &buf }}
  180. strategies := []bufStrategy{
  181. {
  182. name: "stack_16KB",
  183. getBuf: func() []byte { return make([]byte, tls.MaxRecordPayloadSize) },
  184. putBuf: func([]byte) {},
  185. },
  186. {
  187. name: "pool_16KB",
  188. getBuf: func() []byte { return *pool16.Get().(*[]byte) },
  189. putBuf: func(b []byte) { pool16.Put(&b) },
  190. },
  191. {
  192. name: "pool_4KB",
  193. getBuf: func() []byte { return *pool4.Get().(*[]byte) },
  194. putBuf: func(b []byte) { pool4.Put(&b) },
  195. },
  196. }
  197. type scenario struct {
  198. conns int
  199. pktSize int
  200. pktsPerConn int
  201. label string
  202. }
  203. scenarios := []scenario{
  204. // Chat-like: 100 connections, 50K tiny packets each (50 bytes = typing indicator / small ACK)
  205. {100, 50, 50000, "100conn_50B_x50K"},
  206. // Heavy chat: 500 connections, 10K packets of 200 bytes
  207. {500, 200, 10000, "500conn_200B_x10K"},
  208. // Extreme: 1000 connections, 20K packets of 100 bytes each
  209. {1000, 100, 20000, "1000conn_100B_x20K"},
  210. // Burst of tiny: 2000 connections, 5K packets of 50 bytes
  211. {2000, 50, 5000, "2000conn_50B_x5K"},
  212. }
  213. for _, sc := range scenarios {
  214. for _, strat := range strategies {
  215. name := fmt.Sprintf("%s/%s", sc.label, strat.name)
  216. getBuf := strat.getBuf
  217. putBuf := strat.putBuf
  218. sc := sc
  219. b.Run(name, func(b *testing.B) {
  220. totalBytes := int64(sc.conns) * int64(sc.pktSize) * int64(sc.pktsPerConn)
  221. b.SetBytes(totalBytes)
  222. for i := 0; i < b.N; i++ {
  223. runtime.GC()
  224. var memBefore runtime.MemStats
  225. runtime.ReadMemStats(&memBefore)
  226. var totalRead atomic.Int64
  227. var totalReads atomic.Int64
  228. var wg sync.WaitGroup
  229. start := time.Now()
  230. for c := 0; c < sc.conns; c++ {
  231. wg.Add(1)
  232. go func() {
  233. defer wg.Done()
  234. serverConn, clientConn := net.Pipe()
  235. go func() {
  236. pkt := make([]byte, sc.pktSize)
  237. for p := 0; p < sc.pktsPerConn; p++ {
  238. serverConn.Write(pkt)
  239. }
  240. serverConn.Close()
  241. }()
  242. buf := getBuf()
  243. var reads int64
  244. for {
  245. n, err := clientConn.Read(buf)
  246. if n > 0 {
  247. totalRead.Add(int64(n))
  248. reads++
  249. }
  250. if err != nil {
  251. break
  252. }
  253. }
  254. putBuf(buf)
  255. totalReads.Add(reads)
  256. clientConn.Close()
  257. }()
  258. }
  259. wg.Wait()
  260. elapsed := time.Since(start)
  261. var memAfter runtime.MemStats
  262. runtime.ReadMemStats(&memAfter)
  263. throughput := float64(totalRead.Load()) / elapsed.Seconds() / (1024 * 1024)
  264. pps := float64(totalReads.Load()) / elapsed.Seconds()
  265. b.ReportMetric(throughput, "MB/s")
  266. b.ReportMetric(pps, "packets/s")
  267. b.ReportMetric(float64(totalReads.Load()), "total_reads")
  268. b.ReportMetric(float64(memAfter.StackInuse)/(1024*1024), "peak_stack_MB")
  269. b.ReportMetric(float64(memAfter.HeapInuse)/(1024*1024), "peak_heap_MB")
  270. b.ReportMetric(float64(memAfter.NumGC-memBefore.NumGC), "gc_cycles")
  271. b.ReportMetric(float64(memAfter.PauseTotalNs-memBefore.PauseTotalNs)/1000, "gc_pause_us")
  272. }
  273. })
  274. }
  275. }
  276. }
  277. // BenchmarkStress_GCPressure measures how GC behaves under load.
  278. // Stack-allocated buffers don't create GC work; pool buffers do.
  279. // This tests whether pool-induced GC pressure hurts throughput.
  280. func BenchmarkStress_GCPressure(b *testing.B) {
  281. numConns := 500
  282. dataPerConn := 10 * 1024 * 1024
  283. pool16 := &sync.Pool{New: func() any { buf := make([]byte, tls.MaxRecordPayloadSize); return &buf }}
  284. b.Run("stack_16KB", func(b *testing.B) {
  285. for i := 0; i < b.N; i++ {
  286. runtime.GC()
  287. var memBefore runtime.MemStats
  288. runtime.ReadMemStats(&memBefore)
  289. r := runStressTest(b, numConns, dataPerConn, func() []byte {
  290. buf := make([]byte, tls.MaxRecordPayloadSize)
  291. return buf
  292. }, func([]byte) {})
  293. var memAfter runtime.MemStats
  294. runtime.ReadMemStats(&memAfter)
  295. b.ReportMetric(r.throughputMBs, "MB/s")
  296. b.ReportMetric(float64(memAfter.NumGC-memBefore.NumGC), "gc_cycles")
  297. b.ReportMetric(float64(memAfter.PauseTotalNs-memBefore.PauseTotalNs)/1000, "gc_pause_us")
  298. b.ReportMetric(float64(memAfter.StackInuse)/(1024*1024), "final_stack_MB")
  299. b.ReportMetric(float64(memAfter.HeapInuse)/(1024*1024), "final_heap_MB")
  300. }
  301. })
  302. b.Run("pool_16KB", func(b *testing.B) {
  303. for i := 0; i < b.N; i++ {
  304. runtime.GC()
  305. var memBefore runtime.MemStats
  306. runtime.ReadMemStats(&memBefore)
  307. r := runStressTest(b, numConns, dataPerConn, func() []byte {
  308. return *pool16.Get().(*[]byte)
  309. }, func(buf []byte) {
  310. pool16.Put(&buf)
  311. })
  312. var memAfter runtime.MemStats
  313. runtime.ReadMemStats(&memAfter)
  314. b.ReportMetric(r.throughputMBs, "MB/s")
  315. b.ReportMetric(float64(memAfter.NumGC-memBefore.NumGC), "gc_cycles")
  316. b.ReportMetric(float64(memAfter.PauseTotalNs-memBefore.PauseTotalNs)/1000, "gc_pause_us")
  317. b.ReportMetric(float64(memAfter.StackInuse)/(1024*1024), "final_stack_MB")
  318. b.ReportMetric(float64(memAfter.HeapInuse)/(1024*1024), "final_heap_MB")
  319. }
  320. })
  321. }