Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix(migrate): support migrate service #375

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cli/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
Name: options.poolset,
Type: options.poolsetDiskType,
}
diskType := options.poolsetDiskType

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
} else if step == CREATE_LOGICAL_POOL {
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = diskType
options[comm.KEY_POOLSET] = poolset
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
}

Expand Down
147 changes: 104 additions & 43 deletions cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,39 @@ import (
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/playbook"
tui "github.com/opencurve/curveadm/internal/tui/common"
"github.com/opencurve/curveadm/internal/task/task/common"
tuicomm "github.com/opencurve/curveadm/internal/tui/common"

cliutil "github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)

var (
MIGRATE_ETCD_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.ADD_ETCD_MEMBER,
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.AMEND_ETCD_CONFIG,
playbook.START_ETCD,
playbook.REMOVE_ETCD_MEMBER,
playbook.AMEND_SERVER_CONFIG, // modify the etcd endpoint in mds.conf
playbook.RESTART_SERVICE, // restart all mds then modify the etcd endpoint
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.UPDATE_TOPOLOGY,
}

// mds
MIGRATE_MDS_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.START_MDS,
playbook.AMEND_SERVER_CONFIG, // modify the mds.listen.addr in metaserver.conf
playbook.RESTART_SERVICE, // restart all metaserver then modify the mds.listen.addr
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.UPDATE_TOPOLOGY,
}

Expand All @@ -65,32 +74,31 @@ var (
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.START_SNAPSHOTCLONE,
playbook.AMEND_SERVER_CONFIG, // modify the mds.listen.addr in metaserver.conf
playbook.RESTART_SERVICE, // restart all metaserver then modify the mds.listen.addr
playbook.UPDATE_TOPOLOGY,
}

// chunkserevr (curvebs)
MIGRATE_CHUNKSERVER_STEPS = []int{
playbook.BACKUP_ETCD_DATA,
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.CREATE_PHYSICAL_POOL,
playbook.START_CHUNKSERVER,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_SERVER_PENGDDING, // start migrate to new server
}

// metaserver (curvefs)
MIGRATE_METASERVER_STEPS = []int{
playbook.BACKUP_ETCD_DATA,
playbook.STOP_SERVICE, // only container
playbook.CLEAN_SERVICE,
playbook.CREATE_LOGICAL_POOL,
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.START_METASERVER,
playbook.CREATE_LOGICAL_POOL,
playbook.STOP_SERVICE, // start migrate to new server
}

MIGRATE_ROLE_STEPS = map[string][]int{
Expand All @@ -100,12 +108,21 @@ var (
topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS,
topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS,
}

MIGRATE_POST_CLEAN_STEPS = []int{
playbook.STOP_SERVICE, // bs
playbook.CLEAN_SERVICE, // bs, fs
playbook.CREATE_PHYSICAL_POOL, // only for chunkserver, remove server that migrate from
playbook.CREATE_LOGICAL_POOL, // only for metaserver, remove server that migrate from
playbook.UPDATE_TOPOLOGY, // bs, fs
}
)

type migrateOptions struct {
filename string
poolset string
poolsetDiskType string
clean bool
}

func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
Expand All @@ -125,7 +142,7 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
flags := cmd.Flags()
flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset")
flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool")

flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver")
return cmd
}

Expand All @@ -149,7 +166,7 @@ func checkMigrateTopology(curveadm *cli.CurveAdm, data string) error {
} else if len(dcs2add) < len(dcs2del) {
return errno.ERR_DELETE_SERVICE_WHILE_MIGRATING_IS_DENIED
}
// len(dcs2add) == len(dcs2del)

if len(dcs2add) == 0 {
return errno.ERR_NO_SERVICES_FOR_MIGRATING
}
Expand Down Expand Up @@ -191,51 +208,87 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
migrates := getMigrates(curveadm, data)
role := migrates[0].From.GetRole()
steps := MIGRATE_ROLE_STEPS[role]
poolset := options.poolset
poolsetDiskType := options.poolsetDiskType
etcdDCs := curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)

// post clean
if options.clean {
steps = MIGRATE_POST_CLEAN_STEPS
if migrates[0].From.GetKind() == common.KIND_CURVEBS {
steps = append(steps[:3], steps[4:]...)
} else {
steps = append(steps[1:2], steps[3:]...)
}
}

