Skip to content

Commit

Permalink
feat: move migration logics to tool
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy00000000000000 committed Jan 30, 2024
1 parent 759fa78 commit a5caba1
Show file tree
Hide file tree
Showing 6 changed files with 482 additions and 279 deletions.
275 changes: 0 additions & 275 deletions src/scene_server/admin_server/service/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,275 +17,16 @@ import (
"fmt"
"net/http"
"path/filepath"
"time"

"configcenter/src/common"
"configcenter/src/common/blog"
"configcenter/src/common/mapstr"
"configcenter/src/common/metadata"
"configcenter/src/common/types"
"configcenter/src/common/util"
"configcenter/src/common/version"
"configcenter/src/common/watch"
"configcenter/src/scene_server/admin_server/upgrader"
"configcenter/src/source_controller/cacheservice/event"
daltypes "configcenter/src/storage/dal/types"
streamtypes "configcenter/src/storage/stream/types"

"github.com/emicklei/go-restful/v3"
"go.mongodb.org/mongo-driver/bson"
)

func (s *Service) migrate(req *restful.Request, resp *restful.Response) {
rHeader := req.Request.Header
rid := util.GetHTTPCCRequestID(rHeader)
defErr := s.CCErr.CreateDefaultCCErrorIf(util.GetLanguage(rHeader))
ownerID := common.BKDefaultOwnerID
updateCfg := &upgrader.Config{
OwnerID: ownerID,
User: common.CCSystemOperatorUserName,
}

if err := s.createWatchDBChainCollections(rid); err != nil {
blog.Errorf("create watch db chain collections failed, err: %v, rid: %s", err, rid)
result := &metadata.RespError{
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
}
resp.WriteError(http.StatusInternalServerError, result)
return
}

preVersion, finishedVersions, err := upgrader.Upgrade(s.ctx, s.db, s.cache, s.iam, updateCfg)
if err != nil {
blog.Errorf("db upgrade failed, err: %+v, rid: %s", err, rid)
result := &metadata.RespError{
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
}
resp.WriteError(http.StatusInternalServerError, result)
return
}

currentVersion := preVersion
if len(finishedVersions) > 0 {
currentVersion = finishedVersions[len(finishedVersions)-1]
}

result := MigrationResponse{
BaseResp: metadata.BaseResp{
Result: true,
Code: 0,
ErrMsg: "",
Permissions: nil,
},
Data: "migrate success",
PreVersion: preVersion,
CurrentVersion: currentVersion,
FinishedVersions: finishedVersions,
}
resp.WriteEntity(result)
}

// dbChainTTLTime the ttl time seconds of the db event chain, used to set the ttl index of mongodb
const dbChainTTLTime = 5 * 24 * 60 * 60

