原理

正常情况下,不使用epoll,每建立一个ws连接我们都会启一个goroutine来处理客户端发来的消息,如果客户端没有消息,这个goroutine就会阻塞在读消息的状态。有多少个连接就会有多少个goroutine,虽然goroutine比较轻量,但在大量连接的情况下还是比较耗费资源还是很大的。 使用epoll后会把所有ws连接的fd(文件描述符)放入一个epoll队列中,只启一个goroutine对epoll wait进行轮询获取缓冲区有内容的ws连接。这样就把之前要起n个goroutine变成了1个,随着连接数的增加性能的提升也会越来越明显。

实践

具体github demo源码 ,参考1m-go-websockets ,原仓库只有linux下的实现,因为平时主要使用mac开发所以参考Linux的实现增加了mac下的实现。 demo中用到了两种websocket框架,分别是gorilla/websocketgobwas/ws。实际应用中发现了一个问题,因为平时主要使用gorilla/websocket,并且它的start数也远超gobwas/ws,所以在引入epoll时首选还是gorilla/websocket,然后开发调试过程中客户端一开始并发发msg给服务端,服务端就会有消息漏掉没处理的情况。 举个例子,客户端多个消息并发通知到服务端,客户端发第三次消息了,服务端才可能处理第二次消息。很明显并不是每次消息都触发了epoll的回调事件,这里epoll使用的水平触发,根据水平触发的定义:当被监控的文件描述符上有可读写事件发生时,就会一直触发epoll回调,这里没有触发说明文件描述上没有可读事件,基本上可以猜测epoll还没来的及通知时,fd对应的内容已经被读取完毕了。 理想中的触发流程: 实际触发流程: 看到这里可能会有同学想问,既然msg2没有处理,那我们主动处理,不等epoll触发可不可以呢?我们看下gorilla/websocket读取消息的源码

	for {
		connections, err := epoller.Wait()
		if err != nil {
			log.Printf("Failed to epoll wait %v", err)
			continue
		}
		time.Sleep(5)
		for _, conn := range connections {
			if conn == nil {
				break
			}
			_, msg, err := conn.ReadMessage()
			if err != nil {
				if err := epoller.Remove(conn); err != nil {
					log.Printf("Failed to remove %v", err)
				}
				conn.Close()
			} else {
				log.Printf("msg: %s", string(msg))
			}
		}
	}
}

从epoll wait拿到可读连接,到底下的读是一一对应的,我们如果想把conn 缓冲区处理完怎么办,只能接着循环 conn.ReadMessage() 但是这个函数并不会返回缓冲区是否处理完毕,当缓冲区和fd都没有东西读时它就被阻塞了,就是下面代码TODO部分无法实现,如果能够实现,epoll 就可以使用边沿触发的模式,理论上相比水平触发更高效。

func Start() {
	for {
		connections, err := epoller.Wait()
		if err != nil {
			log.Printf("Failed to epoll wait %v", err)
			continue
		}
		time.Sleep(5)
        wg := &sync.WatiGroup{}
		for _, conn := range connections {
			if conn == nil {
				break
			}
            wg.Add(1)
			go func() {
                defer wg.done
				for {
					_, msg, err := conn.ReadMessage()
					if err != nil {
						if err := epoller.Remove(conn); err != nil {
							log.Printf("Failed to remove %v", err)
						}
						conn.Close()
					} else {
						log.Printf("msg: %s", string(msg))
					}
					// TODO 没有内容可读时退出,等待下一次epoll触发
				}
			}()
		}
        wg.Wait()
	}
}

源码分析

为什么gorilla/websocket在这种场景下使用会有这种问题,而gobwas/ws没有?下面我们通过分析两者的读实现找下原因。

gorilla/websocket 读实现

conn.ReadMessage() 点进去,可以看到这函数调了c.NextReader

func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
	var r io.Reader
	messageType, r, err = c.NextReader()
	if err != nil {
		return messageType, nil, err
	}
	p, err = ioutil.ReadAll(r)
	return messageType, p, err
}

再点进c.NextReader可以看到实际返回要读的对象是messageReader

func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
	for c.readErr == nil {
		frameType, err := c.advanceFrame()
		if err != nil {
			c.readErr = hideTempErr(err)
			break
		}

		if frameType == TextMessage || frameType == BinaryMessage {
			c.messageReader = &messageReader{c}
			c.reader = c.messageReader
			if c.readDecompress {
				c.reader = c.newDecompressionReader(c.reader)
			}
            // 返回messageReader
			return frameType, c.reader, nil
		}
	}
}

接着看messageReader的实现, 通过看messageReader实现的Read方法发现,实际读取的内容来自c.br

