Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/simulator: Make simulator work with large scale cluster #8269

Merged
merged 14 commits into from
Jul 26, 2024
8 changes: 5 additions & 3 deletions conf/simconfig.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# PD Simulator Configuration

[tick]
## the tick interval when starting PD inside (default: "100ms")
sim-tick-interval = "100ms"
total-store = 300
total-region = 10000000
case-name = "stable"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do other cases work? like scale in/out

Copy link
Member Author

@HuSharp HuSharp Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I have tested in balance-leader as well
image

image

The stable case is convenient for add/remove node


[store]
## the capacity size of a new store in GB (default: 1024)
Expand All @@ -11,8 +13,8 @@ store-capacity = 1024
store-available = 1024
## the io rate of a new store in MB/s (default: 40)
store-io-per-second = 40
## the version of a new store (default: "2.1.0")
store-version = "2.1.0"
## the version of a new store (default: "8.1.0")
store-version = "8.1.0"

## the meaning of these configurations below are similar with config.toml
[server]
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (m *Manager) Init(ctx context.Context) error {
return err
}
// Load resource group meta info from storage.
m.Lock()
m.groups = make(map[string]*ResourceGroup)
m.Unlock()
handler := func(k, v string) {
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(v), group); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon
}
tickInterval := simConfig.SimTickInterval.Duration

ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(tickInterval)
defer tick.Stop()
sc := make(chan os.Signal, 1)
Expand All @@ -164,6 +165,10 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon

simResult := "FAIL"

go driver.StoresHeartbeat(ctx)
go driver.RegionsHeartbeat(ctx)
go driver.StepRegions(ctx)

EXIT:
for {
select {
Expand All @@ -178,6 +183,7 @@ EXIT:
}
}

cancel()
driver.Stop()
if len(clean) != 0 && clean[0] != nil {
clean[0]()
Expand Down
1 change: 1 addition & 0 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var CaseMap = map[string]func(*config.SimConfig) *Case{
"diagnose-label-not-match1": newLabelNotMatch1,
"diagnose-label-isolation1": newLabelIsolation1,
"diagnose-label-isolation2": newLabelIsolation2,
"stable": newStableEnv,
}

// NewCase creates a new case.
Expand Down
70 changes: 70 additions & 0 deletions tools/pd-simulator/simulator/cases/stable_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
sc "github.com/tikv/pd/tools/pd-simulator/simulator/config"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
"github.com/tikv/pd/tools/pd-simulator/simulator/simutil"
)

// newStableEnv provides a stable environment for test.
func newStableEnv(config *sc.SimConfig) *Case {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we just create a cluster with a fixed region and store rather than cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be an easy thing for the current simulator code structure

var simCase Case

totalStore := config.TotalStore
totalRegion := config.TotalRegion
allStores := make(map[uint64]struct{}, totalStore)
arrStoresID := make([]uint64, 0, totalStore)
replica := int(config.ServerConfig.Replication.MaxReplicas)
for i := 0; i < totalStore; i++ {
id := simutil.IDAllocator.NextID()
simCase.Stores = append(simCase.Stores, &Store{
ID: id,
Status: metapb.StoreState_Up,
})
allStores[id] = struct{}{}
arrStoresID = append(arrStoresID, id)
}

for i := 0; i < totalRegion; i++ {
peers := make([]*metapb.Peer, 0, replica)
for j := 0; j < replica; j++ {
peers = append(peers, &metapb.Peer{
Id: simutil.IDAllocator.NextID(),
StoreId: arrStoresID[(i+j)%totalStore],
})
}
simCase.Regions = append(simCase.Regions, Region{
ID: simutil.IDAllocator.NextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * units.MiB,
Keys: 960000,
})
}

simCase.Checker = func(_ *core.RegionsInfo, _ []info.StoreStats) bool {
if len(allStores) == 0 {
return false
}
HuSharp marked this conversation as resolved.
Show resolved Hide resolved

return false
}
return &simCase
}
12 changes: 6 additions & 6 deletions tools/pd-simulator/simulator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
// Client is a PD (Placement Driver) client.
// It should not be used after calling Close().
type Client interface {
GetClusterID(ctx context.Context) uint64
AllocID(ctx context.Context) (uint64, error)
Bootstrap(ctx context.Context, store *metapb.Store, region *metapb.Region) error
PutStore(ctx context.Context, store *metapb.Store) error
StoreHeartbeat(ctx context.Context, stats *pdpb.StoreStats) error
RegionHeartbeat(ctx context.Context, region *core.RegionInfo) error
GetClusterID(context.Context) uint64
AllocID(context.Context) (uint64, error)
Bootstrap(context.Context, *metapb.Store, *metapb.Region) error
PutStore(context.Context, *metapb.Store) error
StoreHeartbeat(context.Context, *pdpb.StoreStats) error
RegionHeartbeat(context.Context, *core.RegionInfo) error
PutPDConfig(*sc.PDConfig) error

Close()
Expand Down
100 changes: 84 additions & 16 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Driver struct {
conn *Connection
simConfig *config.SimConfig
pdConfig *config.PDConfig

regionTickc chan int64
storeTickc chan int64
tickc chan struct{}
}

// NewDriver returns a driver.
Expand All @@ -66,6 +70,9 @@ func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConf
simCase: simCase,
simConfig: simConfig,
pdConfig: pdConfig,
tickc: make(chan struct{}, 1),
regionTickc: make(chan int64, 1),
storeTickc: make(chan int64, 1),
}, nil
}

