Skip to content

Commit

Permalink
Add SetBufferThreshold Method
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Aug 16, 2024
1 parent c96b189 commit f55a412
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 302 deletions.
4 changes: 1 addition & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ linters:
# Disable specific linter
# https://golangci-lint.run/usage/linters/#disabled-by-default
disable:
- mnd
- testpackage
- nosnakecase
- nlreturn
- gomnd
- forcetypeassert
Expand All @@ -14,15 +14,13 @@ linters:
- ineffassign
- lll
- funlen
- scopelint
- dupl
- gofumpt
- gofmt
- godot
- gci
- goimports
- gocognit
- ifshort
- gochecknoinits
- goconst
- depguard
Expand Down
22 changes: 11 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
)

// Dialer 拨号器接口
// dialer interface
// Dialer interface
type Dialer interface {
// Dial 连接到指定网络上的地址
// connects to the address on the named network
// Connects to the address on the named network
Dial(network, addr string) (c net.Conn, err error)
}

Expand All @@ -31,7 +31,7 @@ type connector struct {
}

// NewClient 创建一个新的 WebSocket 客户端连接
// creates a new WebSocket client connection
// Creates a new WebSocket client connection
func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, error) {
option = initClientOption(option)
c := &connector{option: option, eventHandler: handler}
Expand Down Expand Up @@ -84,7 +84,7 @@ func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Con
return client, resp, err
}

// request 发送HTTP请求, 即WebSocket握手
// 发送HTTP请求, 即WebSocket握手
// Sends an http request, i.e., websocket handshake
func (c *connector) request() (*http.Response, *bufio.Reader, error) {
_ = c.conn.SetDeadline(time.Now().Add(c.option.HandshakeTimeout))
Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *connector) request() (*http.Response, *bufio.Reader, error) {
return resp, br, err
}

