From 217e6eb5471f9a70ea30ebd8b44dc4b60d5804d5 Mon Sep 17 00:00:00 2001 From: "Diablo.luo" Date: Wed, 12 May 2021 14:13:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9EReadMessageAsync?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3,=E8=BF=9B=E8=A1=8C=E4=B8=8D=E9=98=BB?= =?UTF-8?q?=E5=A1=9E=E8=AF=BB=E5=8F=96=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conn.go | 53 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/conn.go b/conn.go index ca46d2f79..5d949376a 100644 --- a/conn.go +++ b/conn.go @@ -967,28 +967,12 @@ func (c *Conn) handleProtocolError(message string) error { // permanent. Once this method returns a non-nil error, all subsequent calls to // this method return the same error. func (c *Conn) NextReader() (messageType int, r io.Reader, err error) { - // Close previous reader, only relevant for decompression. - if c.reader != nil { - c.reader.Close() - c.reader = nil - } - - c.messageReader = nil - c.readLength = 0 - for c.readErr == nil { - frameType, err := c.advanceFrame() + frameType, _, err := c.nextReader() if err != nil { - c.readErr = hideTempErr(err) break } - if frameType == TextMessage || frameType == BinaryMessage { - c.messageReader = &messageReader{c} - c.reader = c.messageReader - if c.readDecompress { - c.reader = c.newDecompressionReader(c.reader) - } return frameType, c.reader, nil } } @@ -1004,6 +988,29 @@ func (c *Conn) NextReader() (messageType int, r io.Reader, err error) { return noFrame, nil, c.readErr } +func (c *Conn) nextReader() (messageType int, r io.Reader, err error) { + if c.reader != nil { + c.reader.Close() + c.reader = nil + } + + c.messageReader = nil + c.readLength = 0 + + frameType, err := c.advanceFrame() + if err != nil { + c.readErr = hideTempErr(err) + } else if frameType == TextMessage || frameType == BinaryMessage { + c.messageReader = &messageReader{c} + c.reader = c.messageReader + if c.readDecompress { + c.reader = c.newDecompressionReader(c.reader) + } + return frameType, c.reader, nil + } + return noFrame, nil, c.readErr +} + type messageReader struct{ c *Conn } func (r *messageReader) Read(b []byte) (int, error) { @@ -1069,6 +1076,18 @@ func (c *Conn) ReadMessage() (messageType int, p []byte, err error) { return messageType, p, err } +// ReadMessage is a helper method for getting a reader using NextReader and +// reading from that reader to a buffer. +func (c *Conn) ReadMessageAsync() (messageType int, p []byte, err error) { + var r io.Reader + messageType, r, err = c.nextReader() + if err != nil { + return messageType, nil, err + } + p, err = ioutil.ReadAll(r) + return messageType, p, err +} + // SetReadDeadline sets the read deadline on the underlying network connection. // After a read has timed out, the websocket connection state is corrupt and // all future reads will return an error. A zero value for t means reads will