Skip to content

Commit

Permalink
region: Make rpcs chan []hrpc.Call
Browse files Browse the repository at this point in the history
Change the rpcs chan from holding hrpc.Call to []hrpc.Call. This is
required for batch support.
  • Loading branch information
aaronbee committed Jun 23, 2023
1 parent ea9efae commit cef242a
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 51 deletions.
19 changes: 10 additions & 9 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package region

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
Expand Down Expand Up @@ -167,7 +168,7 @@ type client struct {
// failOnce used for concurrent calls to fail
failOnce sync.Once

rpcs chan hrpc.Call
rpcs chan []hrpc.Call
done chan struct{}

// sent contains the mapping of sent call IDs to RPC calls, so that when
Expand Down Expand Up @@ -201,7 +202,7 @@ func (c *client) QueueRPC(rpc hrpc.Call) {
// rpc timed out before being processed
case <-c.done:
returnResult(rpc, nil, ErrClientClosed)
case c.rpcs <- rpc:
case c.rpcs <- []hrpc.Call{rpc}:
}
} else {
select {
Expand Down Expand Up @@ -363,9 +364,9 @@ func (c *client) processRPCs() {
select {
case <-c.done:
return
case rpc := <-c.rpcs:
case rpcs := <-c.rpcs:
// have things queued up, batch them
if !m.add(rpc) {
if !m.add(rpcs) {
// can still put more rpcs into batch
continue
}
Expand All @@ -380,11 +381,11 @@ func (c *client) processRPCs() {
select {
case <-c.done:
return
case rpc := <-c.rpcs:
m.add(rpc)
case rpcs := <-c.rpcs:
m.add(rpcs)
}
continue
} else if l == c.rpcQueueSize || c.flushInterval == 0 {
} else if l >= c.rpcQueueSize || c.flushInterval == 0 {
// batch is full, flush
flush("queue full")
continue
Expand All @@ -402,8 +403,8 @@ func (c *client) processRPCs() {
case <-timer.C:
reason = "timeout"
// time to flush
case rpc := <-c.rpcs:
if !m.add(rpc) {
case rpcs := <-c.rpcs:
if !m.add(rpcs) {
// can still put more rpcs into batch
continue
}
Expand Down
30 changes: 15 additions & 15 deletions region/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestFail(t *testing.T) {
c := &client{
conn: mockConn,
done: make(chan struct{}),
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
sent: make(map[uint32]hrpc.Call),
}
expectedErr := errors.New("oooups")
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestFail(t *testing.T) {
// check that failing undialed client doesn't panic
c = &client{
done: make(chan struct{}),
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
sent: make(map[uint32]hrpc.Call),
}
c.fail(expectedErr)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestQueueRPCMultiWithClose(t *testing.T) {

c := &client{
conn: &mc{MockConn: mockConn},
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 10,
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestProcessRPCsWithFail(t *testing.T) {

c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
// never send anything
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestQueueRPC(t *testing.T) {
mockConn.EXPECT().SetReadDeadline(gomock.Any()).AnyTimes()
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestServerErrorWrite(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestServerErrorRead(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -504,7 +504,7 @@ func TestServerErrorExceptionResponse(t *testing.T) {
mockConn.EXPECT().SetReadDeadline(gomock.Any()).Times(2)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1,
Expand Down Expand Up @@ -693,7 +693,7 @@ func TestUnexpectedSendError(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -749,7 +749,7 @@ func TestProcessRPCs(t *testing.T) {

c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: tcase.qsize,
Expand Down Expand Up @@ -824,7 +824,7 @@ func TestRPCContext(t *testing.T) {
mockConn.EXPECT().Close()
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestSanity(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1, // size one to skip sendBatch
Expand Down Expand Up @@ -972,7 +972,7 @@ func TestSanityCompressor(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1, // size one to skip sendBatch
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func BenchmarkSendBatchMemory(b *testing.B) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
// queue size is 1 so that all QueueRPC calls trigger sendBatch,
Expand Down Expand Up @@ -1216,7 +1216,7 @@ func TestMarshalJSON(t *testing.T) {
conn: mockConn,
addr: "testRegionServerAddress",
ctype: RegionClient,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1, // size one to skip sendBatch
Expand Down
2 changes: 1 addition & 1 deletion region/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestRegionInfoMarshalJson(t *testing.T) {
conn: mockConn,
addr: "testAddr",
ctype: RegionClient,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
inFlight: 20,
Expand Down
6 changes: 3 additions & 3 deletions region/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ func (m *multi) returnResults(msg proto.Message, err error) {
}

// add adds the call and returns wether the batch is full.
func (m *multi) add(call hrpc.Call) bool {
m.calls = append(m.calls, call)
return len(m.calls) == m.size
func (m *multi) add(calls []hrpc.Call) bool {
m.calls = append(m.calls, calls...)
return len(m.calls) >= m.size
}

// len returns number of batched calls.
Expand Down
36 changes: 14 additions & 22 deletions region/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,8 @@ func TestMultiToProto(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
m := newMulti(1000)

for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}

if tcase.panicMsg != "" {
Expand Down Expand Up @@ -366,10 +364,8 @@ func TestMultiToProto(t *testing.T) {

// test cellblocks
m = newMulti(1000)
for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}
cellblocksProto, cellblocks, cellblocksLen := m.SerializeCellBlocks(nil)
out, ok = cellblocksProto.(*pb.MultiRequest)
Expand Down Expand Up @@ -692,10 +688,8 @@ func TestMultiReturnResults(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
m := newMulti(1000)

for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}
m.regions = tcase.regions

Expand Down Expand Up @@ -986,10 +980,8 @@ func TestMultiDeserializeCellBlocks(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
m := newMulti(1000)

for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}

n, err := m.DeserializeCellBlocks(tcase.response, tcase.cellblocks)
Expand Down Expand Up @@ -1028,19 +1020,19 @@ func BenchmarkMultiToProto(b *testing.B) {
var c hrpc.Call
c, _ = hrpc.NewGetStr(context.Background(), "reg0", "call0")
c.SetRegion(reg0)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewPutStr(context.Background(), "reg0", "call1", values)
c.SetRegion(reg0)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewAppStr(context.Background(), "reg1", "call2", values)
c.SetRegion(reg1)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewDelStr(context.Background(), "reg1", "call3", delValues)
c.SetRegion(reg1)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewIncStr(context.Background(), "reg2", "call4", delValues)
c.SetRegion(reg2)
m.add(c)
m.add([]hrpc.Call{c})
b.Run("cellblocks", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -1097,7 +1089,7 @@ func BenchmarkMultiToProtoLarge(b *testing.B) {
b.Fatal(err)
}
rpc.SetRegion(regions[i%10])
m.add(rpc)
m.add([]hrpc.Call{rpc})
}
b.Run("cellblocks", func(b *testing.B) {
b.ReportAllocs()
Expand Down
2 changes: 1 addition & 1 deletion region/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.
flushInterval: flushInterval,
effectiveUser: effectiveUser,
readTimeout: readTimeout,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
}
Expand Down

0 comments on commit cef242a

Please sign in to comment.