Skip to content

Commit

Permalink
persist cluster-enabled status in RocksDB
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisxu333 committed Jan 15, 2024
1 parent 9d656f1 commit f3b8266
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 10 deletions.
7 changes: 5 additions & 2 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ daemonize no
# If you enable cluster, kvrocks will encode key with its slot id calculated by
# CRC16 and modulo 16384, encoding key with its slot id makes it efficient to
# migrate keys based on the slot. So if you enabled at first time, cluster mode must
# not be disabled after restarting, and vice versa. That is to say, data is not
# compatible between standalone mode with cluster mode, you must migrate data
# not be disabled after restarting, and vice versa. Currently, kvrocks will keep
# using the cluster-enabled status that is persisted at first time, regardless of
# what cluster-enabled status is provided afterwards.
# Note that even if kvrocks has such protection, you should also be aware that data
# is not compatible between standalone mode with cluster mode, you must migrate data
# if you want to change mode, otherwise, kvrocks will make data corrupt.
#
# Default: no
Expand Down
25 changes: 25 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ Status Server::Start() {
}
}

s = overrideClusterMode();
if (!s.IsOK()) {
return s;
}

if (config_->cluster_enabled) {
if (config_->persist_cluster_nodes_enabled) {
auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
Expand Down Expand Up @@ -1839,6 +1844,26 @@ void Server::cleanupExitedWorkerThreads(bool force) {
}
}

Status Server::overrideClusterMode() {
std::string value;
auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
rocksdb::Status check_cluster_enabled =
storage->Get(rocksdb::ReadOptions(), cf, rocksdb::Slice(engine::kClusterEnabledKey), &value);

if (check_cluster_enabled.IsNotFound()) {
return storage->WriteToPropagateCF(engine::kClusterEnabledKey, std::to_string(config_->cluster_enabled));
}
if (check_cluster_enabled.ok()) {
if (config_->cluster_enabled != stoi(value)) {
LOG(ERROR) << "cluster_enable status from config file is inconsistent with the persisted one";
return {Status::NotOK, "mismatching cluster_enable status"};
}
return Status::OK();
} else {
return {Status::NotOK, "failed to load cluster_enable from storage: " + check_cluster_enabled.ToString()};
}
}

std::string ServerLogData::Encode() const {
if (type_ == kReplIdLog) {
return std::string(1, kReplIdTag) + " " + content_;
Expand Down
1 change: 1 addition & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class Server {
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
void cleanupExitedWorkerThreads(bool force);
Status overrideClusterMode();

std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ constexpr const char *kLuaFuncSHAPrefix = "lua_f_";
constexpr const char *kLuaFuncLibPrefix = "lua_func_lib_";
constexpr const char *kLuaLibCodePrefix = "lua_lib_code_";

const std::string kClusterEnabledKey = "config_cluster_enabled";

struct CompressionOption {
rocksdb::CompressionType type;
const std::string name;
Expand Down
11 changes: 11 additions & 0 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func TestClusterNodes(t *testing.T) {
require.EqualValues(t, []redis.ClusterNode{{ID: nodeID, Addr: srv.HostPort()}}, slots[0].Nodes)
})

t.Run("enable/disable cluster-enabled option", func(t *testing.T) {
// force change cluster-enabled status in kvrocks.conf file
srv.ForceChangeClusterMode(false)
defer func() {
srv.ForceChangeClusterMode(true)
srv.Restart(util.RestartOpt{Nowait: false, Noclose: true})
}()
srv.Restart(util.RestartOpt{Nowait: true, Noclose: false})
require.ErrorContains(t, rdb.Do(ctx, "clusterx", "version").Err(), "connection refused")
})

t.Run("enable/disable the persist cluster nodes", func(t *testing.T) {
require.NoError(t, rdb.ConfigSet(ctx, "persist-cluster-nodes-enabled", "yes").Err())
srv.Restart()
Expand Down
54 changes: 46 additions & 8 deletions tests/gocase/util/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"testing"
Expand All @@ -39,6 +40,11 @@ import (
"golang.org/x/exp/slices"
)

type RestartOpt struct {
Nowait bool
Noclose bool
}

type KvrocksServer struct {
t testing.TB
cmd *exec.Cmd
Expand Down Expand Up @@ -134,8 +140,38 @@ func (s *KvrocksServer) close(keepDir bool) {
s.clean(keepDir)
}

func (s *KvrocksServer) Restart() {
s.close(true)
func (s *KvrocksServer) ForceChangeClusterMode(enable bool) {
dir := s.configs["dir"]
f, err := os.OpenFile(filepath.Join(dir, "kvrocks.conf"), os.O_RDWR, 0666)
require.NoError(s.t, err)
defer func() { require.NoError(s.t, f.Close()) }()

// change the line containing cluster-enabled to no
data, err := os.ReadFile(filepath.Join(dir, "kvrocks.conf"))
require.NoError(s.t, err)

content := string(data)
var newContent string
if !enable {
newContent = strings.ReplaceAll(content, "cluster-enabled yes", "cluster-enabled no")
} else {
newContent = strings.ReplaceAll(content, "cluster-enabled no", "cluster-enabled yes")
}
err = os.WriteFile(filepath.Join(dir, "kvrocks.conf"), []byte(newContent), 0666)
require.NoError(s.t, err)
}

func (s *KvrocksServer) Restart(opt ...RestartOpt) {
nowait := false
noclose := false
if len(opt) >= 1 {
nowait = opt[0].Nowait
noclose = opt[0].Noclose
}

if !noclose {
s.close(true)
}

b := *binPath
require.NotEmpty(s.t, b, "please set the binary path by `-binPath`")
Expand All @@ -157,12 +193,14 @@ func (s *KvrocksServer) Restart() {

require.NoError(s.t, cmd.Start())

c := redis.NewClient(&redis.Options{Addr: s.addr.String()})
defer func() { require.NoError(s.t, c.Close()) }()
require.Eventually(s.t, func() bool {
err := c.Ping(context.Background()).Err()
return err == nil || err.Error() == "NOAUTH Authentication required."
}, time.Minute, time.Second)
if !nowait {
c := redis.NewClient(&redis.Options{Addr: s.addr.String()})
defer func() { require.NoError(s.t, c.Close()) }()
require.Eventually(s.t, func() bool {
err := c.Ping(context.Background()).Err()
return err == nil || err.Error() == "NOAUTH Authentication required."
}, time.Minute, time.Second)
}

s.cmd = cmd
s.clean = func(keepDir bool) {
Expand Down

0 comments on commit f3b8266

Please sign in to comment.