Kaynağa Gözat

Add abridged rwc

tags/0.9
9seconds 7 yıl önce
ebeveyn
işleme
4264fdadf8
3 değiştirilmiş dosya ile 128 ekleme ve 14 silme
  1. 120
    0
      mtproto/wrappers/abridged.go
  2. 3
    3
      telegram/middle.go
  3. 5
    11
      wrappers/buffered_reader.go

+ 120
- 0
mtproto/wrappers/abridged.go Dosyayı Görüntüle

1
+package wrappers
2
+
3
+import (
4
+	"bytes"
5
+	"encoding/binary"
6
+	"io"
7
+	"net"
8
+
9
+	"github.com/juju/errors"
10
+
11
+	"github.com/9seconds/mtg/mtproto"
12
+	"github.com/9seconds/mtg/wrappers"
13
+)
14
+
15
+type uint24 [3]byte
16
+
17
+const (
18
+	abridgedSmallPacketLength = 0x7f
19
+	abridgedQuickAckLength    = 0x80
20
+	abridgedLargePacketLength = 16777216 // 256 ^ 3
21
+)
22
+
23
+type AbridgedReadWriteCloserAddr struct {
24
+	wrappers.BufferedReader
25
+
26
+	conn wrappers.ReadWriteCloserWithAddr
27
+	opts *mtproto.ConnectionOpts
28
+}
29
+
30
+func (a *AbridgedReadWriteCloserAddr) Read(p []byte) (int, error) {
31
+	return a.BufferedRead(p, func() error {
32
+		var msgLength uint8
33
+		if err := binary.Read(a.conn, binary.LittleEndian, &msgLength); err != nil {
34
+			return errors.Annotate(err, "Cannot read message length")
35
+		}
36
+
37
+		a.opts.QuickAck = false
38
+		if msgLength >= abridgedQuickAckLength {
39
+			a.opts.QuickAck = true
40
+			msgLength -= 0x80
41
+		}
42
+
43
+		msgLength32 := uint32(msgLength)
44
+		if msgLength == abridgedSmallPacketLength {
45
+			buf := &bytes.Buffer{}
46
+			buf.Grow(3)
47
+
48
+			if _, err := io.CopyN(buf, a.conn, 3); err != nil {
49
+				return errors.Annotate(err, "Cannot read correct message length")
50
+			}
51
+			number := uint24{}
52
+			copy(number[:], buf.Bytes())
53
+			msgLength32 = fromUint24(number)
54
+		}
55
+		msgLength32 *= 4
56
+
57
+		if _, err := io.CopyN(a.Buffer, a.conn, int64(msgLength32)); err != nil {
58
+			return errors.Annotate(err, "Cannot read message")
59
+		}
60
+
61
+		return nil
62
+	})
63
+}
64
+
65
+func (a *AbridgedReadWriteCloserAddr) Write(p []byte) (int, error) {
66
+	if len(p)%4 != 0 {
67
+		return 0, errors.Errorf("Incorrect packet length %d", len(p))
68
+	}
69
+	if a.opts.SimpleAck {
70
+		return a.conn.Write(reverseBytes(p))
71
+	}
72
+
73
+	packetLength := len(p) / 4
74
+	switch {
75
+	case packetLength < abridgedSmallPacketLength:
76
+		newData := append([]byte{byte(packetLength)}, p...)
77
+		return a.conn.Write(newData)
78
+
79
+	case packetLength < abridgedLargePacketLength:
80
+		length24 := toUint24(uint32(packetLength))
81
+
82
+		buf := &bytes.Buffer{}
83
+		buf.Grow(1 + 3 + len(p))
84
+		buf.WriteByte(byte(abridgedSmallPacketLength))
85
+		buf.Write(length24[:])
86
+		buf.Write(p)
87
+
88
+		return a.conn.Write(buf.Bytes())
89
+
90
+	default:
91
+		return 0, errors.Errorf("Packet is too big %d", len(p))
92
+	}
93
+}
94
+
95
+func (a *AbridgedReadWriteCloserAddr) Close() error {
96
+	return a.conn.Close()
97
+}
98
+
99
+func (a *AbridgedReadWriteCloserAddr) LocalAddr() *net.TCPAddr {
100
+	return a.conn.LocalAddr()
101
+}
102
+
103
+func (a *AbridgedReadWriteCloserAddr) RemoteAddr() *net.TCPAddr {
104
+	return a.conn.RemoteAddr()
105
+}
106
+
107
+func toUint24(number uint32) uint24 {
108
+	return uint24{byte(number), byte(number >> 8), byte(number >> 16)}
109
+}
110
+
111
+func fromUint24(number uint24) uint32 {
112
+	return uint32(number[0]) + (uint32(number[1]) << 8) + (uint32(number[2]) << 16)
113
+}
114
+
115
+func NewAbridgedRWC(conn wrappers.ReadWriteCloserWithAddr, connOpts *mtproto.ConnectionOpts) wrappers.ReadWriteCloserWithAddr {
116
+	return &AbridgedReadWriteCloserAddr{
117
+		conn: conn,
118
+		opts: connOpts,
119
+	}
120
+}

