Миллион WebSocket и Go

image







! , Mail.Ru.







, WebSocket- Go.







WebSocket , Go — , , .







1.



, , .







Mail.Ru , . , . — — . (polling), — — .







, , — . Polling — 50 HTTP- , 60% 304, .







, , publisher-subscriber ( bus, message-broker event-channel), , , , — .







:







+-----------+           +-----------+           +-----------+
|           | ◄-------+ |           | ◄-------+ |           |
|  Storage  |           |    API    |    HTTP   |  Browser  |
|           | +-------► |           | +-------► |           |
+-----------+           +-----------+           +-----------+
      
      





:







 +-------------+     +---------+   WebSocket   +-----------+
 |   Storage   |     |   API * | +-----------► |  Browser  |
 +-------------+     +---------+         (3)   +-----------+
        +             (2) ▲
        |                 |
    (1) ▼                 +     
+---------------------------------+                                 
|               Bus               |
+---------------------------------+
      
      





, . API Storage ( ).







— . WebSocket- API, Storage. API Bus ( ; , ). Storage Bus (1), Bus — (2). API , , (3).







, API, WebSocket-. , , 3 . .







2. Idiomatic way



, , Go, .







net/http



, . , WebSocket (, json-), . Channel



, WebSocket-.







2.1. Channel struct



// Packet represents application level data.
type Packet struct {
    ...
}

// Channel wraps user connection.
type Channel struct {
    conn net.Conn    // WebSocket connection.
    send chan Packet // Outgoing packets queue.
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}
      
      





. , Go 2 8 . , (3 ), 24 ( 4 ). , Channel



, ch.send



.







2.2. I/O



«» :







func (c *Channel) reader() {
    // We make buffered read to reduce read syscalls.
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}
      
      





, ? , syscall’ , buf



. . : .







, , . buf



: 4 12 . «»:







func (c *Channel) writer() {
    // We make buffered write to reduce write syscalls. 
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}
      
      





c.send



. , , 4 12 3 .







2.3. HTTP



Channel



, WebSocket-, . Idiomatic way, .







, WebSocket, , WebSocket HTTP, Upgrade. Upgrade- TCP- WebSocket-.



.


import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    //...
})
      
      





, http.ResponseWriter



bufio.Writer



4 , *http.Request



bufio.Reader



4 .







WebSocket Upgrade- I/O TCP- responseWriter.Hijack()



.







Hint: go:linkname



net/http



net/http.putBufio{Reader,Writer}



.

, 24 3 .







72 , !







3.



, , , . WebSocket — . . . ( ping/pong



) .







.

, Channel.reader()



Channel.writer()



. I/O, 4 .







, , ?







3.1. netpoll



Channel.reader()



, , conn.Read()



bufio.Reader



? runtime go «» . . , runtime go , «».







conn.Read()



, , net.netFD.Read()



:







// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
    //...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        //...
        break
    }
    //...
}
      
      





go . EAGAIN , , , , .

, read()



. EAGAIN



, runtime pollDesc.waitRead()



:







// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
   return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   //...
}
      
      





, , Linux netpoll



epoll



. ? , : .







github.com/golang/go issue netpoll.


3.2.



, netpoll Go. Channel.reader()



, «» :







ch := NewChannel(conn)

// Make conn to be observed by netpoll instance.
// Note that EventRead is identical to EPOLLIN on Linux.
poller.Start(conn, netpoll.EventRead, func() {
    // We spawn goroutine here to prevent poller wait loop
    // to become locked during receiving packet from ch.
    go Receive(ch)
})

// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}
      
      





Channel.writer()



— , :







func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}
      
      





ch.send



( ) writer .







! 48 — I/O «» .







3.3.



— . race condition’ deadlock’, self-DDoS — , .







, - ping/pong



, idle- (, , ), , , , N .







, , (, nginx) .







, , , 48 — , .







3.3.1 Goroutine pool



. :







package gpool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

func (p *Pool) Schedule(task func()) error {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}
      
      





netpoll :







pool := gpool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
    // We will block poller wait loop when
    // all pool workers are busy.
    pool.Schedule(func() {
        Receive(ch)
    })
})
      
      





, .







Send()



:







pool := gpool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}
      
      





go ch.writer()



. , N



, N N + 1



N + 1



. Accept()



Upgrade()



DDoS.







3.4. Zero-copy upgrade



WebSocket. , WebSocket HTTP- Upgrade. :







GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
      
      





HTTP- , WebSocket. , , http.Request



, , , , HTTP- net/http



.







http.Request



, , Header



, . , , Cookie



.

?







3.4.1. WebSocket



, upgrade net/http



-. , ( ) , . , API WebSocket. , :







func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
      
      





API, ( ):







// getReadBuf, putReadBuf are intended to 
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _ := ReadFrame(buf)
    parsePacket(frame.Payload)
    //...
}
      
      





, .







3.4.2. github.com/gobwas/ws



ws



, . io.Reader



io.Writer



, , I/O.







upgrade- net/http



, ws



zero-copy upgrade — upgrade- WebSocket . ws.Upgrade()



io.ReadWriter



(net.Conn



) — . . net.Listen()



ln.Accept()



ws.Upgrade()



. (, Cookie



).







upgrade-: net/http



- net.Listen()



zero-copy upgrade:







BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op
      
      





ws



zero-copy upgrade 24 — , I/O net/http



.







3.5.



, .









:







import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
    // Try to accept incoming connection inside free pool worker.
    // If there no free workers for 1ms, do not accept anything and try later.
    // This will help us to prevent many self-ddos or out of resource limit cases.
    err := pool.ScheduleTimeout(time.Millisecond, func() {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // Wrap WebSocket connection with our Channel struct.
        // This will help us to handle/send our app's packets.
        ch := NewChannel(conn)

        // Wait for incoming bytes from connection.
        poller.Start(conn, netpoll.EventRead, func() {
            // Do not cross the resource limits.
            pool.Schedule(func() {
                // Read and handle incoming packet(s).
                ch.Recevie()
            })
        })
    })
    if err != nil {   
        time.Sleep(time.Millisecond)
    }
}
      
      





4.



Premature optimization is the root of all evil (or at least most of it) in programming. Donald Knuth

, . , (, CPU) , , . , , , .







!







5.






All Articles