Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mocktikv: impl mock pd service discovery #1137

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/client-go/v2
go 1.21

require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/gogo/protobuf v1.3.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *testCommitterSuite) TearDownSuite() {
}

func (s *testCommitterSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
s.Require().Nil(err)
testutils.BootstrapWithMultiRegions(cluster, []byte("a"), []byte("b"), []byte("c"))
s.cluster = cluster
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type testDeleteRangeSuite struct {
}

func (s *testDeleteRangeSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
s.Require().Nil(err)
testutils.BootstrapWithMultiRegions(cluster, []byte("b"), []byte("c"), []byte("d"))
s.cluster = cluster
Expand Down
1 change: 1 addition & 0 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudfoundry/gosigar v1.3.6 // indirect
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.1581/go.mod h1:RcDobYh8k5VP6TNybz9m
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714 h1:Jz3KVLYY5+JO7rDiX0sAuRGtuv2vG01r17Y9nLMWNUw=
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-sdk-go v1.45.25 h1:c4fLlh5sLdK2DCRTY1z0hyuJZU4ygxX8m1FswL6/nF4=
github.com/aws/aws-sdk-go v1.45.25/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *apiTestSuite) SetupTest() {
rpcClient := tikv.NewRPCClient()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient("pd-api-test", addrs))
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient("pd-api-test"))
require.NoError(err)
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/prewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSetMinCommitTSInAsyncCommit(t *testing.T) {
func TestIsRetryRequestFlagWithRegionError(t *testing.T) {
require := require.New(t)

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
require.Nil(err)
_, peerID, regionID := testutils.BootstrapWithSingleStore(cluster)
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/range_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *testRangeTaskSuite) SetupTest() {
}
allRegionRanges = append(allRegionRanges, makeRange("z", ""))

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
s.Require().Nil(err)
testutils.BootstrapWithMultiRegions(cluster, splitKeys...)
s.cluster = cluster
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/raw/api_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type testRawKVSuite struct {
}

func (s *testRawKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
s.Require().NoError(err)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type testSplitSuite struct {
}

func (s *testSplitSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
s.cluster = cluster
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewTestStore(t *testing.T) *tikv.KVStore {
if *withTiKV {
return newTiKVStore(t)
}
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
require.NoError(t, err)
testutils.BootstrapWithSingleStore(cluster)
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
Expand Down
2 changes: 1 addition & 1 deletion internal/mockstore/mocktikv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
)

// NewTiKVAndPDClient creates a TiKV client and PD client from options.
func NewTiKVAndPDClient(path string, coprHandler CoprRPCHandler) (*RPCClient, *Cluster, pd.Client, error) {
func NewTiKVAndPDClient(path string, pdAddrs []string, coprHandler CoprRPCHandler) (*RPCClient, *Cluster, pd.Client, error) {
mvccStore, err := NewMVCCLevelDB(path)
if err != nil {
return nil, nil, nil, err
Expand Down
116 changes: 115 additions & 1 deletion internal/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"

"github.com/asaskevich/govalidator"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -50,6 +52,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"google.golang.org/grpc"
)

// Use global variables to prevent pdClients from creating duplicate timestamps.
Expand Down Expand Up @@ -85,6 +88,8 @@ type pdClient struct {
groups map[string]*rmpb.ResourceGroup

delay *atomic.Bool

pdAddrs []string
}

// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
Expand Down Expand Up @@ -425,4 +430,113 @@ func (m *pdClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGrou
return nil, 0, nil
}

func (m *pdClient) GetServiceDiscovery() pd.ServiceDiscovery { return nil }
func (m *pdClient) GetServiceDiscovery() pd.ServiceDiscovery {
return newMockPDServiceDiscovery(m.pdAddrs)
}

var _ pd.ServiceDiscovery = (*mockPDServiceDiscovery)(nil)
var _ pd.ServiceClient = (*mockPDServiceClient)(nil)

type mockPDServiceClient struct {
addr string
}

func newMockPDServiceClient(addr string) pd.ServiceClient {
if !strings.HasPrefix(addr, "http") {
addr = fmt.Sprintf("%s://%s", "http", addr)
}
return &mockPDServiceClient{addr: addr}
}

func (c *mockPDServiceClient) GetAddress() string {
return c.addr
}

func (c *mockPDServiceClient) GetHTTPAddress() string {
return c.addr
}

func (c *mockPDServiceClient) GetClientConn() *grpc.ClientConn {
return nil
}

func (c *mockPDServiceClient) BuildGRPCTargetContext(ctx context.Context, _ bool) context.Context {
return ctx
}

func (c *mockPDServiceClient) Available() bool {
return true
}

func (c *mockPDServiceClient) NeedRetry(*pdpb.Error, error) bool {
return false
}

func (c *mockPDServiceClient) IsConnectedToLeader() bool {
return true
}

type mockPDServiceDiscovery struct {
addrs []string
clis []pd.ServiceClient
}

func newMockPDServiceDiscovery(addrs []string) pd.ServiceDiscovery {
addresses := make([]string, 0)
clis := make([]pd.ServiceClient, 0)
for _, addr := range addrs {
if check := govalidator.IsURL(addr); !check {
continue
}
addresses = append(addresses, addr)
clis = append(clis, newMockPDServiceClient(addr))
}
return &mockPDServiceDiscovery{addrs: addresses, clis: clis}
}

func (c *mockPDServiceDiscovery) Init() error {
return nil
}

func (c *mockPDServiceDiscovery) Close() {}

func (c *mockPDServiceDiscovery) GetClusterID() uint64 { return 0 }

func (c *mockPDServiceDiscovery) GetKeyspaceID() uint32 { return 0 }

func (c *mockPDServiceDiscovery) GetKeyspaceGroupID() uint32 { return 0 }

func (c *mockPDServiceDiscovery) GetServiceURLs() []string {
return c.addrs
}

func (c *mockPDServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { return nil }

func (c *mockPDServiceDiscovery) GetClientConns() *sync.Map { return nil }

func (c *mockPDServiceDiscovery) GetServingAddr() string { return "" }

func (c *mockPDServiceDiscovery) GetBackupAddrs() []string { return nil }

func (c *mockPDServiceDiscovery) GetServiceClient() pd.ServiceClient {
if len(c.clis) > 0 {
return c.clis[0]
}
return nil
}

func (c *mockPDServiceDiscovery) GetAllServiceClients() []pd.ServiceClient {
return c.clis
}

func (c *mockPDServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return nil, nil
}

func (c *mockPDServiceDiscovery) ScheduleCheckMemberChanged() {}

func (c *mockPDServiceDiscovery) CheckMemberChanged() error { return nil }

func (c *mockPDServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) {}

func (c *mockPDServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) {}
62 changes: 62 additions & 0 deletions internal/mockstore/mocktikv/pd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/mockstore/mocktikv/pd.go
//

// Copyright 2016 PingCAP, Inc.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant and stale Copyright

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mocktikv

import (
"fmt"
"testing"

"github.com/asaskevich/govalidator"
"github.com/stretchr/testify/require"
)

func TestMockPDServiceDiscovery(t *testing.T) {
re := require.New(t)
pdAddrs := []string{"invalid_pd_address", "127.0.0.1:2379", "http://172.32.21.32:2379"}
for i, addr := range pdAddrs {
check := govalidator.IsURL(addr)
fmt.Println(i)
if i > 0 {
re.True(check)
} else {
re.False(check)
}
}
sd := newMockPDServiceDiscovery(pdAddrs)
clis := sd.GetAllServiceClients()
re.Len(clis, 2)
re.Equal(clis[0].GetHTTPAddress(), "http://127.0.0.1:2379")
re.Equal(clis[1].GetHTTPAddress(), "http://172.32.21.32:2379")
}
4 changes: 2 additions & 2 deletions testutils/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type MockClient = mocktikv.RPCClient
type RPCSession = mocktikv.Session

// NewMockTiKV creates a TiKV client and PD client from options.
func NewMockTiKV(path string, coprHandler CoprRPCHandler) (*MockClient, *MockCluster, pd.Client, error) {
return mocktikv.NewTiKVAndPDClient(path, coprHandler)
func NewMockTiKV(path string, pdAddrs []string, coprHandler CoprRPCHandler) (*MockClient, *MockCluster, pd.Client, error) {
return mocktikv.NewTiKVAndPDClient(path, pdAddrs, coprHandler)
}

// BootstrapWithSingleStore initializes a Cluster with 1 Region and 1 Store.
Expand Down
19 changes: 5 additions & 14 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,14 @@ func WithPool(gp Pool) Option {
// Source is to mark where the HTTP client is created, which is used for metrics and logs.
func WithPDHTTPClient(
source string,
pdAddrs []string,
opts ...pdhttp.ClientOption,
) Option {
return func(o *KVStore) {
if cli := o.GetPDClient(); cli != nil {
o.pdHttpClient = pdhttp.NewClientWithServiceDiscovery(
source,
o.pdClient.GetServiceDiscovery(),
opts...,
)
} else {
o.pdHttpClient = pdhttp.NewClient(
source,
pdAddrs,
opts...,
)
}
o.pdHttpClient = pdhttp.NewClientWithServiceDiscovery(
source,
o.pdClient.GetServiceDiscovery(),
opts...,
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion tikv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type testKVSuite struct {
}

func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil, nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
Expand Down
Loading