+ 3
- 3
telegram/middle.go Dosyayı Görüntüle

20
 type middleTelegram struct {
20
 type middleTelegram struct {
21
 	middleTelegramCaller
21
 	middleTelegramCaller
22
 
22
 
23
-	adtag []byte
23
+	conf *config.Config
24
 }
24
 }
25
 
25
 
26
 func NewMiddleTelegram(conf *config.Config, logger *zap.SugaredLogger) Telegram {
26
 func NewMiddleTelegram(conf *config.Config, logger *zap.SugaredLogger) Telegram {
38
 			},
38
 			},
39
 			dialerMutex: &sync.RWMutex{},
39
 			dialerMutex: &sync.RWMutex{},
40
 		},
40
 		},
41
-		adtag: conf.AdTag,
41
+		conf: conf,
42
 	}
42
 	}
43
 
43
 
44
 	if err := tg.update(); err != nil {
44
 	if err := tg.update(); err != nil {
73
 		return nil, err
73
 		return nil, err
74
 	}
74
 	}
75
 
75
 
76
-	return mtwrappers.NewProxyRequestRWC(secureConn, connOpts, t.adtag)
76
+	return mtwrappers.NewProxyRequestRWC(secureConn, connOpts, t.conf.AdTag)
77
 }
77
 }
78
 
78
 
79
 func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.RPCNonceRequest, error) {
79
 func (t *middleTelegram) sendRPCNonceRequest(conn io.Writer) (*rpc.RPCNonceRequest, error) {

+ 5
- 11
wrappers/buffered_reader.go Dosyayı Görüntüle

17
 }
17
 }
18
 
18
 
19
 func (b *BufferedReader) flush(p []byte) (int, error) {
19
 func (b *BufferedReader) flush(p []byte) (int, error) {
20
-	sizeToRead := len(p)
21
-	if b.Buffer.Len() < sizeToRead {
22
-		sizeToRead = b.Buffer.Len()
23
-	}
24
-
25
-	data := b.Buffer.Bytes()
26
-	copy(p, data[:sizeToRead])
27
-	if sizeToRead == b.Buffer.Len() {
20
+	if b.Buffer.Len() < len(p) {
21
+		sizeToReturn := b.Buffer.Len()
22
+		copy(p, b.Buffer.Bytes())
28
 		b.Buffer.Reset()
23
 		b.Buffer.Reset()
29
-	} else {
30
-		b.Buffer = bytes.NewBuffer(data[sizeToRead:])
24
+		return sizeToReturn, nil
31
 	}
25
 	}
32
 
26
 
33
-	return sizeToRead, nil
27
+	return b.Buffer.Read(p)
34
 }
28
 }
35
 
29
 
36
 func NewBufferedReader() BufferedReader {
30
 func NewBufferedReader() BufferedReader {

Loading…
İptal
Kaydet