Skip to content

Commit

Permalink
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Sep 24, 2024
2 parents bcd5018 + f3e9d9a commit acb4244
Show file tree
Hide file tree
Showing 259 changed files with 4,733 additions and 2,498 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ coverage
go.work*
embedded_assets_handler.go
*.log
*.bin
14 changes: 12 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ linters:
- gofmt
- revive
- errcheck
- exportloopref
- goimports
- depguard
linters-settings:
gocritic:
# Which checks should be disabled; can't be combined with 'enabled-checks'; default is empty
Expand Down Expand Up @@ -205,6 +208,14 @@ linters-settings:
- (net/http.ResponseWriter).Write
- github.com/pingcap/log.Sync
- (github.com/tikv/pd/pkg/ratelimit.Runner).RunTask
depguard:
rules:
denied-deps:
deny:
- pkg: go.uber.org/atomic
desc: "Use 'sync/atomic' instead of 'go.uber.org/atomic'"
- pkg: github.com/pkg/errors
desc: "Use 'github.com/pingcap/errors' instead of 'github.com/pkg/errors'"
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
Expand All @@ -215,5 +226,4 @@ issues:
linters:
- errcheck
include:
# remove the comment after the path is ready
# - EXC0012
- EXC0012
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ CLEAN_UT_BINARY := find . -name '*.test.bin'| xargs rm -f

clean-test:
# Cleaning test tmp...
rm -rf /tmp/test_pd*
rm -rf /tmp/pd-tests*
rm -rf /tmp/test_etcd*
rm -rf /tmp/pd_tests*
rm -f $(REAL_CLUSTER_TEST_PATH)/playground.log
go clean -testcache
@$(CLEAN_UT_BINARY)
Expand Down
4 changes: 2 additions & 2 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Sort the member alphabetically.
aliases:
sig-critical-approvers-config:
- easonn7
- kevin-xianliu
- BenMeadowcroft
- niubell
- yudongusa
4 changes: 2 additions & 2 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31 h1:6BY+3T6Hqpw9UZ/D7Om/xB+Xik3NkkYxBV6qCzUdUvU=
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
9 changes: 8 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-scheduler-"
// Admin
ResetTS = "/pd/api/v1/admin/reset-ts"
BaseAllocID = "/pd/api/v1/admin/base-alloc-id"
Expand All @@ -79,6 +79,7 @@ const (
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
safepoint = "/pd/api/v1/gc/safepoint"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
// Keyspace
Expand Down Expand Up @@ -182,6 +183,7 @@ func SchedulerByName(name string) string {
}

// ScatterRangeSchedulerWithName returns the scatter range scheduler API with name parameter.
// It is used in https://github.com/pingcap/tidb/blob/2a3352c45dd0f8dd5102adb92879bbfa964e7f5f/pkg/server/handler/tikvhandler/tikv_handler.go#L1252.
func ScatterRangeSchedulerWithName(name string) string {
return fmt.Sprintf("%s%s", scatterRangeScheduler, name)
}
Expand Down Expand Up @@ -215,3 +217,8 @@ func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
func GetKeyspaceMetaByNameURL(keyspaceName string) string {
return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName)
}

// GetDeleteSafePointURI returns the URI for delete safepoint service
func GetDeleteSafePointURI(serviceID string) string {
return fmt.Sprintf("%s/%s", safepoint, serviceID)
}
7 changes: 4 additions & 3 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"context"
"net/http"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)

