diff --git a/writer.go b/writer.go index 9ac248ff..d4a27576 100644 --- a/writer.go +++ b/writer.go @@ -78,6 +78,15 @@ func (c *Conn) WritevAsync(opcode Opcode, payloads [][]byte, callback func(error }) } +// Async 异步 +// 将任务加入发送队列(并发度为1), 执行异步操作 +// 注意: 不要加入长时间阻塞的任务 +// Add the task to the send queue (concurrency 1), perform asynchronous operation. +// Note: Don't add tasks that are blocking for a long time. +func (c *Conn) Async(f func()) { + c.writeQueue.Push(f) +} + // 执行写入逻辑, 注意妥善维护压缩字典 func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { c.mu.Lock() diff --git a/writer_test.go b/writer_test.go index 25119bb0..8d31ba6d 100644 --- a/writer_test.go +++ b/writer_test.go @@ -489,3 +489,23 @@ func TestConn_Writev(t *testing.T) { assert.Error(t, err) }) } + +func TestConn_Async(t *testing.T) { + var conn = &Conn{writeQueue: workerQueue{maxConcurrency: 1}} + var wg = sync.WaitGroup{} + wg.Add(100) + var arr1, arr2 []int64 + var mu = &sync.Mutex{} + for i := 1; i <= 100; i++ { + var x = int64(i) + arr1 = append(arr1, x) + conn.Async(func() { + mu.Lock() + arr2 = append(arr2, x) + mu.Unlock() + wg.Done() + }) + } + wg.Wait() + assert.ElementsMatch(t, arr1, arr2) +}