Kaynağa Gözat

Remove contexts

tags/0.9
9seconds 7 yıl önce
ebeveyn
işleme
1fe3319988
6 değiştirilmiş dosya ile 22 ekleme ve 100 silme
  1. 1
    2
      client/client.go
  2. 1
    4
      client/direct.go
  3. 2
    4
      client/middle.go
  4. 17
    13
      proxy/proxy.go
  5. 0
    76
      wrappers/ctx.go
  6. 1
    1
      wrappers/wrap.go

+ 1
- 2
client/client.go Dosyayı Görüntüle

@@ -1,7 +1,6 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"context"
5 4
 	"net"
6 5
 
7 6
 	"github.com/9seconds/mtg/config"
@@ -9,4 +8,4 @@ import (
9 8
 	"github.com/9seconds/mtg/wrappers"
10 9
 )
11 10
 
12
-type Init func(context.Context, context.CancelFunc, net.Conn, string, *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error)
11
+type Init func(net.Conn, string, *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error)

+ 1
- 4
client/direct.go Dosyayı Görüntüle

@@ -1,7 +1,6 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"context"
5 4
 	"net"
6 5
 	"time"
7 6
 
@@ -15,8 +14,7 @@ import (
15 14
 
16 15
 const handshakeTimeout = 10 * time.Second
17 16
 
18
-func DirectInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn, connID string,
19
-	conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
17
+func DirectInit(socket net.Conn, connID string, conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
20 18
 	if err := config.SetSocketOptions(socket); err != nil {
21 19
 		return nil, nil, errors.Annotate(err, "Cannot set socket options")
22 20
 	}
@@ -37,7 +35,6 @@ func DirectInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn,
37 35
 	connOpts.ClientAddr = conn.RemoteAddr()
38 36
 
39 37
 	conn = wrappers.NewStreamCipher(conn, obfs2.Encryptor, obfs2.Decryptor)
40
-	conn = wrappers.NewCtx(ctx, cancel, conn)
41 38
 
42 39
 	return conn, connOpts, nil
43 40
 }

+ 2
- 4
client/middle.go Dosyayı Görüntüle

@@ -1,7 +1,6 @@
1 1
 package client
2 2
 
3 3
 import (
4
-	"context"
5 4
 	"net"
6 5
 
7 6
 	"github.com/9seconds/mtg/config"
@@ -9,9 +8,8 @@ import (
9 8
 	"github.com/9seconds/mtg/wrappers"
10 9
 )
11 10
 
12
-func MiddleInit(ctx context.Context, cancel context.CancelFunc, socket net.Conn, connID string,
13
-	conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
14
-	conn, opts, err := DirectInit(ctx, cancel, socket, connID, conf)
11
+func MiddleInit(socket net.Conn, connID string, conf *config.Config) (wrappers.Wrap, *mtproto.ConnectionOpts, error) {
12
+	conn, opts, err := DirectInit(socket, connID, conf)
15 13
 	if err != nil {
16 14
 		return nil, nil, err
17 15
 	}

+ 17
- 13
proxy/proxy.go Dosyayı Görüntüle

@@ -1,7 +1,6 @@
1 1
 package proxy
2 2
 
3 3
 import (
4
-	"context"
5 4
 	"io"
6 5
 	"net"
7 6
 	"sync"
@@ -52,15 +51,14 @@ func (p *Proxy) accept(conn net.Conn) {
52 51
 
53 52
 	log.Infow("Client connected", "addr", conn.RemoteAddr())
54 53
 
55
-	ctx, cancel := context.WithCancel(context.Background())
56
-	client, opts, err := p.clientInit(ctx, cancel, conn, connID, p.conf)
54
+	client, opts, err := p.clientInit(conn, connID, p.conf)
57 55
 	if err != nil {
58 56
 		log.Errorw("Cannot initialize client connection", "error", err)
59 57
 		return
60 58
 	}
61 59
 	defer client.(io.Closer).Close()
62 60
 
63
-	server, err := p.getTelegramConn(ctx, cancel, opts, connID)
61
+	server, err := p.getTelegramConn(opts, connID)
64 62
 	if err != nil {
65 63
 		log.Errorw("Cannot initialize server connection", "error", err)
66 64
 		return
@@ -82,19 +80,16 @@ func (p *Proxy) accept(conn net.Conn) {
82 80
 		go p.directPipe(serverStream, clientStream, wait)
83 81
 	}
84 82
 
85
-	<-ctx.Done()
86 83
 	wait.Wait()
87 84
 
88 85
 	log.Infow("Client disconnected", "addr", conn.RemoteAddr())
89 86
 }
90 87
 
91
-func (p *Proxy) getTelegramConn(ctx context.Context, cancel context.CancelFunc, opts *mtproto.ConnectionOpts,
92
-	connID string) (wrappers.Wrap, error) {
88
+func (p *Proxy) getTelegramConn(opts *mtproto.ConnectionOpts, connID string) (wrappers.Wrap, error) {
93 89
 	streamConn, err := p.tg.Dial(connID, opts)
94 90
 	if err != nil {
95 91
 		return nil, errors.Annotate(err, "Cannot dial to Telegram")
96 92
 	}
97
-	streamConn = wrappers.NewCtx(ctx, cancel, streamConn)
98 93
 
99 94
 	packetConn, err := p.tg.Init(opts, streamConn)
100 95
 	if err != nil {
@@ -104,8 +99,13 @@ func (p *Proxy) getTelegramConn(ctx context.Context, cancel context.CancelFunc,
104 99
 	return packetConn, nil
105 100
 }
106 101
 
107
-func (p *Proxy) middlePipe(src wrappers.PacketReader, dst wrappers.PacketWriter, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
108
-	defer wait.Done()
102
+func (p *Proxy) middlePipe(src wrappers.PacketReadCloser, dst wrappers.PacketWriteCloser, wait *sync.WaitGroup, hacks *mtproto.Hacks) {
103
+	defer func() {
104
+		src.Close()
105
+		dst.Close()
106
+		wait.Done()
107
+	}()
108
+
109 109
 	for {
110 110
 		hacks.SimpleAck = false
111 111
 		hacks.QuickAck = false
@@ -120,10 +120,14 @@ func (p *Proxy) middlePipe(src wrappers.PacketReader, dst wrappers.PacketWriter,
120 120
 	}
121 121
 }
122 122
 
123
-func (p *Proxy) directPipe(src io.Reader, dst io.Writer, wait *sync.WaitGroup) {
124
-	defer wait.Done()
125
-	io.Copy(dst, src)
123
+func (p *Proxy) directPipe(src io.ReadCloser, dst io.WriteCloser, wait *sync.WaitGroup) {
124
+	defer func() {
125
+		src.Close()
126
+		dst.Close()
127
+		wait.Done()
128
+	}()
126 129
 
130
+	io.Copy(dst, src)
127 131
 }
128 132
 
129 133
 func NewProxy(conf *config.Config) *Proxy {

+ 0
- 76
wrappers/ctx.go Dosyayı Görüntüle

@@ -1,76 +0,0 @@
1
-package wrappers
2
-
3
-import (
4
-	"context"
5
-	"net"
6
-
7
-	"github.com/juju/errors"
8
-)
9
-
10
-type Ctx struct {
11
-	cancel context.CancelFunc
12
-	conn   StreamReadWriteCloser
13
-	ctx    context.Context
14
-}
15
-
16
-func (c *Ctx) Read(p []byte) (int, error) {
17
-	select {
18
-	case <-c.ctx.Done():
19
-		return 0, errors.Annotate(c.ctx.Err(), "Read is failed because of closed context")
20
-	default:
21
-		n, err := c.conn.Read(p)
22
-		if err != nil {
23
-			c.cancel()
24
-		}
25
-		return n, err
26
-	}
27
-}
28
-
29
-func (c *Ctx) Write(p []byte) (int, error) {
30
-	select {
31
-	case <-c.ctx.Done():
32
-		return 0, errors.Annotate(c.ctx.Err(), "Write is failed because of closed context")
33
-	default:
34
-		n, err := c.conn.Write(p)
35
-		if err != nil {
36
-			c.cancel()
37
-		}
38
-		return n, err
39
-	}
40
-}
41
-
42
-func (c *Ctx) LogDebug(msg string, data ...interface{}) {
43
-	c.conn.LogDebug(msg, data...)
44
-}
45
-
46
-func (c *Ctx) LogInfo(msg string, data ...interface{}) {
47
-	c.conn.LogInfo(msg, data...)
48
-}
49
-
50
-func (c *Ctx) LogWarn(msg string, data ...interface{}) {
51
-	c.conn.LogWarn(msg, data...)
52
-}
53
-
54
-func (c *Ctx) LogError(msg string, data ...interface{}) {
55
-	c.conn.LogError(msg, data...)
56
-}
57
-
58
-func (c *Ctx) LocalAddr() *net.TCPAddr {
59
-	return c.conn.LocalAddr()
60
-}
61
-
62
-func (c *Ctx) RemoteAddr() *net.TCPAddr {
63
-	return c.conn.RemoteAddr()
64
-}
65
-
66
-func (c *Ctx) Close() error {
67
-	return c.conn.Close()
68
-}
69
-
70
-func NewCtx(ctx context.Context, cancel context.CancelFunc, conn StreamReadWriteCloser) StreamReadWriteCloser {
71
-	return &Ctx{
72
-		ctx:    ctx,
73
-		cancel: cancel,
74
-		conn:   conn,
75
-	}
76
-}

+ 1
- 1
wrappers/wrap.go Dosyayı Görüntüle

@@ -71,7 +71,7 @@ type PacketReadWriter interface {
71 71
 	PacketReader
72 72
 }
73 73
 
74
-type BlockReadCloser interface {
74
+type PacketReadCloser interface {
75 75
 	io.Closer
76 76
 	PacketReader
77 77
 }

Loading…
İptal
Kaydet