Skip to content

Commit

Permalink
Merge branch 'release-7.1' into cherry-pick-7629-to-release-7.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Feb 21, 2024
2 parents 6639f9d + e00f31e commit 001c2e9
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 204 deletions.
3 changes: 2 additions & 1 deletion cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/errs"
resource_manager "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/configutil"
Expand Down Expand Up @@ -185,7 +186,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
}
// Flushing any buffered log entries
defer log.Sync()

memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.APIServiceMode)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (m *Manager) GetResourceGroup(name string) *ResourceGroup {
m.RLock()
defer m.RUnlock()
if group, ok := m.groups[name]; ok {
return group.Copy()
return group.Clone()
}
return nil
}
Expand All @@ -298,7 +298,7 @@ func (m *Manager) GetResourceGroupList() []*ResourceGroup {
m.RLock()
res := make([]*ResourceGroup, 0, len(m.groups))
for _, group := range m.groups {
res = append(res, group.Copy())
res = append(res, group.Clone())
}
m.RUnlock()
sort.Slice(res, func(i, j int) bool {
Expand Down
34 changes: 22 additions & 12 deletions pkg/mcs/resource_manager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ type RequestUnitSettings struct {
RU *GroupTokenBucket `json:"r_u,omitempty"`
}

// Clone returns a deep copy of the RequestUnitSettings.
func (rus *RequestUnitSettings) Clone() *RequestUnitSettings {
if rus == nil {
return nil
}
var ru *GroupTokenBucket
if rus.RU != nil {
ru = rus.RU.Clone()
}
return &RequestUnitSettings{
RU: ru,
}
}

// NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket.
func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings {
return &RequestUnitSettings{
Expand All @@ -58,21 +72,17 @@ func (rg *ResourceGroup) String() string {
return string(res)
}

// Copy copies the resource group.
func (rg *ResourceGroup) Copy() *ResourceGroup {
// TODO: use a better way to copy
// Clone copies the resource group.
func (rg *ResourceGroup) Clone() *ResourceGroup {
rg.RLock()
defer rg.RUnlock()
res, err := json.Marshal(rg)
if err != nil {
panic(err)
}
var newRG ResourceGroup
err = json.Unmarshal(res, &newRG)
if err != nil {
panic(err)
newRG := &ResourceGroup{
Name: rg.Name,
Mode: rg.Mode,
Priority: rg.Priority,
RUSettings: rg.RUSettings.Clone(),
}
return &newRG
return newRG
}

func (rg *ResourceGroup) getRUToken() float64 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestPatchResourceGroup(t *testing.T) {
re.NoError(err)
err = rg.PatchSettings(patch)
re.NoError(err)
res, err := json.Marshal(rg.Copy())
res, err := json.Marshal(rg.Clone())
re.NoError(err)
re.Equal(ca.expectJSONString, string(res))
}
Expand Down
31 changes: 30 additions & 1 deletion pkg/mcs/resource_manager/server/token_buckets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,40 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) {
},
}
tb.patch(tbSetting)

time.Sleep(10 * time.Millisecond)
time2 := time.Now()
tb.request(time2, 0, 0, clientUniqueID)
re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7)
re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate)

tbSetting = &rmpb.TokenBucket{
Tokens: 0,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: -1,
},
}
tb = NewGroupTokenBucket(tbSetting)
tb.request(time2, 0, 0, clientUniqueID)
re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7)
time3 := time.Now()
tb.request(time3, 0, 0, clientUniqueID)
re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7)

tbSetting = &rmpb.TokenBucket{
Tokens: 200000,
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: -1,
},
}
tb = NewGroupTokenBucket(tbSetting)
tb.request(time3, 0, 0, clientUniqueID)
re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7)
time.Sleep(10 * time.Millisecond)
time4 := time.Now()
tb.request(time4, 0, 0, clientUniqueID)
re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7)
}

