Skip to content

Commit

Permalink
Fix(migrate): support migrate other service
Browse files Browse the repository at this point in the history
  • Loading branch information
caoxianfei1 committed Jan 2, 2024
1 parent b1a3a4e commit aace2bb
Show file tree
Hide file tree
Showing 14 changed files with 650 additions and 11 deletions.
42 changes: 36 additions & 6 deletions cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,30 @@ import (

var (
MIGRATE_ETCD_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.ADD_ETCD_MEMBER,
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.AMEND_ETCD_CONFIG,
playbook.START_ETCD,
playbook.REMOVE_ETCD_MEMBER,
playbook.AMEND_SERVER_CONFIG, // modify the etcd endpoint in mds.conf
playbook.RESTART_SERVICE, // restart all mds then modify the etcd endpoint
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.UPDATE_TOPOLOGY,
}

// mds
MIGRATE_MDS_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.START_MDS,
playbook.AMEND_SERVER_CONFIG, // modify the mds.listen.addr in metaserver.conf
playbook.RESTART_SERVICE, // restart all metaserver then modify the mds.listen.addr
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.UPDATE_TOPOLOGY,
}

Expand All @@ -67,6 +74,8 @@ var (
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.START_SNAPSHOTCLONE,
playbook.AMEND_SERVER_CONFIG, // modify the mds.listen.addr in metaserver.conf
playbook.RESTART_SERVICE, // restart all metaserver then modify the mds.listen.addr
playbook.UPDATE_TOPOLOGY,
}

Expand Down Expand Up @@ -157,7 +166,7 @@ func checkMigrateTopology(curveadm *cli.CurveAdm, data string) error {
} else if len(dcs2add) < len(dcs2del) {
return errno.ERR_DELETE_SERVICE_WHILE_MIGRATING_IS_DENIED
}
// len(dcs2add) == len(dcs2del)

if len(dcs2add) == 0 {
return errno.ERR_NO_SERVICES_FOR_MIGRATING
}
Expand Down Expand Up @@ -199,6 +208,7 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
migrates := getMigrates(curveadm, data)
role := migrates[0].From.GetRole()
steps := MIGRATE_ROLE_STEPS[role]
etcdDCs := curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)

// post clean
if options.clean {
Expand All @@ -221,10 +231,25 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
config := dcs2add
switch step {
case playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE:
playbook.CLEAN_SERVICE,
playbook.ADD_ETCD_MEMBER,
playbook.REMOVE_ETCD_MEMBER:
config = dcs2del
case playbook.BACKUP_ETCD_DATA:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
// 1. migrate etcd, need to override mds config and restart all mds
// 2. (FS)migrate mds, need to override metaserver config and restart all metaservers
// 3. (BS)migrate mds, need to override chunkserver and snapshot config and restart all chunkservers and snapshotclones
case playbook.AMEND_SERVER_CONFIG,
playbook.RESTART_SERVICE:
if role == topology.ROLE_ETCD {
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)
} else if role == topology.ROLE_MDS && dcs[0].GetKind() == topology.KIND_CURVEFS {
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_METASERVER)
} else if role == topology.ROLE_MDS && dcs[0].GetKind() == topology.KIND_CURVEBS {
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_CHUNKSERVER)
config = append(config, curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_SNAPSHOTCLONE)...)
}
case
playbook.CREATE_PHYSICAL_POOL,
playbook.CREATE_LOGICAL_POOL,
Expand All @@ -251,6 +276,11 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.UPDATE_TOPOLOGY:
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
case playbook.ADD_ETCD_MEMBER,
playbook.AMEND_ETCD_CONFIG,
playbook.AMEND_SERVER_CONFIG:
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_CLUSTER_DCS] = etcdDCs
}

