From 00358c1777b11fa6539df1555af8b8d6102bbdfc Mon Sep 17 00:00:00 2001 From: Aaron Beitch Date: Thu, 6 Oct 2022 14:00:32 -0700 Subject: [PATCH] region: Implement QueueBatch Allows passing a batch of RPCs to the region client, which will be added to the in progress multi and then sent to HBASE. A note here is that it's possible for a multi to grow beyond the size of the configured size. This feels like the right way to go because the user of the Batch API has specifically requested the rpcs be batched together, so splitting the batch across multiple multi's breaks that bit of atomicity. --- region/client.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/region/client.go b/region/client.go index 79df2985..33d4557e 100644 --- a/region/client.go +++ b/region/client.go @@ -196,14 +196,7 @@ type client struct { // QueueRPC will add an rpc call to the queue for processing by the writer goroutine func (c *client) QueueRPC(rpc hrpc.Call) { if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() { - // queue up the rpc - select { - case <-rpc.Context().Done(): - // rpc timed out before being processed - case <-c.done: - returnResult(rpc, nil, ErrClientClosed) - case c.rpcs <- []hrpc.Call{rpc}: - } + c.QueueBatch(rpc.Context(), []hrpc.Call{rpc}) } else { select { case <-c.done: @@ -223,6 +216,21 @@ func (c *client) QueueRPC(rpc hrpc.Call) { } } +// QueueBatch will add rpcs to the queue for processing by the writer +// goroutine +func (c *client) QueueBatch(ctx context.Context, rpcs []hrpc.Call) { + select { + case <-ctx.Done(): + case <-c.done: + // return error results + res := hrpc.RPCResult{Error: ErrClientClosed} + for _, c := range rpcs { + c.ResultChan() <- res + } + case c.rpcs <- rpcs: + } +} + // Close asks this region.Client to close its connection to the RegionServer. // All queued and outstanding RPCs, if any, will be failed as if a connection // error had happened.