poolset := configure.Poolset{
Name: options.poolset,
Type: options.poolsetDiskType,
}

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
// configs
config := dcs2add
switch step {
case playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE:
playbook.CLEAN_SERVICE,
playbook.ADD_ETCD_MEMBER,
playbook.REMOVE_ETCD_MEMBER:
config = dcs2del
case playbook.BACKUP_ETCD_DATA:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
case CREATE_PHYSICAL_POOL,
CREATE_LOGICAL_POOL:
// 1. migrate etcd, need to override mds config and restart all mds
// 2. (FS)migrate mds, need to override metaserver config and restart all metaservers
// 3. (BS)migrate mds, need to override chunkserver and snapshot config and restart all chunkservers and snapshotclones
case playbook.AMEND_SERVER_CONFIG,
playbook.RESTART_SERVICE:
if role == topology.ROLE_ETCD {
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)
} else if role == topology.ROLE_MDS && dcs[0].GetKind() == topology.KIND_CURVEFS {
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_METASERVER)
} else if role == topology.ROLE_MDS && dcs[0].GetKind() == topology.KIND_CURVEBS {
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_CHUNKSERVER)
config = append(config, curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_SNAPSHOTCLONE)...)
}
case
playbook.CREATE_PHYSICAL_POOL,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_SERVER_PENGDDING:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1]
}

// options
options := map[string]interface{}{}
optionsKV := map[string]interface{}{}
switch step {
case playbook.CLEAN_SERVICE:
options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
options[comm.KEY_CLEAN_BY_RECYCLE] = true
optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true
optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true
case playbook.CREATE_PHYSICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.CREATE_LOGICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
optionsKV[comm.KEY_IF_UPDATE_TOPOLOG] = false
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.UPDATE_TOPOLOGY:
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
case playbook.ADD_ETCD_MEMBER,
playbook.AMEND_ETCD_CONFIG,
playbook.AMEND_SERVER_CONFIG:
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_CLUSTER_DCS] = etcdDCs
}

pb.AddStep(&playbook.PlaybookStep{
Type: step,
Configs: config,
Options: options,
Type: step,
Configs: config,
Options: optionsKV,
ExecOptions: playbook.ExecOptions{
SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
// SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
},
})
}
Expand All @@ -261,7 +314,10 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm,
options.filename,
options.clean,
)
if err != nil {
return err
}
Expand All @@ -272,13 +328,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
return err
}

// 4) display title
displayMigrateTitle(curveadm, data)
if !options.clean {
// 4) display title
displayMigrateTitle(curveadm, data)

// 5) confirm by user
if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
// 5) confirm by user
if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
}
}

// 6) generate migrate playbook
Expand All @@ -294,6 +352,9 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 9) print success prompt
if options.clean {
return nil
}
curveadm.WriteOutln("")
curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^."))
// TODO(P1): warning iff there is changed configs
Expand Down
8 changes: 5 additions & 3 deletions cli/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command {
return cmd
}

func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
func readTopology(curveadm *cli.CurveAdm, filename string, clean bool) (string, error) {
if !utils.PathExist(filename) {
return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND.
F("%s: no such file", utils.AbsPath(filename))
Expand All @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
}

oldData := curveadm.ClusterTopologyData()
curveadm.WriteOut("%s", utils.Diff(oldData, data))
if !clean {
curveadm.WriteOut("%s", utils.Diff(oldData, data))
}
return data, nil
}

Expand Down Expand Up @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm, options.filename, false)
if err != nil {
return err
}
Expand Down
19 changes: 13 additions & 6 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ const (
// format
KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS"

// migrate
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"
KEY_CLUSTER_DCS = "CLUSTER_DCS"

// check
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME"
Expand All @@ -64,6 +69,7 @@ const (
KEY_SCALE_OUT_CLUSTER = "SCALE_OUT_CLUSTER"
KEY_MIGRATE_SERVERS = "MIGRATE_SERVERS"
KEY_NEW_TOPOLOGY_DATA = "NEW_TOPOLOGY_DATA"
KEY_IF_UPDATE_TOPOLOG = "IF_UPDATE_TOPOTLOY"

// status
KEY_ALL_SERVICE_STATUS = "ALL_SERVICE_STATUS"
Expand All @@ -72,12 +78,13 @@ const (
SERVICE_STATUS_UNKNOWN = "Unknown"

// clean
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER"

// client
KEY_CLIENT_HOST = "CLIENT_HOST"
Expand Down
Loading