diff --git a/region/client.go b/region/client.go index 79048e9e..8103d681 100644 --- a/region/client.go +++ b/region/client.go @@ -196,14 +196,7 @@ type client struct { // QueueRPC will add an rpc call to the queue for processing by the writer goroutine func (c *client) QueueRPC(rpc hrpc.Call) { if b, ok := rpc.(hrpc.Batchable); ok && c.rpcQueueSize > 1 && !b.SkipBatch() { - // queue up the rpc - select { - case <-rpc.Context().Done(): - // rpc timed out before being processed - case <-c.done: - returnResult(rpc, nil, ErrClientClosed) - case c.rpcs <- []hrpc.Call{rpc}: - } + c.QueueBatch(rpc.Context(), []hrpc.Call{rpc}) } else { select { case <-c.done: @@ -223,6 +216,21 @@ func (c *client) QueueRPC(rpc hrpc.Call) { } } +// QueueBatch will add rpcs to the queue for processing by the writer +// goroutine +func (c *client) QueueBatch(ctx context.Context, rpcs []hrpc.Call) { + select { + case <-ctx.Done(): + case <-c.done: + // return error results + res := hrpc.RPCResult{Error: ErrClientClosed} + for _, c := range rpcs { + c.ResultChan() <- res + } + case c.rpcs <- rpcs: + } +} + // Close asks this region.Client to close its connection to the RegionServer. // All queued and outstanding RPCs, if any, will be failed as if a connection // error had happened.