| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package hub
-
- import (
- "encoding/binary"
- "fmt"
- "strings"
- "sync"
-
- "go.uber.org/zap"
-
- "github.com/9seconds/mtg/conntypes"
- "github.com/9seconds/mtg/protocol"
- )
-
- type hub struct {
- logger *zap.SugaredLogger
- subs map[string]*connectionHub
- mutex sync.RWMutex
- }
-
- func (h *hub) Write(packet conntypes.Packet, req *protocol.TelegramRequest) error {
- sub := h.getHub(req)
- connections := make(chan *connection)
- sub.channelConnectionRequests <- &connectionHubRequest{
- request: req,
- response: connections,
- }
-
- conn, ok := <-connections
- if !ok {
- return ErrCannotCreateConnection
- }
-
- if err := conn.write(packet); err != nil {
- conn.shutdown()
- return fmt.Errorf("cannot send packet: %w", err)
- }
- sub.channelReturnConnections <- conn
-
- return nil
- }
-
- func (h *hub) getHub(req *protocol.TelegramRequest) *connectionHub {
- keyBuilder := strings.Builder{}
- binary.Write(&keyBuilder, binary.LittleEndian, int16(req.ClientProtocol.DC())) // nolint: errcheck
- keyBuilder.WriteRune('_')
- binary.Write(&keyBuilder, binary.LittleEndian, uint8(req.ClientProtocol.ConnectionProtocol())) // nolint: errcheck
- key := keyBuilder.String()
-
- h.mutex.RLock()
- rv, ok := h.subs[key]
- h.mutex.RUnlock()
-
- if !ok {
- h.mutex.Lock()
- defer h.mutex.Unlock()
-
- rv, ok = h.subs[key]
- if !ok {
- h.logger.Debugw("Create new connection hub",
- "dc", req.ClientProtocol.DC(),
- "protocol", req.ClientProtocol.ConnectionProtocol())
-
- rv = newConnectionHub(h.logger.With(
- "dc", req.ClientProtocol.DC(),
- "protocol", req.ClientProtocol.ConnectionProtocol(),
- ))
- h.subs[key] = rv
- }
- }
-
- return rv
- }
|