// getPermessageDeflate 获取压缩拓展结果
// 获取压缩拓展结果
// Get compression expansion results
func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate {
serverPD := permessageNegotiation(extensions)
Expand All @@ -157,8 +157,8 @@ func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate {
return pd
}

// handshake 执行 WebSocket 握手操作
// performs the WebSocket handshake operation
// 执行 WebSocket 握手操作
// Performs the WebSocket handshake operation
func (c *connector) handshake() (*Conn, *http.Response, error) {
resp, br, err := c.request()
if err != nil {
Expand Down Expand Up @@ -202,8 +202,8 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
return socket, resp, c.conn.SetDeadline(time.Time{})
}

// getSubProtocol 从响应中获取子协议
// retrieves the subprotocol from the response
// 从响应中获取子协议
// Retrieves the subprotocol from the response
func (c *connector) getSubProtocol(resp *http.Response) (string, error) {
a := internal.Split(c.option.RequestHeader.Get(internal.SecWebSocketProtocol.Key), ",")
b := internal.Split(resp.Header.Get(internal.SecWebSocketProtocol.Key), ",")
Expand All @@ -214,8 +214,8 @@ func (c *connector) getSubProtocol(resp *http.Response) (string, error) {
return subprotocol, nil
}

// checkHeaders 检查响应头以验证握手是否成功
// checks the response headers to verify if the handshake was successful
// 检查响应头以验证握手是否成功
// Checks the response headers to verify if the handshake was successful
func (c *connector) checkHeaders(resp *http.Response) error {
if resp.StatusCode != http.StatusSwitchingProtocols {
return ErrHandshake
Expand Down
46 changes: 23 additions & 23 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/lxzan/gws/internal"
)

// flateTail deflate压缩算法的尾部标记
// the tail marker of the deflate compression algorithm
// deflate压缩算法的尾部标记
// The tail marker of the deflate compression algorithm
var flateTail = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff}

type deflaterPool struct {
Expand All @@ -24,8 +24,8 @@ type deflaterPool struct {
pool []*deflater
}

// initialize 初始化deflaterPool
// initialize the deflaterPool
// 初始化deflaterPool
// Initialize the deflaterPool
func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflaterPool {
c.num = uint64(options.PoolSize)
for i := uint64(0); i < c.num; i++ {
Expand All @@ -35,7 +35,7 @@ func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflate
}

// Select 从deflaterPool中选择一个deflater对象
// select a deflater object from the deflaterPool
// Select a deflater object from the deflaterPool
func (c *deflaterPool) Select() *deflater {
var j = atomic.AddUint64(&c.serial, 1) & (c.num - 1)
return c.pool[j]
Expand All @@ -51,8 +51,8 @@ type deflater struct {
cpsWriter *flate.Writer
}

// initialize 初始化deflater
// initialize the deflater
// 初始化deflater
// Initialize the deflater
func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit int) *deflater {
c.dpsReader = flate.NewReader(nil)
c.dpsBuffer = bytes.NewBuffer(nil)
Expand All @@ -67,19 +67,19 @@ func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit in
return c
}

// resetFR 重置deflate reader
// reset the deflate reader
// 重置deflate reader
// Reset the deflate reader
func (c *deflater) resetFR(r io.Reader, dict []byte) {
resetter := c.dpsReader.(flate.Resetter)
_ = resetter.Reset(r, dict) // must return a null pointer
if c.dpsBuffer.Cap() > 256*1024 {
if c.dpsBuffer.Cap() > int(bufferThreshold) {
c.dpsBuffer = bytes.NewBuffer(nil)
}
c.dpsBuffer.Reset()
}

// Decompress 解压
// decompress data
// Decompress data
func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, error) {
c.dpsLocker.Lock()
defer c.dpsLocker.Unlock()
Expand All @@ -96,7 +96,7 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er
}

// Compress 压缩
// compress data
// Compress data
func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error {
c.cpsLocker.Lock()
defer c.cpsLocker.Unlock()
Expand All @@ -117,16 +117,16 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return nil
}

// slideWindow 滑动窗口
// sliding window
// 滑动窗口
// Sliding window
type slideWindow struct {
enabled bool
dict []byte
size int
}

// initialize 初始化滑动窗口
// initialize the sliding window
// 初始化滑动窗口
// Initialize the sliding window
func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *slideWindow {
c.enabled = true
c.size = internal.BinaryPow(windowBits)
Expand All @@ -139,7 +139,7 @@ func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *s
}

// Write 将数据写入滑动窗口
// write data to the sliding window
// Write data to the sliding window
func (c *slideWindow) Write(p []byte) (int, error) {
if !c.enabled {
return 0, nil
Expand Down Expand Up @@ -169,8 +169,8 @@ func (c *slideWindow) Write(p []byte) (int, error) {
return total, nil
}

// genRequestHeader 生成请求头
// generate request headers
// 生成请求头
// Generate request headers
func (c *PermessageDeflate) genRequestHeader() string {
var options = make([]string, 0, 5)
options = append(options, internal.PermessageDeflate)
Expand All @@ -191,8 +191,8 @@ func (c *PermessageDeflate) genRequestHeader() string {
return strings.Join(options, "; ")
}

// genResponseHeader 生成响应头
// generate response headers
// 生成响应头
// Generate response headers
func (c *PermessageDeflate) genResponseHeader() string {
var options = make([]string, 0, 5)
options = append(options, internal.PermessageDeflate)
Expand All @@ -211,7 +211,7 @@ func (c *PermessageDeflate) genResponseHeader() string {
return strings.Join(options, "; ")
}

// permessageNegotiation 压缩拓展协商
// 压缩拓展协商
// Negotiation of compression parameters
func permessageNegotiation(str string) PermessageDeflate {
var options = PermessageDeflate{
Expand Down Expand Up @@ -250,7 +250,7 @@ func permessageNegotiation(str string) PermessageDeflate {
return options
}

// limitReader 限制从io.Reader中最多读取m个字节
// 限制从io.Reader中最多读取m个字节
// Limit reading up to m bytes from io.Reader
func limitReader(r io.Reader, m int) io.Reader { return &limitedReader{R: r, M: m} }

Expand Down
28 changes: 14 additions & 14 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/lxzan/gws/internal"
)

// Conn 结构体表示一个 WebSocket 连接
// Conn struct represents a WebSocket connection
// Conn WebSocket连接
// WebSocket connection
type Conn struct {
// 互斥锁,用于保护共享资源
// Mutex to protect shared resources
Expand All @@ -26,7 +26,7 @@ type Conn struct {

// 用于存储错误的原子值
// Atomic value for storing errors
err atomic.Value
ev atomic.Value

// 标识是否为服务器端
// Indicates if this is a server-side connection
Expand Down Expand Up @@ -76,16 +76,16 @@ type Conn struct {
// Deflater
deflater *deflater

// 数据包发送窗口
// Data packet send window
// 解压字典滑动窗口
// Decompressing dictionary sliding window
dpsWindow slideWindow

// 数据包接收窗口
// Data packet receive window
// 压缩字典滑动窗口
// Compressed dictionary sliding window
cpsWindow slideWindow

// 每消息压缩
// Per-message deflate
// 压缩拓展配置
// Compression extension configuration
pd PermessageDeflate
}

Expand All @@ -105,7 +105,7 @@ func (c *Conn) ReadLoop() {
}
}

err, ok := c.err.Load().(error)
err, ok := c.ev.Load().(error)
c.handler.OnClose(c, internal.SelectValue(ok, err, errEmpty))

// 回收资源
Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Conn) isClosed() bool {
// 关闭连接并存储错误信息
// Closes the connection and stores the error information
func (c *Conn) close(reason []byte, err error) {
c.err.Store(err)
c.ev.Store(err)
_ = c.doWrite(OpcodeCloseConnection, internal.Bytes(reason))
_ = c.conn.Close()
}
Expand Down Expand Up @@ -310,9 +310,9 @@ func (c *Conn) NetConn() net.Conn {
return c.conn
}

// SetNoDelay
// 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle 算法)。
// 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送
// SetNoDelay 设置无延迟
// 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle算法).
// 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送.
// Controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm).
// The default is true (no delay), meaning that data is sent as soon as possible after a Write.
func (c *Conn) SetNoDelay(noDelay bool) error {
Expand Down
18 changes: 15 additions & 3 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@ package gws
import "github.com/lxzan/gws/internal"

var (
framePadding = frameHeader{} // 帧头填充物
binaryPool = internal.NewBufferPool(128, 256*1024) // 内存池
defaultLogger = new(stdLogger) // 默认日志工具
framePadding = frameHeader{} // 帧头填充物
defaultLogger = new(stdLogger) // 默认日志工具
bufferThreshold = uint32(256 * 1024) // buffer阈值
binaryPool = new(internal.BufferPool) // 内存池
)

func init() {
SetBufferThreshold(bufferThreshold)
}

// SetBufferThreshold 设置buffer阈值, x=pow(2,n), 超过x个字节的buffer不会被回收
// Set the buffer threshold, x=pow(2,n), that buffers larger than x bytes are not reclaimed.
func SetBufferThreshold(x uint32) {
bufferThreshold = internal.ToBinaryNumber(x)
binaryPool = internal.NewBufferPool(128, bufferThreshold)
}
Loading

0 comments on commit f55a412

Please sign in to comment.