| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package hub
-
- import (
- "time"
-
- "go.uber.org/zap"
-
- "github.com/9seconds/mtg/protocol"
- )
-
- const hubGCEvery = time.Minute
-
- type connectionHubRequest struct {
- request *protocol.TelegramRequest
- response chan<- *connection
- }
-
- type connectionHub struct {
- sockets map[int]*connection
- logger *zap.SugaredLogger
-
- channelBrokenSockets chan int
- channelConnectionRequests chan *connectionHubRequest
- channelReturnConnections chan *connection
- }
-
- func (c *connectionHub) run() {
- ticker := time.NewTicker(hubGCEvery)
- defer ticker.Stop()
-
- for {
- select {
- case <-ticker.C:
- c.runGC()
- case request := <-c.channelConnectionRequests:
- c.runConnectionRequest(request)
- case id := <-c.channelBrokenSockets:
- c.runBrokenSocket(id)
- case conn := <-c.channelReturnConnections:
- c.runReturnConnection(conn)
- }
- }
- }
-
- func (c *connectionHub) runGC() {
- logger := c.logger.Named("gc")
-
- for key, conn := range c.sockets {
- switch {
- case conn.closed():
- logger.Debugw("Delete closed socket", "key", key)
- delete(c.sockets, key)
- case conn.idle():
- logger.Debugw("Delete idle socket", "key", key)
- conn.shutdown()
- delete(c.sockets, key)
-
- return
- }
- }
- }
-
- func (c *connectionHub) runConnectionRequest(req *connectionHubRequest) {
- logger := c.logger.Named("request").With("connection-id", req.request.ConnID)
-
- for key, conn := range c.sockets {
- delete(c.sockets, key)
-
- if !conn.closed() {
- logger.Debugw("Choose connection",
- "id", conn.id,
- "remote_addr", conn.conn.RemoteAddr())
- req.response <- conn
- close(req.response)
-
- return
- }
- }
-
- if conn, err := newConnection(req.request, c); err == nil {
- logger.Debugw("New connection",
- "id", conn.id,
- "remote_addr", conn.conn.RemoteAddr())
- req.response <- conn
- }
-
- close(req.response)
- }
-
- func (c *connectionHub) runBrokenSocket(id int) {
- c.logger.Named("broken-socket").Debugw("Delete broken socket", "id", id)
- delete(c.sockets, id)
- }
-
- func (c *connectionHub) runReturnConnection(conn *connection) {
- c.logger.Named("return-connection").Debugw("Return connection",
- "id", conn.id,
- "remote_addr", conn.conn.RemoteAddr())
-
- c.sockets[conn.id] = conn
- }
-
- func newConnectionHub(logger *zap.SugaredLogger) *connectionHub {
- rv := &connectionHub{
- logger: logger.Named("connection-hub"),
- sockets: map[int]*connection{},
- channelBrokenSockets: make(chan int, 1),
- channelConnectionRequests: make(chan *connectionHubRequest),
- channelReturnConnections: make(chan *connection, 1),
- }
- go rv.run()
-
- return rv
- }
|