diff --git a/README.md b/README.md index 244d0269..45f36267 100755 --- a/README.md +++ b/README.md @@ -102,8 +102,9 @@ ok github.com/lxzan/gws 17.231s - [x] Broadcast - [x] Dial via Proxy - [x] Context-Takeover -- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/) - [x] Concurrent & Asynchronous Non-Blocking Write +- [x] Segmented Writing of Large Files +- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/) ### Attention diff --git a/README_CN.md b/README_CN.md index 4a4c871d..e05395fd 100755 --- a/README_CN.md +++ b/README_CN.md @@ -92,6 +92,7 @@ ok github.com/lxzan/gws 17.231s - [x] 广播 - [x] 代理拨号 - [x] 上下文接管 +- [x] 大文件分段写入 - [x] 支持并发和异步非阻塞写入 - [x] 通过所有 Autobahn 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/) diff --git a/benchmark_test.go b/benchmark_test.go index f2ed3504..bd098e8f 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -68,7 +68,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) { conn: &benchConn{}, config: upgrader.option.getConfig(), } - var buf, _ = conn1.genFrame(OpcodeText, true, false, internal.Bytes(githubData), false) + var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{ + fin: true, + compress: conn1.pd.Enabled, + broadcast: false, + checkEncoding: false, + }) var reader = bytes.NewBuffer(buf.Bytes()) var conn2 = &Conn{ @@ -98,7 +103,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) { deflater: new(deflater), } conn1.deflater.initialize(false, conn1.pd, config.ReadMaxPayloadSize) - var buf, _ = conn1.genFrame(OpcodeText, true, true, internal.Bytes(githubData), false) + var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{ + fin: true, + compress: conn1.pd.Enabled, + broadcast: false, + checkEncoding: false, + }) var reader = bytes.NewBuffer(buf.Bytes()) var conn2 = &Conn{ diff --git a/bigfile.go b/bigfile.go index 64bd945f..d3b387c4 100644 --- a/bigfile.go +++ b/bigfile.go @@ -4,23 +4,26 @@ import ( "bytes" "encoding/binary" "errors" - "github.com/klauspost/compress/flate" - "github.com/lxzan/gws/internal" "io" "math" + + "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) const segmentSize = 128 * 1024 // 获取大文件压缩器 +// Get bigDeflater func (c *Conn) getBigDeflater() *bigDeflater { if c.isServer { return c.config.bdPool.Get() } - return c.deflater.ToBigDeflater() + return (*bigDeflater)(c.deflater.cpsWriter) } // 回收大文件压缩器 +// Recycle bigDeflater func (c *Conn) putBigDeflater(d *bigDeflater) { if c.isServer { c.config.bdPool.Put(d) @@ -28,8 +31,11 @@ func (c *Conn) putBigDeflater(d *bigDeflater) { } // 拆分io.Reader为小切片 +// Split io.Reader into small slices func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) error) error { var buf = binaryPool.Get(segmentSize) + defer binaryPool.Put(buf) + var p = buf.Bytes()[:segmentSize] var n, index = 0, 0 var err error @@ -46,21 +52,29 @@ func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) er return err } -// WriteReader 大文件写入 -// 采用分段写入技术, 大大减少内存占用 -func (c *Conn) WriteReader(opcode Opcode, payload io.Reader) error { - err := c.doWriteReader(opcode, payload) +// WriteFile 大文件写入 +// 采用分段写入技术, 减少写入过程中的内存占用 +// Segmented write technology to reduce memory usage during write process +func (c *Conn) WriteFile(opcode Opcode, payload io.Reader) error { + err := c.doWriteFile(opcode, payload) c.emitError(err) return err } -func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error { +func (c *Conn) doWriteFile(opcode Opcode, payload io.Reader) error { c.mu.Lock() defer c.mu.Unlock() var cb = func(index int, eof bool, p []byte) error { - op := internal.SelectValue(index == 0, opcode, OpcodeContinuation) - frame, err := c.genFrame(op, eof, false, internal.Bytes(p), false) + if index > 0 { + opcode = OpcodeContinuation + } + frame, err := c.genFrame(opcode, internal.Bytes(p), frameConfig{ + fin: eof, + compress: false, + broadcast: false, + checkEncoding: false, + }) if err != nil { return err } @@ -87,25 +101,26 @@ func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error { } // 大文件压缩器 -type bigDeflater struct { - cpsWriter *flate.Writer -} +type bigDeflater flate.Writer -// 初始化大文件压缩器 -// Initialize the bigDeflater -func (c *bigDeflater) initialize(isServer bool, options PermessageDeflate) *bigDeflater { +// 创建大文件压缩器 +// Create a bigDeflater +func newBigDeflater(isServer bool, options PermessageDeflate) *bigDeflater { windowBits := internal.SelectValue(isServer, options.ServerMaxWindowBits, options.ClientMaxWindowBits) if windowBits == 15 { - c.cpsWriter, _ = flate.NewWriter(nil, options.Level) + cpsWriter, _ := flate.NewWriter(nil, options.Level) + return (*bigDeflater)(cpsWriter) } else { - c.cpsWriter, _ = flate.NewWriterWindow(nil, internal.BinaryPow(windowBits)) + cpsWriter, _ := flate.NewWriterWindow(nil, internal.BinaryPow(windowBits)) + return (*bigDeflater)(cpsWriter) } - return c } +func (c *bigDeflater) FlateWriter() *flate.Writer { return (*flate.Writer)(c) } + // Compress 压缩 func (c *bigDeflater) Compress(src io.Reader, dst *flateWriter, dict []byte, sw *slideWindow) error { - if err := compressTo(c.cpsWriter, &readerWrapper{r: src, sw: sw}, dst, dict); err != nil { + if err := compressTo(c.FlateWriter(), &readerWrapper{r: src, sw: sw}, dst, dict); err != nil { return err } return dst.Flush() @@ -113,6 +128,8 @@ func (c *bigDeflater) Compress(src io.Reader, dst *flateWriter, dict []byte, sw // 写入代理 // 将切片透传给回调函数, 以实现分段写入功能 +// Write proxy +// Passthrough slices to the callback function for segmented writes. type flateWriter struct { index int buffers []*bytes.Buffer @@ -120,6 +137,7 @@ type flateWriter struct { } // 是否可以执行回调函数 +// Whether the callback function can be executed func (c *flateWriter) shouldCall() bool { var n = len(c.buffers) if n < 2 { @@ -132,18 +150,17 @@ func (c *flateWriter) shouldCall() bool { return sum >= 4 } -// 聚合写入, 减少syscall.write次数 +// 聚合写入, 减少syscall.write调用次数 +// Aggregate writes, reducing the number of syscall.write calls func (c *flateWriter) write(p []byte) { if len(c.buffers) == 0 { - var buf = binaryPool.Get(segmentSize) - c.buffers = append(c.buffers, buf) + c.buffers = append(c.buffers, binaryPool.Get(segmentSize)) } var n = len(c.buffers) var tail = c.buffers[n-1] - if tail.Len()+len(p) >= segmentSize { - var buf = binaryPool.Get(segmentSize) - c.buffers = append(c.buffers, buf) - tail = buf + if tail.Len()+len(p)+frameHeaderSize > tail.Cap() { + tail = binaryPool.Get(segmentSize) + c.buffers = append(c.buffers, tail) } tail.Write(p) } @@ -166,8 +183,7 @@ func (c *flateWriter) Flush() error { binaryPool.Put(c.buffers[i]) } if n := buf.Len(); n >= 4 { - compressedContent := buf.Bytes() - if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 { + if tail := buf.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 { buf.Truncate(n - 4) } } @@ -178,12 +194,14 @@ func (c *flateWriter) Flush() error { } // 将io.Reader包装为io.WriterTo +// Wrapping io.Reader as io.WriterTo type readerWrapper struct { r io.Reader sw *slideWindow } // WriteTo 写入内容, 并更新字典 +// Write the contents, and update the dictionary func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) { var buf = binaryPool.Get(segmentSize) defer binaryPool.Put(buf) @@ -205,7 +223,6 @@ func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) { return int64(sum), err } -// 压缩公共函数 func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error { cpsWriter.ResetDict(w, dict) if _, err := r.WriteTo(cpsWriter); err != nil { diff --git a/client.go b/client.go index 6c59ac20..83f79bf2 100644 --- a/client.go +++ b/client.go @@ -190,6 +190,9 @@ func (c *connector) handshake() (*Conn, *http.Response, error) { writeQueue: workerQueue{maxConcurrency: 1}, readQueue: make(channel, c.option.ParallelGolimit), } + + // 压缩字典和解压字典内存开销比较大, 故使用懒加载 + // Compressing and decompressing dictionaries has a large memory overhead, so use lazy loading. if pd.Enabled { socket.deflater.initialize(false, pd, c.option.ReadMaxPayloadSize) if pd.ServerContextTakeover { diff --git a/compress.go b/compress.go index 3117743b..9d4b8750 100644 --- a/compress.go +++ b/compress.go @@ -104,16 +104,13 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte return err } if n := dst.Len(); n >= 4 { - compressedContent := dst.Bytes() - if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 { + if tail := dst.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 { dst.Truncate(n - 4) } } return nil } -func (c *deflater) ToBigDeflater() *bigDeflater { return &bigDeflater{cpsWriter: c.cpsWriter} } - // 滑动窗口 // Sliding window type slideWindow struct { diff --git a/examples/chatroom/main.go b/examples/chatroom/main.go index ef00f904..0937fe5f 100644 --- a/examples/chatroom/main.go +++ b/examples/chatroom/main.go @@ -61,7 +61,7 @@ func main() { func MustLoad[T any](session gws.SessionStorage, key string) (v T) { if value, exist := session.Load(key); exist { - v = value.(T) + v, _ = value.(T) } return } diff --git a/examples/echo/main.go b/examples/echo/main.go index e7374ec7..497210e2 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -1,10 +1,10 @@ package main import ( - "github.com/lxzan/gws" "log" "net/http" - "os" + + "github.com/lxzan/gws" ) func main() { @@ -41,9 +41,5 @@ func (c *Handler) OnPing(socket *gws.Conn, payload []byte) { func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) { defer message.Close() - //file, _ := os.OpenFile("C:\\msys64\\home\\lxzan\\Open\\gws\\assets\\github.json", os.O_RDONLY, 0644) - file, _ := os.OpenFile("C:\\Users\\lxzan\\Pictures\\mg.png", os.O_RDONLY, 0644) - defer file.Close() - _ = socket.WriteReader(gws.OpcodeBinary, file) - //_ = socket.WriteReader(message.Opcode, message) + _ = socket.WriteMessage(message.Opcode, message.Bytes()) } diff --git a/examples/push/main.go b/examples/push/main.go index 1d23c05f..b7a9edac 100644 --- a/examples/push/main.go +++ b/examples/push/main.go @@ -1,67 +1,80 @@ package main import ( - "bufio" + "fmt" "log" - "net" "net/http" "github.com/lxzan/gws" ) func main() { - var app = gws.NewServer(new(Handler), nil) + var h = &Handler{conns: gws.NewConcurrentMap[string, *gws.Conn]()} - app.OnRequest = func(conn net.Conn, br *bufio.Reader, r *http.Request) { - socket, err := app.GetUpgrader().UpgradeFromConn(conn, br, r) + var upgrader = gws.NewUpgrader(h, &gws.ServerOption{ + PermessageDeflate: gws.PermessageDeflate{ + Enabled: true, + ServerContextTakeover: true, + ClientContextTakeover: true, + }, + }) + + http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + socket, err := upgrader.Upgrade(writer, request) if err != nil { - log.Print(err.Error()) + log.Println(err.Error()) return } - var channel = make(chan []byte, 8) - var closer = make(chan struct{}) - socket.Session().Store("channel", channel) - socket.Session().Store("closer", closer) - go socket.ReadLoop() + websocketKey := request.Header.Get("Sec-WebSocket-Key") + socket.Session().Store("websocketKey", websocketKey) + h.conns.Store(websocketKey, socket) go func() { - for { - select { - case p := <-channel: - _ = socket.WriteMessage(gws.OpcodeText, p) - case <-closer: - return - } - } + socket.ReadLoop() }() + }) + + go func() { + if err := http.ListenAndServe(":8000", nil); err != nil { + return + } + }() + + for { + var msg = "" + if _, err := fmt.Scanf("%s\n", &msg); err != nil { + log.Println(err.Error()) + return + } + h.Broadcast(msg) } +} - log.Fatalf("%v", app.Run(":8000")) +func getSession[T any](s gws.SessionStorage, key string) (val T) { + if v, ok := s.Load(key); ok { + val, _ = v.(T) + } + return } type Handler struct { gws.BuiltinEventHandler + conns *gws.ConcurrentMap[string, *gws.Conn] } -func (c *Handler) getSession(socket *gws.Conn, key string) any { - v, _ := socket.Session().Load(key) - return v -} - -func (c *Handler) Send(socket *gws.Conn, payload []byte) { - var channel = c.getSession(socket, "channel").(chan []byte) - select { - case channel <- payload: - default: - return - } +func (c *Handler) Broadcast(msg string) { + var b = gws.NewBroadcaster(gws.OpcodeText, []byte(msg)) + c.conns.Range(func(key string, conn *gws.Conn) bool { + _ = b.Broadcast(conn) + return true + }) + _ = b.Close() } func (c *Handler) OnClose(socket *gws.Conn, err error) { - var closer = c.getSession(socket, "closer").(chan struct{}) - closer <- struct{}{} + websocketKey := getSession[string](socket.Session(), "websocketKey") + c.conns.Delete(websocketKey) } func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) { defer message.Close() - _ = socket.WriteMessage(message.Opcode, message.Bytes()) } diff --git a/option.go b/option.go index e3c517f6..52bfd3c3 100644 --- a/option.go +++ b/option.go @@ -105,6 +105,7 @@ type ( brPool *internal.Pool[*bufio.Reader] // 大文件压缩器 + // Big File Compressor bdPool *internal.Pool[*bigDeflater] // 压缩器滑动窗口内存池 @@ -324,7 +325,7 @@ func initServerOption(c *ServerOption) *ServerOption { if c.PermessageDeflate.Enabled { c.config.bdPool = internal.NewPool[*bigDeflater](func() *bigDeflater { - return new(bigDeflater).initialize(true, c.PermessageDeflate) + return newBigDeflater(true, c.PermessageDeflate) }) if c.PermessageDeflate.ServerContextTakeover { windowSize := internal.BinaryPow(c.PermessageDeflate.ServerMaxWindowBits) diff --git a/reader.go b/reader.go index d53a82a3..7235ef19 100644 --- a/reader.go +++ b/reader.go @@ -165,7 +165,7 @@ func (c *Conn) emitMessage(msg *Message) (err error) { if err != nil { return internal.NewError(internal.CloseInternalServerErr, err) } - c.dpsWindow.Write(msg.Bytes()) + _, _ = c.dpsWindow.Write(msg.Bytes()) } if !c.isTextValid(msg.Opcode, msg.Bytes()) { return internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding) diff --git a/reader_test.go b/reader_test.go index 23e00593..36855ba1 100644 --- a/reader_test.go +++ b/reader_test.go @@ -287,7 +287,12 @@ func TestSegments(t *testing.T) { go client.ReadLoop() go func() { - frame, _ := client.genFrame(OpcodeText, true, true, internal.Bytes(testdata), false) + frame, _ := client.genFrame(OpcodeText, internal.Bytes(testdata), frameConfig{ + fin: true, + compress: client.pd.Enabled, + broadcast: false, + checkEncoding: client.config.CheckUtf8Enabled, + }) data := frame.Bytes() data[20] = 'x' client.conn.Write(data) @@ -366,7 +371,12 @@ func TestConn_ReadMessage(t *testing.T) { var serverHandler = &webSocketMocker{} serverHandler.onOpen = func(socket *Conn) { var p = []byte("123") - frame, _ := socket.genFrame(OpcodePing, true, socket.pd.Enabled, internal.Bytes(p), false) + frame, _ := socket.genFrame(OpcodePing, internal.Bytes(p), frameConfig{ + fin: true, + compress: socket.pd.Enabled, + broadcast: false, + checkEncoding: socket.config.CheckUtf8Enabled, + }) socket.conn.Write(frame.Bytes()[:2]) socket.conn.Close() } @@ -391,7 +401,12 @@ func TestConn_ReadMessage(t *testing.T) { var serverHandler = &webSocketMocker{} serverHandler.onOpen = func(socket *Conn) { var p = []byte("123") - frame, _ := socket.genFrame(OpcodeText, true, socket.pd.Enabled, internal.Bytes(p), false) + frame, _ := socket.genFrame(OpcodeText, internal.Bytes(p), frameConfig{ + fin: true, + compress: socket.pd.Enabled, + broadcast: false, + checkEncoding: false, + }) socket.conn.Write(frame.Bytes()[:2]) socket.conn.Close() } diff --git a/upgrader.go b/upgrader.go index 5e4a255a..def7b67a 100644 --- a/upgrader.go +++ b/upgrader.go @@ -244,6 +244,9 @@ func (c *Upgrader) doUpgradeFromConn(netConn net.Conn, br *bufio.Reader, r *http writeQueue: workerQueue{maxConcurrency: 1}, readQueue: make(channel, c.option.ParallelGolimit), } + + // 压缩字典和解压字典内存开销比较大, 故使用懒加载 + // Compressing and decompressing dictionaries has a large memory overhead, so use lazy loading. if pd.Enabled { socket.deflater = c.deflaterPool.Select() if c.option.PermessageDeflate.ServerContextTakeover { diff --git a/writer.go b/writer.go index e3a43718..7ec569b9 100644 --- a/writer.go +++ b/writer.go @@ -104,26 +104,53 @@ func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { return ErrConnClosed } - frame, err := c.genFrame(opcode, true, c.pd.Enabled, payload, false) + // 生成帧, 向连接写入内容, 最后更新压缩字典 + // 为了使上下文接管模式正常工作, 压缩, 写入和更新字典三个操作的上下文必须保持同步 + // Generate frames, write to the connection, and update the compression dictionary + // For context_takeover mode to work correctly, the contexts of compression, writing, and dictionary updating must be synchronized. + frame, err := c.genFrame(opcode, payload, frameConfig{ + fin: true, + compress: c.pd.Enabled, + broadcast: false, + checkEncoding: c.config.CheckUtf8Enabled, + }) if err != nil { return err } - err = internal.WriteN(c.conn, frame.Bytes()) _, _ = payload.WriteTo(&c.cpsWindow) binaryPool.Put(frame) return err } +// WebSocket帧配置, 用于重写连接里面的配置, 以适配各种场景 +// WebSocket frame configuration, used to rewrite the configuration inside the connection, to adapt to various scenarios +type frameConfig struct { + // 结束标志位 + // Finish flag + fin bool + + // 是否开启压缩 + // Whether to enable compression + compress bool + + // 帧生成动作是否由广播发起 + // Whether the frame generation action is initiated by a broadcast + broadcast bool + + // 是否检查文本编码 + // Whether to check text encoding + checkEncoding bool +} + // 生成帧数据 // Generates the frame data -func (c *Conn) genFrame(opcode Opcode, fin bool, compress bool, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { - if opcode == OpcodeText && fin && !payload.CheckEncoding(c.config.CheckUtf8Enabled, uint8(opcode)) { +func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, cfg frameConfig) (*bytes.Buffer, error) { + if opcode == OpcodeText && !payload.CheckEncoding(cfg.checkEncoding, uint8(opcode)) { return nil, internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding) } var n = payload.Len() - if n > c.config.WriteMaxPayloadSize { return nil, internal.CloseMessageTooLarge } @@ -131,12 +158,12 @@ func (c *Conn) genFrame(opcode Opcode, fin bool, compress bool, payload internal var buf = binaryPool.Get(n + frameHeaderSize) buf.Write(framePadding[0:]) - if compress && opcode.isDataFrame() && n >= c.pd.Threshold { - return c.compressData(buf, opcode, fin, payload, isBroadcast) + if cfg.compress && opcode.isDataFrame() && n >= c.pd.Threshold { + return c.compressData(buf, opcode, cfg.fin, payload, cfg.broadcast) } var header = frameHeader{} - headerLength, maskBytes := header.GenerateHeader(c.isServer, fin, false, opcode, n) + headerLength, maskBytes := header.GenerateHeader(c.isServer, cfg.fin, false, opcode, n) _, _ = payload.WriteTo(buf) var contents = buf.Bytes() if !c.isServer { @@ -218,7 +245,12 @@ func (c *Broadcaster) Broadcast(socket *Conn) error { var msg = c.msgs[idx] msg.once.Do(func() { - msg.frame, msg.err = socket.genFrame(c.opcode, true, socket.pd.Enabled, internal.Bytes(c.payload), true) + msg.frame, msg.err = socket.genFrame(c.opcode, internal.Bytes(c.payload), frameConfig{ + fin: true, + compress: socket.pd.Enabled, + broadcast: true, + checkEncoding: socket.config.CheckUtf8Enabled, + }) }) if msg.err != nil { return msg.err diff --git a/writer_test.go b/writer_test.go index 4bb1528a..66e45d33 100644 --- a/writer_test.go +++ b/writer_test.go @@ -510,7 +510,7 @@ func TestConn_Async(t *testing.T) { assert.True(t, internal.IsSameSlice(arr1, arr2)) } -func TestConn_WriteReader(t *testing.T) { +func TestConn_WriteFile(t *testing.T) { t.Run("context_take_over 1", func(t *testing.T) { var pd = PermessageDeflate{ Enabled: true, @@ -540,7 +540,7 @@ func TestConn_WriteReader(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = server.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = server.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.NoError(t, err) wg.Wait() }) @@ -576,7 +576,7 @@ func TestConn_WriteReader(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = server.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = server.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.NoError(t, err) wg.Wait() }) @@ -613,7 +613,7 @@ func TestConn_WriteReader(t *testing.T) { for i := 0; i < count; i++ { var length = 128*1024 + internal.AlphabetNumeric.Intn(10) var content = internal.AlphabetNumeric.Generate(length) - var err = server.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = server.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.NoError(t, err) } wg.Wait() @@ -648,7 +648,7 @@ func TestConn_WriteReader(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = client.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.NoError(t, err) wg.Wait() }) @@ -679,7 +679,7 @@ func TestConn_WriteReader(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = client.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.NoError(t, err) wg.Wait() }) @@ -711,7 +711,7 @@ func TestConn_WriteReader(t *testing.T) { go client.ReadLoop() client.WriteClose(1000, nil) - var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = client.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.Error(t, err) wg.Wait() }) @@ -741,13 +741,13 @@ func TestConn_WriteReader(t *testing.T) { go server.ReadLoop() go client.ReadLoop() - var err = client.WriteReader(OpcodeBinary, bytes.NewReader(content)) + var err = client.WriteFile(OpcodeBinary, bytes.NewReader(content)) assert.Error(t, err) wg.Wait() }) t.Run("", func(t *testing.T) { - deflater := new(bigDeflater).initialize(true, PermessageDeflate{ + deflater := newBigDeflater(true, PermessageDeflate{ Enabled: true, ServerMaxWindowBits: 12, ClientMaxWindowBits: 12, @@ -760,7 +760,7 @@ func TestConn_WriteReader(t *testing.T) { }) t.Run("", func(t *testing.T) { - deflater := new(bigDeflater).initialize(true, PermessageDeflate{ + deflater := newBigDeflater(true, PermessageDeflate{ Enabled: true, ServerMaxWindowBits: 12, ClientMaxWindowBits: 12,