Expand Down Expand Up @@ -147,28 +154,89 @@ func (d *Driver) Prepare() error {
// Tick invokes nodes' Tick.
func (d *Driver) Tick() {
d.tickCount++
d.raftEngine.stepRegions()
d.eventRunner.Tick(d.tickCount)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
go func() {
d.tickc <- struct{}{}
}()
go func() {
d.regionTickc <- d.tickCount
}()
go func() {
d.storeTickc <- d.tickCount
}()
}

func (d *Driver) StepRegions(ctx context.Context) {
for {
select {
case <-d.tickc:
d.raftEngine.stepRegions()
d.eventRunner.Tick(d.tickCount)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
}
d.wg.Wait()
case <-ctx.Done():
return
}
}
d.wg.Wait()
}

// Check checks if the simulation is completed.
func (d *Driver) Check() bool {
length := uint64(len(d.conn.Nodes) + 1)
for index := range d.conn.Nodes {
if index >= length {
length = index + 1
func (d *Driver) StoresHeartbeat(ctx context.Context) {
config := d.raftEngine.storeConfig
storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
var wg sync.WaitGroup
for {
select {
case tick := <-d.storeTickc:
if uint64(tick)%storeInterval == 0 {
for _, n := range d.conn.Nodes {
wg.Add(1)
go n.storeHeartBeat(&wg)
}
wg.Wait()
}
case <-ctx.Done():
return
}
}
stats := make([]info.StoreStats, length)
for index, node := range d.conn.Nodes {
stats[index] = *node.stats
}

var schedule sync.Once

func (d *Driver) RegionsHeartbeat(ctx context.Context) {
config := d.raftEngine.storeConfig
regionInterval := uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration)
var wg sync.WaitGroup
// simulator don't need any schedulers util all regions send their heartbeat.
ChooseToHaltPDSchedule(true)
for {
select {
case tick := <-d.regionTickc:
if uint64(tick)%regionInterval == 0 {
for _, n := range d.conn.Nodes {
wg.Add(1)
go n.regionHeartBeat(&wg)
}
wg.Wait()
schedule.Do(func() {
ChooseToHaltPDSchedule(false)
})
}
case <-ctx.Done():
return
}
}
}

// Check checks if the simulation is completed.
func (d *Driver) Check() bool {
var stats []info.StoreStats
for _, n := range d.conn.Nodes {
stats = append(stats, *n.stats)
}

return d.simCase.Checker(d.raftEngine.regionsInfo, stats)
}

Expand Down
23 changes: 4 additions & 19 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (n *Node) Tick(wg *sync.WaitGroup) {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
n.stepHeartBeat()
n.stepCompaction()
n.stepTask()
n.tick++
Expand All @@ -172,29 +171,14 @@ func (n *Node) stepTask() {
}
}

var schedulerCheck sync.Once

func (n *Node) stepHeartBeat() {
config := n.raftEngine.storeConfig

period := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
if n.tick%period == 0 {
n.storeHeartBeat()
}
period = uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration)
if n.tick%period == 0 {
n.regionHeartBeat()
schedulerCheck.Do(func() { ChooseToHaltPDSchedule(false) })
}
}

func (n *Node) stepCompaction() {
if n.tick%compactionDelayPeriod == 0 {
n.compaction()
}
}

func (n *Node) storeHeartBeat() {
func (n *Node) storeHeartBeat(wg *sync.WaitGroup) {
defer wg.Done()
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
Expand All @@ -220,7 +204,8 @@ func (n *Node) compaction() {
n.stats.ToCompactionSize = 0
}

func (n *Node) regionHeartBeat() {
func (n *Node) regionHeartBeat(wg *sync.WaitGroup) {
defer wg.Done()
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
Expand Down