Selaa lähdekoodia

Make session drop even more responsive

tags/0.12
9seconds 7 vuotta sitten
vanhempi
commit
753639beb8
1 muutettua tiedostoa jossa 35 lisäystä ja 10 poistoa
  1. 35
    10
      wrappers/conn.go

+ 35
- 10
wrappers/conn.go Näytä tiedosto

38
 	connTimeoutWrite = 2 * time.Minute
38
 	connTimeoutWrite = 2 * time.Minute
39
 )
39
 )
40
 
40
 
41
+type ioResult struct {
42
+	n   int
43
+	err error
44
+}
45
+
46
+type ioFunc func([]byte) (int, error)
47
+
41
 // Conn is a basic wrapper for net.Conn providing the most low-level
48
 // Conn is a basic wrapper for net.Conn providing the most low-level
42
 // logic and management as possible.
49
 // logic and management as possible.
43
 type Conn struct {
50
 type Conn struct {
56
 	case <-c.ctx.Done():
63
 	case <-c.ctx.Done():
57
 		return 0, errors.Annotate(c.ctx.Err(), "Cannot write because context was closed")
64
 		return 0, errors.Annotate(c.ctx.Err(), "Cannot write because context was closed")
58
 	default:
65
 	default:
59
-		c.conn.SetWriteDeadline(time.Now().Add(connTimeoutWrite)) // nolint: errcheck
60
-		n, err := c.conn.Write(p)
61
-		if err != nil {
62
-			c.cancel()
63
-		}
66
+		n, err := c.doIO(c.conn.Write, p, connTimeoutWrite)
64
 
67
 
65
 		c.logger.Debugw("Write to stream", "bytes", n, "error", err)
68
 		c.logger.Debugw("Write to stream", "bytes", n, "error", err)
66
 		stats.EgressTraffic(n)
69
 		stats.EgressTraffic(n)
74
 	case <-c.ctx.Done():
77
 	case <-c.ctx.Done():
75
 		return 0, errors.Annotate(c.ctx.Err(), "Cannot read because context was closed")
78
 		return 0, errors.Annotate(c.ctx.Err(), "Cannot read because context was closed")
76
 	default:
79
 	default:
77
-		c.conn.SetReadDeadline(time.Now().Add(connTimeoutRead)) // nolint: errcheck
78
-		n, err := c.conn.Read(p)
79
-		if err != nil {
80
-			c.cancel()
81
-		}
80
+		n, err := c.doIO(c.conn.Read, p, connTimeoutRead)
82
 
81
 
83
 		c.logger.Debugw("Read from stream", "bytes", n, "error", err)
82
 		c.logger.Debugw("Read from stream", "bytes", n, "error", err)
84
 		stats.IngressTraffic(n)
83
 		stats.IngressTraffic(n)
87
 	}
86
 	}
88
 }
87
 }
89
 
88
 
89
+func (c *Conn) doIO(callback ioFunc, p []byte, timeout time.Duration) (int, error) {
90
+	resChan := make(chan ioResult, 1)
91
+	timer := time.NewTimer(timeout)
92
+
93
+	go func() {
94
+		n, err := callback(p)
95
+		resChan <- ioResult{n: n, err: err}
96
+	}()
97
+
98
+	select {
99
+	case res := <-resChan:
100
+		timer.Stop()
101
+		if res.err != nil {
102
+			c.Close()
103
+		}
104
+		return res.n, res.err
105
+	case <-c.ctx.Done():
106
+		timer.Stop()
107
+		c.Close()
108
+		return 0, errors.Annotate(c.ctx.Err(), "Cannot do IO because context is closed")
109
+	case <-timer.C:
110
+		c.Close()
111
+		return 0, errors.Annotate(c.ctx.Err(), "Timeout on IO operation")
112
+	}
113
+}
114
+
90
 // Close closes underlying net.Conn instance.
115
 // Close closes underlying net.Conn instance.
91
 func (c *Conn) Close() error {
116
 func (c *Conn) Close() error {
92
 	defer c.logger.Debugw("Close connection")
117
 	defer c.logger.Debugw("Close connection")

Loading…
Peruuta
Tallenna