diff --git a/client.go b/client.go index 76132896..00bd3e85 100644 --- a/client.go +++ b/client.go @@ -43,6 +43,7 @@ type Client interface { CheckAndPut(p *hrpc.Mutate, family string, qualifier string, expectedValue []byte) (bool, error) SendBatch(ctx context.Context, batch []hrpc.Call) (res []hrpc.RPCResult, allOK bool) + CacheRegions(table []byte) error Close() } @@ -391,3 +392,10 @@ func (c *client) CheckAndPut(p *hrpc.Mutate, family string, return r.GetProcessed(), nil } + +// CacheRegions scan the meta region to get all the regions and populate to cache. +// This can be used to warm up region cache +func (c *client) CacheRegions(table []byte) error { + _, err := c.findAllRegions(context.Background(), table) + return err +} diff --git a/integration_test.go b/integration_test.go index 63389ed9..c7560281 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2374,3 +2374,40 @@ func TestDebugState(t *testing.T) { assert.Equal(t, 1, len(keyRegionCache.(map[string]interface{}))) assert.Equal(t, 1, len(clientRegionCache.(map[string]interface{}))) // only have one client } + +type regionInfoAndAddr struct { + regionInfo hrpc.RegionInfo + addr string +} + +// Test loading region cache +func TestCacheRegions(t *testing.T) { + c := gohbase.NewClient(*host) + defer c.Close() + + // make sure region cache is empty at startup + var jsonUnMarshal map[string]interface{} + jsonVal, err := gohbase.DebugState(c) + err = json.Unmarshal(jsonVal, &jsonUnMarshal) + if err != nil { + t.Fatalf("Encoutered eror when Unmarshalling: %v", err) + } + cacheLength := len(jsonUnMarshal["KeyRegionCache"].(map[string]interface{})) + if cacheLength != 0 { + t.Fatal("expected empty region cache when creating a new client") + } + + // load regions to cache + c.CacheRegions([]byte(table)) + + jsonVal, err = gohbase.DebugState(c) + err = json.Unmarshal(jsonVal, &jsonUnMarshal) + if err != nil { + t.Fatalf("Encoutered eror when Unmarshalling: %v", err) + } + // CreateTable init function starts hbase with 4 regions + cacheLength = len(jsonUnMarshal["KeyRegionCache"].(map[string]interface{})) + if cacheLength != 4 { + t.Fatalf("Expect 4 regions but got: %v", cacheLength) + } +} diff --git a/rpc.go b/rpc.go index e9166a38..0b20ce27 100644 --- a/rpc.go +++ b/rpc.go @@ -24,6 +24,11 @@ import ( "google.golang.org/protobuf/proto" ) +type regionInfoAndAddr struct { + regionInfo hrpc.RegionInfo + addr string +} + // Constants var ( // Name of the meta region. @@ -554,6 +559,85 @@ func (c *client) lookupRegion(ctx context.Context, } } +func (c *client) lookupAllRegions(ctx context.Context, + table []byte) ([]regionInfoAndAddr, error) { + var regs []regionInfoAndAddr + var err error + backoff := backoffStart + for { + // If it takes longer than regionLookupTimeout, fail so that we can sleep + lookupCtx, cancel := context.WithTimeout(ctx, c.regionLookupTimeout) + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + }).Debug("looking up regions") + + regs, err = c.metaLookupForTable(lookupCtx, table) + cancel() + if err == TableNotFound { + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + "err": err, + }).Debug("hbase:meta does not know about this table") + + return nil, err + } else if err == ErrClientClosed { + return nil, err + } + + if err == nil { + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + "regionsAndAddr": regs, + }).Debug("looked up all regions") + + return regs, nil + } + + log.WithFields(log.Fields{ + "table": strconv.Quote(string(table)), + "backoff": backoff, + "err": err, + }).Error("failed looking up regions") + + // This will be hit if there was an error locating the region + backoff, err = sleepAndIncreaseBackoff(ctx, backoff) + if err != nil { + return nil, err + } + } +} + +func (c *client) findAllRegions(ctx context.Context, table []byte) ([]regionInfoAndAddr, error) { + regs, err := c.lookupAllRegions(ctx, table) + if err != nil { + return nil, err + } + for _, regaddr := range regs { + reg, addr := regaddr.regionInfo, regaddr.addr + reg.MarkUnavailable() + + if reg != c.metaRegionInfo && reg != c.adminRegionInfo { + // Check that the region wasn't added to + // the cache while we were looking it up. + overlaps, replaced := c.regions.put(reg) + if !replaced { + // the same or younger regions are already in cache + continue + } + + // otherwise, new region in cache, delete overlaps from client's cache + for _, r := range overlaps { + c.clients.del(r) + } + } + + // Start a goroutine to connect to the region + go c.establishRegion(reg, addr) + } + + return regs, nil +} + func (c *client) findRegion(ctx context.Context, table, key []byte) (hrpc.RegionInfo, error) { // The region was not in the cache, it // must be looked up in the meta table @@ -678,6 +762,52 @@ func (c *client) metaLookup(ctx context.Context, return reg, addr, nil } +// Creates the META key to search for all regions +func createAllRegionSearchKey(table []byte) []byte { + metaKey := make([]byte, 0, len(table)+1) + metaKey = append(metaKey, table...) + // '.' is the first byte greater than ','. Meta table entry has + // the format table,key,timestamp. By adding '.' to the stop row + // we scan all keys for table + metaKey = append(metaKey, '.') + return metaKey +} + +// metaLookupForTable checks meta table for all the region in which the given table is. +func (c *client) metaLookupForTable(ctx context.Context, + table []byte) ([]regionInfoAndAddr, error) { + metaKey := createAllRegionSearchKey(table) + rpc, err := hrpc.NewScanRange(ctx, metaTableName, table, metaKey, + hrpc.Families(infoFamily)) + if err != nil { + return nil, err + } + + var regions []regionInfoAndAddr + scanner := c.Scan(rpc) + for { + resp, err := scanner.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + reg, addr, err := region.ParseRegionInfo(resp) + if err != nil { + return nil, err + } + + regions = append(regions, regionInfoAndAddr{regionInfo: reg, addr: addr}) + } + + if len(regions) == 0 { + return nil, TableNotFound + } + return regions, nil +} + func fullyQualifiedTable(reg hrpc.RegionInfo) []byte { namespace := reg.Namespace() table := reg.Table() diff --git a/rpc_test.go b/rpc_test.go index 17727f69..c87b2e8c 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -702,6 +702,20 @@ func TestMetaLookupCanceledContext(t *testing.T) { } } +func TestMetaLookupAllRegionsCanceledContext(t *testing.T) { + c := newMockClient(nil) + // pretend regionserver:0 has meta table + rc := c.clients.put("regionserver:0", c.metaRegionInfo, newRegionClientFn("regionserver:0")) + c.metaRegionInfo.SetClient(rc) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.metaLookupForTable(ctx, []byte("tablenotfound")) + if err != context.Canceled { + t.Errorf("Expected error %v, got error %v", context.Canceled, err) + } +} + func TestConcurrentRetryableError(t *testing.T) { ctrl := test.NewController(t) defer ctrl.Finish() diff --git a/test/mock/client.go b/test/mock/client.go index cdb6fb60..fb3e926c 100644 --- a/test/mock/client.go +++ b/test/mock/client.go @@ -50,6 +50,20 @@ func (mr *MockClientMockRecorder) Append(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Append", reflect.TypeOf((*MockClient)(nil).Append), arg0) } +// CacheRegions mocks base method. +func (m *MockClient) CacheRegions(arg0 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CacheRegions", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CacheRegions indicates an expected call of CacheRegions. +func (mr *MockClientMockRecorder) CacheRegions(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheRegions", reflect.TypeOf((*MockClient)(nil).CacheRegions), arg0) +} + // CheckAndPut mocks base method. func (m *MockClient) CheckAndPut(arg0 *hrpc.Mutate, arg1, arg2 string, arg3 []byte) (bool, error) { m.ctrl.T.Helper()