diff --git a/region/client.go b/region/client.go index 4d81cd5b..79df2985 100644 --- a/region/client.go +++ b/region/client.go @@ -6,6 +6,7 @@ package region import ( + "context" "encoding/binary" "encoding/json" "errors" @@ -167,7 +168,7 @@ type client struct { // failOnce used for concurrent calls to fail failOnce sync.Once - rpcs chan hrpc.Call + rpcs chan []hrpc.Call done chan struct{} // sent contains the mapping of sent call IDs to RPC calls, so that when @@ -201,7 +202,7 @@ func (c *client) QueueRPC(rpc hrpc.Call) { // rpc timed out before being processed case <-c.done: returnResult(rpc, nil, ErrClientClosed) - case c.rpcs <- rpc: + case c.rpcs <- []hrpc.Call{rpc}: } } else { select { @@ -363,9 +364,9 @@ func (c *client) processRPCs() { select { case <-c.done: return - case rpc := <-c.rpcs: + case rpcs := <-c.rpcs: // have things queued up, batch them - if !m.add(rpc) { + if !m.add(rpcs) { // can still put more rpcs into batch continue } @@ -380,11 +381,11 @@ func (c *client) processRPCs() { select { case <-c.done: return - case rpc := <-c.rpcs: - m.add(rpc) + case rpcs := <-c.rpcs: + m.add(rpcs) } continue - } else if l == c.rpcQueueSize || c.flushInterval == 0 { + } else if l >= c.rpcQueueSize || c.flushInterval == 0 { // batch is full, flush flush("queue full") continue @@ -402,8 +403,8 @@ func (c *client) processRPCs() { case <-timer.C: reason = "timeout" // time to flush - case rpc := <-c.rpcs: - if !m.add(rpc) { + case rpcs := <-c.rpcs: + if !m.add(rpcs) { // can still put more rpcs into batch continue } diff --git a/region/client_test.go b/region/client_test.go index c22bb636..c191ed1a 100644 --- a/region/client_test.go +++ b/region/client_test.go @@ -128,7 +128,7 @@ func TestFail(t *testing.T) { c := &client{ conn: mockConn, done: make(chan struct{}), - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), sent: make(map[uint32]hrpc.Call), } expectedErr := errors.New("oooups") @@ -161,7 +161,7 @@ func TestFail(t *testing.T) { // check that failing undialed client doesn't panic c = &client{ done: make(chan struct{}), - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), sent: make(map[uint32]hrpc.Call), } c.fail(expectedErr) @@ -196,7 +196,7 @@ func TestQueueRPCMultiWithClose(t *testing.T) { c := &client{ conn: &mc{MockConn: mockConn}, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: 10, @@ -268,7 +268,7 @@ func TestProcessRPCsWithFail(t *testing.T) { c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), // never send anything @@ -338,7 +338,7 @@ func TestQueueRPC(t *testing.T) { mockConn.EXPECT().SetReadDeadline(gomock.Any()).AnyTimes() c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: queueSize, @@ -424,7 +424,7 @@ func TestServerErrorWrite(t *testing.T) { mockConn := mock.NewMockConn(ctrl) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: queueSize, @@ -465,7 +465,7 @@ func TestServerErrorRead(t *testing.T) { mockConn := mock.NewMockConn(ctrl) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: queueSize, @@ -504,7 +504,7 @@ func TestServerErrorExceptionResponse(t *testing.T) { mockConn.EXPECT().SetReadDeadline(gomock.Any()).Times(2) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: 1, @@ -693,7 +693,7 @@ func TestUnexpectedSendError(t *testing.T) { mockConn := mock.NewMockConn(ctrl) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: queueSize, @@ -749,7 +749,7 @@ func TestProcessRPCs(t *testing.T) { c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: tcase.qsize, @@ -824,7 +824,7 @@ func TestRPCContext(t *testing.T) { mockConn.EXPECT().Close() c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: queueSize, @@ -881,7 +881,7 @@ func TestSanity(t *testing.T) { mockConn := mock.NewMockConn(ctrl) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: 1, // size one to skip sendBatch @@ -972,7 +972,7 @@ func TestSanityCompressor(t *testing.T) { mockConn := mock.NewMockConn(ctrl) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: 1, // size one to skip sendBatch @@ -1097,7 +1097,7 @@ func BenchmarkSendBatchMemory(b *testing.B) { mockConn := mock.NewMockConn(ctrl) c := &client{ conn: mockConn, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), // queue size is 1 so that all QueueRPC calls trigger sendBatch, @@ -1216,7 +1216,7 @@ func TestMarshalJSON(t *testing.T) { conn: mockConn, addr: "testRegionServerAddress", ctype: RegionClient, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), rpcQueueSize: 1, // size one to skip sendBatch diff --git a/region/info_test.go b/region/info_test.go index baec5a6d..d59b091a 100644 --- a/region/info_test.go +++ b/region/info_test.go @@ -171,7 +171,7 @@ func TestRegionInfoMarshalJson(t *testing.T) { conn: mockConn, addr: "testAddr", ctype: RegionClient, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), inFlight: 20, diff --git a/region/multi.go b/region/multi.go index 48dbe964..22bbf249 100644 --- a/region/multi.go +++ b/region/multi.go @@ -273,9 +273,9 @@ func (m *multi) returnResults(msg proto.Message, err error) { } // add adds the call and returns wether the batch is full. -func (m *multi) add(call hrpc.Call) bool { - m.calls = append(m.calls, call) - return len(m.calls) == m.size +func (m *multi) add(calls []hrpc.Call) bool { + m.calls = append(m.calls, calls...) + return len(m.calls) >= m.size } // len returns number of batched calls. diff --git a/region/multi_test.go b/region/multi_test.go index aa7d1956..3833ae90 100644 --- a/region/multi_test.go +++ b/region/multi_test.go @@ -320,10 +320,8 @@ func TestMultiToProto(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { m := newMulti(1000) - for _, c := range tcase.calls { - if m.add(c) { - t.Fatal("multi is full") - } + if m.add(tcase.calls) { + t.Fatal("multi is full") } if tcase.panicMsg != "" { @@ -366,10 +364,8 @@ func TestMultiToProto(t *testing.T) { // test cellblocks m = newMulti(1000) - for _, c := range tcase.calls { - if m.add(c) { - t.Fatal("multi is full") - } + if m.add(tcase.calls) { + t.Fatal("multi is full") } cellblocksProto, cellblocks, cellblocksLen := m.SerializeCellBlocks() out, ok = cellblocksProto.(*pb.MultiRequest) @@ -692,10 +688,8 @@ func TestMultiReturnResults(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { m := newMulti(1000) - for _, c := range tcase.calls { - if m.add(c) { - t.Fatal("multi is full") - } + if m.add(tcase.calls) { + t.Fatal("multi is full") } m.regions = tcase.regions @@ -986,10 +980,8 @@ func TestMultiDeserializeCellBlocks(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { m := newMulti(1000) - for _, c := range tcase.calls { - if m.add(c) { - t.Fatal("multi is full") - } + if m.add(tcase.calls) { + t.Fatal("multi is full") } n, err := m.DeserializeCellBlocks(tcase.response, tcase.cellblocks) @@ -1028,19 +1020,19 @@ func BenchmarkMultiToProto(b *testing.B) { var c hrpc.Call c, _ = hrpc.NewGetStr(context.Background(), "reg0", "call0") c.SetRegion(reg0) - m.add(c) + m.add([]hrpc.Call{c}) c, _ = hrpc.NewPutStr(context.Background(), "reg0", "call1", values) c.SetRegion(reg0) - m.add(c) + m.add([]hrpc.Call{c}) c, _ = hrpc.NewAppStr(context.Background(), "reg1", "call2", values) c.SetRegion(reg1) - m.add(c) + m.add([]hrpc.Call{c}) c, _ = hrpc.NewDelStr(context.Background(), "reg1", "call3", delValues) c.SetRegion(reg1) - m.add(c) + m.add([]hrpc.Call{c}) c, _ = hrpc.NewIncStr(context.Background(), "reg2", "call4", delValues) c.SetRegion(reg2) - m.add(c) + m.add([]hrpc.Call{c}) b.Run("cellblocks", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { @@ -1097,7 +1089,7 @@ func BenchmarkMultiToProtoLarge(b *testing.B) { b.Fatal(err) } rpc.SetRegion(regions[i%10]) - m.add(rpc) + m.add([]hrpc.Call{rpc}) } b.Run("cellblocks", func(b *testing.B) { b.ReportAllocs() diff --git a/region/new.go b/region/new.go index 6481e64f..4156396c 100644 --- a/region/new.go +++ b/region/new.go @@ -28,7 +28,7 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time. flushInterval: flushInterval, effectiveUser: effectiveUser, readTimeout: readTimeout, - rpcs: make(chan hrpc.Call), + rpcs: make(chan []hrpc.Call), done: make(chan struct{}), sent: make(map[uint32]hrpc.Call), }