Skip to content

Commit

Permalink
update test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Aug 19, 2024
1 parent 2d79d12 commit bf42d3f
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 104 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ ok github.com/lxzan/gws 17.231s
- [x] Broadcast
- [x] Dial via Proxy
- [x] Context-Takeover
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] Concurrent & Asynchronous Non-Blocking Write
- [x] Segmented Writing of Large Files
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

### Attention

Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ok github.com/lxzan/gws 17.231s
- [x] 广播
- [x] 代理拨号
- [x] 上下文接管
- [x] 大文件分段写入
- [x] 支持并发和异步非阻塞写入
- [x] 通过所有 Autobahn 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

Expand Down
14 changes: 12 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
conn: &benchConn{},
config: upgrader.option.getConfig(),
}
var buf, _ = conn1.genFrame(OpcodeText, true, false, internal.Bytes(githubData), false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{
fin: true,
compress: conn1.pd.Enabled,
broadcast: false,
checkEncoding: false,
})

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down Expand Up @@ -98,7 +103,12 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
deflater: new(deflater),
}
conn1.deflater.initialize(false, conn1.pd, config.ReadMaxPayloadSize)
var buf, _ = conn1.genFrame(OpcodeText, true, true, internal.Bytes(githubData), false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), frameConfig{
fin: true,
compress: conn1.pd.Enabled,
broadcast: false,
checkEncoding: false,
})

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down
77 changes: 47 additions & 30 deletions bigfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,38 @@ import (
"bytes"
"encoding/binary"
"errors"
"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
"io"
"math"

"github.com/klauspost/compress/flate"
"github.com/lxzan/gws/internal"
)

const segmentSize = 128 * 1024

// 获取大文件压缩器
// Get bigDeflater
func (c *Conn) getBigDeflater() *bigDeflater {
if c.isServer {
return c.config.bdPool.Get()
}
return c.deflater.ToBigDeflater()
return (*bigDeflater)(c.deflater.cpsWriter)
}

// 回收大文件压缩器
// Recycle bigDeflater
func (c *Conn) putBigDeflater(d *bigDeflater) {
if c.isServer {
c.config.bdPool.Put(d)
}
}

// 拆分io.Reader为小切片
// Split io.Reader into small slices
func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) error) error {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)

var p = buf.Bytes()[:segmentSize]
var n, index = 0, 0
var err error
Expand All @@ -46,21 +52,29 @@ func (c *Conn) splitReader(r io.Reader, f func(index int, eof bool, p []byte) er
return err
}