func TestGroupTokenBucketRequest(t *testing.T) {
Expand Down
48 changes: 35 additions & 13 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ type GroupTokenBucket struct {
GroupTokenBucketState `json:"state,omitempty"`
}

// Clone returns the deep copy of GroupTokenBucket
func (gtb *GroupTokenBucket) Clone() *GroupTokenBucket {
if gtb == nil {
return nil
}
var settings *rmpb.TokenLimitSettings
if gtb.Settings != nil {
settings = proto.Clone(gtb.Settings).(*rmpb.TokenLimitSettings)
}
stateClone := *gtb.GroupTokenBucketState.Clone()
return &GroupTokenBucket{
Settings: settings,
GroupTokenBucketState: stateClone,
}
}

func (gtb *GroupTokenBucket) setState(state *GroupTokenBucketState) {
gtb.Tokens = state.Tokens
gtb.LastUpdate = state.LastUpdate
Expand Down Expand Up @@ -85,10 +101,14 @@ type GroupTokenBucketState struct {

// Clone returns the copy of GroupTokenBucketState
func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState {
tokenSlots := make(map[uint64]*TokenSlot)
for id, tokens := range gts.tokenSlots {
tokenSlots[id] = tokens
var tokenSlots map[uint64]*TokenSlot
if gts.tokenSlots != nil {
tokenSlots = make(map[uint64]*TokenSlot)
for id, tokens := range gts.tokenSlots {
tokenSlots[id] = tokens
}
}

var lastUpdate *time.Time
if gts.LastUpdate != nil {
newLastUpdate := *gts.LastUpdate
Expand Down Expand Up @@ -272,7 +292,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) {
if gtb.Settings.FillRate == 0 {
gtb.Settings.FillRate = defaultRefillRate
}
if gtb.Tokens < defaultInitialTokens {
if gtb.Tokens < defaultInitialTokens && gtb.Settings.BurstLimit > 0 {
gtb.Tokens = defaultInitialTokens
}
// init slot
Expand All @@ -291,21 +311,23 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien
var elapseTokens float64
if !gtb.Initialized {
gtb.init(now, clientUniqueID)
} else if delta := now.Sub(*gtb.LastUpdate); delta > 0 {
elapseTokens = float64(gtb.Settings.GetFillRate())*delta.Seconds() + gtb.lastBurstTokens
gtb.lastBurstTokens = 0
gtb.Tokens += elapseTokens
gtb.LastUpdate = &now
} else if burst := float64(burstLimit); burst > 0 {
if delta := now.Sub(*gtb.LastUpdate); delta > 0 {
elapseTokens = float64(gtb.Settings.GetFillRate())*delta.Seconds() + gtb.lastBurstTokens
gtb.lastBurstTokens = 0
gtb.Tokens += elapseTokens
}
if gtb.Tokens > burst {
elapseTokens -= gtb.Tokens - burst
gtb.Tokens = burst
}
}
gtb.LastUpdate = &now
// Reloan when setting changed
if gtb.settingChanged && gtb.Tokens <= 0 {
elapseTokens = 0
gtb.resetLoan()
}
if burst := float64(burstLimit); burst > 0 && gtb.Tokens > burst {
elapseTokens -= gtb.Tokens - burst
gtb.Tokens = burst
}
// Balance each slots.
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken, elapseTokens)
}
Expand Down
38 changes: 37 additions & 1 deletion pkg/memory/meminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ func MemTotalNormal() (uint64, error) {
if time.Since(t) < 60*time.Second {
return total, nil
}
return totalMem()
}

func totalMem() (uint64, error) {
v, err := mem.VirtualMemory()
if err != nil {
return v.Total, err
return 0, err
}
memLimit.set(v.Total, time.Now())
return v.Total, nil
Expand Down Expand Up @@ -182,6 +186,38 @@ func init() {
mustNil(err)
}

// InitMemoryHook initializes the memory hook.
// It is to solve the problem that tidb cannot read cgroup in the systemd.
// so if we are not in the container, we compare the cgroup memory limit and the physical memory,
// the cgroup memory limit is smaller, we use the cgroup memory hook.
// ref https://github.com/pingcap/tidb/pull/48096/
func InitMemoryHook() {
if cgroup.InContainer() {
log.Info("use cgroup memory hook because pd is in the container")
return
}
cgroupValue, err := cgroup.GetMemoryLimit()
if err != nil {
return
}
physicalValue, err := totalMem()
if err != nil {
return
}
if physicalValue > cgroupValue && cgroupValue != 0 {
MemTotal = MemTotalCGroup
MemUsed = MemUsedCGroup
sysutil.RegisterGetMemoryCapacity(MemTotalCGroup)
log.Info("use cgroup memory hook", zap.Int64("cgroupMemorySize", int64(cgroupValue)), zap.Int64("physicalMemorySize", int64(physicalValue)))
} else {
log.Info("use physical memory hook", zap.Int64("cgroupMemorySize", int64(cgroupValue)), zap.Int64("physicalMemorySize", int64(physicalValue)))
}
_, err = MemTotal()
mustNil(err)
_, err = MemUsed()
mustNil(err)
}

// InstanceMemUsed returns the memory usage of this process
func InstanceMemUsed() (uint64, error) {
used, t := serverMemUsage.get()
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.rd.JSON(w, http.StatusOK, nil)
handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
}

func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.rd.JSON(w, http.StatusOK, nil)
handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
}

func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
Expand Down
9 changes: 5 additions & 4 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func getFunctionName(f interface{}) string {
// @BasePath /pd/api/v1
func createRouter(prefix string, svr *server.Server) *mux.Router {
serviceMiddle := newServiceMiddlewareBuilder(svr)
registerPrefix := func(router *mux.Router, prefixPath string,
registerPrefix := func(router *mux.Router, prefixPath, name string,
handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) {
routeCreateFunc(router.PathPrefix(prefixPath), serviceMiddle.createHandler(handleFunc),
getFunctionName(handleFunc), opts...)
name, opts...)
}
registerFunc := func(router *mux.Router, path string,
handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) {
Expand Down Expand Up @@ -148,7 +148,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.GetDiagnosticResult, setMethods(http.MethodGet), setAuditBackend(prometheus))

schedulerConfigHandler := newSchedulerConfigHandler(svr, rd)
registerPrefix(apiRouter, "/scheduler-config", schedulerConfigHandler.GetSchedulerConfig, setAuditBackend(prometheus))
registerPrefix(apiRouter, "/scheduler-config", "HandleSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodPost, http.MethodDelete, http.MethodPut, http.MethodPatch), setAuditBackend(localLog, prometheus))
registerPrefix(apiRouter, "/scheduler-config", "GetSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodGet), setAuditBackend(prometheus))

clusterHandler := newClusterHandler(svr, rd)
registerFunc(apiRouter, "/cluster", clusterHandler.GetCluster, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down Expand Up @@ -365,7 +366,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
// API to set or unset failpoints
failpoint.Inject("enableFailpointAPI", func() {
// this function will be named to "func2". It may be used in test
registerPrefix(apiRouter, "/fail", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
registerPrefix(apiRouter, "/fail", "FailPoint", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// The HTTP handler of failpoint requires the full path to be the failpoint path.
r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix+apiPrefix+"/fail")
new(failpoint.HttpHandler).ServeHTTP(w, r)
Expand Down
2 changes: 1 addition & 1 deletion server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func newSchedulerConfigHandler(svr *server.Server, rd *render.Render) *scheduler
}
}

func (h *schedulerConfigHandler) GetSchedulerConfig(w http.ResponseWriter, r *http.Request) {
func (h *schedulerConfigHandler) HandleSchedulerConfig(w http.ResponseWriter, r *http.Request) {
handler := h.svr.GetHandler()
sh, err := handler.GetSchedulerConfigHandler()
if err == nil && sh != nil {
Expand Down
5 changes: 4 additions & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func (suite *serviceTestSuite) TestServiceLabels() {
accessPaths = suite.svr.GetServiceLabels("GetSchedulerConfig")
suite.Len(accessPaths, 1)
suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path)
suite.Equal("", accessPaths[0].Method)
suite.Equal("GET", accessPaths[0].Method)
accessPaths = suite.svr.GetServiceLabels("HandleSchedulerConfig")
suite.Len(accessPaths, 4)
suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path)

accessPaths = suite.svr.GetServiceLabels("ResignLeader")
suite.Len(accessPaths, 1)
Expand Down
Loading

0 comments on commit 001c2e9

Please sign in to comment.