原理
正常情况下,不使用epoll,每建立一个ws连接我们都会启一个goroutine来处理客户端发来的消息,如果客户端没有消息,这个goroutine就会阻塞在读消息的状态。有多少个连接就会有多少个goroutine,虽然goroutine比较轻量,但在大量连接的情况下还是比较耗费资源还是很大的。 使用epoll后会把所有ws连接的fd(文件描述符)放入一个epoll队列中,只启一个goroutine对epoll wait进行轮询获取缓冲区有内容的ws连接。这样就把之前要起n个goroutine变成了1个,随着连接数的增加性能的提升也会越来越明显。

实践
具体github demo源码 ,参考1m-go-websockets ,原仓库只有linux下的实现,因为平时主要使用mac开发所以参考Linux的实现增加了mac下的实现。
demo中用到了两种websocket框架,分别是gorilla/websocket 和 gobwas/ws。实际应用中发现了一个问题,因为平时主要使用gorilla/websocket,并且它的start数也远超gobwas/ws,所以在引入epoll时首选还是gorilla/websocket,然后开发调试过程中客户端一开始并发发msg给服务端,服务端就会有消息漏掉没处理的情况。
举个例子,客户端多个消息并发通知到服务端,客户端发第三次消息了,服务端才可能处理第二次消息。很明显并不是每次消息都触发了epoll的回调事件,这里epoll使用的水平触发,根据水平触发的定义:当被监控的文件描述符上有可读写事件发生时,就会一直触发epoll回调,这里没有触发说明文件描述上没有可读事件,基本上可以猜测epoll还没来的及通知时,fd对应的内容已经被读取完毕了。
理想中的触发流程:
实际触发流程:
看到这里可能会有同学想问,既然msg2没有处理,那我们主动处理,不等epoll触发可不可以呢?我们看下gorilla/websocket读取消息的源码
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("Failed to epoll wait %v", err)
continue
}
time.Sleep(5)
for _, conn := range connections {
if conn == nil {
break
}
_, msg, err := conn.ReadMessage()
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("Failed to remove %v", err)
}
conn.Close()
} else {
log.Printf("msg: %s", string(msg))
}
}
}
}
从epoll wait拿到可读连接,到底下的读是一一对应的,我们如果想把conn 缓冲区处理完怎么办,只能接着循环 conn.ReadMessage() 但是这个函数并不会返回缓冲区是否处理完毕,当缓冲区和fd都没有东西读时它就被阻塞了,就是下面代码TODO部分无法实现,如果能够实现,epoll 就可以使用边沿触发的模式,理论上相比水平触发更高效。
func Start() {
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("Failed to epoll wait %v", err)
continue
}
time.Sleep(5)
wg := &sync.WatiGroup{}
for _, conn := range connections {
if conn == nil {
break
}
wg.Add(1)
go func() {
defer wg.done
for {
_, msg, err := conn.ReadMessage()
if err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("Failed to remove %v", err)
}
conn.Close()
} else {
log.Printf("msg: %s", string(msg))
}
// TODO 没有内容可读时退出,等待下一次epoll触发
}
}()
}
wg.Wait()
}
}
源码分析
为什么gorilla/websocket在这种场景下使用会有这种问题,而gobwas/ws没有?下面我们通过分析两者的读实现找下原因。
gorilla/websocket 读实现
从 conn.ReadMessage() 点进去,可以看到这函数调了c.NextReader
func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
var r io.Reader
messageType, r, err = c.NextReader()
if err != nil {
return messageType, nil, err
}
p, err = ioutil.ReadAll(r)
return messageType, p, err
}
再点进c.NextReader可以看到实际返回要读的对象是messageReader
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
for c.readErr == nil {
frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
break
}
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
// 返回messageReader
return frameType, c.reader, nil
}
}
}
接着看messageReader的实现, 通过看messageReader实现的Read方法发现,实际读取的内容来自c.br
func (r *messageReader) Read(b []byte) (int, error) {
for c.readErr == nil {
if c.readRemaining > 0 {
if int64(len(b)) > c.readRemaining {
b = b[:c.readRemaining]
}
// 实际从c.br读取的内容
n, err := c.br.Read(b)
c.readErr = hideTempErr(err)
if c.isServer {
c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
}
rem := c.readRemaining
rem -= int64(n)
c.setReadRemaining(rem)
if c.readRemaining > 0 && c.readErr == io.EOF {
c.readErr = errUnexpectedEOF
}
return n, c.readErr
}
}
c.br是go自身提供的缓冲读io的对象,看到这里已经很明朗了,就是这里在读的时候,把fd里要读的内容读到了本地缓冲区,导致上图msg2的时候epoll没有再次触发。
type Conn struct {
conn net.Conn
// Read fields
reader io.ReadCloser // the current reader returned to the application
readErr error
br *bufio.Reader
}
上面还有一个用到比较多的函数 c.advanceFrame(),这个函数是用来处理每次要读多少字节的,因为websocket消息是一帧一帧的,读的时候也不能读多或者读少。
gobwas/ws读实现
gobwas/ws 是用的utils包里的ReadClientData方法读取client发的消息的,最终是调的readData方法
func ReadClientData(rw io.ReadWriter) ([]byte, ws.OpCode, error) {
return ReadData(rw, ws.StateServerSide)
}
func ReadData(rw io.ReadWriter, s ws.State) ([]byte, ws.OpCode, error) {
return readData(rw, s, ws.OpText|ws.OpBinary)
}
readData具体实现,这里根据net.Conn连接创建了一个Reader
func readData(rw io.ReadWriter, s ws.State, want ws.OpCode) ([]byte, ws.OpCode, error) {
controlHandler := ControlFrameHandler(rw, s)
// rw 就是 net.Conn
rd := Reader{
Source: rw,
State: s,
CheckUTF8: true,
SkipHeaderCheck: false,
OnIntermediate: controlHandler,
}
for {
hdr, err := rd.NextFrame()
if err != nil {
return nil, 0, err
}
if hdr.OpCode.IsControl() {
if err := controlHandler(hdr, &rd); err != nil {
return nil, 0, err
}
continue
}
if hdr.OpCode&want == 0 {
if err := rd.Discard(); err != nil {
return nil, 0, err
}
continue
}
bts, err := ioutil.ReadAll(&rd)
return bts, hdr.OpCode, err
}
}
Reader 结构, 可以看到这个是gobwas/ws自己实现的一个无buf Reader
type Reader struct {
Source io.Reader
State ws.State
// SkipHeaderCheck disables checking header bits to be RFC6455 compliant.
SkipHeaderCheck bool
// CheckUTF8 enables UTF-8 checks for text frames payload. If incoming
// bytes are not valid UTF-8 sequence, ErrInvalidUTF8 returned.
CheckUTF8 bool
// TODO(gobwas): add max frame size limit here.
OnContinuation FrameHandlerFunc
OnIntermediate FrameHandlerFunc
opCode ws.OpCode // Used to store message op code on fragmentation.
frame io.Reader // Used to as frame reader.
raw io.LimitedReader // Used to discard frames without cipher.
utf8 UTF8Reader // Used to check UTF8 sequences if CheckUTF8 is true.
}
具体怎么读,读多少字节是在rd.NextFrame()方法中实现的,这里可以看到gobwas/ws每次都是直接读net.Conn也就是fd,并且每次当前帧的内容,所以刚好对应上图理想中的触发流程。
func (r *Reader) NextFrame() (hdr ws.Header, err error) {
// 解析当前帧报文
hdr, err = ws.ReadHeader(r.Source)
if err == io.EOF && r.fragmented() {
err = io.ErrUnexpectedEOF
}
if err == nil && !r.SkipHeaderCheck {
err = ws.CheckHeader(hdr, r.State)
}
if err != nil {
return hdr, err
}
// 设置读取长度, 这里r.Source 就是net.Conn
r.raw = io.LimitedReader{r.Source, hdr.Length}
frame := io.Reader(&r.raw)
if hdr.Masked {
frame = NewCipherReader(frame, hdr.Mask)
}
if r.fragmented() {
if hdr.OpCode.IsControl() {
if cb := r.OnIntermediate; cb != nil {
err = cb(hdr, frame)
}
if err == nil {
// Ensure that src is empty.
// 直接从net.Conn读取hdr.Length长度的内容
_, err = io.Copy(ioutil.Discard, &r.raw)
}
return
}
} else {
r.opCode = hdr.OpCode
}
if r.CheckUTF8 && (hdr.OpCode == ws.OpText || (r.fragmented() && r.opCode == ws.OpText)) {
r.utf8.Source = frame
frame = &r.utf8
}
// Save reader with ciphering and other streaming checks.
r.frame = frame
if hdr.OpCode == ws.OpContinuation {
if cb := r.OnContinuation; cb != nil {
err = cb(hdr, frame)
}
}
if hdr.Fin {
r.State = r.State.Clear(ws.StateFragmented)
} else {
r.State = r.State.Set(ws.StateFragmented)
}
return
}
两种方式的读比较
gorilla/websocket加了buf读的方式理论上比gobwas/ws性能更高,如果能提供个buf和fd已读完读返回就好了,这样就可以直接使用epoll边沿触发模式。如果实在想用gorilla/websocket,这里也有一个解决方案,就是客户端加一个ping的控制帧消息,gorilla/websocket在接收到ping/pong之类的控制帧消息时,不会把这些消息暴漏给用户处理,而是在内部自己处理,处理完之后会接着处理剩下的消息,直到处理的消息不是ping/pong为止,这样就可以使用ping控制型帧强制触发epoll事件,保证业务消息能够及时处理。