pb.AddStep(&playbook.PlaybookStep{
Expand Down
1 change: 1 addition & 0 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
// migrate
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"
KEY_CLUSTER_DCS = "CLUSTER_DCS"

// check
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
Expand Down
6 changes: 5 additions & 1 deletion internal/configure/topology/dc_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func (dc *DeployConfig) GetInstances() int { return dc.instanc
func (dc *DeployConfig) GetHostSequence() int { return dc.hostSequence }
func (dc *DeployConfig) GetInstancesSequence() int { return dc.instancesSequence }
func (dc *DeployConfig) GetServiceConfig() map[string]string { return dc.serviceConfig }
func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables }
func (dc *DeployConfig) SetServiceConfig(key, value string) {
dc.serviceConfig[key] = value
}

func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables }

// (2): config item
func (dc *DeployConfig) GetPrefix() string { return dc.getString(CONFIG_PREFIX) }
Expand Down
4 changes: 2 additions & 2 deletions internal/configure/topology/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ var (
{name: "cluster_mds_dummy_addr"},
{name: "cluster_mds_dummy_port"},
{name: "cluster_chunkserver_addr", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_addr", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_addr"},
{name: "cluster_snapshotclone_proxy_addr", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_dummy_port", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_dummy_port"},
{name: "cluster_snapshotclone_nginx_upstream", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshot_addr"}, // tools-v2: compatible with some old version image
{name: "cluster_snapshot_dummy_addr"}, // tools-v2
Expand Down
3 changes: 3 additions & 0 deletions internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ var (
ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset")
ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2")
ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed")
ERR_GET_CLUSTER_ETCD_ADDR = EC(410028, "failed to get cluster_etcd_addr variable")
ERR_ADD_ETCD_MEMEBER = EC(410029, "failed to add etcd member to existing etcd cluster")
ERR_REMOVE_ETCD_MEMBER = EC(410030, "failed to remove etcd member from existing etcd cluster")
// 420: common (curvebs client)
ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped")
ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed")
Expand Down
12 changes: 12 additions & 0 deletions internal/playbook/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const (
INSTALL_CLIENT
UNINSTALL_CLIENT
ATTACH_LEADER_OR_RANDOM_CONTAINER
ADD_ETCD_MEMBER
AMEND_ETCD_CONFIG
AMEND_SERVER_CONFIG
REMOVE_ETCD_MEMBER

// bs
FORMAT_CHUNKFILE_POOL
Expand Down Expand Up @@ -251,6 +255,14 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
t, err = comm.NewInstallClientTask(curveadm, config.GetCC(i))
case UNINSTALL_CLIENT:
t, err = comm.NewUninstallClientTask(curveadm, nil)
case ADD_ETCD_MEMBER:
t, err = comm.NewAddEtcdMemberTask(curveadm, config.GetDC(i))
case AMEND_ETCD_CONFIG:
t, err = comm.NewAmendEtcdConfigTask(curveadm, config.GetDC(i))
case AMEND_SERVER_CONFIG:
t, err = comm.NewAmendServerConfigTask(curveadm, config.GetDC(i))
case REMOVE_ETCD_MEMBER:
t, err = comm.NewRemoveEtcdMemberTask(curveadm, config.GetDC(i))
// bs
case FORMAT_CHUNKFILE_POOL:
t, err = bs.NewFormatChunkfilePoolTask(curveadm, config.GetFC(i))
Expand Down
4 changes: 2 additions & 2 deletions internal/task/scripts/enable_etcd_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/

/*
* Project: Curveadm
* Created Date: 2023-08-02
* Author: wanghai (SeanHai)
*/
*/

package scripts

Expand Down
4 changes: 4 additions & 0 deletions internal/task/scripts/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (
WAIT string
//go:embed shell/report.sh
REPORT string
//go:embed shell/add_etcd.sh
ADD_ETCD string
//go:embed shell/remove_etcd.sh
REMOVE_ETCD string

// CurveBS

Expand Down
47 changes: 47 additions & 0 deletions internal/task/scripts/shell/add_etcd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash

# Usage:
# Example:
# Created Date: 2023-12-15
# Author: Caoxianfei

etcdctl=$1
endpoints=$2
old_name=$3
new_name=$4
new_peer_url=$5

tmplog=/tmp/_curveadm_add_etcd_

output=$(${etcdctl} --endpoints=${endpoints} member list)
if [ $? -ne 0 ]; then
echo "failed to list all etcd members"
exit 1
fi

# if member has added, then skip
id=$(echo "$output" | awk -v name="$new_name" -F ', ' '$3 == name {print $1}')
if [ -z "${id}" ]; then
echo "EXIST"
exit 0
fi

${etcdctl} --endpoints=${endpoints} member add ${new_name} --peer-urls ${new_peer_url} > ${tmplog} 2>&1
if [ $? -ne 0 ]; then
if cat ${tmplog} | grep -q "Peer URLs already exists"; then
exit 0
else
exit 1
fi
fi


# ${etcdctl} --endpoints=${endpoints} member remove ${id}
# if [ $? -ne 0 ]; then
# echo "failed to remove member ${old_name}"
# exit 1
# fi




33 changes: 33 additions & 0 deletions internal/task/scripts/shell/remove_etcd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash

# Usage:
# Example:
# Created Date: 2023-12-15
# Author: Caoxianfei

etcdctl=$1
endpoints=$2
old_name=$3

output=$(${etcdctl} --endpoints=${endpoints} member list)
if [ $? -ne 0 ]; then
echo "failed to list all etcd members"
exit 1
fi

id=$(echo "$output" | awk -v name="$old_name" -F ', ' '$3 == name {print $1}')
# if not found the name then exit 0
if [ -z "${id}" ]; then
echo "NOTEXIST"
exit 0
fi

${etcdctl} --endpoints=${endpoints} member remove ${id}
if [ $? -ne 0 ]; then
echo "failed to remove member ${old_name}"
exit 1
fi




119 changes: 119 additions & 0 deletions internal/task/task/common/add_etcd_mem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Project: CurveAdm
* Created Date: 2023-12-20
* Author: Caoxianfei
*/

package common

import (
"fmt"
"strconv"

"github.com/opencurve/curveadm/cli/cli"
comm "github.com/opencurve/curveadm/internal/common"
"github.com/opencurve/curveadm/internal/configure"
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/task/context"
"github.com/opencurve/curveadm/internal/task/scripts"
"github.com/opencurve/curveadm/internal/task/step"
"github.com/opencurve/curveadm/internal/task/task"
tui "github.com/opencurve/curveadm/internal/tui/common"
)

func checkAddEtcdMemberStatus(success *bool, out *string) step.LambdaType {
return func(ctx *context.Context) error {
if !*success {
return errno.ERR_ADD_ETCD_MEMEBER.S(*out)
}
if *out == "EXIST" {
return task.ERR_SKIP_TASK
}
return nil
}
}

func NewAddEtcdMemberTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) {
serviceId := curveadm.GetServiceId(dc.GetId())
containerId, err := curveadm.GetContainerId(serviceId)
if curveadm.IsSkip(dc) {
return nil, nil
} else if err != nil {
return nil, err
}
hc, err := curveadm.GetHost(dc.GetHost())
if err != nil {
return nil, err
}

subname := fmt.Sprintf("host=%s role=%s containerId=%s",
dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId))
t := task.NewTask("Add Etcd Member", subname, hc.GetSSHConfig())

host, role := dc.GetHost(), dc.GetRole()
script := scripts.ADD_ETCD
layout := dc.GetProjectLayout()
scriptPath := fmt.Sprintf("%s/add_etcd.sh", layout.ServiceBinDir)
etcdctlPath := layout.ServiceBinDir + "/etcdctl"
endpoints, err := dc.GetVariables().Get("cluster_etcd_addr")
if err != nil {
return nil, errno.ERR_GET_CLUSTER_ETCD_ADDR
}
oldName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence()))
newName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()+3), strconv.Itoa(dc.GetInstancesSequence()))
migrates := []*configure.MigrateServer{}
if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil {
migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer)
}
toService := migrates[0].To
peerUrl := fmt.Sprint("http://", toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenPort()))
addEtcdCmd := fmt.Sprintf("/bin/bash %s %s %s %s %s %s", scriptPath, etcdctlPath, endpoints, oldName, newName, peerUrl)

var success bool
var out string
t.AddStep(&step.ListContainers{
ShowAll: true,
Format: `"{{.ID}}"`,
Filter: fmt.Sprintf("id=%s", containerId),
Out: &out,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.Lambda{
Lambda: CheckContainerExist(host, role, containerId, &out),
})
t.AddStep(&step.InstallFile{
ContainerId: &containerId,
ContainerDestPath: scriptPath,
Content: &script,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.ContainerExec{
ContainerId: &containerId,
Success: &success,
Out: &out,
Command: addEtcdCmd,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.Lambda{
Lambda: checkAddEtcdMemberStatus(&success, &out),
})

return t, nil
}
Loading

0 comments on commit aace2bb

Please sign in to comment.