Skip to content

Commit

Permalink
Fix(migrate): support migrate chunkserver and metaserveruccessed
Browse files Browse the repository at this point in the history
Signed-off-by: caoxianfei1 <[email protected]>
  • Loading branch information
caoxianfei1 committed Dec 20, 2023
1 parent 8d9c430 commit ce2bd4a
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 64 deletions.
4 changes: 1 addition & 3 deletions cli/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
Name: options.poolset,
Type: options.poolsetDiskType,
}
diskType := options.poolsetDiskType

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
} else if step == CREATE_LOGICAL_POOL {
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = diskType
options[comm.KEY_POOLSET] = poolset
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
}

Expand Down
105 changes: 68 additions & 37 deletions cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/playbook"
tui "github.com/opencurve/curveadm/internal/tui/common"
"github.com/opencurve/curveadm/internal/task/task/common"
tuicomm "github.com/opencurve/curveadm/internal/tui/common"

cliutil "github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -71,26 +73,23 @@ var (
// chunkserevr (curvebs)
MIGRATE_CHUNKSERVER_STEPS = []int{
playbook.BACKUP_ETCD_DATA,
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.CREATE_PHYSICAL_POOL,
playbook.START_CHUNKSERVER,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_SERVER_PENGDDING, // start migrate to new server
}

// metaserver (curvefs)
MIGRATE_METASERVER_STEPS = []int{
playbook.BACKUP_ETCD_DATA,
playbook.STOP_SERVICE, // only container
playbook.CLEAN_SERVICE,
playbook.CREATE_LOGICAL_POOL,
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.START_METASERVER,
playbook.CREATE_LOGICAL_POOL,
playbook.STOP_SERVICE, // start migrate to new server
}

MIGRATE_ROLE_STEPS = map[string][]int{
Expand All @@ -100,12 +99,21 @@ var (
topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS,
topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS,
}

MIGRATE_POST_CLEAN_STEPS = []int{
playbook.STOP_SERVICE, // bs
playbook.CLEAN_SERVICE, // bs, fs
playbook.CREATE_PHYSICAL_POOL, // only for chunkserver, remove server that migrate from
playbook.CREATE_LOGICAL_POOL, // only for metaserver, remove server that migrate from
playbook.UPDATE_TOPOLOGY, // bs, fs
}
)

type migrateOptions struct {
filename string
poolset string
poolsetDiskType string
clean bool
}

func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
Expand All @@ -125,7 +133,7 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
flags := cmd.Flags()
flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset")
flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool")

flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver")
return cmd
}

Expand Down Expand Up @@ -191,8 +199,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
migrates := getMigrates(curveadm, data)
role := migrates[0].From.GetRole()
steps := MIGRATE_ROLE_STEPS[role]
poolset := options.poolset
poolsetDiskType := options.poolsetDiskType

// post clean
if options.clean {
steps = MIGRATE_POST_CLEAN_STEPS
if migrates[0].From.GetKind() == common.KIND_CURVEBS {
steps = append(steps[:3], steps[4:]...)
} else {
steps = append(steps[1:2], steps[3:]...)
}
}

poolset := configure.Poolset{
Name: options.poolset,
Type: options.poolsetDiskType,
}

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -204,38 +225,40 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
config = dcs2del
case playbook.BACKUP_ETCD_DATA:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
case CREATE_PHYSICAL_POOL,
CREATE_LOGICAL_POOL:
case
playbook.CREATE_PHYSICAL_POOL,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_SERVER_PENGDDING:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1]
}

// options
options := map[string]interface{}{}
optionsKV := map[string]interface{}{}
switch step {
case playbook.CLEAN_SERVICE:
options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
options[comm.KEY_CLEAN_BY_RECYCLE] = true
optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true
optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true
case playbook.CREATE_PHYSICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.CREATE_LOGICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
optionsKV[comm.KEY_IF_UPDATE_TOPOLOG] = false
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.UPDATE_TOPOLOGY:
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
}

pb.AddStep(&playbook.PlaybookStep{
Type: step,
Configs: config,
Options: options,
Type: step,
Configs: config,
Options: optionsKV,
ExecOptions: playbook.ExecOptions{
SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
// SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
},
})
}
Expand All @@ -261,7 +284,10 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm,
options.filename,
options.clean,
)
if err != nil {
return err
}
Expand All @@ -272,13 +298,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
return err
}

// 4) display title
displayMigrateTitle(curveadm, data)
if !options.clean {
// 4) display title
displayMigrateTitle(curveadm, data)

// 5) confirm by user
if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
// 5) confirm by user
if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
}
}