func TestPDAllowFollowerHandleHeader(t *testing.T) {
Expand Down Expand Up @@ -53,7 +53,8 @@ func TestPDAllowFollowerHandleHeader(t *testing.T) {
func TestWithCallerID(t *testing.T) {
re := require.New(t)
checked := 0
expectedVal := atomic.NewString(defaultCallerID)
var expectedVal atomic.Value
expectedVal.Store(defaultCallerID)
httpClient := NewHTTPClientWithRequestChecker(func(req *http.Request) error {
val := req.Header.Get(xCallerIDKey)
// Exclude the request sent by the inner client.
Expand All @@ -68,7 +69,7 @@ func TestWithCallerID(t *testing.T) {
defer c.Close()
c.GetRegions(context.Background())
expectedVal.Store("test")
c.WithCallerID(expectedVal.Load()).GetRegions(context.Background())
c.WithCallerID(expectedVal.Load().(string)).GetRegions(context.Background())
re.Equal(2, checked)
}

Expand Down
30 changes: 30 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type Client interface {
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
GetGCSafePoint(context.Context) (ListServiceGCSafepoint, error)
DeleteGCSafePoint(context.Context, string) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServicePrimary(context.Context, string) (string, error)
Expand Down Expand Up @@ -1024,3 +1026,31 @@ func (c *client) GetKeyspaceMetaByName(ctx context.Context, keyspaceName string)
}
return &keyspaceMetaPB, nil
}

// GetGCSafePoint gets the GC safe point list.
func (c *client) GetGCSafePoint(ctx context.Context) (ListServiceGCSafepoint, error) {
var gcSafePoint ListServiceGCSafepoint
err := c.request(ctx, newRequestInfo().
WithName(GetGCSafePointName).
WithURI(safepoint).
WithMethod(http.MethodGet).
WithResp(&gcSafePoint))
if err != nil {
return gcSafePoint, err
}
return gcSafePoint, nil
}

// DeleteGCSafePoint deletes a GC safe point with the given service ID.
func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (string, error) {
var msg string
err := c.request(ctx, newRequestInfo().
WithName(DeleteGCSafePointName).
WithURI(GetDeleteSafePointURI(serviceID)).
WithMethod(http.MethodDelete).
WithResp(&msg))
if err != nil {
return msg, err
}
return msg, nil
}
2 changes: 2 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
deleteOperators = "DeleteOperators"
UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType"
GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName"
GetGCSafePointName = "GetGCSafePoint"
DeleteGCSafePointName = "DeleteGCSafePoint"
)

type requestInfo struct {
Expand Down
16 changes: 16 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ import (
pd "github.com/tikv/pd/client"
)

// ServiceSafePoint is the safepoint for a specific service
// NOTE: This type is in sync with pd/pkg/storage/endpoint/gc_safe_point.go
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
SafePoint uint64 `json:"safe_point"`
}

// ListServiceGCSafepoint is the response for list service GC safepoint.
// NOTE: This type is in sync with pd/server/api/service_gc_safepoint.go
type ListServiceGCSafepoint struct {
ServiceGCSafepoints []*ServiceSafePoint `json:"service_gc_safe_points"`
MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"`
GCSafePoint uint64 `json:"gc_safe_point"`
}

// ClusterState saves some cluster state information.
// NOTE: This type sync with https://github.com/tikv/pd/blob/5eae459c01a797cbd0c416054c6f0cad16b8740a/server/cluster/cluster.go#L173
type ClusterState struct {
Expand Down
24 changes: 17 additions & 7 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func initAndRegisterMetrics(constLabels prometheus.Labels) {
}

var (
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -117,6 +118,15 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "The status to indicate if the request is forwarded",
ConstLabels: constLabels,
}, []string{"host", "delegate"})

ongoingRequestCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "ongoing_requests_count",
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down
4 changes: 2 additions & 2 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type pdServiceClient struct {
}

// NOTE: In the current implementation, the URL passed in is bound to have a scheme,
// because it is processed in `newPDServiceDiscovery`, and the url returned by etcd member owns the sheme.
// because it is processed in `newPDServiceDiscovery`, and the url returned by etcd member owns the scheme.
// When testing, the URL is also bound to have a scheme.
func newPDServiceClient(url, leaderURL string, conn *grpc.ClientConn, isLeader bool) ServiceClient {
cli := &pdServiceClient{
Expand Down Expand Up @@ -1074,7 +1074,7 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader
leaderURL := pickMatchedURL(leader.GetClientUrls(), c.tlsCfg)
leaderChanged, err := c.switchLeader(leaderURL)
followerChanged := c.updateFollowers(members, leader.GetMemberId(), leaderURL)
// don't need to recreate balancer if no changess.
// don't need to recreate balancer if no changes.
if !followerChanged && !leaderChanged {
return err
}
Expand Down
12 changes: 4 additions & 8 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand All @@ -57,7 +56,7 @@ const (
lowToken selectType = 1
)

var enableControllerTraceLog = atomicutil.NewBool(false)
var enableControllerTraceLog atomic.Bool

func logControllerTrace(msg string, fields ...zap.Field) {
if enableControllerTraceLog.Load() {
Expand Down Expand Up @@ -250,16 +249,13 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
defer emergencyTokenAcquisitionTicker.Stop()

failpoint.Inject("fastCleanup", func() {
cleanupTicker.Stop()
cleanupTicker = time.NewTicker(100 * time.Millisecond)
cleanupTicker.Reset(100 * time.Millisecond)
// because of checking `gc.run.consumption` in cleanupTicker,
// so should also change the stateUpdateTicker.
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(200 * time.Millisecond)
stateUpdateTicker.Reset(200 * time.Millisecond)
})
failpoint.Inject("acceleratedReportingPeriod", func() {
stateUpdateTicker.Stop()
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
stateUpdateTicker.Reset(time.Millisecond * 100)
})

_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
Expand Down
Loading

0 comments on commit acb4244

Please sign in to comment.