| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- package main
-
- import (
- "crypto/rand"
- "flag"
- "fmt"
- "io"
- "net"
- "os"
- "runtime"
- "runtime/debug"
- "sync"
- "sync/atomic"
- "time"
- )
-
- const (
- maxRecordPayloadSize = 16379
- maxRecordSize = 16384
- )
-
- // --- Buffer strategies ---
-
- type bufStrategy interface {
- Name() string
- Pump(src, dst net.Conn) (int64, error)
- }
-
- // Stack-allocated buffer (current mtg code)
- type stackStrategy struct{}
-
- func (stackStrategy) Name() string { return "stack" }
-
- func (stackStrategy) Pump(src, dst net.Conn) (int64, error) {
- var buf [maxRecordPayloadSize]byte
- return io.CopyBuffer(dst, src, buf[:])
- }
-
- // Pool-allocated buffer
- var relayPool = sync.Pool{
- New: func() any {
- b := make([]byte, maxRecordPayloadSize)
- return &b
- },
- }
-
- type poolStrategy struct{}
-
- func (poolStrategy) Name() string { return "pool" }
-
- func (poolStrategy) Pump(src, dst net.Conn) (int64, error) {
- bp := relayPool.Get().(*[]byte)
- defer relayPool.Put(bp)
- return io.CopyBuffer(dst, src, *bp)
- }
-
- // --- Memory measurement ---
-
- type memSnapshot struct {
- StackInuse uint64
- HeapInuse uint64
- HeapAlloc uint64
- NumGC uint32
- PauseTotalNs uint64
- NumGoroutine int
- }
-
- func snapMem() memSnapshot {
- runtime.GC()
- var m runtime.MemStats
- runtime.ReadMemStats(&m)
- return memSnapshot{
- StackInuse: m.StackInuse,
- HeapInuse: m.HeapInuse,
- HeapAlloc: m.HeapAlloc,
- NumGC: m.NumGC,
- PauseTotalNs: m.PauseTotalNs,
- NumGoroutine: runtime.NumGoroutine(),
- }
- }
-
- // --- Test harness ---
-
- func runTest(strat bufStrategy, conns int, dataPerConn int64, reportInterval time.Duration) {
- fmt.Printf("\n=== %s strategy, %d connections, %s per conn ===\n",
- strat.Name(), conns, formatBytes(dataPerConn))
-
- // Start "telegram" echo servers - one listener, accepts all
- echoLn, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- fmt.Fprintf(os.Stderr, "echo listen: %v\n", err)
- return
- }
- defer echoLn.Close()
- echoAddr := echoLn.Addr().String()
-
- // Echo server goroutines
- var echoWg sync.WaitGroup
- go func() {
- for {
- c, err := echoLn.Accept()
- if err != nil {
- return
- }
- echoWg.Add(1)
- go func(c net.Conn) {
- defer echoWg.Done()
- defer c.Close()
- io.Copy(c, c) //nolint: errcheck
- }(c)
- }
- }()
-
- // Start relay listener
- relayLn, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- fmt.Fprintf(os.Stderr, "relay listen: %v\n", err)
- return
- }
- defer relayLn.Close()
- relayAddr := relayLn.Addr().String()
-
- // Relay server
- var relayWg sync.WaitGroup
- go func() {
- for {
- client, err := relayLn.Accept()
- if err != nil {
- return
- }
- relayWg.Add(1)
- go func(client net.Conn) {
- defer relayWg.Done()
- defer client.Close()
-
- tg, err := net.Dial("tcp", echoAddr)
- if err != nil {
- return
- }
- defer tg.Close()
-
- // Bidirectional relay (like mtg relay.Relay)
- done := make(chan struct{})
- go func() {
- defer close(done)
- strat.Pump(client, tg) //nolint: errcheck
- // When one direction is done, close both to unblock the other
- client.Close() //nolint: errcheck
- tg.Close() //nolint: errcheck
- }()
- strat.Pump(tg, client) //nolint: errcheck
- client.Close() //nolint: errcheck
- tg.Close() //nolint: errcheck
- <-done
- }(client)
- }
- }()
-
- // Force GC and take baseline
- debug.SetGCPercent(100)
- runtime.GC()
- runtime.GC()
- time.Sleep(50 * time.Millisecond)
- before := snapMem()
-
- // Launch clients
- var (
- totalBytes atomic.Int64
- clientWg sync.WaitGroup
- startSignal = make(chan struct{})
- peakMem atomic.Uint64
- )
-
- // Memory sampler
- samplerDone := make(chan struct{})
- samplerStopped := make(chan struct{})
- go func() {
- defer close(samplerStopped)
- ticker := time.NewTicker(10 * time.Millisecond)
- defer ticker.Stop()
- for {
- select {
- case <-samplerDone:
- return
- case <-ticker.C:
- var m runtime.MemStats
- runtime.ReadMemStats(&m)
- total := m.StackInuse + m.HeapInuse
- for {
- old := peakMem.Load()
- if total <= old || peakMem.CompareAndSwap(old, total) {
- break
- }
- }
- }
- }
- }()
-
- for i := 0; i < conns; i++ {
- clientWg.Add(1)
- go func() {
- defer clientWg.Done()
- <-startSignal
-
- conn, err := net.Dial("tcp", relayAddr)
- if err != nil {
- fmt.Fprintf(os.Stderr, "client dial: %v\n", err)
- return
- }
- defer conn.Close()
-
- // Write data in chunks, read it back (echo)
- chunk := make([]byte, 4096)
- rand.Read(chunk) //nolint: errcheck
- readBuf := make([]byte, 4096)
-
- var written int64
- for written < dataPerConn {
- toWrite := int64(len(chunk))
- if written+toWrite > dataPerConn {
- toWrite = dataPerConn - written
- }
- n, err := conn.Write(chunk[:toWrite])
- if err != nil {
- return
- }
- written += int64(n)
-
- // Read back echo
- remaining := n
- for remaining > 0 {
- rn, err := conn.Read(readBuf)
- if err != nil {
- return
- }
- remaining -= rn
- }
- totalBytes.Add(int64(n * 2)) // write + read
- }
- }()
- }
-
- start := time.Now()
- close(startSignal)
-
- // Progress reporter
- reporterDone := make(chan struct{})
- if reportInterval > 0 {
- go func() {
- ticker := time.NewTicker(reportInterval)
- defer ticker.Stop()
- for {
- select {
- case <-reporterDone:
- return
- case <-ticker.C:
- elapsed := time.Since(start)
- bytes := totalBytes.Load()
- fmt.Printf(" [%.1fs] %s transferred, %.1f MB/s\n",
- elapsed.Seconds(), formatBytes(bytes),
- float64(bytes)/elapsed.Seconds()/1024/1024)
- }
- }
- }()
- }
-
- clientWg.Wait()
- close(reporterDone)
- elapsed := time.Since(start)
-
- // Stop sampler
- close(samplerDone)
- <-samplerStopped
-
- after := snapMem()
-
- // Results
- bytes := totalBytes.Load()
- throughput := float64(bytes) / elapsed.Seconds() / 1024 / 1024
-
- gcCycles := after.NumGC - before.NumGC
- gcPause := time.Duration(after.PauseTotalNs - before.PauseTotalNs)
-
- peak := peakMem.Load()
- baseMem := before.StackInuse + before.HeapInuse
-
- fmt.Printf("\nResults:\n")
- fmt.Printf(" Duration: %v\n", elapsed.Round(time.Millisecond))
- fmt.Printf(" Total data: %s\n", formatBytes(bytes))
- fmt.Printf(" Throughput: %.1f MB/s\n", throughput)
- fmt.Printf(" Peak memory: %s (baseline %s, delta %s)\n",
- formatBytes(int64(peak)), formatBytes(int64(baseMem)),
- formatBytes(int64(peak)-int64(baseMem)))
- fmt.Printf(" Stack (before): %s → (after): %s\n",
- formatBytes(int64(before.StackInuse)), formatBytes(int64(after.StackInuse)))
- fmt.Printf(" Heap (before): %s → (after): %s\n",
- formatBytes(int64(before.HeapInuse)), formatBytes(int64(after.HeapInuse)))
- fmt.Printf(" Goroutines: %d → %d\n", before.NumGoroutine, after.NumGoroutine)
- fmt.Printf(" GC cycles: %d\n", gcCycles)
- fmt.Printf(" GC total pause: %v\n", gcPause)
- if gcCycles > 0 {
- fmt.Printf(" GC avg pause: %v\n", gcPause/time.Duration(gcCycles))
- }
-
- // Cleanup
- relayLn.Close()
- echoLn.Close()
- relayWg.Wait()
- echoWg.Wait()
- runtime.GC()
- time.Sleep(100 * time.Millisecond)
- }
-
- func formatBytes(b int64) string {
- switch {
- case b >= 1024*1024*1024:
- return fmt.Sprintf("%.1f GB", float64(b)/1024/1024/1024)
- case b >= 1024*1024:
- return fmt.Sprintf("%.1f MB", float64(b)/1024/1024)
- case b >= 1024:
- return fmt.Sprintf("%.1f KB", float64(b)/1024)
- default:
- return fmt.Sprintf("%d B", b)
- }
- }
-
- func main() {
- conns := flag.Int("conns", 500, "number of concurrent connections")
- dataMB := flag.Int("data", 1, "MB of data per connection")
- strategy := flag.String("strategy", "both", "buffer strategy: stack, pool, or both")
- flag.Parse()
-
- dataPerConn := int64(*dataMB) * 1024 * 1024
-
- fmt.Printf("Real network relay benchmark\n")
- fmt.Printf("GOMAXPROCS=%d, OS=%s/%s\n", runtime.GOMAXPROCS(0), runtime.GOOS, runtime.GOARCH)
- fmt.Printf("Connections: %d, Data per conn: %s\n\n", *conns, formatBytes(dataPerConn))
-
- switch *strategy {
- case "stack":
- runTest(stackStrategy{}, *conns, dataPerConn, 2*time.Second)
- case "pool":
- runTest(poolStrategy{}, *conns, dataPerConn, 2*time.Second)
- case "both":
- runTest(stackStrategy{}, *conns, dataPerConn, 2*time.Second)
- fmt.Println("\n" + "============================================================")
- runTest(poolStrategy{}, *conns, dataPerConn, 2*time.Second)
- }
- }
|