// WriteReader 大文件写入
// 采用分段写入技术, 大大减少内存占用
func (c *Conn) WriteReader(opcode Opcode, payload io.Reader) error {
err := c.doWriteReader(opcode, payload)
// WriteFile 大文件写入
// 采用分段写入技术, 减少写入过程中的内存占用
// Segmented write technology to reduce memory usage during write process
func (c *Conn) WriteFile(opcode Opcode, payload io.Reader) error {
err := c.doWriteFile(opcode, payload)
c.emitError(err)
return err
}

func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error {
func (c *Conn) doWriteFile(opcode Opcode, payload io.Reader) error {
c.mu.Lock()
defer c.mu.Unlock()

var cb = func(index int, eof bool, p []byte) error {
op := internal.SelectValue(index == 0, opcode, OpcodeContinuation)
frame, err := c.genFrame(op, eof, false, internal.Bytes(p), false)
if index > 0 {
opcode = OpcodeContinuation
}
frame, err := c.genFrame(opcode, internal.Bytes(p), frameConfig{
fin: eof,
compress: false,
broadcast: false,
checkEncoding: false,
})
if err != nil {
return err
}
Expand All @@ -87,39 +101,43 @@ func (c *Conn) doWriteReader(opcode Opcode, payload io.Reader) error {
}

// 大文件压缩器
type bigDeflater struct {
cpsWriter *flate.Writer
}
type bigDeflater flate.Writer

// 初始化大文件压缩器
// Initialize the bigDeflater
func (c *bigDeflater) initialize(isServer bool, options PermessageDeflate) *bigDeflater {
// 创建大文件压缩器
// Create a bigDeflater
func newBigDeflater(isServer bool, options PermessageDeflate) *bigDeflater {
windowBits := internal.SelectValue(isServer, options.ServerMaxWindowBits, options.ClientMaxWindowBits)
if windowBits == 15 {
c.cpsWriter, _ = flate.NewWriter(nil, options.Level)
cpsWriter, _ := flate.NewWriter(nil, options.Level)
return (*bigDeflater)(cpsWriter)
} else {
c.cpsWriter, _ = flate.NewWriterWindow(nil, internal.BinaryPow(windowBits))
cpsWriter, _ := flate.NewWriterWindow(nil, internal.BinaryPow(windowBits))
return (*bigDeflater)(cpsWriter)
}
return c
}

func (c *bigDeflater) FlateWriter() *flate.Writer { return (*flate.Writer)(c) }

// Compress 压缩
func (c *bigDeflater) Compress(src io.Reader, dst *flateWriter, dict []byte, sw *slideWindow) error {
if err := compressTo(c.cpsWriter, &readerWrapper{r: src, sw: sw}, dst, dict); err != nil {
if err := compressTo(c.FlateWriter(), &readerWrapper{r: src, sw: sw}, dst, dict); err != nil {
return err
}
return dst.Flush()
}

// 写入代理
// 将切片透传给回调函数, 以实现分段写入功能
// Write proxy
// Passthrough slices to the callback function for segmented writes.
type flateWriter struct {
index int
buffers []*bytes.Buffer
cb func(index int, eof bool, p []byte) error
}

// 是否可以执行回调函数
// Whether the callback function can be executed
func (c *flateWriter) shouldCall() bool {
var n = len(c.buffers)
if n < 2 {
Expand All @@ -132,18 +150,17 @@ func (c *flateWriter) shouldCall() bool {
return sum >= 4
}

// 聚合写入, 减少syscall.write次数
// 聚合写入, 减少syscall.write调用次数
// Aggregate writes, reducing the number of syscall.write calls
func (c *flateWriter) write(p []byte) {
if len(c.buffers) == 0 {
var buf = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, buf)
c.buffers = append(c.buffers, binaryPool.Get(segmentSize))
}
var n = len(c.buffers)
var tail = c.buffers[n-1]
if tail.Len()+len(p) >= segmentSize {
var buf = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, buf)
tail = buf
if tail.Len()+len(p)+frameHeaderSize > tail.Cap() {
tail = binaryPool.Get(segmentSize)
c.buffers = append(c.buffers, tail)
}
tail.Write(p)
}
Expand All @@ -166,8 +183,7 @@ func (c *flateWriter) Flush() error {
binaryPool.Put(c.buffers[i])
}
if n := buf.Len(); n >= 4 {
compressedContent := buf.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
if tail := buf.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
buf.Truncate(n - 4)
}
}
Expand All @@ -178,12 +194,14 @@ func (c *flateWriter) Flush() error {
}

// 将io.Reader包装为io.WriterTo
// Wrapping io.Reader as io.WriterTo
type readerWrapper struct {
r io.Reader
sw *slideWindow
}

// WriteTo 写入内容, 并更新字典
// Write the contents, and update the dictionary
func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) {
var buf = binaryPool.Get(segmentSize)
defer binaryPool.Put(buf)
Expand All @@ -205,7 +223,6 @@ func (c *readerWrapper) WriteTo(w io.Writer) (int64, error) {
return int64(sum), err
}

// 压缩公共函数
func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error {
cpsWriter.ResetDict(w, dict)
if _, err := r.WriteTo(cpsWriter); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
writeQueue: workerQueue{maxConcurrency: 1},
readQueue: make(channel, c.option.ParallelGolimit),
}

// 压缩字典和解压字典内存开销比较大, 故使用懒加载
// Compressing and decompressing dictionaries has a large memory overhead, so use lazy loading.
if pd.Enabled {
socket.deflater.initialize(false, pd, c.option.ReadMaxPayloadSize)
if pd.ServerContextTakeover {
Expand Down
5 changes: 1 addition & 4 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,13 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return err
}
if n := dst.Len(); n >= 4 {
compressedContent := dst.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
if tail := dst.Bytes()[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
dst.Truncate(n - 4)
}
}
return nil
}

func (c *deflater) ToBigDeflater() *bigDeflater { return &bigDeflater{cpsWriter: c.cpsWriter} }

// 滑动窗口
// Sliding window
type slideWindow struct {
Expand Down
2 changes: 1 addition & 1 deletion examples/chatroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {

func MustLoad[T any](session gws.SessionStorage, key string) (v T) {
if value, exist := session.Load(key); exist {
v = value.(T)
v, _ = value.(T)
}
return
}
Expand Down
10 changes: 3 additions & 7 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"github.com/lxzan/gws"
"log"
"net/http"
"os"

"github.com/lxzan/gws"
)

func main() {
Expand Down Expand Up @@ -41,9 +41,5 @@ func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
//file, _ := os.OpenFile("C:\\msys64\\home\\lxzan\\Open\\gws\\assets\\github.json", os.O_RDONLY, 0644)
file, _ := os.OpenFile("C:\\Users\\lxzan\\Pictures\\mg.png", os.O_RDONLY, 0644)
defer file.Close()
_ = socket.WriteReader(gws.OpcodeBinary, file)
//_ = socket.WriteReader(message.Opcode, message)
_ = socket.WriteMessage(message.Opcode, message.Bytes())
}
Loading

0 comments on commit bf42d3f

Please sign in to comment.