Skip to content

Commit

Permalink
region: Implement QueueBatch
Browse files Browse the repository at this point in the history
Allows passing a batch of RPCs to the region client, which will be
added to the in progress multi and then sent to HBASE.

A note here is that it's possible for a multi to grow beyond the size
of the configured size. This feels like the right way to go because
the user of the Batch API has specifically requested the rpcs be
batched together, so splitting the batch across multiple multi's
breaks that bit of atomicity.
  • Loading branch information
aaronbee committed Jun 23, 2023
1 parent cef242a commit 6ffc0ac
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions region/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,7 @@ type client struct {
// QueueRPC will add an rpc call to the queue for processing by the writer goroutine
func (c *client) QueueRPC(rpc hrpc.Call) {
if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() {
// queue up the rpc
select {
case <-rpc.Context().Done():
// rpc timed out before being processed
case <-c.done:
returnResult(rpc, nil, ErrClientClosed)
case c.rpcs <- []hrpc.Call{rpc}:
}
c.QueueBatch(rpc.Context(), []hrpc.Call{rpc})
} else {
select {
case <-c.done:
Expand All @@ -223,6 +216,21 @@ func (c *client) QueueRPC(rpc hrpc.Call) {
}
}

// QueueBatch will add rpcs to the queue for processing by the writer
// goroutine
func (c *client) QueueBatch(ctx context.Context, rpcs []hrpc.Call) {
select {
case <-ctx.Done():
case <-c.done:
// return error results
res := hrpc.RPCResult{Error: ErrClientClosed}
for _, c := range rpcs {
c.ResultChan() <- res
}
case c.rpcs <- rpcs:
}
}

// Close asks this region.Client to close its connection to the RegionServer.
// All queued and outstanding RPCs, if any, will be failed as if a connection
// error had happened.
Expand Down

0 comments on commit 6ffc0ac

Please sign in to comment.