Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SendBatch API #202

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hrpc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type RegionClient interface {
Close()
Addr() string
QueueRPC(Call)
QueueBatch(context.Context, []Call)
String() string
}

Expand Down
4 changes: 4 additions & 0 deletions mockrc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,8 @@ func (c *testClient) QueueRPC(call hrpc.Call) {
}
}

func (c *testClient) QueueBatch(ctx context.Context, batch []hrpc.Call) {
// do nothing. Let the test fill in result.
}

func (c *testClient) Close() {}
41 changes: 25 additions & 16 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package region

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
Expand Down Expand Up @@ -167,7 +168,7 @@ type client struct {
// failOnce used for concurrent calls to fail
failOnce sync.Once

rpcs chan hrpc.Call
rpcs chan []hrpc.Call
done chan struct{}

// sent contains the mapping of sent call IDs to RPC calls, so that when
Expand Down Expand Up @@ -195,14 +196,7 @@ type client struct {
// QueueRPC will add an rpc call to the queue for processing by the writer goroutine
func (c *client) QueueRPC(rpc hrpc.Call) {
if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() {
// queue up the rpc
select {
case <-rpc.Context().Done():
// rpc timed out before being processed
case <-c.done:
returnResult(rpc, nil, ErrClientClosed)
case c.rpcs <- rpc:
}
c.QueueBatch(rpc.Context(), []hrpc.Call{rpc})
} else {
select {
case <-c.done:
Expand All @@ -222,6 +216,21 @@ func (c *client) QueueRPC(rpc hrpc.Call) {
}
}

// QueueBatch will add rpcs to the queue for processing by the writer
// goroutine
func (c *client) QueueBatch(ctx context.Context, rpcs []hrpc.Call) {
select {
case <-ctx.Done():
case <-c.done:
// return error results
res := hrpc.RPCResult{Error: ErrClientClosed}
for _, c := range rpcs {
c.ResultChan() <- res
}
case c.rpcs <- rpcs:
}
}

// Close asks this region.Client to close its connection to the RegionServer.
// All queued and outstanding RPCs, if any, will be failed as if a connection
// error had happened.
Expand Down Expand Up @@ -364,9 +373,9 @@ func (c *client) processRPCs() {
select {
case <-c.done:
return
case rpc := <-c.rpcs:
case rpcs := <-c.rpcs:
// have things queued up, batch them
if !m.add(rpc) {
if !m.add(rpcs) {
// can still put more rpcs into batch
continue
}
Expand All @@ -381,11 +390,11 @@ func (c *client) processRPCs() {
select {
case <-c.done:
return
case rpc := <-c.rpcs:
m.add(rpc)
case rpcs := <-c.rpcs:
m.add(rpcs)
}
continue
} else if l == c.rpcQueueSize || c.flushInterval == 0 {
} else if l >= c.rpcQueueSize || c.flushInterval == 0 {
// batch is full, flush
flush("queue full")
continue
Expand All @@ -403,8 +412,8 @@ func (c *client) processRPCs() {
case <-timer.C:
reason = "timeout"
// time to flush
case rpc := <-c.rpcs:
if !m.add(rpc) {
case rpcs := <-c.rpcs:
if !m.add(rpcs) {
// can still put more rpcs into batch
continue
}
Expand Down
30 changes: 15 additions & 15 deletions region/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestFail(t *testing.T) {
c := &client{
conn: mockConn,
done: make(chan struct{}),
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
sent: make(map[uint32]hrpc.Call),
}
expectedErr := errors.New("oooups")
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestFail(t *testing.T) {
// check that failing undialed client doesn't panic
c = &client{
done: make(chan struct{}),
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
sent: make(map[uint32]hrpc.Call),
}
c.fail(expectedErr)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestQueueRPCMultiWithClose(t *testing.T) {

c := &client{
conn: &mc{MockConn: mockConn},
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 10,
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestProcessRPCsWithFail(t *testing.T) {

c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
// never send anything
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestQueueRPC(t *testing.T) {
mockConn.EXPECT().SetReadDeadline(gomock.Any()).AnyTimes()
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestServerErrorWrite(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestServerErrorRead(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -504,7 +504,7 @@ func TestServerErrorExceptionResponse(t *testing.T) {
mockConn.EXPECT().SetReadDeadline(gomock.Any()).Times(2)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1,
Expand Down Expand Up @@ -693,7 +693,7 @@ func TestUnexpectedSendError(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -749,7 +749,7 @@ func TestProcessRPCs(t *testing.T) {

c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: tcase.qsize,
Expand Down Expand Up @@ -824,7 +824,7 @@ func TestRPCContext(t *testing.T) {
mockConn.EXPECT().Close()
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: queueSize,
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestSanity(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1, // size one to skip sendBatch
Expand Down Expand Up @@ -972,7 +972,7 @@ func TestSanityCompressor(t *testing.T) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1, // size one to skip sendBatch
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func BenchmarkSendBatchMemory(b *testing.B) {
mockConn := mock.NewMockConn(ctrl)
c := &client{
conn: mockConn,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
// queue size is 1 so that all QueueRPC calls trigger sendBatch,
Expand Down Expand Up @@ -1216,7 +1216,7 @@ func TestMarshalJSON(t *testing.T) {
conn: mockConn,
addr: "testRegionServerAddress",
ctype: RegionClient,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
rpcQueueSize: 1, // size one to skip sendBatch
Expand Down
2 changes: 1 addition & 1 deletion region/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestRegionInfoMarshalJson(t *testing.T) {
conn: mockConn,
addr: "testAddr",
ctype: RegionClient,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
inFlight: 20,
Expand Down
6 changes: 3 additions & 3 deletions region/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ func (m *multi) returnResults(msg proto.Message, err error) {
}

// add adds the call and returns wether the batch is full.
aaronbee marked this conversation as resolved.
Show resolved Hide resolved
func (m *multi) add(call hrpc.Call) bool {
m.calls = append(m.calls, call)
return len(m.calls) == m.size
func (m *multi) add(calls []hrpc.Call) bool {
m.calls = append(m.calls, calls...)
return len(m.calls) >= m.size
}

// len returns number of batched calls.
Expand Down
36 changes: 14 additions & 22 deletions region/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,8 @@ func TestMultiToProto(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
m := newMulti(1000)

for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}

if tcase.panicMsg != "" {
Expand Down Expand Up @@ -366,10 +364,8 @@ func TestMultiToProto(t *testing.T) {

// test cellblocks
m = newMulti(1000)
for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}
cellblocksProto, cellblocks, cellblocksLen := m.SerializeCellBlocks(nil)
out, ok = cellblocksProto.(*pb.MultiRequest)
Expand Down Expand Up @@ -692,10 +688,8 @@ func TestMultiReturnResults(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
m := newMulti(1000)

for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}
m.regions = tcase.regions

Expand Down Expand Up @@ -986,10 +980,8 @@ func TestMultiDeserializeCellBlocks(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
m := newMulti(1000)

for _, c := range tcase.calls {
if m.add(c) {
t.Fatal("multi is full")
}
if m.add(tcase.calls) {
t.Fatal("multi is full")
}

n, err := m.DeserializeCellBlocks(tcase.response, tcase.cellblocks)
Expand Down Expand Up @@ -1028,19 +1020,19 @@ func BenchmarkMultiToProto(b *testing.B) {
var c hrpc.Call
c, _ = hrpc.NewGetStr(context.Background(), "reg0", "call0")
c.SetRegion(reg0)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewPutStr(context.Background(), "reg0", "call1", values)
c.SetRegion(reg0)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewAppStr(context.Background(), "reg1", "call2", values)
c.SetRegion(reg1)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewDelStr(context.Background(), "reg1", "call3", delValues)
c.SetRegion(reg1)
m.add(c)
m.add([]hrpc.Call{c})
c, _ = hrpc.NewIncStr(context.Background(), "reg2", "call4", delValues)
c.SetRegion(reg2)
m.add(c)
m.add([]hrpc.Call{c})
b.Run("cellblocks", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -1097,7 +1089,7 @@ func BenchmarkMultiToProtoLarge(b *testing.B) {
b.Fatal(err)
}
rpc.SetRegion(regions[i%10])
m.add(rpc)
m.add([]hrpc.Call{rpc})
}
b.Run("cellblocks", func(b *testing.B) {
b.ReportAllocs()
Expand Down
2 changes: 1 addition & 1 deletion region/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.
flushInterval: flushInterval,
effectiveUser: effectiveUser,
readTimeout: readTimeout,
rpcs: make(chan hrpc.Call),
rpcs: make(chan []hrpc.Call),
done: make(chan struct{}),
sent: make(map[uint32]hrpc.Call),
}
Expand Down
Loading
Loading