From 7dbe6079ca2f303a2f5886ee7e8aac6c54c2532d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 13 Nov 2023 13:41:13 +0800 Subject: [PATCH] client: introduce the HTTP client (#7304) ref tikv/pd#7300 Introduce the HTTP client. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 4 +- client/http/api.go | 54 +++ client/http/client.go | 337 ++++++++++++++++++ client/http/types.go | 178 +++++++++ client/http/types_test.go | 49 +++ tests/integrations/client/http_client_test.go | 87 +++++ 6 files changed, 707 insertions(+), 2 deletions(-) create mode 100644 client/http/api.go create mode 100644 client/http/client.go create mode 100644 client/http/types.go create mode 100644 client/http/types_test.go create mode 100644 tests/integrations/client/http_client_test.go diff --git a/client/client.go b/client/client.go index 067872d2d39..56923b697e2 100644 --- a/client/client.go +++ b/client/client.go @@ -74,7 +74,7 @@ type GlobalConfigItem struct { PayLoad []byte } -// Client is a PD (Placement Driver) client. +// Client is a PD (Placement Driver) RPC client. // It should not be used after calling Close(). type Client interface { // GetClusterID gets the cluster ID from PD. @@ -1062,7 +1062,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) defer span.Finish() } start := time.Now() - defer cmdDurationScanRegions.Observe(time.Since(start).Seconds()) + defer func() { cmdDurationScanRegions.Observe(time.Since(start).Seconds()) }() var cancel context.CancelFunc scanCtx := ctx diff --git a/client/http/api.go b/client/http/api.go new file mode 100644 index 00000000000..5326919561d --- /dev/null +++ b/client/http/api.go @@ -0,0 +1,54 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "fmt" + "net/url" +) + +// The following constants are the paths of PD HTTP APIs. +const ( + HotRead = "/pd/api/v1/hotspot/regions/read" + HotWrite = "/pd/api/v1/hotspot/regions/write" + Regions = "/pd/api/v1/regions" + regionByID = "/pd/api/v1/region/id" + regionByKey = "/pd/api/v1/region/key" + regionsByKey = "/pd/api/v1/regions/key" + regionsByStoreID = "/pd/api/v1/regions/store" + Stores = "/pd/api/v1/stores" + MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts" +) + +// RegionByID returns the path of PD HTTP API to get region by ID. +func RegionByID(regionID uint64) string { + return fmt.Sprintf("%s/%d", regionByID, regionID) +} + +// RegionByKey returns the path of PD HTTP API to get region by key. +func RegionByKey(key []byte) string { + return fmt.Sprintf("%s/%s", regionByKey, url.QueryEscape(string(key))) +} + +// RegionsByKey returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters. +func RegionsByKey(startKey, endKey []byte, limit int) string { + return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d", + regionsByKey, url.QueryEscape(string(startKey)), url.QueryEscape(string(endKey)), limit) +} + +// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID. +func RegionsByStoreID(storeID uint64) string { + return fmt.Sprintf("%s/%d", regionsByStoreID, storeID) +} diff --git a/client/http/client.go b/client/http/client.go new file mode 100644 index 00000000000..6cb1277dfcb --- /dev/null +++ b/client/http/client.go @@ -0,0 +1,337 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const ( + httpScheme = "http" + httpsScheme = "https" + networkErrorStatus = "network error" + + defaultTimeout = 30 * time.Second +) + +// Client is a PD (Placement Driver) HTTP client. +type Client interface { + GetRegionByID(context.Context, uint64) (*RegionInfo, error) + GetRegionByKey(context.Context, []byte) (*RegionInfo, error) + GetRegions(context.Context) (*RegionsInfo, error) + GetRegionsByKey(context.Context, []byte, []byte, int) (*RegionsInfo, error) + GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) + GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) + GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) + GetStores(context.Context) (*StoresInfo, error) + GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) + Close() +} + +var _ Client = (*client)(nil) + +type client struct { + pdAddrs []string + tlsConf *tls.Config + cli *http.Client + + requestCounter *prometheus.CounterVec + executionDuration *prometheus.HistogramVec +} + +// ClientOption configures the HTTP client. +type ClientOption func(c *client) + +// WithHTTPClient configures the client with the given initialized HTTP client. +func WithHTTPClient(cli *http.Client) ClientOption { + return func(c *client) { + c.cli = cli + } +} + +// WithTLSConfig configures the client with the given TLS config. +// This option won't work if the client is configured with WithHTTPClient. +func WithTLSConfig(tlsConf *tls.Config) ClientOption { + return func(c *client) { + c.tlsConf = tlsConf + } +} + +// WithMetrics configures the client with metrics. +func WithMetrics( + requestCounter *prometheus.CounterVec, + executionDuration *prometheus.HistogramVec, +) ClientOption { + return func(c *client) { + c.requestCounter = requestCounter + c.executionDuration = executionDuration + } +} + +// NewClient creates a PD HTTP client with the given PD addresses and TLS config. +func NewClient( + pdAddrs []string, + opts ...ClientOption, +) Client { + c := &client{} + // Apply the options first. + for _, opt := range opts { + opt(c) + } + // Normalize the addresses with correct scheme prefix. + for i, addr := range pdAddrs { + if !strings.HasPrefix(addr, httpScheme) { + var scheme string + if c.tlsConf != nil { + scheme = httpsScheme + } else { + scheme = httpScheme + } + pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr) + } + } + c.pdAddrs = pdAddrs + // Init the HTTP client if it's not configured. + if c.cli == nil { + c.cli = &http.Client{Timeout: defaultTimeout} + if c.tlsConf != nil { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = c.tlsConf + c.cli.Transport = transport + } + } + + return c +} + +// Close closes the HTTP client. +func (c *client) Close() { + if c.cli != nil { + c.cli.CloseIdleConnections() + } + log.Info("[pd] http client closed") +} + +func (c *client) reqCounter(name, status string) { + if c.requestCounter == nil { + return + } + c.requestCounter.WithLabelValues(name, status).Inc() +} + +func (c *client) execDuration(name string, duration time.Duration) { + if c.executionDuration == nil { + return + } + c.executionDuration.WithLabelValues(name).Observe(duration.Seconds()) +} + +// At present, we will use the retry strategy of polling by default to keep +// it consistent with the current implementation of some clients (e.g. TiDB). +func (c *client) requestWithRetry( + ctx context.Context, + name, uri string, + res interface{}, +) error { + var ( + err error + addr string + ) + for idx := 0; idx < len(c.pdAddrs); idx++ { + addr = c.pdAddrs[idx] + err = c.request(ctx, name, addr, uri, res) + if err == nil { + break + } + log.Debug("[pd] request one addr failed", + zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err)) + } + return err +} + +func (c *client) request( + ctx context.Context, + name, addr, uri string, + res interface{}, +) error { + reqURL := fmt.Sprintf("%s%s", addr, uri) + logFields := []zap.Field{ + zap.String("name", name), + zap.String("url", reqURL), + } + log.Debug("[pd] request the http url", logFields...) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) + return errors.Trace(err) + } + start := time.Now() + resp, err := c.cli.Do(req) + if err != nil { + c.reqCounter(name, networkErrorStatus) + log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...) + return errors.Trace(err) + } + c.execDuration(name, time.Since(start)) + c.reqCounter(name, resp.Status) + defer func() { + err = resp.Body.Close() + if err != nil { + log.Warn("[pd] close http response body failed", append(logFields, zap.Error(err))...) + } + }() + + if resp.StatusCode != http.StatusOK { + logFields = append(logFields, zap.String("status", resp.Status)) + + bs, readErr := io.ReadAll(resp.Body) + if readErr != nil { + logFields = append(logFields, zap.NamedError("read-body-error", err)) + } else { + logFields = append(logFields, zap.ByteString("body", bs)) + } + + log.Error("[pd] request failed with a non-200 status", logFields...) + return errors.Errorf("request pd http api failed with status: '%s'", resp.Status) + } + + err = json.NewDecoder(resp.Body).Decode(res) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// GetRegionByID gets the region info by ID. +func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + var region RegionInfo + err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), ®ion) + if err != nil { + return nil, err + } + return ®ion, nil +} + +// GetRegionByKey gets the region info by key. +func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) { + var region RegionInfo + err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), ®ion) + if err != nil { + return nil, err + } + return ®ion, nil +} + +// GetRegions gets the regions info. +func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.requestWithRetry(ctx, "GetRegions", Regions, ®ions) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range. +func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), ®ions) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsByStoreID gets the regions info by store ID. +func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), ®ions) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetHotReadRegions gets the hot read region statistics info. +func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { + var hotReadRegions StoreHotPeersInfos + err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions) + if err != nil { + return nil, err + } + return &hotReadRegions, nil +} + +// GetHotWriteRegions gets the hot write region statistics info. +func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) { + var hotWriteRegions StoreHotPeersInfos + err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions) + if err != nil { + return nil, err + } + return &hotWriteRegions, nil +} + +// GetStores gets the stores info. +func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { + var stores StoresInfo + err := c.requestWithRetry(ctx, "GetStores", Stores, &stores) + if err != nil { + return nil, err + } + return &stores, nil +} + +// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs. +func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { + uri := MinResolvedTSPrefix + // scope is an optional parameter, it can be `cluster` or specified store IDs. + // - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. + // - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. + // - When scope given a list of stores, min_resolved_ts will be provided for each store + // and the scope-specific min_resolved_ts will be returned. + if len(storeIDs) != 0 { + storeIDStrs := make([]string, len(storeIDs)) + for idx, id := range storeIDs { + storeIDStrs[idx] = fmt.Sprintf("%d", id) + } + uri = fmt.Sprintf("%s?scope=%s", uri, strings.Join(storeIDStrs, ",")) + } + resp := struct { + MinResolvedTS uint64 `json:"min_resolved_ts"` + IsRealTime bool `json:"is_real_time,omitempty"` + StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` + }{} + err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp) + if err != nil { + return 0, nil, err + } + if !resp.IsRealTime { + return 0, nil, errors.Trace(errors.New("min resolved ts is not enabled")) + } + return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil +} diff --git a/client/http/types.go b/client/http/types.go new file mode 100644 index 00000000000..66eb31ec3a1 --- /dev/null +++ b/client/http/types.go @@ -0,0 +1,178 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import "time" + +// NOTICE: the structures below are copied from the PD API definitions. +// Please make sure the consistency if any change happens to the PD API. + +// RegionInfo stores the information of one region. +type RegionInfo struct { + ID int64 `json:"id"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + Epoch RegionEpoch `json:"epoch"` + Peers []RegionPeer `json:"peers"` + Leader RegionPeer `json:"leader"` + DownPeers []RegionPeerStat `json:"down_peers"` + PendingPeers []RegionPeer `json:"pending_peers"` + WrittenBytes uint64 `json:"written_bytes"` + ReadBytes uint64 `json:"read_bytes"` + ApproximateSize int64 `json:"approximate_size"` + ApproximateKeys int64 `json:"approximate_keys"` + + ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` +} + +// GetStartKey gets the start key of the region. +func (r *RegionInfo) GetStartKey() string { return r.StartKey } + +// GetEndKey gets the end key of the region. +func (r *RegionInfo) GetEndKey() string { return r.EndKey } + +// RegionEpoch stores the information about its epoch. +type RegionEpoch struct { + ConfVer int64 `json:"conf_ver"` + Version int64 `json:"version"` +} + +// RegionPeer stores information of one peer. +type RegionPeer struct { + ID int64 `json:"id"` + StoreID int64 `json:"store_id"` + IsLearner bool `json:"is_learner"` +} + +// RegionPeerStat stores one field `DownSec` which indicates how long it's down than `RegionPeer`. +type RegionPeerStat struct { + Peer RegionPeer `json:"peer"` + DownSec int64 `json:"down_seconds"` +} + +// ReplicationStatus represents the replication mode status of the region. +type ReplicationStatus struct { + State string `json:"state"` + StateID int64 `json:"state_id"` +} + +// RegionsInfo stores the information of regions. +type RegionsInfo struct { + Count int64 `json:"count"` + Regions []RegionInfo `json:"regions"` +} + +// Merge merges two RegionsInfo together and returns a new one. +func (ri *RegionsInfo) Merge(other *RegionsInfo) *RegionsInfo { + newRegionsInfo := &RegionsInfo{ + Regions: make([]RegionInfo, 0, ri.Count+other.Count), + } + m := make(map[int64]RegionInfo, ri.Count+other.Count) + for _, region := range ri.Regions { + m[region.ID] = region + } + for _, region := range other.Regions { + m[region.ID] = region + } + for _, region := range m { + newRegionsInfo.Regions = append(newRegionsInfo.Regions, region) + } + newRegionsInfo.Count = int64(len(newRegionsInfo.Regions)) + return newRegionsInfo +} + +// StoreHotPeersInfos is used to get human-readable description for hot regions. +type StoreHotPeersInfos struct { + AsPeer StoreHotPeersStat `json:"as_peer"` + AsLeader StoreHotPeersStat `json:"as_leader"` +} + +// StoreHotPeersStat is used to record the hot region statistics group by store. +type StoreHotPeersStat map[uint64]*HotPeersStat + +// HotPeersStat records all hot regions statistics +type HotPeersStat struct { + StoreByteRate float64 `json:"store_bytes"` + StoreKeyRate float64 `json:"store_keys"` + StoreQueryRate float64 `json:"store_query"` + TotalBytesRate float64 `json:"total_flow_bytes"` + TotalKeysRate float64 `json:"total_flow_keys"` + TotalQueryRate float64 `json:"total_flow_query"` + Count int `json:"regions_count"` + Stats []HotPeerStatShow `json:"statistics"` +} + +// HotPeerStatShow records the hot region statistics for output +type HotPeerStatShow struct { + StoreID uint64 `json:"store_id"` + Stores []uint64 `json:"stores"` + IsLeader bool `json:"is_leader"` + IsLearner bool `json:"is_learner"` + RegionID uint64 `json:"region_id"` + HotDegree int `json:"hot_degree"` + ByteRate float64 `json:"flow_bytes"` + KeyRate float64 `json:"flow_keys"` + QueryRate float64 `json:"flow_query"` + AntiCount int `json:"anti_count"` + LastUpdateTime time.Time `json:"last_update_time,omitempty"` +} + +// StoresInfo represents the information of all TiKV/TiFlash stores. +type StoresInfo struct { + Count int `json:"count"` + Stores []StoreInfo `json:"stores"` +} + +// StoreInfo represents the information of one TiKV/TiFlash store. +type StoreInfo struct { + Store MetaStore `json:"store"` + Status StoreStatus `json:"status"` +} + +// MetaStore represents the meta information of one store. +type MetaStore struct { + ID int64 `json:"id"` + Address string `json:"address"` + State int64 `json:"state"` + StateName string `json:"state_name"` + Version string `json:"version"` + Labels []StoreLabel `json:"labels"` + StatusAddress string `json:"status_address"` + GitHash string `json:"git_hash"` + StartTimestamp int64 `json:"start_timestamp"` +} + +// StoreLabel stores the information of one store label. +type StoreLabel struct { + Key string `json:"key"` + Value string `json:"value"` +} + +// StoreStatus stores the detail information of one store. +type StoreStatus struct { + Capacity string `json:"capacity"` + Available string `json:"available"` + LeaderCount int64 `json:"leader_count"` + LeaderWeight float64 `json:"leader_weight"` + LeaderScore float64 `json:"leader_score"` + LeaderSize int64 `json:"leader_size"` + RegionCount int64 `json:"region_count"` + RegionWeight float64 `json:"region_weight"` + RegionScore float64 `json:"region_score"` + RegionSize int64 `json:"region_size"` + StartTS time.Time `json:"start_ts"` + LastHeartbeatTS time.Time `json:"last_heartbeat_ts"` + Uptime string `json:"uptime"` +} diff --git a/client/http/types_test.go b/client/http/types_test.go new file mode 100644 index 00000000000..0dfebacbdcf --- /dev/null +++ b/client/http/types_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMergeRegionsInfo(t *testing.T) { + re := require.New(t) + regionsInfo1 := &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + } + regionsInfo2 := &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, + } + regionsInfo := regionsInfo1.Merge(regionsInfo2) + re.Equal(int64(2), regionsInfo.Count) + re.Equal(2, len(regionsInfo.Regions)) + re.Equal(append(regionsInfo1.Regions, regionsInfo2.Regions...), regionsInfo.Regions) +} diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go new file mode 100644 index 00000000000..03d90c6cd32 --- /dev/null +++ b/tests/integrations/client/http_client_test.go @@ -0,0 +1,87 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "context" + "math" + "testing" + + "github.com/stretchr/testify/suite" + pd "github.com/tikv/pd/client/http" + "github.com/tikv/pd/tests" +) + +type httpClientTestSuite struct { + suite.Suite + ctx context.Context + cancelFunc context.CancelFunc + cluster *tests.TestCluster + client pd.Client +} + +func TestHTTPClientTestSuite(t *testing.T) { + suite.Run(t, new(httpClientTestSuite)) +} + +func (suite *httpClientTestSuite) SetupSuite() { + re := suite.Require() + var err error + suite.ctx, suite.cancelFunc = context.WithCancel(context.Background()) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) + re.NoError(err) + err = suite.cluster.RunInitialServers() + re.NoError(err) + leader := suite.cluster.WaitLeader() + re.NotEmpty(leader) + err = suite.cluster.GetLeaderServer().BootstrapCluster() + re.NoError(err) + var ( + testServers = suite.cluster.GetServers() + endpoints = make([]string, 0, len(testServers)) + ) + for _, s := range testServers { + endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) + } + suite.client = pd.NewClient(endpoints) +} + +func (suite *httpClientTestSuite) TearDownSuite() { + suite.cancelFunc() + suite.client.Close() + suite.cluster.Destroy() +} + +func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { + re := suite.Require() + // Get the cluster-level min resolved TS. + minResolvedTS, storeMinResolvedTSMap, err := suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, nil) + re.NoError(err) + re.Greater(minResolvedTS, uint64(0)) + re.Empty(storeMinResolvedTSMap) + // Get the store-level min resolved TS. + minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, []uint64{1}) + re.NoError(err) + re.Greater(minResolvedTS, uint64(0)) + re.Len(storeMinResolvedTSMap, 1) + re.Equal(minResolvedTS, storeMinResolvedTSMap[1]) + // Get the store-level min resolved TS with an invalid store ID. + minResolvedTS, storeMinResolvedTSMap, err = suite.client.GetMinResolvedTSByStoresIDs(suite.ctx, []uint64{1, 2}) + re.NoError(err) + re.Greater(minResolvedTS, uint64(0)) + re.Len(storeMinResolvedTSMap, 2) + re.Equal(minResolvedTS, storeMinResolvedTSMap[1]) + re.Equal(uint64(math.MaxUint64), storeMinResolvedTSMap[2]) +}