// 6) generate migrate playbook
Expand All @@ -294,6 +322,9 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 9) print success prompt
if options.clean {
return nil
}
curveadm.WriteOutln("")
curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^."))
// TODO(P1): warning iff there is changed configs
Expand Down
8 changes: 5 additions & 3 deletions cli/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command {
return cmd
}

func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
func readTopology(curveadm *cli.CurveAdm, filename string, clean bool) (string, error) {
if !utils.PathExist(filename) {
return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND.
F("%s: no such file", utils.AbsPath(filename))
Expand All @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
}

oldData := curveadm.ClusterTopologyData()
curveadm.WriteOut("%s", utils.Diff(oldData, data))
if !clean {
curveadm.WriteOut("%s", utils.Diff(oldData, data))
}
return data, nil
}

Expand Down Expand Up @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm, options.filename, false)
if err != nil {
return err
}
Expand Down
18 changes: 12 additions & 6 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ const (
// format
KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS"

// migrate
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"

// check
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME"
Expand All @@ -64,6 +68,7 @@ const (
KEY_SCALE_OUT_CLUSTER = "SCALE_OUT_CLUSTER"
KEY_MIGRATE_SERVERS = "MIGRATE_SERVERS"
KEY_NEW_TOPOLOGY_DATA = "NEW_TOPOLOGY_DATA"
KEY_IF_UPDATE_TOPOLOG = "IF_UPDATE_TOPOTLOY"

// status
KEY_ALL_SERVICE_STATUS = "ALL_SERVICE_STATUS"
Expand All @@ -72,12 +77,13 @@ const (
SERVICE_STATUS_UNKNOWN = "Unknown"

// clean
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER"

// client
KEY_CLIENT_HOST = "CLIENT_HOST"
Expand Down
38 changes: 26 additions & 12 deletions internal/configure/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,26 +263,40 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po
old.NPools = old.NPools + 1
}

func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) {
func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) {
m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig
for _, migrate := range migrates {
m[formatName(migrate.From)] = migrate.To
}

for i, server := range old.Servers {
dc, ok := m[server.Name]
if !ok {
continue
// add server that will migrate to
for fromName, toDc := range m {
server := Server{}
server.InternalIp = toDc.GetListenIp()
server.ExternalIp = toDc.GetListenExternalIp()
server.InternalPort = toDc.GetListenPort()
server.ExternalPort = toDc.GetListenExternalPort()
server.Name = formatName(toDc)

for _, oldServer := range old.Servers {
if oldServer.Name == fromName {
server.PhysicalPool = oldServer.PhysicalPool
server.Poolset = oldServer.Poolset
server.Pool = oldServer.Pool
server.Zone = oldServer.Zone
}
}
old.Servers = append(old.Servers, server)
}

server.InternalIp = dc.GetListenIp()
server.ExternalIp = dc.GetListenExternalIp()
server.Name = formatName(dc)
if server.InternalPort != 0 && server.ExternalPort != 0 {
server.InternalPort = dc.GetListenPort()
server.ExternalPort = dc.GetListenExternalPort()
// remove server that has migrated
if removeMigratedServer {
for i := 0; i < len(old.Servers); i++ {
_, ok := m[old.Servers[i].Name]
if ok {
old.Servers = append(old.Servers[:i], old.Servers[i+1:]...)
}
}
old.Servers[i] = server
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ var (
ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed")
ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found")
ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed")

ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate")
RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr")
ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset")
ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2")
ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed")
// 420: common (curvebs client)
ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped")
ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed")
Expand Down
3 changes: 3 additions & 0 deletions internal/playbook/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const (
CREATE_VOLUME
MAP_IMAGE
UNMAP_IMAGE
MARK_SERVER_PENGDDING

// monitor
PULL_MONITOR_IMAGE
Expand Down Expand Up @@ -278,6 +279,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
t, err = bs.NewDeleteTargetTask(curveadm, nil)
case LIST_TARGETS:
t, err = bs.NewListTargetsTask(curveadm, nil)
case MARK_SERVER_PENGDDING:
t, err = bs.NewMarkServerPendding(curveadm, config.GetDC(i))
// fs
case CHECK_CLIENT_S3:
t, err = checker.NewClientS3ConfigureTask(curveadm, config.GetCC(i))
Expand Down
3 changes: 3 additions & 0 deletions internal/task/scripts/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ var (

//go:embed shell/create_fs.sh
CREATE_FS string

//go:embed shell/mark_server_pendding.sh
MARK_SERVER_PENDDING string
)
Loading

0 comments on commit ce2bd4a

Please sign in to comment.