diff --git a/CHANGELOG.md b/CHANGELOG.md index fea66493e..4e8cde26d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - `tt cluster replicaset roles add`: command to add roles in config scope provided by flags. - `tt replicaset roles remove`: command to remove roles in the tarantool replicaset with cluster config (3.0) or cartridge orchestrator. +- `tt replicaset roles add`: command to add roles in the tarantool replicaset with + cluster config (3.0) or cartridge orchestrator. ### Fixed diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index 3cc426207..42cd989ad 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -39,7 +39,10 @@ var ( replicasetIntegrityPrivateKey string replicasetBootstrapVshard bool replicasetCartridgeReplicasetsFile string + replicasetGroupName string replicasetReplicasetName string + replicasetInstanceName string + replicasetIsGlobal bool rebootstrapConfirmed bool replicasetUriHelp = " The URI can be specified in the following formats:\n" + @@ -242,6 +245,54 @@ func newRebootstrapCmd() *cobra.Command { return cmd } +func newRolesCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "roles", + Short: "Adds or removes roles for Cartridge and Tarantool 3 orchestrator", + } + + cmd.AddCommand(newRolesAddCmd()) + return cmd +} + +// newRolesAddCmd creates a "replicaset roles add" command. +func newRolesAddCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "add [--cartridge|--config|--custom] [-f] [--timeout secs]" + + " [flags]", + Short: "Adds a role for Cartridge and Tarantool 3 orchestrator", + Long: "Adds a role for Cartridge and Tarantool 3 orchestrator", + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + internalReplicasetRolesAddModule, args) + util.HandleCmdErr(cmd, err) + }, + Args: cobra.ExactArgs(2), + } + + cmd.Flags().StringVarP(&replicasetReplicasetName, "replicaset", "r", "", + "name of a target replicaset") + cmd.Flags().StringVarP(&replicasetGroupName, "group", "g", "", + "name of a target group (vshard-group in the Cartridge case)") + cmd.Flags().StringVarP(&replicasetInstanceName, "instance", "i", "", + "name of a target instance") + cmd.Flags().BoolVarP(&replicasetIsGlobal, "global", "G", false, + "global config context") + + addOrchestratorFlags(cmd) + addTarantoolConnectFlags(cmd) + cmd.Flags().BoolVarP(&replicasetForce, "force", "f", false, + "to force a promotion:\n"+ + " * config: skip instances not found locally\n"+ + " * cartridge: force inconsistency") + cmd.Flags().IntVarP(&replicasetTimeout, "timeout", "", + replicasetcmd.DefaultTimeout, "adding timeout") + integrity.RegisterWithIntegrityFlag(cmd.Flags(), &replicasetIntegrityPrivateKey) + + return cmd +} + // NewReplicasetCmd creates a replicaset command. func NewReplicasetCmd() *cobra.Command { cmd := &cobra.Command{ @@ -257,6 +308,7 @@ func NewReplicasetCmd() *cobra.Command { cmd.AddCommand(newVShardCmd()) cmd.AddCommand(newBootstrapCmd()) cmd.AddCommand(newRebootstrapCmd()) + cmd.AddCommand(newRolesCmd()) return cmd } @@ -350,6 +402,31 @@ func replicasetFillCtx(cmdCtx *cmdcontext.CmdCtx, ctx *replicasetCtx, args []str } } } + // In case of adding a role when user may not provide an instance. + if cmdCtx.CommandName == "add" && ctx.InstName == "" { + if len(ctx.RunningCtx.Instances) == 0 { + return fmt.Errorf("there are no running instances") + } + // Trying to find alive instance to create connection with it. + var err error + for _, i := range ctx.RunningCtx.Instances { + connOpts = makeConnOpts( + connector.UnixNetwork, + i.ConsoleSocket, + connectCtx, + ) + var conn connector.Connector + conn, err = connector.Connect(connOpts) + if err == nil { + ctx.IsInstanceConnect = true + conn.Close() + break + } + } + if err != nil { + return fmt.Errorf("cannot connect to any instance from replicaset") + } + } } else { if isRunningCtxRequired { return err @@ -570,3 +647,46 @@ func internalReplicasetRebootstrapModule(cmdCtx *cmdcontext.CmdCtx, args []strin Confirmed: rebootstrapConfirmed, }) } + +// internalReplicasetRolesAddModule is a "roles add" command for the replicaset module. +func internalReplicasetRolesAddModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + defer ctx.Conn.Close() + if ctx.IsApplication && replicasetInstanceName == "" && ctx.InstName == "" && + !replicasetIsGlobal && replicasetGroupName == "" && replicasetReplicasetName == "" { + return fmt.Errorf("there is no destination provided in which to add role") + } + if ctx.InstName != "" && replicasetInstanceName != "" && + replicasetInstanceName != ctx.InstName { + return fmt.Errorf("there are different instance names passed after" + + " app name and in flag arg") + } + if replicasetInstanceName != "" { + ctx.InstName = replicasetInstanceName + } + + collectors, publishers, err := createDataCollectorsAndDataPublishers( + cmdCtx.Integrity, replicasetIntegrityPrivateKey) + if err != nil { + return err + } + + return replicasetcmd.RolesAdd(replicasetcmd.RolesAddCtx{ + InstName: ctx.InstName, + GroupName: replicasetGroupName, + ReplicasetName: replicasetReplicasetName, + IsGlobal: replicasetIsGlobal, + RoleName: args[1], + Collectors: collectors, + Publishers: publishers, + IsApplication: ctx.IsApplication, + Conn: ctx.Conn, + RunningCtx: ctx.RunningCtx, + Orchestrator: ctx.Orchestrator, + Force: replicasetForce, + Timeout: replicasetTimeout, + }) +} diff --git a/cli/replicaset/cartridge.go b/cli/replicaset/cartridge.go index ef869e694..f2e6c5d9c 100644 --- a/cli/replicaset/cartridge.go +++ b/cli/replicaset/cartridge.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "slices" "strings" "github.com/apex/log" @@ -205,6 +206,11 @@ func (c *CartridgeInstance) BootstrapVShard(ctx VShardBootstrapCtx) error { return nil } +// RolesAdd adds role for a single instance by the Cartridge orchestrator. +func (c *CartridgeInstance) RolesAdd(ctx RolesChangeCtx) error { + return newErrRolesAddByInstanceNotSupported(OrchestratorCartridge) +} + // CartridgeApplication is an application with the Cartridge orchestrator. type CartridgeApplication struct { cachedDiscoverer @@ -662,6 +668,85 @@ func (c *CartridgeApplication) BootstrapVShard(ctx VShardBootstrapCtx) error { return nil } +// RolesAdd adds role for an application by the Cartridge orchestrator. +func (c *CartridgeApplication) RolesAdd(ctx RolesChangeCtx) error { + if len(c.runningCtx.Instances) == 0 { + return fmt.Errorf("failed to add role: there are no running instances") + } + + targetReplicaset, err := getReplicasetByAlias(c.replicasets.Replicasets, ctx.ReplicasetName) + if err != nil { + return err + } + + instances := filterInstances(c.runningCtx.Instances, func( + inst running.InstanceCtx) bool { + return slices.ContainsFunc(targetReplicaset.Instances, func(i Instance) bool { + return i.Alias == inst.InstName + }) + }) + if slices.Contains(targetReplicaset.Roles, ctx.RoleName) { + return fmt.Errorf("role %q already exists in replicaset %q", + ctx.RoleName, ctx.ReplicasetName) + } + targetReplicaset.Roles = append(targetReplicaset.Roles, ctx.RoleName) + + cartridgeEditOpt := cartridgeEditReplicasetsOpts{ + UUID: &targetReplicaset.UUID, + Roles: targetReplicaset.Roles, + } + if ctx.GroupName != "" { + cartridgeEditOpt.VshardGroup = &ctx.GroupName + } + + eval := func(instance running.InstanceCtx, evaler connector.Evaler) (bool, error) { + return true, cartridgeEditReplicasets(evaler, []cartridgeEditReplicasetsOpts{ + cartridgeEditOpt, + }, ctx.Timeout) + } + if err := EvalForeach(instances, InstanceEvalFunc(eval)); err != nil { + return err + } + + newReplicasets, err := c.Discovery(SkipCache) + if err != nil { + return err + } + targetReplicaset, err = getReplicasetByAlias(newReplicasets.Replicasets, ctx.ReplicasetName) + if err != nil { + return err + } + + if len(targetReplicaset.Roles) == 0 { + log.Infof("Now replicaset %s has no roles enabled", ctx.ReplicasetName) + } else { + log.Infof( + "Replicaset %s now has these roles enabled:", + ctx.ReplicasetName, + ) + + for _, role := range targetReplicaset.Roles { + if targetReplicaset.VshardGroup != "" { + log.Infof(" %s (%s)", role, targetReplicaset.VshardGroup) + } else { + log.Infof(" %s", role) + } + } + } + return nil +} + +// getReplicasetByAlias searches for a replicaset by its alias in discovered slice +// of replicasets. +func getReplicasetByAlias(replicasets []Replicaset, alias string) (Replicaset, error) { + for _, r := range replicasets { + if r.Alias == alias { + return r, nil + } + } + return Replicaset{}, fmt.Errorf("failed to find replicaset %q", alias) +} + // getCartridgeInstanceInfo returns an additional instance information. func getCartridgeInstanceInfo( evaler connector.Evaler) (uuid string, rw bool, err error) { diff --git a/cli/replicaset/cartridge_test.go b/cli/replicaset/cartridge_test.go index 7c8204def..61fc71f7e 100644 --- a/cli/replicaset/cartridge_test.go +++ b/cli/replicaset/cartridge_test.go @@ -19,12 +19,14 @@ var _ replicaset.Demoter = &replicaset.CartridgeInstance{} var _ replicaset.Expeller = &replicaset.CartridgeInstance{} var _ replicaset.VShardBootstrapper = &replicaset.CartridgeInstance{} var _ replicaset.Bootstrapper = &replicaset.CartridgeInstance{} +var _ replicaset.RolesAdder = &replicaset.CartridgeInstance{} var _ replicaset.Discoverer = &replicaset.CartridgeApplication{} var _ replicaset.Promoter = &replicaset.CartridgeApplication{} var _ replicaset.Demoter = &replicaset.CartridgeApplication{} var _ replicaset.Expeller = &replicaset.CartridgeApplication{} var _ replicaset.Bootstrapper = &replicaset.CartridgeApplication{} +var _ replicaset.RolesAdder = &replicaset.CartridgeApplication{} func TestCartridgeApplication_Demote(t *testing.T) { app := replicaset.NewCartridgeApplication(running.RunningCtx{}) @@ -975,3 +977,10 @@ func TestCartridgeInstance_Expel(t *testing.T) { assert.EqualError(t, err, `expel is not supported for a single instance by "cartridge" orchestrator`) } + +func TestCartridgeInstance_RolesAdd(t *testing.T) { + inst := replicaset.NewCartridgeInstance(nil) + err := inst.RolesAdd(replicaset.RolesChangeCtx{}) + assert.EqualError(t, err, + `roles add is not supported for a single instance by "cartridge" orchestrator`) +} diff --git a/cli/replicaset/cconfig.go b/cli/replicaset/cconfig.go index 44daccbc3..99079a0d7 100644 --- a/cli/replicaset/cconfig.go +++ b/cli/replicaset/cconfig.go @@ -4,6 +4,7 @@ import ( _ "embed" "errors" "fmt" + "slices" "strings" "github.com/apex/log" @@ -62,11 +63,11 @@ type CConfigInstance struct { evaler connector.Evaler } -// patchRoleTarget describes a content to patch a config. +// patchRoleTarget describes a role content to patch a config. type patchRoleTarget struct { // path is a destination to a patch target in config. path []string - // roleNames are roles to add by path in config. + // roleNames are roles to set by path in config. roleNames []string } @@ -135,6 +136,12 @@ func (c *CConfigInstance) BootstrapVShard(ctx VShardBootstrapCtx) error { return nil } +// RolesAdd is not supported for a single instance by the centralized config +// orchestrator. +func (c *CConfigInstance) RolesAdd(ctx RolesChangeCtx) error { + return newErrRolesAddByInstanceNotSupported(OrchestratorCentralizedConfig) +} + // CConfigApplication is an application with the centralized config // orchestrator. type CConfigApplication struct { @@ -465,6 +472,59 @@ func (c *CConfigApplication) Bootstrap(BootstrapCtx) error { return newErrBootstrapByAppNotSupported(OrchestratorCentralizedConfig) } +// RolesAdd adds role for an application by the centralized config orchestrator. +func (c *CConfigApplication) RolesAdd(ctx RolesChangeCtx) error { + replicasets, err := c.Discovery(UseCache) + if err != nil { + return fmt.Errorf("failed to get replicasets: %w", err) + } + + var ( + instances []running.InstanceCtx + unfound []string + ) + + if ctx.InstName != "" { + targetReplicaset, targetInstance, found := findInstanceByAlias(replicasets, ctx.InstName) + if !found { + return fmt.Errorf("instance %q not found in a configured replicaset", ctx.InstName) + } + if !targetInstance.InstanceCtxFound { + return fmt.Errorf("instance %q should be online", ctx.InstName) + } + for _, inst := range targetReplicaset.Instances { + if !inst.InstanceCtxFound { + unfound = append(unfound, inst.Alias) + continue + } + instances = append(instances, inst.InstanceCtx) + } + } else { + for _, r := range c.replicasets.Replicasets { + for _, i := range r.Instances { + if !i.InstanceCtxFound { + unfound = append(unfound, i.Alias) + continue + } + instances = append(instances, i.InstanceCtx) + } + } + } + if len(unfound) > 0 { + msg := "could not connect to: " + strings.Join(unfound, ",") + if !ctx.Force { + return fmt.Errorf("all instances in the target replicaset should be online, %s", msg) + } + log.Warn(msg) + } + + isConfigPublished, err := c.rolesAdd(ctx) + if isConfigPublished { + err = errors.Join(err, reloadCConfig(instances)) + } + return err +} + // cconfigPromoteElection tries to promote an instance via `box.ctl.promote()`. func cconfigPromoteElection(evaler connector.Evaler, timeout int) error { args := []any{} @@ -691,6 +751,61 @@ func (c *CConfigApplication) demoteElection(instanceCtx running.InstanceCtx, return } +func (c *CConfigApplication) rolesAdd(ctx RolesChangeCtx) (bool, error) { + if len(c.runningCtx.Instances) == 0 { + return false, fmt.Errorf("there are no running instances") + } + clusterCfgPath := c.runningCtx.Instances[0].ClusterConfigPath + clusterCfg, err := cluster.GetClusterConfig( + libcluster.NewCollectorFactory(c.collectors), clusterCfgPath) + if err != nil { + return false, fmt.Errorf("failed to get cluster config: %w", err) + } + + paths, err := getCConfigRolesPath(clusterCfg, ctx) + if err != nil { + return false, err + } + + pRoleTarget := make([]patchRoleTarget, 0, len(paths)) + for _, path := range paths { + value, err := clusterCfg.RawConfig.Get(path.path) + var notExistErr libcluster.NotExistError + if err != nil && !errors.As(err, ¬ExistErr) { + return false, err + } + var existingRoles []string + if value != nil { + existingRoles, err = parseRoles(value) + if err != nil { + return false, err + } + } + if len(existingRoles) > 0 && slices.Index(existingRoles, ctx.RoleName) != -1 { + return false, fmt.Errorf("role %q already exists in %s", + ctx.RoleName, strings.Join(path.path, "/")) + } + // If the role does not exist in requested path, append it. + existingRoles = append(existingRoles, ctx.RoleName) + + pRoleTarget = append(pRoleTarget, patchRoleTarget{ + path: path.path, + roleNames: existingRoles, + }) + } + if err := patchLocalCConfig( + clusterCfgPath, + c.collectors, + c.publishers, + func(config *libcluster.Config) (*libcluster.Config, error) { + return patchCConfigEditRole(config, pRoleTarget) + }, + ); err != nil { + return false, err + } + return true, nil +} + // patchLocalConfig patches the local cluster config. func patchLocalCConfig(path string, collectors libcluster.DataCollectorFactory, @@ -862,8 +977,8 @@ func patchCConfigElectionMode(config *libcluster.Config, } func patchCConfigEditRole(config *libcluster.Config, - prt []patchRoleTarget) (*libcluster.Config, error) { - for _, p := range prt { + targets []patchRoleTarget) (*libcluster.Config, error) { + for _, p := range targets { if err := config.Set(p.path, p.roleNames); err != nil { return nil, err } diff --git a/cli/replicaset/cconfig_test.go b/cli/replicaset/cconfig_test.go index 875744fba..ed2780dd8 100644 --- a/cli/replicaset/cconfig_test.go +++ b/cli/replicaset/cconfig_test.go @@ -18,6 +18,7 @@ var _ replicaset.Demoter = &replicaset.CConfigInstance{} var _ replicaset.Expeller = &replicaset.CConfigInstance{} var _ replicaset.VShardBootstrapper = &replicaset.CConfigInstance{} var _ replicaset.Bootstrapper = &replicaset.CConfigInstance{} +var _ replicaset.RolesAdder = &replicaset.CConfigInstance{} var _ replicaset.Discoverer = &replicaset.CConfigApplication{} var _ replicaset.Promoter = &replicaset.CConfigApplication{} @@ -25,6 +26,7 @@ var _ replicaset.Demoter = &replicaset.CConfigApplication{} var _ replicaset.Expeller = &replicaset.CConfigApplication{} var _ replicaset.VShardBootstrapper = &replicaset.CConfigApplication{} var _ replicaset.Bootstrapper = &replicaset.CConfigApplication{} +var _ replicaset.RolesAdder = &replicaset.CConfigApplication{} func TestCconfigApplication_Bootstrap(t *testing.T) { app := replicaset.NewCConfigApplication(running.RunningCtx{}, nil, nil) @@ -466,3 +468,10 @@ func TestCConfigInstance_Expel(t *testing.T) { assert.EqualError(t, err, `expel is not supported for a single instance by "centralized config" orchestrator`) } + +func TestCConfigInstance_RolesAdd(t *testing.T) { + instance := replicaset.NewCConfigInstance(nil) + err := instance.RolesAdd(replicaset.RolesChangeCtx{}) + assert.EqualError(t, err, + `roles add is not supported for a single instance by "centralized config" orchestrator`) +} diff --git a/cli/replicaset/cmd/common.go b/cli/replicaset/cmd/common.go index 546d04e69..a1846de88 100644 --- a/cli/replicaset/cmd/common.go +++ b/cli/replicaset/cmd/common.go @@ -22,6 +22,7 @@ type replicasetOrchestrator interface { replicaset.Expeller replicaset.VShardBootstrapper replicaset.Bootstrapper + replicaset.RolesAdder } // makeApplicationOrchestrator creates an orchestrator for the application. diff --git a/cli/replicaset/cmd/roles.go b/cli/replicaset/cmd/roles.go new file mode 100644 index 000000000..e30ccae4a --- /dev/null +++ b/cli/replicaset/cmd/roles.go @@ -0,0 +1,117 @@ +package replicasetcmd + +import ( + "fmt" + + "github.com/apex/log" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" + libcluster "github.com/tarantool/tt/lib/cluster" +) + +// RolesAddCtx describes the context to add a role to +// provided config scope. +type RolesAddCtx struct { + // InstName is an instance name in which add or remove role. + InstName string + // GroupName is a replicaset name in which add or remove role. + GroupName string + // ReplicasetName is a replicaset name in which add or remove role. + ReplicasetName string + // IsGlobal is a boolean value if role needs to add in global scope. + IsGlobal bool + // RoleName is a name of role to add. + RoleName string + // Publishers is a data publisher factory. + Publishers libcluster.DataPublisherFactory + // Collectors is a data collector factory. + Collectors libcluster.DataCollectorFactory + // IsApplication is true if an application passed. + IsApplication bool + // Orchestrator is a forced orchestator choice. + Orchestrator replicaset.Orchestrator + // Conn is an active connection to a passed instance. + Conn connector.Connector + // RunningCtx is an application running context. + RunningCtx running.RunningCtx + // Force is true if unfound instances can be skipped. + Force bool + // Timeout describes a timeout in seconds. + // We keep int as it can be passed to the target instance. + Timeout int +} + +// RolesAdd adds role with provided path target to config. +func RolesAdd(ctx RolesAddCtx) error { + orchestratorType, err := getInstanceOrchestrator(ctx.Orchestrator, ctx.Conn) + if err != nil { + return err + } + + if orchestratorType == replicaset.OrchestratorCartridge { + if ctx.ReplicasetName == "" { + return fmt.Errorf( + "in cartridge replicaset name must be specified via --replicaset flag") + } + } + + var orchestrator replicasetOrchestrator + if ctx.IsApplication { + if orchestrator, err = makeApplicationOrchestrator( + orchestratorType, ctx.RunningCtx, ctx.Collectors, ctx.Publishers); err != nil { + return err + } + } else { + if orchestrator, err = makeInstanceOrchestrator(orchestratorType, ctx.Conn); err != nil { + return err + } + } + + log.Info("Discovery application...") + fmt.Println() + + // Get and print status. + replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + if err != nil { + return err + } + statusReplicasets(replicasets) + fmt.Println() + + if ctx.IsGlobal { + if orchestratorType == replicaset.OrchestratorCartridge { + return fmt.Errorf("cannot pass --global (-G) flag due to cluster with cartridge") + } else { + log.Infof("Add role %s to global scope", ctx.RoleName) + } + } + if ctx.GroupName != "" && orchestratorType != replicaset.OrchestratorCartridge { + log.Infof("Add role %s to group: %s", ctx.RoleName, ctx.GroupName) + } + if ctx.InstName != "" { + if orchestratorType == replicaset.OrchestratorCartridge { + return fmt.Errorf("cannot pass the instance or --instance (-i) flag due to cluster" + + " with cartridge orchestrator can't add role into instance scope") + } else { + log.Infof("Add role %s to instance: %s", ctx.RoleName, ctx.InstName) + } + } + if ctx.ReplicasetName != "" { + log.Infof("Add role %s to replicaset: %s", ctx.RoleName, ctx.ReplicasetName) + } + + err = orchestrator.RolesAdd(replicaset.RolesChangeCtx{ + InstName: ctx.InstName, + GroupName: ctx.GroupName, + ReplicasetName: ctx.ReplicasetName, + IsGlobal: ctx.IsGlobal, + RoleName: ctx.RoleName, + Force: ctx.Force, + Timeout: ctx.Timeout, + }) + if err == nil { + log.Info("Done.") + } + return err +} diff --git a/cli/replicaset/custom.go b/cli/replicaset/custom.go index c524292e4..e22c7f76e 100644 --- a/cli/replicaset/custom.go +++ b/cli/replicaset/custom.go @@ -93,6 +93,11 @@ func (c *CustomInstance) Bootstrap(BootstrapCtx) error { return newErrBootstrapByInstanceNotSupported(OrchestratorCustom) } +// RolesAdd is not supported for a single instance by the Custom orchestrator. +func (c *CustomInstance) RolesAdd(RolesChangeCtx) error { + return newErrRolesAddByInstanceNotSupported(OrchestratorCustom) +} + // CustomApplication is an application with a custom orchestrator. type CustomApplication struct { cachedDiscoverer @@ -162,6 +167,11 @@ func (c *CustomApplication) Bootstrap(BootstrapCtx) error { return newErrBootstrapByAppNotSupported(OrchestratorCustom) } +// RolesAdd is not supported for an application by the Custom orchestrator. +func (c *CustomApplication) RolesAdd(RolesChangeCtx) error { + return newErrRolesAddByAppNotSupported(OrchestratorCustom) +} + // getCustomInstanceTopology returns a topology for an instance. func getCustomInstanceTopology(name string, evaler connector.Evaler) (customTopology, error) { diff --git a/cli/replicaset/custom_test.go b/cli/replicaset/custom_test.go index d01bf302e..7e962d109 100644 --- a/cli/replicaset/custom_test.go +++ b/cli/replicaset/custom_test.go @@ -17,6 +17,7 @@ var _ replicaset.Demoter = &replicaset.CustomInstance{} var _ replicaset.Expeller = &replicaset.CustomInstance{} var _ replicaset.VShardBootstrapper = &replicaset.CustomInstance{} var _ replicaset.Bootstrapper = &replicaset.CustomInstance{} +var _ replicaset.RolesAdder = &replicaset.CustomInstance{} var _ replicaset.Discoverer = &replicaset.CustomApplication{} var _ replicaset.Promoter = &replicaset.CustomApplication{} @@ -24,6 +25,7 @@ var _ replicaset.Demoter = &replicaset.CustomApplication{} var _ replicaset.Expeller = &replicaset.CustomApplication{} var _ replicaset.VShardBootstrapper = &replicaset.CustomApplication{} var _ replicaset.Bootstrapper = &replicaset.CustomApplication{} +var _ replicaset.RolesAdder = &replicaset.CustomApplication{} func TestCustomApplication_Promote(t *testing.T) { app := replicaset.NewCustomApplication(running.RunningCtx{}) @@ -60,6 +62,13 @@ func TestCustomApplication_Bootstrap(t *testing.T) { `bootstrap is not supported for an application by "custom" orchestrator`) } +func TestCustomApplication_RolesAdd(t *testing.T) { + instance := replicaset.NewCustomApplication(running.RunningCtx{}) + err := instance.RolesAdd(replicaset.RolesChangeCtx{}) + assert.EqualError(t, err, + `roles add is not supported for an application by "custom" orchestrator`) +} + func TestCustomInstance_Discovery(t *testing.T) { cases := []struct { Name string @@ -442,3 +451,10 @@ func TestCustomInstance_Bootstrap(t *testing.T) { assert.EqualError(t, err, `bootstrap is not supported for a single instance by "custom" orchestrator`) } + +func TestCustomInstance_RolesAdd(t *testing.T) { + instance := replicaset.NewCustomInstance(nil) + err := instance.RolesAdd(replicaset.RolesChangeCtx{}) + assert.EqualError(t, err, + `roles add is not supported for a single instance by "custom" orchestrator`) +} diff --git a/cli/replicaset/lua/cartridge/get_topology_replicasets_body.lua b/cli/replicaset/lua/cartridge/get_topology_replicasets_body.lua index 47201defd..563c1d649 100644 --- a/cli/replicaset/lua/cartridge/get_topology_replicasets_body.lua +++ b/cli/replicaset/lua/cartridge/get_topology_replicasets_body.lua @@ -21,6 +21,7 @@ local function format_topology(replicaset) leaderuuid = leader_uuid, alias = replicaset.alias, roles = replicaset.roles, + vshard_group = replicaset.vshard_group, instances = instances, } diff --git a/cli/replicaset/replicaset.go b/cli/replicaset/replicaset.go index 11a938d9f..22a153165 100644 --- a/cli/replicaset/replicaset.go +++ b/cli/replicaset/replicaset.go @@ -11,6 +11,8 @@ type Replicaset struct { Alias string // Roles is a list of roles of the replicaset. Roles []string + // VShardGroup is a vshard group for cartridge cluster. + VshardGroup string `mapstructure:"vshard_group"` // Master is a current master mode. Master Master // Failover is a configured failover. diff --git a/cli/replicaset/roles.go b/cli/replicaset/roles.go index 58c0c7676..8674ee3a9 100644 --- a/cli/replicaset/roles.go +++ b/cli/replicaset/roles.go @@ -48,6 +48,26 @@ type RolesChangeCtx struct { Timeout int } +// RolesAdder is an interface for adding roles to a replicaset. +type RolesAdder interface { + // RolesAdd adds role to a replicasets by its name. + RolesAdd(ctx RolesChangeCtx) error +} + +// newErrRolesAddByInstanceNotSupported creates a new error that 'roles add' is not +// supported by the orchestrator for a single instance. +func newErrRolesAddByInstanceNotSupported(orchestrator Orchestrator) error { + return fmt.Errorf("roles add is not supported for a single instance by %q orchestrator", + orchestrator) +} + +// newErrRolesAddByAppNotSupported creates a new error that 'roles add' by URI is not +// supported by the orchestrator for an application. +func newErrRolesAddByAppNotSupported(orchestrator Orchestrator) error { + return fmt.Errorf("roles add is not supported for an application by %q orchestrator", + orchestrator) +} + // parseRoles is a function to convert roles type 'any' // from yaml config. Returns slice of roles and error. func parseRoles(value any) ([]string, error) { diff --git a/cli/running/running.go b/cli/running/running.go index b961435a3..94bc44f47 100644 --- a/cli/running/running.go +++ b/cli/running/running.go @@ -679,7 +679,8 @@ func FillCtx(cliOpts *config.CliOpts, cmdCtx *cmdcontext.CmdCtx, runningCtx *RunningCtx, args []string) error { var err error - if len(args) > 1 && cmdCtx.CommandName != "run" && cmdCtx.CommandName != "connect" { + if len(args) > 1 && cmdCtx.CommandName != "run" && cmdCtx.CommandName != "connect" && + cmdCtx.CommandName != "add" { return util.NewArgError("currently, you can specify only one instance at a time") } diff --git a/test/integration/replicaset/replicaset_helpers.py b/test/integration/replicaset/replicaset_helpers.py index d0fe3c0c0..0cb66421b 100644 --- a/test/integration/replicaset/replicaset_helpers.py +++ b/test/integration/replicaset/replicaset_helpers.py @@ -20,15 +20,17 @@ def start_application(tt_cmd, workdir, app_name, instances): assert file != "" -def stop_application(tt_cmd, app_name, workdir, instances): +def stop_application(tt_cmd, app_name, workdir, instances, force=False): stop_cmd = [tt_cmd, "stop", app_name] stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=workdir) assert stop_rc == 0 - for inst in instances: - assert re.search(rf"The Instance {app_name}:{inst} \(PID = \d+\) has been terminated.", - stop_out) - assert not os.path.exists(os.path.join(workdir, run_path, app_name, inst, "tarantool.pid")) + if not force: + for inst in instances: + assert re.search(rf"The Instance {app_name}:{inst} \(PID = \d+\) has been terminated.", + stop_out) + assert not os.path.exists(os.path.join(workdir, run_path, app_name, + inst, "tarantool.pid")) def eval_on_instance(tt_cmd, app_name, inst_name, workdir, eval): @@ -107,3 +109,19 @@ def cut(line=None, c=":"): def parse_yml(input): return yaml.safe_load(input) + + +def get_group_by_replicaset_name(cluster_cfg, replicaset): + for gk, g in cluster_cfg["groups"].items(): + for r in g["replicasets"].keys(): + if r == replicaset: + return gk + + +def get_group_replicaset_by_instance_name(cluster_cfg, instance): + for gk, g in cluster_cfg["groups"].items(): + for rk, r in g["replicasets"].items(): + for i in r["instances"].keys(): + if i == instance: + return gk, rk + return "", "" diff --git a/test/integration/replicaset/test_ccluster_app/greeter.lua b/test/integration/replicaset/test_ccluster_app/greeter.lua new file mode 100644 index 000000000..e49da0d05 --- /dev/null +++ b/test/integration/replicaset/test_ccluster_app/greeter.lua @@ -0,0 +1,6 @@ +-- greeter.lua -- +return { + validate = function() end, + apply = function() require('log').info("Hi from the 'greeter' role!") end, + stop = function() end, +} diff --git a/test/integration/replicaset/test_replicaset_roles_add.py b/test/integration/replicaset/test_replicaset_roles_add.py new file mode 100644 index 000000000..31f27004c --- /dev/null +++ b/test/integration/replicaset/test_replicaset_roles_add.py @@ -0,0 +1,347 @@ +import io +import os +import shutil + +import pytest +from cartridge_helper import (cartridge_name, cartridge_password, + cartridge_username) +from integration.replicaset.replicaset_helpers import ( + get_group_by_replicaset_name, get_group_replicaset_by_instance_name, + parse_status, parse_yml, start_application, stop_application) + +from utils import get_tarantool_version, read_kv, run_command_and_get_output + +tarantool_major_version, tarantool_minor_version = get_tarantool_version() + + +TEST_ROLES_ADD_PARAMS_CCONFIG = ("role_name, inst, inst_flg, group, rs, is_uri, is_global, err_msg," + " stop_instance, is_force, is_add_role") +TEST_ROLES_ADD_PARAMS_CARTRIDGE = ("role_name, inst_flg, group, rs, is_uri, is_global," + " is_custom, err_msg") + + +def make_test_roles_add_param( + is_cartridge_orchestrator, + role_name, + inst=None, + inst_flg=None, + group=None, + rs=None, + is_global=False, + err_msg="", + stop_instance=None, + is_uri=False, + is_force=False, + is_add_role=False, + is_custom=False +): + if is_cartridge_orchestrator: + return pytest.param(role_name, inst_flg, group, rs, is_uri, is_global, is_custom, err_msg) + return pytest.param(role_name, inst, inst_flg, group, rs, is_uri, is_global, err_msg, + stop_instance, is_force, is_add_role) + + +@pytest.mark.parametrize("args, err_msg", [ + pytest.param(["some_role"], "Error: accepts 2 arg(s), received 1"), + pytest.param(["some_app", "some_role"], "can't collect instance information for some_app"), +]) +def test_roles_add_missing_args(tt_cmd, tmpdir_with_cfg, args, err_msg): + cmd = [tt_cmd, "rs", "roles", "add"] + cmd.extend(args) + rc, out = run_command_and_get_output(cmd, cwd=tmpdir_with_cfg) + assert rc != 0 + assert err_msg in out + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip centralized config test for Tarantool < 3") +@pytest.mark.parametrize(TEST_ROLES_ADD_PARAMS_CCONFIG, [ + make_test_roles_add_param( + False, + inst="instance-001", + role_name="greeter", + ), + make_test_roles_add_param( + False, + is_global=True, + role_name="greeter", + ), + make_test_roles_add_param( + False, + group="group-001", + role_name="greeter", + ), + make_test_roles_add_param( + False, + rs="replicaset-001", + role_name="greeter", + ), + make_test_roles_add_param( + False, + inst_flg="instance-002", + role_name="greeter", + ), + make_test_roles_add_param( + False, + is_global=True, + group="group-001", + rs="replicaset-001", + inst_flg="instance-002", + role_name="greeter", + ), + make_test_roles_add_param( + False, + is_global=True, + inst="instance-001", + group="group-001", + rs="replicaset-001", + inst_flg="instance-002", + role_name="greeter", + err_msg="there are different instance names passed after app name and in flag arg", + ), + make_test_roles_add_param( + False, + role_name="greeter", + err_msg="there is no destination provided in which to add role", + ), + make_test_roles_add_param( + False, + inst="instance-002", + role_name="greeter", + stop_instance="instance-001", + is_force=True, + ), + make_test_roles_add_param( + False, + inst="instance-002", + role_name="greeter", + rs="replicaset-001", + stop_instance="instance-001", + err_msg="all instances in the target replicaset should be online," + + " could not connect to: instance-001", + ), + make_test_roles_add_param( + False, + inst="instance-001", + role_name="greeter", + is_add_role=True, + err_msg="role \"greeter\" already exists in groups/group-001/replicasets/" + + "replicaset-001/instances/instance-001/roles" + ), + make_test_roles_add_param( + False, + role_name="greeter", + is_uri=True, + err_msg="roles add is not supported for a single instance by" + + " \"centralized config\" orchestrator", + ), +]) +def test_replicaset_cconfig_roles_add( + role_name, + inst, + inst_flg, + group, + rs, + is_global, + err_msg, + stop_instance, + tt_cmd, + tmpdir_with_cfg, + is_uri, + is_force, + is_add_role, +): + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir_with_cfg, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + + kv = read_kv(app_path) + instances = parse_yml(kv["instances"]).keys() + + try: + start_application(tt_cmd, tmpdir_with_cfg, app_name, instances) + + if stop_instance: + stop_cmd = [tt_cmd, "stop", f"{app_name}:{stop_instance}"] + rc, _ = run_command_and_get_output(stop_cmd, cwd=tmpdir_with_cfg) + assert rc == 0 + if is_add_role: + add_first_role_cmd = [tt_cmd, "rs", "roles", "add", + f"{app_name}:{inst}" if inst else app_name, role_name] + rc, _ = run_command_and_get_output(add_first_role_cmd, cwd=tmpdir_with_cfg) + assert rc == 0 + + flags = [] + if is_force: + flags.extend(["-f"]) + if is_global: + flags.extend(["-G"]) + if group: + flags.extend(["-g", group]) + if rs: + flags.extend(["-r", rs]) + if inst_flg: + flags.extend(["-i", inst_flg]) + + uri = None + if is_uri: + uri = f"client:secret@{tmpdir_with_cfg}/{app_name}/{list(instances)[0]}.iproto" + + roles_add_cmd = [tt_cmd, "rs", "roles", "add", + (f"{app_name}:{inst}" if inst else app_name + if not is_uri else uri), role_name] + if len(flags) != 0: + roles_add_cmd.extend(flags) + rc, out = run_command_and_get_output(roles_add_cmd, cwd=tmpdir_with_cfg) + if err_msg == "": + assert rc == 0 + kv = read_kv(app_path) + cluster_cfg = parse_yml(kv["config"]) + if is_global: + assert "Add role to global scope" + assert role_name in cluster_cfg["roles"] + if group: + assert f"Add role to group: {group}" + assert role_name in cluster_cfg["groups"][group]["roles"] + if rs: + assert f"Add role to replicaset: {rs}" + gr = get_group_by_replicaset_name(cluster_cfg, rs) + assert role_name in cluster_cfg["groups"][gr]["replicasets"][rs]["roles"] + if inst_flg or inst: + i = inst if inst else inst_flg + assert f"Add role to instance: {i}" + g, r = get_group_replicaset_by_instance_name(cluster_cfg, i) + assert (role_name in + cluster_cfg["groups"][g]["replicasets"][r]["instances"][i]["roles"]) + else: + assert rc == 1 + assert err_msg in out + finally: + stop_application(tt_cmd, + app_name, + tmpdir_with_cfg, instances, + force=True if stop_instance else False) + + +@pytest.mark.skipif(tarantool_major_version >= 3, + reason="skip cartridge tests for Tarantool 3.0") +@pytest.mark.parametrize(TEST_ROLES_ADD_PARAMS_CARTRIDGE, [ + make_test_roles_add_param( + True, + rs="s-1", + group="default", + role_name="failover-coordinator", + ), + make_test_roles_add_param( + True, + rs="s-1", + group="default", + role_name="failover-coordinator", + inst_flg="s1-master", + err_msg=("cannot pass the instance or --instance (-i) flag due to cluster" + " with cartridge orchestrator can't add role into instance scope") + ), + make_test_roles_add_param( + True, + rs="s-1", + group="default", + role_name="failover-coordinator", + is_global=True, + err_msg="cannot pass --global (-G) flag due to cluster with cartridge" + ), + make_test_roles_add_param( + True, + inst_flg="s-1.master", + role_name="failover-coordinator", + err_msg="in cartridge replicaset name must be specified via --replicaset flag" + ), + make_test_roles_add_param( + True, + role_name="vshard-storage", + rs="s-1", + err_msg="role \"vshard-storage\" already exists in replicaset \"s-1\"", + ), + make_test_roles_add_param( + True, + rs="unknown", + role_name="some_role", + err_msg="failed to find replicaset \"unknown\"" + ), + make_test_roles_add_param( + True, + rs="r", + role_name="role", + is_custom=True, + err_msg="roles add is not supported for an application by \"custom\" orchestrator", + ), + make_test_roles_add_param( + True, + rs="r", + role_name="role", + is_custom=True, + err_msg="roles add is not supported for an application by \"custom\" orchestrator", + ), + make_test_roles_add_param( + True, + rs="r", + role_name="role", + is_uri=True, + err_msg="roles add is not supported for a single instance by \"cartridge\" orchestrator", + ), +]) +def test_replicaset_cartridge_roles_add(tt_cmd, + cartridge_app, + role_name, + rs, + inst_flg, + group, + is_uri, + is_global, + is_custom, + err_msg): + flags = [] + if is_global: + flags.extend(["-G"]) + if group: + flags.extend(["-g", group]) + if rs: + flags.extend(["-r", rs]) + if inst_flg: + flags.extend(["-i", inst_flg]) + + uri = None + if is_uri: + uri = cartridge_app.instances_cfg[f"{cartridge_name}.s1-master"]["advertise_uri"] + uri = f"{cartridge_username}:{cartridge_password}@{uri}" + + roles_add_cmd = [tt_cmd, "rs", "roles", "add"] + if is_custom: + roles_add_cmd.append("--custom") + roles_add_cmd.extend([cartridge_name if not is_uri else uri, role_name]) + roles_add_cmd.extend(flags) + rc, out = run_command_and_get_output(roles_add_cmd, cwd=cartridge_app.workdir) + + if err_msg == "": + assert rc == 0 + + buf = io.StringIO(out) + assert "• Discovery application..." in buf.readline() + buf.readline() + # Skip init status in the output. + parse_status(buf) + assert f"Add role {role_name} to replicaset: {rs}" in buf.readline() + assert f"Replicaset {rs} now has these roles enabled:" in buf.readline() + assert role_name if group == "" else f"{role_name} ({group})" in buf.readline() + assert "vshard-storage (default)" in buf.readline() + assert "Done." in buf.readline() + + status_cmd = [tt_cmd, "rs", "status", cartridge_name] + rc, out = run_command_and_get_output(status_cmd, cwd=cartridge_app.workdir) + assert rc == 0 + + # Check status. + actual_roles = parse_status(io.StringIO(out))["replicasets"][rs]["roles"] + assert role_name in actual_roles + else: + assert rc == 1 + assert err_msg in out diff --git a/test/integration/replicaset/test_replicaset_vshard.py b/test/integration/replicaset/test_replicaset_vshard.py index 2bd458cfa..d58fd9c4c 100644 --- a/test/integration/replicaset/test_replicaset_vshard.py +++ b/test/integration/replicaset/test_replicaset_vshard.py @@ -8,8 +8,9 @@ cartridge_username) from replicaset_helpers import eval_on_instance, parse_status, stop_application -from utils import (create_tt_config, get_tarantool_version, - run_command_and_get_output, wait_event, wait_file) +from utils import (create_tt_config, get_tarantool_version, log_file, log_path, + run_command_and_get_output, wait_event, wait_file, + wait_string_in_file) tarantool_major_version, tarantool_minor_version = get_tarantool_version() @@ -193,6 +194,15 @@ def stop_and_clean(): for inst in instances: file = wait_file(app_path, f'ready-{inst}', []) assert file != "" + + wait_string_in_file(app_path / log_path / "router-001-a" / log_file, + "All replicas are ok") + for inst in ["storage-001-a", "storage-002-a"]: + wait_string_in_file(app_path / log_path / inst / log_file, + "leaving orphan mode") + for inst in ["storage-001-b", "storage-002-b"]: + wait_string_in_file(app_path / log_path / inst / log_file, + "subscribed replica") return tmpdir diff --git a/test/utils.py b/test/utils.py index c3893ed31..3bd4cc0ef 100644 --- a/test/utils.py +++ b/test/utils.py @@ -507,6 +507,8 @@ def get_tarantool_version(): def read_kv(dirname): kvs = {} for filename in os.listdir(dirname): + if not os.path.isfile(os.path.join(dirname, filename)): + continue key, _ = os.path.splitext(filename) with open(os.path.join(dirname, filename), "r") as f: kvs[key] = f.read()