func (s *Service) createWatchDBChainCollections(rid string) error {
// create watch token table to store the last watch token info for every collections
exists, err := s.watchDB.HasTable(s.ctx, common.BKTableNameWatchToken)
if err != nil {
blog.Errorf("check if table %s exists failed, err: %v, rid: %s", common.BKTableNameWatchToken, err, rid)
return err
}

if !exists {
err = s.watchDB.CreateTable(s.ctx, common.BKTableNameWatchToken)
if err != nil && !s.watchDB.IsDuplicatedError(err) {
blog.Errorf("create table %s failed, err: %v, rid: %s", common.BKTableNameWatchToken, err, rid)
return err
}
}

// create watch chain node table and init the last token info as empty for all collections
cursorTypes := watch.ListCursorTypes()
for _, cursorType := range cursorTypes {
key, err := event.GetResourceKeyWithCursorType(cursorType)
if err != nil {
blog.Errorf("get resource key with cursor type %s failed, err: %v, rid: %s", cursorType, err, rid)
return err
}

exists, err := s.watchDB.HasTable(s.ctx, key.ChainCollection())
if err != nil {
blog.Errorf("check if table %s exists failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}

if !exists {
err = s.watchDB.CreateTable(s.ctx, key.ChainCollection())
if err != nil && !s.watchDB.IsDuplicatedError(err) {
blog.Errorf("create table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}
}

if err = s.createWatchIndexes(cursorType, key, rid); err != nil {
return err
}

if err = s.createWatchToken(key); err != nil {
return err
}
}
return nil
}

func (s *Service) createWatchIndexes(cursorType watch.CursorType, key event.Key, rid string) error {
indexes := []daltypes.Index{
{Name: "index_id", Keys: bson.D{{common.BKFieldID, -1}}, Background: true, Unique: true},
{Name: "index_cursor", Keys: bson.D{{common.BKCursorField, -1}}, Background: true, Unique: true},
{Name: "index_cluster_time", Keys: bson.D{{common.BKClusterTimeField, -1}}, Background: true,
ExpireAfterSeconds: dbChainTTLTime},
}

if cursorType == watch.ObjectBase || cursorType == watch.MainlineInstance || cursorType == watch.InstAsst {
subResourceIndex := daltypes.Index{
Name: "index_sub_resource", Keys: bson.D{{common.BKSubResourceField, 1}}, Background: true,
}
indexes = append(indexes, subResourceIndex)
}

existIndexArr, err := s.watchDB.Table(key.ChainCollection()).Indexes(s.ctx)
if err != nil {
blog.Errorf("get exist indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}

existIdxMap := make(map[string]bool)
for _, index := range existIndexArr {
existIdxMap[index.Name] = true
}

for _, index := range indexes {
if _, exist := existIdxMap[index.Name]; exist {
continue
}

err = s.watchDB.Table(key.ChainCollection()).CreateIndex(s.ctx, index)
if err != nil && !s.watchDB.IsDuplicatedError(err) {
blog.Errorf("create indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid)
return err
}
}
return nil
}

func (s *Service) createWatchToken(key event.Key) error {
filter := map[string]interface{}{
"_id": key.Collection(),
}

count, err := s.watchDB.Table(common.BKTableNameWatchToken).Find(filter).Count(s.ctx)
if err != nil {
blog.Errorf("check if last watch token exists failed, err: %v, filter: %+v", err, filter)
return err
}

if count > 0 {
return nil
}

if key.Collection() == event.HostIdentityKey.Collection() {
// host identity's watch token is different with other identity.
// only set coll is ok, the other fields is useless
data := mapstr.MapStr{
"_id": key.Collection(),
common.BKTableNameBaseHost: watch.LastChainNodeData{Coll: common.BKTableNameBaseHost},
common.BKTableNameModuleHostConfig: watch.LastChainNodeData{Coll: common.BKTableNameModuleHostConfig},
common.BKTableNameBaseProcess: watch.LastChainNodeData{Coll: common.BKTableNameBaseProcess},
}
if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil {
blog.Errorf("init last watch token failed, err: %v, data: %+v", err, data)
return err
}
return nil
}

if key.Collection() == event.BizSetRelationKey.Collection() {
// biz set relation's watch token is generated in the same way with the host identity's watch token
data := mapstr.MapStr{
"_id": key.Collection(),
common.BKTableNameBaseApp: watch.LastChainNodeData{Coll: common.BKTableNameBaseApp},
common.BKTableNameBaseBizSet: watch.LastChainNodeData{Coll: common.BKTableNameBaseBizSet},
common.BKFieldID: 0,
common.BKTokenField: "",
}
if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil {
blog.Errorf("init last biz set relation watch token failed, err: %v, data: %+v", err, data)
return err
}
return nil
}

data := watch.LastChainNodeData{
Coll: key.Collection(),
Token: "",
StartAtTime: streamtypes.TimeStamp{
Sec: uint32(time.Now().Unix()),
Nano: 0,
},
}
if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil {
blog.Errorf("init last watch token failed, err: %v, data: %+v", err, data)
return err
}
return nil
}

func (s *Service) migrateSpecifyVersion(req *restful.Request, resp *restful.Response) {
rHeader := req.Request.Header
rid := util.GetHTTPCCRequestID(rHeader)
defErr := s.CCErr.CreateDefaultCCErrorIf(util.GetLanguage(rHeader))
ownerID := common.BKDefaultOwnerID
updateCfg := &upgrader.Config{
OwnerID: ownerID,
User: common.CCSystemOperatorUserName,
}

input := new(MigrateSpecifyVersionRequest)
if err := json.NewDecoder(req.Request.Body).Decode(input); err != nil {
blog.Errorf("migrateSpecifyVersion failed, decode body err: %v, body:%+v,rid:%s", err, req.Request.Body, rid)
_ = resp.WriteError(http.StatusOK, &metadata.RespError{Msg: defErr.Error(common.CCErrCommJSONUnmarshalFailed)})
return
}

if input.CommitID != version.CCGitHash {
_ = resp.WriteError(http.StatusOK,
&metadata.RespError{Msg: defErr.Errorf(common.CCErrCommParamsInvalid, "commit_id")})
return
}

err := upgrader.UpgradeSpecifyVersion(s.ctx, s.db, s.cache, s.iam, updateCfg, input.Version)
if err != nil {
blog.Errorf("db upgrade specify failed, err: %+v, rid: %s", err, rid)
result := &metadata.RespError{
Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()),
}
resp.WriteError(http.StatusInternalServerError, result)
return
}

result := MigrationResponse{
BaseResp: metadata.BaseResp{
Result: true,
Code: 0,
ErrMsg: "",
Permissions: nil,
},
Data: "migrate success. version: " + input.Version,
}
resp.WriteEntity(result)

}

var allConfigNames = map[string]bool{
"redis": true,
"mongodb": true,
Expand Down Expand Up @@ -350,19 +91,3 @@ func (s *Service) refreshConfig(req *restful.Request, resp *restful.Response) {
blog.Infof("refresh config success, input:%#v", input)
resp.WriteEntity(metadata.NewSuccessResp("refresh config success"))
}

// MigrationResponse TODO
type MigrationResponse struct {
metadata.BaseResp `json:",inline"`
Data interface{} `json:"data"`
PreVersion string `json:"pre_version"`
CurrentVersion string `json:"current_version"`
FinishedVersions []string `json:"finished_migrations"`
}

// MigrateSpecifyVersionRequest TODO
type MigrateSpecifyVersionRequest struct {
CommitID string `json:"commit_id"`
TimeStamp int64 `json:"time_stamp"`
Version string `json:"version"`
}
2 changes: 0 additions & 2 deletions src/scene_server/admin_server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (s *Service) WebService() *restful.Container {

api.Route(api.POST("/authcenter/init").To(s.InitAuthCenter))
api.Route(api.POST("/authcenter/register").To(s.RegisterAuthAccount))
api.Route(api.POST("/migrate/{distribution}/{ownerID}").To(s.migrate))
api.Route(api.POST("/migrate/system/hostcrossbiz/{ownerID}").To(s.SetSystemConfiguration))
api.Route(api.POST("/migrate/system/user_config/{key}/{can}").To(s.UserConfigSwitch))
api.Route(api.GET("/find/system/config_admin").To(s.SearchConfigAdmin))
Expand All @@ -105,7 +104,6 @@ func (s *Service) WebService() *restful.Container {
api.Route(api.PUT("/update/system_config/platform_setting").To(s.UpdatePlatformSettingConfig))
api.Route(api.GET("/find/system_config/platform_setting/{type}").To(s.SearchPlatformSettingConfig))

api.Route(api.POST("/migrate/specify/version/{distribution}/{ownerID}").To(s.migrateSpecifyVersion))
api.Route(api.POST("/migrate/config/refresh").To(s.refreshConfig))
api.Route(api.POST("/migrate/dataid").To(s.migrateDataID))
api.Route(api.POST("/migrate/old/dataid").To(s.migrateOldDataID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* limitations under the License.
*/

package main
package imports

import (

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* limitations under the License.
*/

package main
package imports

import (
_ "configcenter/src/scene_server/admin_server/upgrader/history/v3.0.8"
Expand Down
Loading

0 comments on commit a5caba1

Please sign in to comment.