func (r *messageReader) Read(b []byte) (int, error) {
	for c.readErr == nil {
		if c.readRemaining > 0 {
			if int64(len(b)) > c.readRemaining {
				b = b[:c.readRemaining]
			}
            // 实际从c.br读取的内容
			n, err := c.br.Read(b)
			c.readErr = hideTempErr(err)
			if c.isServer {
				c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
			}
			rem := c.readRemaining
			rem -= int64(n)
			c.setReadRemaining(rem)
			if c.readRemaining > 0 && c.readErr == io.EOF {
				c.readErr = errUnexpectedEOF
			}
			return n, c.readErr
		}
}

c.br是go自身提供的缓冲读io的对象,看到这里已经很明朗了,就是这里在读的时候,把fd里要读的内容读到了本地缓冲区,导致上图msg2的时候epoll没有再次触发。

type Conn struct {
	conn        net.Conn
	// Read fields
	reader  io.ReadCloser // the current reader returned to the application
	readErr error
	br      *bufio.Reader
}

上面还有一个用到比较多的函数 c.advanceFrame(),这个函数是用来处理每次要读多少字节的,因为websocket消息是一帧一帧的,读的时候也不能读多或者读少。

gobwas/ws读实现

gobwas/ws 是用的utils包里的ReadClientData方法读取client发的消息的,最终是调的readData方法

func ReadClientData(rw io.ReadWriter) ([]byte, ws.OpCode, error) {
	return ReadData(rw, ws.StateServerSide)
}

func ReadData(rw io.ReadWriter, s ws.State) ([]byte, ws.OpCode, error) {
	return readData(rw, s, ws.OpText|ws.OpBinary)
}

readData具体实现,这里根据net.Conn连接创建了一个Reader

func readData(rw io.ReadWriter, s ws.State, want ws.OpCode) ([]byte, ws.OpCode, error) {
	controlHandler := ControlFrameHandler(rw, s)
    // rw 就是 net.Conn
	rd := Reader{
		Source:          rw,
		State:           s,
		CheckUTF8:       true,
		SkipHeaderCheck: false,
		OnIntermediate:  controlHandler,
	}
	for {
		hdr, err := rd.NextFrame()
		if err != nil {
			return nil, 0, err
		}
		if hdr.OpCode.IsControl() {
			if err := controlHandler(hdr, &rd); err != nil {
				return nil, 0, err
			}
			continue
		}
		if hdr.OpCode&want == 0 {
			if err := rd.Discard(); err != nil {
				return nil, 0, err
			}
			continue
		}

		bts, err := ioutil.ReadAll(&rd)

		return bts, hdr.OpCode, err
	}
}

Reader 结构, 可以看到这个是gobwas/ws自己实现的一个无buf Reader

type Reader struct {
	Source io.Reader
	State  ws.State

	// SkipHeaderCheck disables checking header bits to be RFC6455 compliant.
	SkipHeaderCheck bool

	// CheckUTF8 enables UTF-8 checks for text frames payload. If incoming
	// bytes are not valid UTF-8 sequence, ErrInvalidUTF8 returned.
	CheckUTF8 bool

	// TODO(gobwas): add max frame size limit here.

	OnContinuation FrameHandlerFunc
	OnIntermediate FrameHandlerFunc

	opCode ws.OpCode        // Used to store message op code on fragmentation.
	frame  io.Reader        // Used to as frame reader.
	raw    io.LimitedReader // Used to discard frames without cipher.
	utf8   UTF8Reader       // Used to check UTF8 sequences if CheckUTF8 is true.
}

具体怎么读,读多少字节是在rd.NextFrame()方法中实现的,这里可以看到gobwas/ws每次都是直接读net.Conn也就是fd,并且每次当前帧的内容,所以刚好对应上图理想中的触发流程。

func (r *Reader) NextFrame() (hdr ws.Header, err error) {
    // 解析当前帧报文
	hdr, err = ws.ReadHeader(r.Source)
	if err == io.EOF && r.fragmented() {
		err = io.ErrUnexpectedEOF
	}
	if err == nil && !r.SkipHeaderCheck {
		err = ws.CheckHeader(hdr, r.State)
	}
	if err != nil {
		return hdr, err
	}

	// 设置读取长度, 这里r.Source 就是net.Conn
	r.raw = io.LimitedReader{r.Source, hdr.Length}

	frame := io.Reader(&r.raw)
	if hdr.Masked {
		frame = NewCipherReader(frame, hdr.Mask)
	}
	if r.fragmented() {
		if hdr.OpCode.IsControl() {
			if cb := r.OnIntermediate; cb != nil {
				err = cb(hdr, frame)
			}
			if err == nil {
				// Ensure that src is empty.
                // 直接从net.Conn读取hdr.Length长度的内容
				_, err = io.Copy(ioutil.Discard, &r.raw)
			}
			return
		}
	} else {
		r.opCode = hdr.OpCode
	}
	if r.CheckUTF8 && (hdr.OpCode == ws.OpText || (r.fragmented() && r.opCode == ws.OpText)) {
		r.utf8.Source = frame
		frame = &r.utf8
	}

	// Save reader with ciphering and other streaming checks.
	r.frame = frame

	if hdr.OpCode == ws.OpContinuation {
		if cb := r.OnContinuation; cb != nil {
			err = cb(hdr, frame)
		}
	}

	if hdr.Fin {
		r.State = r.State.Clear(ws.StateFragmented)
	} else {
		r.State = r.State.Set(ws.StateFragmented)
	}

	return
}

两种方式的读比较

gorilla/websocket加了buf读的方式理论上比gobwas/ws性能更高,如果能提供个buf和fd已读完读返回就好了,这样就可以直接使用epoll边沿触发模式。如果实在想用gorilla/websocket,这里也有一个解决方案,就是客户端加一个ping的控制帧消息,gorilla/websocket在接收到ping/pong之类的控制帧消息时,不会把这些消息暴漏给用户处理,而是在内部自己处理,处理完之后会接着处理剩下的消息,直到处理的消息不是ping/pong为止,这样就可以使用ping控制型帧强制触发epoll事件,保证业务消息能够及时处理。

参考以及源码