! , Mail.Ru.
, WebSocket- Go.
WebSocket , Go — , , .
1.
, , .
Mail.Ru , . , . — — . (polling), — — .
, , — . Polling — 50 HTTP- , 60% 304, .
, , publisher-subscriber ( bus, message-broker event-channel), , , , — .
:
+-----------+ +-----------+ +-----------+ | | ◄-------+ | | ◄-------+ | | | Storage | | API | HTTP | Browser | | | +-------► | | +-------► | | +-----------+ +-----------+ +-----------+
:
+-------------+ +---------+ WebSocket +-----------+ | Storage | | API * | +-----------► | Browser | +-------------+ +---------+ (3) +-----------+ + (2) ▲ | | (1) ▼ + +---------------------------------+ | Bus | +---------------------------------+
, . API Storage ( ).
— . WebSocket- API, Storage. API Bus ( ; , ). Storage Bus (1), Bus — (2). API , , (3).
, API, WebSocket-. , , 3 . .
2. Idiomatic way
, , Go, .
net/http
, . , WebSocket (, json-), . Channel
, WebSocket-.
2.1. Channel struct
// Packet represents application level data.
type Packet struct {
...
}
// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
. , Go 2 8 . , (3 ), 24 ( 4 ). , Channel
, ch.send
.
2.2. I/O
«» :
func (c *Channel) reader() {
// We make buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
, ? , syscall’ , buf
. . : .
, , . buf
: 4 12 . «»:
func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
c.send
. , , 4 12 3 .
2.3. HTTP
Channel
, WebSocket-, . Idiomatic way, .
, WebSocket, , WebSocket HTTP, Upgrade. Upgrade- TCP- WebSocket-.
.
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})
, http.ResponseWriter
bufio.Writer
4 , *http.Request
bufio.Reader
4 .
WebSocket Upgrade- I/O TCP- responseWriter.Hijack()
.
Hint:go:linkname
net/http
net/http.putBufio{Reader,Writer}
.
, 24 3 .
72 , !
3.
, , , . WebSocket — . . . ( ping/pong
) .
.
, Channel.reader()
Channel.writer()
. I/O, 4 .
, , ?
3.1. netpoll
Channel.reader()
, , conn.Read()
bufio.Reader
? runtime go «» . . , runtime go , «».
conn.Read()
, , net.netFD.Read()
:
// net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}
go . EAGAIN , , , , .
, read()
. EAGAIN
, runtime pollDesc.waitRead()
:
// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}
, , Linux netpoll
epoll
. ? , : .
github.com/golang/go issue netpoll.
3.2.
, netpoll Go. Channel.reader()
, «» :
ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
// Note that EventRead is identical to EPOLLIN on Linux.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
Channel.writer()
— , :
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
ch.send
( ) writer .
! 48 — I/O «» .
3.3.
— . race condition’ deadlock’, self-DDoS — , .
, - ping/pong
, idle- (, , ), , , , N .
, , (, nginx) .
, , , 48 — , .
3.3.1 Goroutine pool
. :
package gpool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
netpoll :
pool := gpool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})
, .
Send()
:
pool := gpool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
go ch.writer()
. , N
, N N + 1
N + 1
. Accept()
Upgrade()
DDoS.
3.4. Zero-copy upgrade
WebSocket. , WebSocket HTTP- Upgrade. :
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
HTTP- , WebSocket. , , http.Request
, , , , HTTP- net/http
.
http.Request
, ,Header
, . , ,Cookie
.
?
3.4.1. WebSocket
, upgrade net/http
-. , ( ) , . , API WebSocket. , :
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
API, ( ):
// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}
, .
3.4.2. github.com/gobwas/ws
ws
, . io.Reader
io.Writer
, , I/O.
upgrade- net/http
, ws
zero-copy upgrade — upgrade- WebSocket . ws.Upgrade()
io.ReadWriter
(net.Conn
) — . . net.Listen()
ln.Accept()
ws.Upgrade()
. (, Cookie
).
upgrade-: net/http
- net.Listen()
zero-copy upgrade:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
ws
zero-copy upgrade 24 — , I/O net/http
.
3.5.
, .
- — .
: netpoll (epoll, kqueue); . - — .
: , ; . - netpoll .
: . net/http
Upgrade WebSocket.
: zero-copy upgrade «» TCP-.
:
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
4.
Premature optimization is the root of all evil (or at least most of it) in programming. Donald Knuth
, . , (, CPU) , , . , , , .
!