Skip to content

Commit

Permalink
feat: 新增ReadMessageAsync接口,进行不阻塞读取消息
Browse files Browse the repository at this point in the history
  • Loading branch information
Diablo.luo committed May 12, 2021
1 parent e8629af commit 217e6eb
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,28 +967,12 @@ func (c *Conn) handleProtocolError(message string) error {
// permanent. Once this method returns a non-nil error, all subsequent calls to
// this method return the same error.
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
// Close previous reader, only relevant for decompression.
if c.reader != nil {
c.reader.Close()
c.reader = nil
}

c.messageReader = nil
c.readLength = 0

for c.readErr == nil {
frameType, err := c.advanceFrame()
frameType, _, err := c.nextReader()
if err != nil {
c.readErr = hideTempErr(err)
break
}

if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
}
Expand All @@ -1004,6 +988,29 @@ func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
return noFrame, nil, c.readErr
}

func (c *Conn) nextReader() (messageType int, r io.Reader, err error) {
if c.reader != nil {
c.reader.Close()
c.reader = nil
}

c.messageReader = nil
c.readLength = 0

frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
} else if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
return noFrame, nil, c.readErr
}

type messageReader struct{ c *Conn }

func (r *messageReader) Read(b []byte) (int, error) {
Expand Down Expand Up @@ -1069,6 +1076,18 @@ func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
return messageType, p, err
}

// ReadMessage is a helper method for getting a reader using NextReader and
// reading from that reader to a buffer.
func (c *Conn) ReadMessageAsync() (messageType int, p []byte, err error) {
var r io.Reader
messageType, r, err = c.nextReader()
if err != nil {
return messageType, nil, err
}
p, err = ioutil.ReadAll(r)
return messageType, p, err
}

// SetReadDeadline sets the read deadline on the underlying network connection.
// After a read has timed out, the websocket connection state is corrupt and
// all future reads will return an error. A zero value for t means reads will
Expand Down

0 comments on commit 217e6eb

Please sign in to comment.